DorisDB实时慢SQL

1、前言
DorisDB数据库,我们已经使用了有一段时间了,目前已经15套左右了。很多业务已经由初期测试到线上的深度使用了。

但伴随着业务量的增长,很多性能问题就暴露出来了,例如SQL查询效率如何?

因此,随着很多基础的自动化的完成,近期在考虑慢SQL方面的自动化建设。

慢SQL的采集、展示,用来方便DBA、开发来定期查看、优化。

2、实现方式
2.1、思考
如何实现慢SQL的采集、分析、展示呢?

日志格式:

fe/log/fe.audit.log

里面有2种日志:[query] 和 [slow_query]

慢SQL日志如下:

2021-08-15 09:05:49,223 [slow_query] |Client=10.1.1.2:42141|User=default_cluster:xxx|Db=default_cluster:xxx|State=EOF|Time=10|ScanBytes=1339|ScanRows=1|ReturnRows=1|StmtId=43542666|QueryId=edbdd4e3-fd64-11eb-b176-0ab2114db0b3|IsQuery=true|feIp=10.1.1.1|Stmt=SELECT 1

需求:

  • 慢SQL实时采集

  • 入库存储,方便分析,例如某时间段内的慢SQL数量、平均执行时间等等

  • 写入性能好,可扩展

解决:

  • 使用通用的日志采集工具,filebeat

  • DorisDB作为底层的存储,方便分析

  • kafka接入方式,快速、高效

  • SQL指纹暂时不好解决:

<1>、filebeat 采集时,使用正则处理?此种方式可能有一定性能损失,且SQL复杂,正则不好处理

<2>、入kafka 后,自己消费kafka,再使用TiDB Parser 解析,再写入DorisDB?如何写入,flink?kafka ?流程有点长

<3>、入库后,再使用其他方式分析?例如TiDB Parser ?这些待后面思考、测试

<4>、官方支持?这个是最方便的方式,后面咨询官方是否支持吧

<5>、每天级别的慢SQL分析,使用脚本分析,效率稍微差一些

综上,先实现DorisDB的实时采集吧,后面再细考虑下SQL指纹及汇总分析的实现

2.2、实现架构
【filebeat 过滤采集 】–> 【kakfa】 --> 【DorisDB】
慢SQL架构

3、具体实现
3.1、kafka 准备
【申请公司的kafka】:

topic: dorisdb_slow_log

client_id: dorisdb_slow_log-123

hosts: [“10.1.1.1:666”,“10.1.1.2:666”,“10.1.1.3:666”,“10.1.1.4:666”,“10.1.1.5:666”]

3.2、filebeat环境准备
下载filebeat,修改配置文件,开启filebeat采集

3.3、filebeat配置

此处的难点为:

只采集slow_query

切割列

【配置如下】:

path.home: /opt/soft/filebeat/filebeat_999-1
path.config: /opt/soft/filebeat/filebeat_999-1/conf
path.data: /opt/soft/filebeat/filebeat_999-1/data
path.logs: /opt/soft/filebeat/filebeat_999-1/log
#=============================================================================
filebeat.inputs:

  • type: log
    enabled: true
    ignore_older: 5m
    include_lines: [’[slow_query]’]
    paths:
    • /opt/soft/dorisdb999/fe/log/fe.audit.log
      fields:
      log_topics: dorisdb_slow_log
      processors:
    • script:
      lang: javascript
      id: my_filter
      tag: enable
      source: >
      function process(event) {
      var str= event.Get(“message”);
      var slow_time =str.substr(0, 19);
      var detail_query = str.substr(38);
      var js_arr = detail_query.split("|");
      var Client_tmp = js_arr[0];
      var Client=Client_tmp.replace(‘Client=’,’’);
      var User_tmp = js_arr[1];
      var User=User_tmp.replace(‘User=’,’’);
      var Db_tmp =js_arr[2];
      var Db = Db_tmp.replace(‘Db=’,’’);
      var State_tmp = js_arr[3];
      var State = State_tmp.replace(‘State=’,’’);
      var Time_tmp=js_arr[4];
      var Time = Time_tmp.replace(‘Time=’,’’);
      var ScanBytes_tmp = js_arr[5];
      var ScanBytes = ScanBytes_tmp.replace(‘ScanBytes=’,’’);
      var ScanRows_tmp = js_arr[6];
      var ScanRows = ScanRows_tmp.replace(‘ScanRows=’,’’);
      var ReturnRows_tmp = js_arr[7];
      var ReturnRows = ReturnRows_tmp.replace(‘ReturnRows=’,’’);
      var StmtId_tmp = js_arr[8];
      var StmtId = StmtId_tmp.replace(‘StmtId=’,’’);
      var QueryId_tmp = js_arr[9];
      var QueryId = QueryId_tmp.replace(‘QueryId=’,’’);
      var IsQuery_tmp = js_arr[10];
      var IsQuery = IsQuery_tmp.replace(‘IsQuery=’,’’);
      var feIp_tmp = js_arr[11];
      var feIp = feIp_tmp.replace(‘feIp=’,’’);
      var Stmt_tmp = js_arr[12];
      var Stmt = Stmt_tmp.replace(‘Stmt=’,’’);
      var Stmt = Stmt.substring(0,65530)
      event.Put(“slow_time”,slow_time);
      event.Put(“Client”,Client);
      event.Put(“User”,User);
      event.Put(“Db”,Db);
      event.Put(“State”,State);
      event.Put(“Time”,Time);
      event.Put(“ScanBytes”,ScanBytes);
      event.Put(“ScanRows”,ScanRows);
      event.Put(“ReturnRows”,ReturnRows);
      event.Put(“StmtId”,StmtId);
      event.Put(“QueryId”,QueryId);
      event.Put(“IsQuery”,IsQuery);
      event.Put(“feIp”,feIp);
      event.Put(“Stmt”,Stmt);
      event.Put(“igid”,‘999-1’);
      event.Put(“port”,999);
      event.Put(“dorisdb_fe_ip”,‘10.10.10.10’);
      }
    • drop_fields:
      fields: [“ecs”,“agent”,“message”,“log”,“host”]

