Flink-connector 是能支持主键模型的部分列更新功能

【StarRocks版本】例如:2.3
Flink-connector 是否支持主键模型的部分列更新功能

目前有没有加入这个功能的计划

您好,当前是支持的。

sr版本:2.3.0-RC01 e14245a,connnector版本:1.2.3_flink-1.15,sink属性增加partial_update=true,提示不支持该参数。

您好,方便的话发一下sink端的配置看一下

create table test_sink_starrocks(
id string,
database_name string,
table_name string,
primary key(id) not enforced
)WITH(
‘connector’ = ‘starrocks’,
‘jdbc-url’=‘jdbc:mysql://xx’,
‘load-url’=‘xx’,
‘database-name’ = ‘test_db’,
‘table-name’ = ‘xx’,
‘username’ = ‘xxx’,
‘password’ = ‘xxxx’,
‘sink.buffer-flush.max-rows’ = ‘64000’,
‘sink.buffer-flush.interval-ms’ = ‘1000’,
‘partial_update’=‘true’
)

sink.properties.partial_update=true这样试一下看看,另外如果是csv格式数据需要指定列名,json格式不需要指定

建表 : image

不指定列明报错,指定列明可以支持部分列更新

2022-08-05 15:37:27       WARN (com.starrocks.connector.flink.manager.StarRocksSinkManager:asyncFlush) - Failed to flush batch data to StarRocks, retry times = 0
com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response: 
{"Status":"Fail","BeginTxnTimeMs":0,"Message":"key column id not in partial update columns","NumberUnselectedRows":0,"CommitAndPublishTimeMs":0,"Label":"9f8d2991-1524-4681-b56c-b74987276718","LoadBytes":0,"StreamLoadPutTimeMs":0,"NumberTotalRows":0,"WriteDataTimeMs":0,"TxnId":925692,"LoadTimeMs":0,"ReadDataTimeMs":0,"NumberLoadedRows":0,"NumberFilteredRows":0}
{}

	at com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor.doStreamLoad(StarRocksStreamLoadVisitor.java:104)
	at com.starrocks.connector.flink.manager.StarRocksSinkManager.asyncFlush(StarRocksSinkManager.java:324)
	at com.starrocks.connector.flink.manager.StarRocksSinkManager.lambda$startAsyncFlushing$0(StarRocksSinkManager.java:159)
	at java.lang.Thread.run(Thread.java:748)

您好,能帮看下这个问题吗

您好,您是把红框中的那段配置注释下去之后报了下面的错对吧?

image 部分列更新也是有一些限制的,这部分关注一下,应该是这个问题

这样是可以更新部分列的,如果上边的指定列名去掉,就会报错,数据里边是有 主键列 id的

如果是两条数据,一条数据两列,一条数据三列,这样是不可以的吗

您好,这个是不行的必须保持列一致,上面我发的那个图片中有说明的。另外刚才确认了一下,json格式也需要指定columns。

您好,想请问一下,针对json格式后边会支持不指定columns吗?我们暂时是有这样的需求的。

您好,我理解的话如果没有指定columns的话没有办法做到部分列更新吧…不知道您是基于什么场景下需要这个需求呢?还是从易用性来讲不指定columns对用户操作更友好呢?

我们现在需要拿到用户埋点更新用户数据,针对某一属性(例如 :充值金额 6元),对于这个用户其他属性是不需要做改变的,我们只需要在累计金额的基础上累加 6 就可以了,用户表需要改变的属性可能是不相同的。

好的,我大概理解您的意思了,您的意思是针对一张表,可能不是批次针对某一列进行更新?比如说针对一张表,第一行我想更新第二列数据,第二行数据我想更新第三列数据?

这种情况,flink-connector支持吗,或者说我应该在哪里改比较好?