mysql+canal+kafka+SparkStreaming+StarRocks导入示例

mysql+canal+kafka+SparkStreaming+StarRocks

环境:

集群已有mysql+zookeeper+kafka,未有组件请自行百度安装~~~

mysql

确认mysql是否开启binlog

show variables like '%log_bin%';

未开启binlog

vim /etc/my.cnf 添加以下内容

server_id=1918
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 7
max_binlog_size = 100M

重启mysql再次查看相关参数

CREATE USER canal IDENTIFIED BY '123456';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY '123456';
CREATE DATABASE canal;

安装canal

下载安装

安装目录:/home/disk1/sr/app/canal-1.1.4

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

解压后目录如下:

- bin   # 运维脚本
- conf  # 配置文件  
    canal_local.properties  # canal本地配置,一般不需要动  
    canal.properties        # canal服务配置  
    logback.xml             # logback日志配置  
    metrics                 # 度量统计配置  
    spring                  # spring-实例配置,主要和binlog位置计算、一些策略配置相关,可以在canal.properties选用其中的任意一个配置文件  
    example                 # 实例配置文件夹,一般认为单个数据库对应一个独立的实例配置文件夹    
        instance.properties   # 实例配置,一般指单个数据库的配置
- lib   # 服务依赖包
- logs  # 日志文件输出目录

配置编辑

编辑canal.properties文件 vim conf/canal.properties

去掉canal.instance.parser.parallelThreadSize = 16

这个配置项的 注释 ,也就是启用此配置项,和实例解析器的线程数相关,不配置会表现为阻塞或者不进行解析。

canal.serverMode 配置项指定为 kafka ,可选值有 tcpkafkarocketmqmaster 分支或者最新的的 v1.1.5-alpha-1 版本,可以选用 rabbitmq ),默认是 kafka

canal.mq.servers 配置需要指定为 Kafka 服务或者集群 Broker 的地址,这里配置为 127.0.0.1:9092

编辑instance.properties文件 vim conf/example/instance.properties

canal.instance.mysql.slaveId

#需要配置一个和

Master

节点的服务

ID

完全不同的值,这里配置为

654321

配置数据源实例,包括地址、用户、密码和目标数据库:

canal.instance.master.address #这里指定为 127.0.0.1:3306

canal.instance.dbUsername #这里指定为 canal

canal.instance.dbPassword #这里指定为123456。

新增 canal.instance.defaultDatabaseName #这里指定为canal(需要在 MySQL 中建立一个 test 数据库,见前面的流程)。

Kafka 相关配置,这里暂时使用静态 topic 和单个 partition

canal.mq.topic #这里指定为canal test也就是解析完的 binlog 结构化数据会发送到 Kafka 的命名为 test topic

canal.mq.partition #这里指定为 0

配置修改完成后启动canal

Sh bin/startup.sh

DDL操作

