使用Canal + ClickHouse实时分析MySQL事务信息 作为DBA, 有时候我们会希望能够了解线上核心库更具体的”样貌”, 如:
这个库主要的DML类型是什么?
这个库的事务大小, 执行时间, 影响行数大概是什么样的?
以上信息也许没什么价值, 但大事务对复制的影响不用多说, 并且当我们希望升级当前主从架构到MGR/PXC等高可用方案的场景时以上信息就比较重要了(毕竟用数据说话更有力度).
大事务对MGR和PXC都是不友好的, 尤其是MGR(起码在5.7版本)严重时会导致整个集群hang死
在Galera 4.0中新特性Streaming Replication 对大事务有了更好的支持
当然, 有人可能会说, 通过分析binlog就可以完成这样的工作, 最简单的方法写个shell脚本就可以, 比如这篇文章中介绍的方法Identifying Useful Info from MySQL Row-Based Binary Logs (这篇文章介绍的方法比较简单, 分析速度也较慢, 可以试试analysis_binlog ). 当然还有很多其他工具, 比如infobin.
但个人认为上面的方法从某种角度来看还是比较麻烦, 而且现在ClickHouse越来越流行, 使用ClickHouse去完成这个工作也能帮助我们更好的学习ClickHouse
先看一下最终的成果
实现方法
canal + kafka
部署canal , 订阅线上库的binlog, 写入到kafka 我这里没有使用flatMessage(canal.mq.flatMessage = false), 写入到kafka的消息是二进制的protobuf格式的, 当然也可以开启flatMessage, 那么写到kafka的消息就是json格式的.
消费binlog, 持久化到clickhouse
具体代码详见 https://github.com/Fanduzi/Use_clickhouse_2_analyze_mysql_binlog
clickhouse表
基础表 canal消费后直接写入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 CREATE TABLE mysql_monitor.broker_binlog_local( `schema` String COMMENT '数据库名' , `table` String COMMENT '表名' , `event_type` String COMMENT '语句类型' , `is_ddl` UInt8 COMMENT 'DDL 1 else 0' , `binlog_file` String COMMENT 'binlog文件名' , `binlog_pos` String COMMENT 'binlog pos' , `characterset` String COMMENT '字符集' , `execute_time` DateTime COMMENT '执行的时间' , `gtid` String COMMENT 'gtid' , `single_statement_affected_rows` UInt32 COMMENT '此语句影响行数' , `single_statement_size` String DEFAULT '0' COMMENT '此语句size,单位bytes' , `ctime` DateTime DEFAULT now () COMMENT '写入clickhouse时间' ) ENGINE = ReplicatedMergeTree('/clickhouse/mysql_monitor/tables/{layer}-{shard}/broker_binlog' , '{replica}' )PARTITION BY toDate(execute_time)ORDER BY (execute_time, gtid, table , schema )TTL execute_time + toIntervalMonth(30 ) SETTINGS index_granularity = 8192 CREATE TABLE mysql_monitor.broker_binlog( `schema` String COMMENT '数据库名' , `table` String COMMENT '表名' , `event_type` String COMMENT '语句类型' , `is_ddl` UInt8 COMMENT 'DDL 1 else 0' , `binlog_file` String COMMENT 'binlog文件名' , `binlog_pos` String COMMENT 'binlog pos' , `characterset` String COMMENT '字符集' , `execute_time` DateTime COMMENT '执行的时间' , `gtid` String COMMENT 'gtid' , `single_statement_affected_rows` UInt32 COMMENT '此语句影响行数' , `single_statement_size` String DEFAULT '0' COMMENT '此语句size,单位bytes' , `ctime` DateTime DEFAULT now () COMMENT '写入clickhouse时间' ) ENGINE = Distributed ('ch_cluster_all' , 'mysql_monitor' , 'broker_binlog_local' , rand ())
统计用表
SummingMergeTree
ClickHouse会将所有具有相同主键(或更准确地说, 具有相同sorting key)的行替换为包含具有数字数据类型的列的汇总值的一行
每天binlog 各个event_type数量 用于统计每日整体binlog event类型占比
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 CREATE TABLE mysql_monitor.broker_daily_binlog_event_count_local ON CLUSTER ch_cluster_all( `day` Date , `event_type` String , `event_count` UInt64 ) ENGINE = ReplicatedSummingMergeTree('/clickhouse/mysql_monitor/tables/{layer}-{shard}/broker_daily_binlog_event_count' , '{replica}' )PARTITION BY day ORDER BY (day , event_type)TTL day + toIntervalMonth(30 ) SETTINGS index_granularity = 8192 CREATE MATERIALIZED VIEW mysql_monitor.broker_daily_binlog_event_count_mv_local ON CLUSTER ch_cluster_all TO mysql_monitor.broker_daily_binlog_event_count_local( `day` Date , `event_type` String , `event_count` UInt64 ) AS SELECT toDate(execute_time) AS day , event_type, count (*) AS event_count FROM mysql_monitor.broker_binlog_localGROUP BY day , event_type ORDER BY day ASC , event_type ASC CREATE TABLE mysql_monitor.broker_daily_binlog_event_count_mv ON CLUSTER ch_cluster_all( `day` Date , `event_type` String , `event_count` UInt64 ) ENGINE = Distributed ('ch_cluster_all' , 'mysql_monitor' , 'broker_daily_binlog_event_count_mv_local' , rand ())
grafana中查询语句
1 2 3 4 5 6 7 8 SELECT day as t, event_type, sum (event_count) FROM mysql_monitor.${prefix}_daily_binlog_event_count_mvWHERE day = yesterday() and event_type!='QUERY' and event_type!='EVENTTYPECOMPATIBLEPROTO2' GROUP BY day , event_type
每日TOP DML表统计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 CREATE TABLE mysql_monitor.broker_daily_binlog_event_count_by_table_local ON CLUSTER ch_cluster_all( `day` Date , `schema` String , `table` String , `event_type` String , `event_count` UInt64 ) ENGINE = ReplicatedSummingMergeTree('/clickhouse/mysql_monitor/tables/{layer}-{shard}/broker_daily_binlog_event_count_by_table' , '{replica}' )PARTITION BY day ORDER BY (day , table , schema , event_type)TTL day + toIntervalMonth(30 ) SETTINGS index_granularity = 8192 CREATE MATERIALIZED VIEW mysql_monitor.broker_daily_binlog_event_count_by_table_mv_local ON CLUSTER ch_cluster_all TO mysql_monitor.broker_daily_binlog_event_count_by_table_local( `day` Date , `schema` String , `table` String , `event_type` String , `event_count` UInt64 ) AS SELECT toDate(execute_time) AS day , schema , table , event_type, count (*) AS event_count FROM mysql_monitor.broker_binlog_localGROUP BY day , schema , table , event_type ORDER BY day ASC , schema ASC , table ASC , event_type DESC CREATE TABLE mysql_monitor.broker_daily_binlog_event_count_by_table_mv ON CLUSTER ch_cluster_all( `day` Date , `schema` String , `table` String , `event_type` String , `event_count` UInt64 ) ENGINE = Distributed ('ch_cluster_all' , 'mysql_monitor' , 'broker_daily_binlog_event_count_by_table_mv_local' , rand ())
grafana中查询语句
1 2 3 4 5 6 7 8 9 10 11 SELECT day as t, table , sum (event_count) count FROM mysql_monitor.${prefix}_daily_binlog_event_count_by_table_mvWHERE day = yesterday() and event_type!='QUERY' and event_type!='EVENTTYPECOMPATIBLEPROTO2' GROUP BY day , table ORDER BY count desc LIMIT 10
grafana中查询语句
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 SELECT day as t, table , event_type, sum (event_count) count FROM mysql_monitor.${prefix}_daily_binlog_event_count_by_table_mvWHERE day = yesterday() and event_type!='QUERY' and event_type!='EVENTTYPECOMPATIBLEPROTO2' and table GLOBAL IN ( select table from ( select table , sum (event_count) count FROM mysql_monitor.${prefix}_daily_binlog_event_count_by_table_mv WHERE day = yesterday() GROUP BY table order by count desc limit 3 )) GROUP BY day , table , event_type ORDER BY count DESC
一周执行DML总量情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 WITH ( SELECT sum (event_count) FROM mysql_monitor.${prefix}_daily_binlog_event_count_mv WHERE (day >= (today() - 6 )) AND (event_type != 'QUERY' ) AND (event_type != 'EVENTTYPECOMPATIBLEPROTO2' ) ) AS total_statement_count SELECT multiIf(toDayOfWeek(day ) = 1 , '星期一' , toDayOfWeek(day ) = 2 , '星期二' , toDayOfWeek(day ) = 3 , '星期三' , toDayOfWeek(day ) = 4 , '星期四' , toDayOfWeek(day ) = 5 , '星期五' , toDayOfWeek(day ) = 6 , '星期六' , toDayOfWeek(day ) = 7 , '星期日' , 'N/A' ) AS w, day AS t, sum (event_count) AS statement_count, bar(statement_count, 0 , total_statement_count, 500 ) AS bar FROM mysql_monitor.${prefix}_daily_binlog_event_count_mvWHERE (day >= (today() - 6 )) AND (event_type != 'QUERY' ) AND (event_type != 'EVENTTYPECOMPATIBLEPROTO2' )GROUP BY day ORDER BY toDate(t) DESC
事务情况统计
统计影响行数对多的事务, 产生binlog最大的事务, 执行时间最长的事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 CREATE TABLE mysql_monitor.broker_largest_transaction_local ON CLUSTER ch_cluster_all( `end_time` DateTime COMMENT '采集语句中的end_time' , `invertal` String COMMENT '采集周期,单位秒' , `gtid` String COMMENT 'gtid' , `transaction_spend_time` Int32 COMMENT '事务用时' , `transaction_size` Int64 COMMENT '事务size' , `single_statement_affected_rows` UInt64 COMMENT '事务影响行数' ) ENGINE = ReplicatedSummingMergeTree('/clickhouse/mysql_monitor/tables/{layer}-{shard}/broker_largest_transaction' , '{replica}' )PARTITION BY toDate(end_time)ORDER BY gtidTTL toDate(end_time) + toIntervalMonth(30 ) SETTINGS index_granularity = 8192 CREATE TABLE mysql_monitor.broker_largest_transaction ON CLUSTER ch_cluster_all( `end_time` DateTime COMMENT '采集语句中的end_time' , `invertal` String COMMENT '采集周期,单位秒' , `gtid` String COMMENT 'gtid' , `transaction_spend_time` Int32 COMMENT '事务用时' , `transaction_size` Int64 COMMENT '事务size' , `single_statement_affected_rows` UInt64 COMMENT '事务影响行数' ) ENGINE = Distributed ('ch_cluster_all' , 'mysql_monitor' , 'broker_largest_transaction_local' , rand ())CREATE TABLE mysql_monitor.broker_most_time_consuming_transaction_local ON CLUSTER ch_cluster_all( `end_time` DateTime COMMENT '采集语句中的end_time' , `invertal` String COMMENT '采集周期,单位秒' , `gtid` String COMMENT 'gtid' , `transaction_spend_time` Int32 COMMENT '事务用时' , `transaction_size` Int64 COMMENT '事务size' , `single_statement_affected_rows` UInt64 COMMENT '事务影响行数' ) ENGINE = ReplicatedSummingMergeTree('/clickhouse/mysql_monitor/tables/{layer}-{shard}/broker_most_time_consuming_transaction' , '{replica}' )PARTITION BY toDate(end_time)ORDER BY gtidTTL toDate(end_time) + toIntervalMonth(30 ) SETTINGS index_granularity = 8192 CREATE TABLE mysql_monitor.broker_most_time_consuming_transaction ON CLUSTER ch_cluster_all( `end_time` DateTime COMMENT '采集语句中的end_time' , `invertal` String COMMENT '采集周期,单位秒' , `gtid` String COMMENT 'gtid' , `transaction_spend_time` Int32 COMMENT '事务用时' , `transaction_size` Int64 COMMENT '事务size' , `single_statement_affected_rows` UInt64 COMMENT '事务影响行数' ) ENGINE = Distributed ('ch_cluster_all' , 'mysql_monitor' , 'broker_most_time_consuming_transaction_local' , rand ())CREATE TABLE mysql_monitor.broker_most_affected_rows_transaction_local ON CLUSTER ch_cluster_all( `end_time` DateTime COMMENT '采集语句中的end_time' , `invertal` String COMMENT '采集周期,单位秒' , `gtid` String COMMENT 'gtid' , `transaction_spend_time` Int32 COMMENT '事务用时' , `transaction_size` Int64 COMMENT '事务size' , `single_statement_affected_rows` UInt64 COMMENT '事务影响行数' ) ENGINE = ReplicatedSummingMergeTree('/clickhouse/mysql_monitor/tables/{layer}-{shard}/broker_most_affected_rows_transaction' , '{replica}' )PARTITION BY toDate(end_time)ORDER BY gtidTTL toDate(end_time) + toIntervalMonth(30 ) SETTINGS index_granularity = 8192 CREATE TABLE mysql_monitor.broker_most_affected_rows_transaction ON CLUSTER ch_cluster_all( `end_time` DateTime COMMENT '采集语句中的end_time' , `invertal` String COMMENT '采集周期,单位秒' , `gtid` String COMMENT 'gtid' , `transaction_spend_time` Int32 COMMENT '事务用时' , `transaction_size` Int64 COMMENT '事务size' , `single_statement_affected_rows` UInt64 COMMENT '事务影响行数' ) ENGINE = Distributed ('ch_cluster_all' , 'mysql_monitor' , 'broker_most_affected_rows_transaction_local' , rand ())
想了想只能建三张表, 写脚本自己周期性查询size,耗时, 影响行数最多的在插入这些表中
查询语句大致如下, 由于grafana必须需要一个DateTime列, 所以加了一个toDateTime('{end}')
取每次采集窗口的高水位. 三个查询只是order by不同
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 昨日最大事物size SELECT $timeSeries as t, sum (transaction_size) sum_transaction_size FROM mysql_monitor.${prefix}_largest_transactionWHERE end_time>=toDateTime(yesterday()) and end_time<toDateTime(today())GROUP BY t,gtidORDER BY sum_transaction_size desc limit 1 昨日事务最大执行时间 SELECT $timeSeries as t, sum (transaction_spend_time) sum_transaction_spend_time FROM mysql_monitor.${prefix}_most_time_consuming_transactionWHERE end_time>=toDateTime(yesterday()) and end_time<toDateTime(today())GROUP BY t,gtidORDER BY sum_transaction_spend_time desc limit 1 昨日事务影响最大行数 SELECT $timeSeries as t, sum (single_statement_affected_rows) transaction_affected_rows FROM mysql_monitor.${prefix}_most_affected_rows_transactionWHERE end_time>=toDateTime(yesterday()) and end_time<toDateTime(today())GROUP BY t,gtidORDER BY transaction_affected_rows desc limit 1 上周最大事物概览 select t,max (sum_transaction_size) max_transaction_size from (SELECT toDate(end_time) as t, gtid, sum (transaction_size) sum_transaction_size FROM mysql_monitor.${prefix}_largest_transactionWHERE end_time >= (today() - 6 )GROUP BY t,gtid) group by t order by t desc 上周事务最大执行时间概览 select t,max (sum_transaction_spend_time) max_transaction_spend_time from (SELECT toDate(end_time) as t, gtid, sum (transaction_spend_time) sum_transaction_spend_time FROM mysql_monitor.${prefix}_most_time_consuming_transactionWHERE end_time >= (today() - 6 )GROUP BY t,gtid) group by t order by t desc 上周事务影响最大行数概览 select t,max (transaction_affected_rows) max_transaction_affected_rows from (SELECT toDate(end_time) as t, gtid, sum (single_statement_affected_rows) transaction_affected_rows FROM mysql_monitor.${prefix}_most_affected_rows_transactionWHERE end_time >= (today() - 6 )GROUP BY t,gtid) group by t order by t desc 近实时事务size 图 SELECT $timeSeries as t, sum (transaction_size) transaction_size FROM mysql_monitor.${prefix}_largest_transactionWHERE $timeFilterGROUP BY t, gtidORDER BY tSELECT $timeSeries as t, sum (transaction_spend_time) transaction_spend_time FROM mysql_monitor.${prefix}_most_time_consuming_transactionWHERE $timeFilterGROUP BY t, gtidORDER BY t近实时事务影响行数图 SELECT $timeSeries as t, sum (single_statement_affected_rows) transaction_affected_rows FROM mysql_monitor.${prefix}_most_affected_rows_transactionWHERE $timeFilterGROUP BY t, gtidORDER BY tSELECT $timeSeries as t, sum (transaction_spend_time) transaction_spend_time FROM mysql_monitor.${prefix}_most_time_consuming_transactionWHERE $timeFilterGROUP BY t, gtidORDER BY t
总结 实际干下来发现一些查询确实可以通过物化视图优化, 但是grafana每次都要带一个DateTime比较烦, 可能物化视图还有优化空间. 对于如下图所示的实时统计的需求daily_binlog
这种天级物化视图就无法实现细粒度的查询了(查询速度太慢, 必须借助物化视图)
那么如何实现更细粒度的物化视图呢? 得看下如何按周期聚合, 比如5分钟一个聚合. 目前clickhouse没有oracle那样的开窗函数
oracle可以
sum over(partition by gtid order by execute_time range between interval ‘1’ day preceding and interval ‘1’ day following)
对于一些不好优化的查询, 如果换马蜂窝那些一天上百G binlog的库可能真的跑不动了. 也许只能多搞几个分片, 不过不知道最后聚合会不会很耗内存, 感觉最主要是这套系统是否值得付出加节点的金钱成本.