更新表同步kafka数据 数据不准确 版本 3.2.3

我是从一个源表清洗数据到另外一个维度表 我跑了3次数据 我sum了一个金额指标 每次都不一样
建表语句:
CREATE TABLE app_user_revenue (
day date NULL COMMENT “”,
ad_id varchar(200) NULL COMMENT “”,
app varchar(200) NULL COMMENT “”,
ad_revenue_unit varchar(200) NULL COMMENT “”,
ad_revenue_network varchar(200) NULL COMMENT “”,
ad_revenue_placement varchar(200) NULL COMMENT “”,
app_version varchar(200) NULL COMMENT “”,
ad_type varchar(200) NULL DEFAULT “unknown” COMMENT “”,
scene_name varchar(200) NULL DEFAULT “unknown” COMMENT “”,
statusvpn varchar(200) NULL DEFAULT “unknown” COMMENT “”,
id bigint(20) NULL COMMENT “”,
package_name varchar(200) NULL COMMENT “”,
idfa varchar(200) NULL COMMENT “”,
revenue float NULL DEFAULT “0” COMMENT “”,
impressions int(11) NULL COMMENT “”,
ad_id_name varchar(200) NULL COMMENT “”,
country_code varchar(200) NULL COMMENT “”,
create_by varchar(200) NULL COMMENT “”,
update_by varchar(200) NULL COMMENT “”,
update_time datetime NULL COMMENT “”,
del_flag varchar(65533) NULL DEFAULT “0” COMMENT “”,
created_at bigint(20) NULL DEFAULT “0” COMMENT “”,
create_time datetime NULL COMMENT “”,
match_count int(11) NULL DEFAULT “0” COMMENT “”,
req_count int(11) NULL DEFAULT “0” COMMENT “”,
click_count int(11) NULL DEFAULT “0” COMMENT “”
) ENGINE=OLAP
UNIQUE KEY(day, ad_id, app, ad_revenue_unit, ad_revenue_network, ad_revenue_placement, app_version, ad_type, scene_name, statusvpn)
PARTITION BY date_trunc(‘day’, day)
DISTRIBUTED BY HASH(day)
PROPERTIES (
“replication_num” = “1”,
“in_memory” = “false”,
“enable_persistent_index” = “false”,
“replicated_storage” = “true”,
“compression” = “LZ4”
);

查询源表语句:
SELECT
day,
array_agg(DISTINCT idfa)[1] as idfa,
ad_id,
sum(revenue_usd) AS revenue_usd,
sum(ad_impressions_count) AS ad_impressions_count,
array_agg(DISTINCT package_name)[1] as package_name,
array_agg(DISTINCT ad_id_name)[1] as ad_id_name,
app,
app_version,
array_agg(DISTINCT country_code)[1] as country_code,
ad_revenue_unit,
ad_revenue_network,
ad_revenue_placement,
app_version,
ad_type,
scene_name,
statusvpn,
array_agg(DISTINCT created_at)[1] as created_at,
COUNT(CASE WHEN event_name = ‘ad_req’ THEN 1 END) AS reqCount,
COUNT(CASE WHEN event_name = ‘ad_match’ THEN 1 END) AS matchCount,
COUNT(CASE WHEN event_name = ‘ad_click’ THEN 1 END) AS clickCount
from
source_user_adjust
where

 day BETWEEN  '2024-03-07' and '2024-03-07'

and (activity_kind = 'ad_revenue'
	or event_name in('ad_req', 'ad_click', 'ad_match'))

group by
day,
ad_id,
app,
ad_revenue_unit,
ad_revenue_network,
ad_revenue_placement,
app_version,
ad_type,
scene_name,
statusvpn

创建ROUTINE LOAD:
CREATE ROUTINE LOAD kafka_xx ON talbe
PROPERTIES
(
“format” = “json”
)
FROM KAFKA
(
“kafka_broker_list” = “localhost:9092”,
“kafka_topic” = “kafka_topic”,
“property.kafka_default_offsets” = “OFFSET_BEGINNING”,
“property.group.id” = “group”
);

三次执行时,kafka中数据是否有自动删除,从OFFSET_BEGINNING开始消费,如果kafka中数据有自动淘汰的话,可能导入后的数据不一样

kafka目前是会保留好几天的数据的

offset_beginning 的数据是什么时候写入到kafka的,比如kafka保留3天,那么在 11号 1点整 导入到sr的数据是从8号1点的数据开始的,11号1点20分 导入到sr的数据是从8号1点20分开始到11号1点20分的数据,可能会有数据差。

我刚刚用clickhouse数据库和starRocks 同步读取同一个topic 两边读取下来的数量是对不上的 clickhouse读取下来的数量是正确的 ck用的是 MergeTree引擎

如果数据确定没问题的话,看下 array_agg 函数的问题,在sql中加上 order by 看看

kafka中的数据是否有实时变动,如何判断的数据条数不对

kafka数据是通过定时任务去生产的 不是实时的 我是查询了 某一天的全部行 两边一个行数是不一致的

我用excel去重了我的数据 显示没有重复值 是不是我的字段什么问题导致数据被去重掉了
image

kafka 数据中是否有以上这些字段值相同的多条数据,sr中使用unique key 会对相同key列的数据进行合并,ck是否使用主键去重的ReplacingMergeTree

我再去检查一下我的查询SQL看看 也去试试
ReplacingMergeTree