use `canal`;
CREATE TABLE `order`(    
id BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主键',    
order_id    VARCHAR(64)    NOT NULL COMMENT '订单ID',    
amount      DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '订单金额',    
create_time DATETIME       NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',    UNIQUE uniq_order_id (`order_id`)) COMMENT '订单表';

INSERT INTO `order`(order_id, amount) VALUES ('10086', 999);UPDATE `order` SET amount = 10087 WHERE order_id = '10086';DELETE  FROM `order` WHERE order_id = '10086';

查看canal日志

Tail -30f /home/disk1/sr/app/canal-1.1.4/logs/canal/example/example.log 日志如下

(刚开始有个错误mysql链接失败的截图忘了截图,排查cs01集群的mysql,无法用密码登录,确认密码无误,卡了半天最后发现mysql库里的user表有两行user和authentication_string是空,导致任何一个用户都无法用密码登录。。。只能免密登录,确认两个空行是无用信息后删除了,这个卡了好久!!!参考资料:https://www.cnblogs.com/zhaoxd07/p/5580543.html)

解决mysql问题后example.log日志如下,已经正常读取binlog。

另一个日志报错显示无法连接kafka:

Tail -30f /home/disk1/sr/app/canal-1.1.4/logs/canal/canal.log

去查kafka配置文件

vim /home/disk1/sr/app/kafka_2.12-2.5.0/config/server.properties

修改canal配置文件相应参数 vim canal.properties

canal.mq.servers = xxx:9092

重启canal查看日志

忘了创建topic

cd /home/disk1/sr/app/kafka_2.12-2.5.0/bin
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic canaltest

创建分区后日志已不刷新上述错误

查看kafka topic数据

sh /home/disk1/sr/app/kafka_2.12-2.5.0/bin/kafka-console-consumer.sh --bootstrap-server 172.26.194.184:9092 --from-beginning --topic canaltest

数据已成功写入kafka

安装maven

官网地址: http://maven.apache.org/download.cgi下载apache-maven-3.8.3-bin.tar.gz
## 上传到/usr/local
cd /usr/local 
## 解压
tar -zxvf apache-maven-3.8.3-bin.tar.gz
## 配置环境变量
vi /etc/profile
export MAVEN_HOME=/usr/local/apache-maven-3.8.3
export PATH=$MAVEN_HOME/bin:$PATH 
## 刷新环境变量
source /etc/profile

SparkStream2StarRocks

导入样例数据:

{“data”:[{“host”:“172.26.194.184”,“collect_time”:“1634257382332000000”}],“database”:“test”,“es”:1634257382000,“id”:1466,“isDdl”:false,“mysqlType”:{“host”:“varchar(32)”,“collect_time”:“bigint(20)”},“old”:[{“collect_time”:“1634257102008000000”}],“pkNames”:[“host”],“sql”:"",“sqlType”:{“host”:12,“collect_time”:-5},“table”:“query_collect_time”,“ts”:1634257382785,“type”:“UPDATE”}

DDL

CREATE TABLE sparkstreamig(
 `database` varchar(50) NULL COMMENT "",
 `es` varchar(50) NULL COMMENT "",
 `id` varchar(50) NULL COMMENT "",
 `table` string NULL COMMENT ""
) ENGINE=OLAP 
DUPLICATE KEY(`database`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`database`) BUCKETS 3 
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);

代码构建:

https://github.com/StarRocks/demo fork到自己仓库 https://github.com/qcydorisdb/demo

修改SparkStreaming2StarRocks.scala文件中的database,topic,brokers等信息,示例如下:

SparkStreaming2StarRocks.scala (5.0 KB)

mkdir project
cd project
## clone修改后的文件到服务器
git clone https://github.com/qcydorisdb/demo

cd demo/SparkDemo

编译

mvn clean scala:compile compile package

启动spark 程序

cd /home/disk1/qichaoyang/project/demo/SparkDemo
java -cp target/SparkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.starrocks.spark.SparkStreaming2StarRocks

执行任务报错:

分析:database列和table列是关键词导入失败,

修改SparkStream2StarRocks内容,columns添加反引号。

修改前:

val columns = "database,es,id,table"

val database = jsObj.getString("database")
val es = jsObj.getString("es")
val id = jsObj.getString("id")
val table = jsObj.getString("table")

修改后

val columns = "`database`,`es`,`id`,`table`"

val `database` = jsObj.getString("database")
val `es` = jsObj.getString("es")
val `id` = jsObj.getString("id")
val `table` = jsObj.getString("table")

再次编译并执行demo

cd /home/disk1/qichaoyang/project/demo/SparkDemo
mvn clean scala:compile compile package
java -cp target/SparkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.starrocks.spark.SparkStreaming2StarRocks

插入成功。

查询表数据:

参考资料:

https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

https://github.com/alibaba/canal/wiki/AdminGuide

基于Canal和Kafka实现MySQL的Binlog近实时同步 - throwable - 博客园

1赞

这个是支持insert/update/delete的吗

这个是只支持insert,关于upsert语义及使用可以看下官网1.19的新增的cdc内容https://docs.starrocks.com/zh-cn/main/release_notes/release-1.19

看过了,感谢!期待