Hive to StarRocks

Hive to StarRocks

将Hive 中的数据如何导入到 StarRocks 中,官网上给出了好多导入数据的方案,本篇主要讲下Broker Load的方式导入。在Broker Load模式下,通过部署的Broker程序,StarRocks可读取对应数据源(如HDFS, S3)上的数据,利用自身的计算资源对数据进行预处理和导入。这是一种异步的导入方式,用户需要通过MySQL协议创建导入,并通过查看导入命令检查导入结果。有关Broker Load 的详细解释你可以查阅官网的解说。

需求:

每天将数仓中跑完的Hive的相关表导入到StarRocks。

场景:

  • 不更新历史数据

    1. 如果是分区表,我们增量导入到 StarRocks 中即可
    2. 非分区表全量导入
  • 更新历史数据

    这种情况主要存在分区表中,往往会更改前几个月数据或者时间更久的数据,这种情况下,就不的不将该表重新同步一边,使StarRocks中的数据和hive中的数据保持一致。

  • hive中表的元数据发生变化,和StarRocks中的表结构不一致

    这种情况下,就需要我们删除重新建表,重新同步数据

实操

这里直接通过编写shell 脚本,来完成数据的导入

function common(){
        database_name=$1
        table_name=$2
         #判断数据文件格式
         hive_file_type=$(echo `impala-shell -i $impala_host -c -q "describe formatted ${database_name}.${table_name};"`)
         type=$(echo "${hive_file_type#*InputFormat}" | awk -F \| '{print $2}')
         result=$(echo "$type" | grep "parquet")
         #这里只判断了两种格式,如果你们hive中的文件类型由多种,你都需要判断
         if [[ "$result" != "" ]]
                then
                        hive_file_type="parquet"
                        echo "file type: $hive_file_type"
                else
                        hive_file_type="csv"
                        echo "file type: $hive_file_type"
                fi

                #查询表的字段
                select_col_name_sql="desc ${table_name};"
                select_col_name=`mysql -h${StarRocks_ip}  -P${StarRocks_port}  -u${StarRocks_user} -p${StarRocks_password} ${database_name} -Bse "${select_col_name_sql}"`
                # 获取要同步的表的字段
                if [[ -f $CURRENT_DIR/tmp.txt ]]
                then
                        rm -rf $CURRENT_DIR/tmp.txt
                fi
				#这里将表的字段写入到了文件中,你也可以存储在数组或者其他容器中
                echo "$select_col_name" | awk '{print $1}' > ${CURRENT_DIR}/col_tmp.txt
                cat ${CURRENT_DIR}/col_tmp.txt | while read line
                do
                        echo "\`$line\`" >> ${CURRENT_DIR}/tmp.txt
                done
				# 指定该表的数据文件
                table_data_file="${hdfs_url}$database_name.db/$table_name/dt=*"
}

function trun_table(){
        database_name=$1
        table_name=$2
        truncate_table_sql="truncate table $table_name;"
        truncate_table=`mysql -h${StarRocks_ip}  -P${StarRocks_port}  -u$StarRocks_user} -p${StarRocks_password} $1 -Bse "${truncate_table_sql}"`
}

不更新历史数据

