Canal2Kafka2StarRocks

背景

在支持客户中,我们发现有一些客户公司已经存在一些比较完善数据通道,不允许业务直接消费MySQL Binlog,所有的数据消费都是从Kafka中获取,所以写这篇文档分享下如何消费Kafka中canal格式的数据写到到starrocks,实现CDC。

数据流向

Mysql Binlog–>Canal–>kafka–>Flink SQL–>StarRocks

环境准备

  1. Canal

关于canal相关的配置这里就不赘述了,建议大家可以参考使用canal从mysql同步binlog导入StarRocks安装和配置下canal以及依赖环境

  1. Flink

这里我们需要利用flink sql完成数据的读取和写入,所以需要大家安装flink服务。以下介绍单机版Flink安装教程。(如果公司已有flink集群服务,可跳过这一部分)

启动Flink服务:

cd flink-xxx
./bin/start-cluster.sh

DDL

Kafka中数据样例

{
    "data":[
        {
            "id":"2f2192e9-f8b5-4332-a96f-192b05c9e6bc",
            "agent_id":"16",
            "http_port":"8031",
            "rpc_port":"9020",
            "query_port":"8306",
            "edit_log_port":"9010",
            "meta_dir":"",
            "absolute_meta_dir":"/home/disk1/sr/data/sr/meta",
            "log_dir":"",
            "absolute_log_dir":"/home/disk1/sr/starrocks-manager-20211008/fe-2f2192e9-f8b5-4332-a96f-192b05c9e6bc/log",
            "role":"FOLLOWER",
            "install_path":"/home/disk1/sr/starrocks-manager-20211008",
            "absolute_migrate_path":"/home/disk1/sr/app/StarRocks/SE/StarRocks-1.18.3/fe",
            "deleted":"0",
            "deleted_at":"0",
            "created_at":"1633759183484",
            "updated_at":"1634240355691"
        }
    ],
    "database":"test",
    "es":1634240355000,
    "id":1076,
    "isDdl":false,
    "mysqlType":{
        "id":"varchar(48)",
        "agent_id":"int(11)",
        "http_port":"int(11)",
        "rpc_port":"int(11)",
        "query_port":"int(11)",
        "edit_log_port":"int(11)",
        "meta_dir":"text",
        "absolute_meta_dir":"text",
        "log_dir":"text",
        "absolute_log_dir":"text",
        "role":"varchar(32)",
        "install_path":"text",
        "absolute_migrate_path":"text",
        "deleted":"tinyint(1)",
        "deleted_at":"bigint(20)",
        "created_at":"bigint(20)",
        "updated_at":"bigint(20)"
    },
    "old":[
        {
            "updated_at":"1634240295633"
        }
    ],
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":12,
        "agent_id":4,
        "http_port":4,
        "rpc_port":4,
        "query_port":4,
        "edit_log_port":4,
        "meta_dir":2005,
        "absolute_meta_dir":2005,
        "log_dir":2005,
        "absolute_log_dir":2005,
        "role":12,
        "install_path":2005,
        "absolute_migrate_path":2005,
        "deleted":-7,
        "deleted_at":-5,
        "created_at":-5,
        "updated_at":-5
    },
    "table":"fe_instances",
    "ts":1634240355886,
    "type":"UPDATE"
}

StarRocks

create database canaltest;
CREATE TABLE IF NOT EXISTS `canaltest`.`canal_test_sink` (
  `id` STRING NOT NULL,
  `agent_id` int(11) NULL,
  `http_port` int(11) NULL,
  `rpc_port` int(11) NULL,
  `query_port` int(11),
  `edit_log_port` int(11),
  `meta_dir` STRING,
  `absolute_meta_dir` STRING,
  `log_dir` STRING,
  `absolute_log_dir` STRING,
  `role` varchar(32),
  `install_path` STRING,
  `absolute_migrate_path` STRING,
  `deleted` tinyint(1),
  `deleted_at` bigint(20),
  `created_at` bigint(20),
  `updated_at` bigint(20)
) ENGINE=OLAP 
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 3 
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);

Flink SQL

可以写到文件flink-create.sql中

CREATE DATABASE IF NOT EXISTS `testdb`;

CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_source` (
  `id` STRING NOT NULL,
  `agent_id` int NULL,
  `http_port` int NULL,
  `rpc_port` int NULL,
  `query_port` int,
  `edit_log_port` int,
  `meta_dir` STRING,
  `absolute_meta_dir` STRING,
  `log_dir` STRING,
  `absolute_log_dir` STRING,
  `role` varchar,
  `install_path` STRING,
  `absolute_migrate_path` STRING,
  `deleted` tinyint,
  `deleted_at` bigint,
  `created_at` bigint,
  `updated_at` bigint
) with (
 'connector' = 'kafka',
 'topic' = 'canal_test', #kafka topic名字
 'properties.bootstrap.servers' = '$kafka_host:9092', #kafka主机名
 'properties.group.id' = 'canal_group', #kafka消费组
 'format' = 'canal-json'  -- 使用 canal-json 格式
);

CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_sink` (
  `id` STRING NOT NULL,
  `agent_id` int NULL,
  `http_port` int NULL,
  `rpc_port` int NULL,
  `query_port` int NULL,
  `edit_log_port` int,
  `meta_dir` STRING,
  `absolute_meta_dir` STRING,
  `log_dir` STRING,
  `absolute_log_dir` STRING,
  `role` STRING,
  `install_path` STRING,
  `absolute_migrate_path` STRING,
  `deleted` tinyint,
  `deleted_at` bigint,
  `created_at` bigint,
  `updated_at` bigint,
   PRIMARY KEY(`id`)
 NOT ENFORCED
) with (
  'load-url' = '$fe_host:8030',
  'sink.properties.row_delimiter' = '\x02',
  'username' = 'root',
  'database-name' = 'canaltest',
  'sink.properties.column_separator' = '\x01',
  'jdbc-url' = 'jdbc:mysql://$fe_host:9030',
  'password' = '',
  'sink.buffer-flush.interval-ms' = '15000',
  'connector' = 'starrocks',
  'table-name' = 'canal_test_sink' #starrocks中的表名
);