output.kafka:
enabled: true
hosts: [“10.1.1.1:666”,“10.1.1.2:666”,“10.1.1.3:666”,“10.1.1.4:666”,“10.1.1.5:666”]
topic: ‘%{[fields][log_topics]}’
worker: 1
timeout: 30s
broker_timeout: 10s
keep_alive: 0
compression: gzip
required_acks: 1
client_id: dorisdb_slow_log-123

3.4、开启filebeat
/usr/bin/nohup filebeat -c filebeat_999-1.yml -e >> filebeat_999-1.log 2>&1 &

4、DorisDB环境
4.1、创建集群

dorisdb_manage cluster deploy 999-1 --db_name=dorisdb_slow --user_name=xxx --user_password=xxx --auth_ip=10.1.1.1 --domain_name=xxx

注:此处为DBA自己实现的部署自动化

4.2、创建慢SQL表
CREATE TABLE dorisdb_slow (
slow_time datetime COMMENT “slow time”,
igid varchar(40) COMMENT “”,
db_name varchar(300) COMMENT “db name”,
fe_ip varchar(30) COMMENT “fe ip”,
query_id varchar(200) COMMENT “QueryId”,
time bigint(20) COMMENT “SQL run time”,
client varchar(30) COMMENT “client ip”,
user varchar(200) COMMENT “user name”,
state varchar(200) COMMENT “State”,
scan_bytes bigint(20) COMMENT “ScanBytes”,
scan_rows bigint(20) COMMENT “ScanRows”,
return_rows bigint(20) COMMENT “ReturnRows”,
stmt_id bigint(20) COMMENT “StmtId”,
is_query varchar(50) COMMENT “IsQuery”,
stmt varchar(65533) COMMENT “Stmt,Query Detail”
) ENGINE=OLAP
DUPLICATE KEY(slow_time, igid, db_name, fe_ip, query_id)
COMMENT “DorisDB慢SQL表”
PARTITION BY RANGE(slow_time)
(PARTITION p20210813 VALUES [(‘2021-08-13 00:00:00’), (‘2021-08-14 00:00:00’)),
PARTITION p20210814 VALUES [(‘2021-08-14 00:00:00’), (‘2021-08-15 00:00:00’)),

PARTITION p20211230 VALUES [(‘2021-12-30 00:00:00’), (‘2021-12-31 00:00:00’)),
PARTITION p20211231 VALUES [(‘2021-12-31 00:00:00’), (‘2022-01-01 00:00:00’)))
DISTRIBUTED BY HASH(igid) BUCKETS 48
PROPERTIES (
“replication_num” = “1”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”
);

4.3、创建kafka任务
CREATE ROUTINE LOAD dorisdb_slow_load_20210814 ON dorisdb_slow
columns (slow_time,igid,db_name,fe_ip,query_id,time,client,user,state,scan_bytes,scan_rows,return_rows,stmt_id,is_query,stmt)
PROPERTIES (
“format”=“json”,
“jsonpaths”="["$.slow_time","$.igid","$.Db","$.feIp","$.QueryId","$.Time","$.Client","$.User","$.State","$.ScanBytes","$.ScanRows","$.ReturnRows","$.StmtId","$.IsQuery","$.Stmt"]",
“desired_concurrent_number”=“8”,
“max_error_number” = “9999999999”,
“max_batch_rows”=“200000”,
“max_batch_size” = “104857600”,
“strict_mode” = “false”
)
FROM KAFKA
(
“kafka_broker_list”= “10.1.1.1:666,10.1.1.2:666,10.1.1.3:666,10.1.1.4:666,10.1.1.5:666”,
“kafka_topic” = “dorisdb_slow_log”,
“property.kafka_default_offsets” = “OFFSET_END”,
“property.client.id” = “dorisdb_slow_log-123”,
“property.group.id” = “dorisdb_slow_load_20210814”
);

4.4、查看任务状态


4.5、查看慢SQL

1赞

很不错的想法和实践。
应该让 DorisDB 本身集成这个功能。