function sync_ordinary(){
        database_name=$1
        table_name=$2
        # 判断该文件夹是否存在,如果存在级即为分区表,否为非分区表
        hadoop fs -test -d $table_data_file
        if [ $? -eq 0 ] ;then
                table_data_file="${hdfs_url}${database_name}.db/${table_name}/dt*/*"
                # 获取要同步的表的字段
                sed -i '1d' ${CURRENT_DIR}/tmp.txt
                echo `cat "${CURRENT_DIR}/tmp.txt"` |  tr ' ' ',' > ${CURRENT_DIR}/col_name.txt
                col_name=`cat "${CURRENT_DIR}/col_name.txt"`
                #load data 
                load_data_sql="LOAD LABEL ${database_name}.${table_name}_${sync_time} ( DATA INFILE ("\"${table_data_file}\"")  INTO TABLE "\`${table_name}\`" COLUMNS TERMINATED BY \"\\\x01\" FORMAT AS \"${hive_file_type}\"  (${col_name}) COLUMNS FROM PATH as (dt) ) WITH BROKER $broker_name ( \"hadoop.security.authentication\"="\"${authentication}\"", \"username\"="\"${hdfs_username}\"", \"password\"="\"${hdfs_password}\"", \"dfs.nameservices\"=\"ns\", \"dfs.ha.namenodes.ns\"=\"nn1,nn2\",\"dfs.namenode.rpc-address.ns.nn1\"="\"$hdfs_nn1\"", \"dfs.namenode.rpc-address.ns.nn2\"="\"$hdfs_nn2\"", \"dfs.client.failover.proxy.provider\"=\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\" ) PROPERTIES ( \"timeout\" = \"3600\", \"timezone\" = \"Asia/Shanghai\" );"        else                table_data_file="${hdfs_url}$database_name.db/$table_name/*"
                # 获取要同步的表的字段
                echo `cat "${CURRENT_DIR}/tmp.txt"` |  tr ' ' ',' > ${CURRENT_DIR}/col_name.txt
                col_name=`cat "${CURRENT_DIR}/col_name.txt"`
                # truncate table
                trun_table ${database_name} ${table_name}
                load_data_sql="LOAD LABEL ${database_name}.${table_name}_${sync_time} (DATA INFILE ("\"${table_data_file}\"") INTO TABLE "\`${table_name}\`"  COLUMNS TERMINATED BY \"\\\x01\" FORMAT AS \"parquet\" (${col_name}) ) WITH BROKER $broker_name ( \"hadoop.security.authentication\"="\"${authentication}\"", \"username\"="\"${hdfs_username}\"", \"password\"="\"${hdfs_password}\"", \"dfs.nameservices\"=\"ns\", \"dfs.ha.namenodes.ns\"=\"nn1,nn2\",\"dfs.namenode.rpc-address.ns.nn1\"="\"$hdfs_nn1\"", \"dfs.namenode.rpc-address.ns.nn2\"="\"$hdfs_nn2\"", \"dfs.client.failover.proxy.provider\"=\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\" ) PROPERTIES ( \"timeout\" = \"3600\", \"timezone\" = \"Asia/Shanghai\" );"        fi        #load data
        echo "$load_data_sql"
        load_data=`mysql -h${StarRocks_ip}  -P${StarRocks_port}  -u${StarRocks_user} -p${StarRocks_password} ${database_name} -Bse "${load_data_sql}"`
        
 }

更新历史数据

这种情况下就比较麻烦了,因为有的表需要更新前几个月的数据,有的表需要更新前几天的数据,这种情况下我的处理方法是将这些要更新历史数据的表做了统计梳理,写入到一个文件中,每天全量同步这些数据即可,如果表的数据量太大,这种情况下就需要特殊处理,不建议全量同步。

脚本的编写基本和上面的脚本一致,只不过在同步之前首先要清空表,然后再重新全量同步即可。

元数据不一致

当hive中表的元数据发生变化,比如增加、删除、修改字段等时,和StarRocks中的表结构不一致,这种情况下,就需要我手动干预,需要删除重新建表,重新同步数据至StarRocks中。

关于hive中的数据同步至 StarRocks 中,基本上就着几类问题,当然也有一些细节问题,比如hive 表中有多个字段作为分区等一些情况,但是StarRocks中也给出了很好的解决方案,利用 COLUMNS FROM PATH as (dt) 这种形式去获得hive中的分区字段,在同步的过程中需要做一些数据类型的转换等,这些细节问题官网的文档写的也非常详细,这里不再赘述了。

不足

总结目前线上hive中的数据同步至 StarRocks 中的问题:

  • hive中的元数据发生改变需要人为手动干预
  • string字符串的长度限制,导致部份表导入失败

如果大家有更好的解决方案,希望各位积极探讨。

2赞