INSERT INTO `testdb`.`canal_test_sink` SELECT * FROM `testdb`.`canal_test_source`;

启动任务测试

cd flink-xxx
./bin/sql-client.sh -f flink-create.sql
#查看任务状态
./bin/flink list
#输出如下图表示正常启动
Waiting for response...
------------------ Running/Restarting Jobs -------------------
18.03.2022 09:48:34 : 4a2c5035ca292fef9691524c731122c2 : insert-into_default_catalog.test.canal_test_sink (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

确认数据是否已经导入starrocks中

select * from canaltest.canal_test_sink;

常见问题排查

1.Flink任务没有报错的时候

第一步:确认binlog是否开启,可以通过 SHOW VARIABLES LIKE 'log_bin’查看;

第二步:确认flink、flink-cdc、flink-starrocks-connector和mysql版本(MySQL版本为5.7和8.0.X)是否满足要求,flink、flink-cdc和flink-starrocks-connector的大版本需要一致,例如都是1.13版本

第三步:逐步判断是查源表还是写starrocks的问题,这里利用下面的sql文件演示一下,该文件是上面生成步骤生成的flink-create.sql

安装的Flink目录下执行下面语句进入flink-sql

bin/sql-client.sh

首先验证读取source表是否正常

#分别把上面的sql粘贴进来判断是查询源表的问题还是写入到starrocks的问题
CREATE DATABASE IF NOT EXISTS `testdb`;

CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_source` (
  `id` STRING NOT NULL,
  `agent_id` int NULL,
  `http_port` int NULL,
  `rpc_port` int NULL,
  `query_port` int,
  `edit_log_port` int,
  `meta_dir` STRING,
  `absolute_meta_dir` STRING,
  `log_dir` STRING,
  `absolute_log_dir` STRING,
  `role` varchar,
  `install_path` STRING,
  `absolute_migrate_path` STRING,
  `deleted` tinyint,
  `deleted_at` bigint,
  `created_at` bigint,
  `updated_at` bigint
) with (
 'connector' = 'kafka',
 'topic' = 'canal_test', #kafka topic名字
 'properties.bootstrap.servers' = '$kafka_host:9092',
 'properties.group.id' = 'canal_group',
 'format' = 'canal-json'  -- 使用 canal-json 格式
);
#验证source是否正常
select * from `testdb`.`canal_test_source`

再验证写入starrocks是否正常

CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_sink` (
  `id` STRING NOT NULL,
  `agent_id` int NULL,
  `http_port` int NULL,
  `rpc_port` int NULL,
  `query_port` int NULL,
  `edit_log_port` int,
  `meta_dir` STRING,
  `absolute_meta_dir` STRING,
  `log_dir` STRING,
  `absolute_log_dir` STRING,
  `role` STRING,
  `install_path` STRING,
  `absolute_migrate_path` STRING,
  `deleted` tinyint,
  `deleted_at` bigint,
  `created_at` bigint,
  `updated_at` bigint,
   PRIMARY KEY(`id`)
 NOT ENFORCED
) with (
  'load-url' = '$fe_host:8030',
  'sink.properties.row_delimiter' = '\x02',
  'username' = 'root',
  'database-name' = 'canaltest',
  'sink.properties.column_separator' = '\x01',
  'jdbc-url' = 'jdbc:mysql://$fe_host:9030',
  'password' = '',
  'sink.buffer-flush.interval-ms' = '15000',
  'connector' = 'starrocks',
  'table-name' = 'canal_test_sink'
);

INSERT INTO `testdb`.`canal_test_sink` SELECT * FROM `testdb`.`canal_test_source`;

2.Flink任务出错

第一步:确认flink集群是否有启动,可能有的同学本地下载的flink没有启动,需要./bin/start-cluster.sh启动下flink

第二步:根据具体的报错再具体分析

1赞

600多个表,flink任务多了后,导入StarRocks报 导入报错close index channel failed 和 intolerable failure in opening node channels 错误
flink任务多了,导入频率不好控制,特别是kafka数据积压几天后,数据量很大,flink重启一下,所有的任务都恢复执行,频繁报StarRocks错误
有没有什么跟可靠的方便的方案?

1.控制频率在5-10s一个批次写入
2.可以看下每个分桶大小是否1g左右,如果分桶太多可以考虑降低下分桶个数