SparkStreaming2StarRocks

背景

通过实时计算满足实时统计

数据流向

kafka–>spark streaming–>stream load–>StarRocks

环境准备

  1. 创建topic:spark_demo1_src

./bin/kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 1 --partitions 4 --topic spark_demo1_src
  1. 模拟数据脚本

demo1_data_gen.py

#!/bin/python
# Copyright (c) 2020 Beijing Dingshi Zongheng Technology Co., Ltd. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
import random
import time

def genUid(s = 10000):
    return random.randint(1,s)

def getSite():
    site_scope = ['https://www.starrocks.com/', 'https://trial.starrocks.com/', 'https://docs.starrocks.com/']
    idx = random.randint(0,len(site_scope) -1 )
    return site_scope[idx]

def getTm():
    delay_jitter = random.randint(-1800, 0)
    chance = random.randint(0,3)
    return long(time.time() + delay_jitter * chance)

"""
{uid:1, site: https://www.starrocks.com/, time: 1621410635}
"""
def gen():
    data = """{ "uid":%d, "site": "%s", "time": %s } """ % (genUid(), getSite(), getTm())
    return data

def main():
    lines =  random.randint(1,long(sys.argv[1]) )
    for x in range(lines):
        data = gen()
        print(data)
if __name__ == '__main__':
    main()

gen_data.sh

#!/bin/bash
# Copyright (c) 2020 Beijing Dingshi Zongheng Technology Co., Ltd. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

echo "Usage: $0 interval lines topicName"

interval=$1
lines=$2
topic=$3

echo "Sending time data to ${topic:=starrocks_t1_src} every ${interval:=15} seconds..."
while true ; do
  python demo1_data_gen.py  $lines| ./bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic spark_demo1_src
  sleep "${interval:=15}"
done
  1. zeppline安装

本次测试使用docker,非docker部署可参考网上文档部署

docker pull apache/zeppelin:0.9.0 #拉取镜像,可配置proxy加速

"""挂载本地方式启动,一直没搞成功
docker run -p 8089:8080  -v /Users/dmzgxl/opt/zep:/opt/zeppelin -v /Users/dmzgxl/opt/zep/logs:/logs -v /Users/dmzgxl/opt/zep/notebooks:/notebook   -e ZEPPELIN_LOG_DIR='/logs'  -e ZEPPELIN_NOTEBOOK_DIR='/notebook' -v /etc/localtime:/etc/localtime -v /Users/dmzgxl/opt/zep/deps:/deps --rm  -d --name zeppelin apache/zeppelin:0.9.0; sleep 10; open http://localhost:8089
"""

#启动zeppelin
docker run -d --name zeppelin -p 8089:8080 apache/zeppelin:0.9.0#可挂载本地目录到docker,方便加载一些依赖,搞得过程中加了挂载后一直没办法起来,所以采取下面方法实现了
#会依赖jar包连接mysql,需要下载插件
wget https://cdn.mysql.com//archives/mysql-connector-java-5.1/mysql-connector-java-5.1.46.zip
unzip mysql-connector-java-5.1.46.zip
cd mysql-connector-java-5.1.46
#上传jar包到容器
docker cp mysql-connector-java-5.1.46-bin.jar $(CONTAINER ID):/opt/zeppelin

代码构建

Fork https://github.com/StarRocks/demo.git到自己的仓库,例如https://github.com/LittleBeeBee/demo.git

方法一:主机可访问外网的方式

mkdir project
cd project
git clone https://github.com/LittleBeeBee/demo.git
cd demo/SparkDemo
#编译,会生成target/SparkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar
mvn clean scala:compile compile package

方法二:主机无法访问外网的方式

本地主机:自己的电脑 服务器:线上的机器

本地主机编译打包好上传到服务器

  1. 在服务器新建repo
mkdir project/git/starrocks-demo.git -p
cd project/git/starrocks-demo.git
git init --bare
  1. 本地关联远程git repo
git clone https://github.com/LittleBeeBee/demo.git
cd demo/
git checkout -b dev #新建dev分支
git remote add ssh://jingdan@doris-sandbox04:/home/disk2/jingdan/project/git/starrocks-demo.git
cd SparkDemo
mvn clean scala:compile compile package
git add *
git commit -m "compile"
git push upstream master

  1. 服务器拉取代码
cd project
git clone ssh://jingdan@starrocks-sandbox04:/home/disk2/jingdan/project/git/starrocks-demo.git
cd starrocks-demo/SparkDemo

生成测试数据

启动生成数据

cd /home/disk1/starrocks/thirdparty/kafka_2.12-2.5.0/script
sh gen_data.sh 2 10 spark_demo1_src

启动spark程序

cd starrocks-demo/SparkDemo
java -cp target/SparkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.starrocks.spark.SparkStreaming2starrocks

查看starrocks数据是否写入

mysql> select * from demo1_spark_tb0 limit 10;
+---------------------------+------------+------+--------+------+
| site                      | date       | hour | minute | uv   |
+---------------------------+------------+------+--------+------+
| https://docs.starrocks.com/ | 2021-07-22 |   19 |     56 | NULL |
| https://docs.starrocks.com/ | 2021-07-22 |   19 |     58 | NULL |
| https://docs.starrocks.com/ | 2021-07-22 |   20 |      1 | NULL |
| https://docs.starrocks.com/ | 2021-07-22 |   20 |      4 | NULL |
| https://docs.starrocks.com/ | 2021-07-22 |   20 |      5 | NULL |
| https://docs.starrocks.com/ | 2021-07-22 |   20 |      7 | NULL |
| https://docs.starrocks.com/ | 2021-07-22 |   20 |      8 | NULL |
| https://docs.starrocks.com/ | 2021-07-22 |   20 |      9 | NULL |
| https://docs.starrocks.com/ | 2021-07-22 |   20 |     11 | NULL |
| https://docs.starrocks.com/ | 2021-07-22 |   20 |     12 | NULL |
+---------------------------+------------+------+--------+------+

可视化demo

打开http://localhost:8089

配置mysql jdbc

创建新的notebook

最终效果图