flink写入starrocks异常

【详述】flink写入starrocks异常
【背景】我正在用flink-connector-starrocks写入starrocks中,但是starrocks没有数据,flink版本是1.11
【业务影响】
【StarRocks版本】2.1.3
【集群规模】例如:1fe(1 follower)+5be(fe与be混部)
【机器信息】8C/16G/万兆
【附件】
{“Status”:“Fail”,“BeginTxnTimeMs”:0,“Message”:“Cancelled FileScanNode::get_next”,“NumberUnselectedRows”:0,“CommitAndPublishTimeMs”:0,“Label”:“dc2dfdfb-71fc-440f-b0c9-6fe8fce3b808”,“LoadBytes”:94415231,“StreamLoadPutTimeMs”:4,“NumberTotalRows”:0,“WriteDataTimeMs”:74429,“TxnId”:1832,“LoadTimeMs”:74434,“ReadDataTimeMs”:84,“NumberLoadedRows”:0,“NumberFilteredRows”:0}
{}

at com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor.doStreamLoad(StarRocksStreamLoadVisitor.java:104)
at com.starrocks.connector.flink.manager.StarRocksSinkManager.asyncFlush(StarRocksSinkManager.java:324)
at com.starrocks.connector.flink.manager.StarRocksSinkManager.lambda$startAsyncFlushing$0(StarRocksSinkManager.java:161)
at java.lang.Thread.run(Thread.java:748)

有没有人帮我看看?很急

看起来节点服务不正常,flush 失败了

但是我执行SHOW PROC ‘/backends’ \G看到所有节点isAlive属性都是true啊,都是正常的啊

换个高版本的flink 试试

不行,我刚升到1.13版本还是报同样的错误

用的啥方法?步骤?

我是用的官网的flink写入starrocks的方法,

StarRocksSinkOptions.builder()
            .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx")
            .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port")
            .withProperty("username", "xxx")
            .withProperty("password", "xxx")
            .withProperty("table-name", "xxx")
            .withProperty("database-name", "xxx")
            .withProperty("sink.properties.format", "json")
            .withProperty("sink.properties.strip_outer_array", "true")
            .build()

然后表里面列很多,有400列,表属性包括:
PROPERTIES (
“replication_num” = “1”,
“dynamic_partition.enable” = “true”,
“dynamic_partition.time_unit” = “DAY”,
“dynamic_partition.end” = “3”,
“dynamic_partition.prefix” = “p”,
“dynamic_partition.buckets” = “32”
)

我发现一个问题,我把表的列删减到100列,flink并发数也调小,插入数据就比较正常,针对starrocks表中很多列的情况是需要调优什么参数吗?

字段数太多的话会导致header里的 columns 会超出http协议限制,用 json格式导入可以不set header columns。导入频率建议10s+

已经设置了 ‘sink.buffer-flush.max-rows’ = ‘1000000’,
‘sink.buffer-flush.max-bytes’ = ‘300000000’,
‘sink.buffer-flush.interval-ms’ = ‘15000’,
‘sink.max-retries’ = ‘3’,
‘sink.parallelism’ = ‘3’,
‘sink.buffer-flush.enqueue-timeout-ms’ = ‘3600000’,
‘sink.properties.format’=‘json’,
‘sink.properties.strip_outer_array’ =‘true’,
‘sink.properties.ignore_json_size’ = ‘true’

但是还是报 错。

报错信息:
Caused by: com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response:

{“Status”:“Fail”,“BeginTxnTimeMs”:1,“Message”:“Cancelled FileScanNode::get_next”,“NumberUnselectedRows”:0,“CommitAndPublishTimeMs”:0,“Label”:“62283ddb-e557-4ebd-a237-367665c4d52a”,“LoadBytes”:168998670,“StreamLoadPutTimeMs”:1,“NumberTotalRows”:0,“WriteDataTimeMs”:182757,“TxnId”:5015,“LoadTimeMs”:182760,“ReadDataTimeMs”:727,“NumberLoadedRows”:0,“NumberFilteredRows”:0}

{}