Canal dynamicTopic问题

未来同事跑了几个月的canal突然报下面的错, 使用了dynamicTopic. 其实我没有用过dynamicTopic, 只能搜一搜issue

关于dynamicTopic和partitionHash的说明

canal.mq.dynamicTopic 表达式说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
>canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔

>例子1:test\\.test 指定匹配的单表,发送到以test_test为名字的topic上
>例子2:.*\\..* 匹配所有表,则每个表都会发送到各自表名的topic上
>例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
>例子4:test\\.* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
>例子5:test,test1\\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值
>为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table

>例子1: test:test\\.test 指定匹配的单表,发送到以test为名字的topic上
>例子2: test:.*\\..* 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下
>例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下
>例子4:testA:test\\.* 指定匹配的表达式,针对匹配的表会发送到testA的topic下
>例子5:test0:test,test1:test1\\.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1\\.test1的表发送到对应的test1的topic下,其余的表发送到默认的canal.mq.topic值

>大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力

canal.mq.partitionHash 表达式说明

1
2
3
4
5
6
7
8
9
>canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔

>例子1:test\\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
>例子2:.*\\..*:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
>例子3:.*\\..*:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
>例子4: 匹配规则啥都不写,则默认发到0这个partition上
>例子5:.*\\..* ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
>按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
>例子6: test\\.test:id,.\\..* , 针对test的表按照id散列,其余的表按照table散列

注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)

其他详细参数可参考Canal AdminGuide

canal.log报INVALID_TOPIC_EXCEPTION

1
2
3
4
5
6
7
2020-01-10 15:58:31.552 [canal-instance-scan-0] INFO  com.alibaba.otter.canal.deployer.CanalController - auto notify start example successful.
2020-01-10 16:37:26.946 [canal-instance-scan-0] INFO com.alibaba.otter.canal.deployer.CanalController - auto notify start mgr_fanboshi successful.
2020-01-10 16:38:03.992 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 10 : {=INVALID_TOPIC_EXCEPTION}
2020-01-10 16:38:04.096 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 11 : {=INVALID_TOPIC_EXCEPTION}
2020-01-10 16:38:04.200 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 12 : {=INVALID_TOPIC_EXCEPTION}
2020-01-10 16:38:04.304 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 13 : {=INVALID_TOPIC_EXCEPTION}
2020-01-10 16:38:04.409 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 14 : {=INVALID_TOPIC_EXCEPTION}

instance的log中会报错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2020-01-10 16:37:27.159 [destination = mgr_fanboshi , address = /172.18.8.200:3399 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=0080323399-mysql-bin.000004,position=1065960584,serverId=<null>,gtid=69ce1dcb-1b67-5e4b-9945-7cc64c64a14f:1-566281:1000009-2590159:3005066-3017688,
ce947b65-6b70-539b-8e6a-59e9bcde28a0:206083443-206083524:207812793-207825891:207921053-207921085,
d58aa5c2-b773-5944-bfe9-0a1142ef87f4:1706527:2447252-2447264,
f5d4a39c-7800-52ec-b181-8c751ba2d078:868915-869106:1968590-1968591,timestamp=<null>] cost : 2ms , the next step is binlog dump
2020-01-10 16:39:03.987 [pool-3-thread-2] ERROR com.alibaba.otter.canal.kafka.CanalKafkaProducer - java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:215) ~[canal.server-1.1.4.jar:na]
at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:179) ~[canal.server-1.1.4.jar:na]
at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:117) ~[canal.server-1.1.4.jar:na]
at com.alibaba.otter.canal.server.CanalMQStarter.worker(CanalMQStarter.java:183) [canal.server-1.1.4.jar:na]
at com.alibaba.otter.canal.server.CanalMQStarter.access$500(CanalMQStarter.java:23) [canal.server-1.1.4.jar:na]
at com.alibaba.otter.canal.server.CanalMQStarter$CanalMQRunnable.run(CanalMQStarter.java:225) [canal.server-1.1.4.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_111]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1150) ~[kafka-clients-1.1.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:846) ~[kafka-clients-1.1.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784) ~[kafka-clients-1.1.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:671) ~[kafka-clients-1.1.1.jar:na]
at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:199) ~[canal.server-1.1.4.jar:na]
... 8 common frames omitted
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

搜到一个issue给了我一些启发https://github.com/alibaba/canal/issues/1716

在instance.properties里面配置
同步库配置
canal.instance.filter.regex=A\\..*,B\\..*,C\\..*

消息队列配置
使用正则路由schema和table
格式topic:schema.table,topic:schema.table,topic:schema.table
canal.mq.dynamicTopic=topic_A:A,topic_B:B,topic_C:C
最好配置一个canal.mq.topic作为默认的topic,在实践中发现,有一些mysql schema的binlog也会读进来(建表语句,grant语句等),如果没有这个默认的topic,会报找不到分区的错误,从而导致canal停止写入。
canal.mq.topic=a_default_topic

分区设置

  1. 如果每个topic只有一个分区设置如下
    canal.mq.partition=0
  2. 如果每个topic有多个分区,且希望按照表名做hash进行如下配置
    # hash partition config
    canal.mq.partitionsNum=3
    canal.mq.partitionHash=.*\\..*

是不是canal.mq.topic这个参数随便写一个存在的topic,然后主要设置在这里对吧?canal.mq.dynamicTopic=topic_A:A,topic_B:B,topic_C:C

是的。canal.mq.topic算是一个默认的,没有匹配到的schema和table的日志都会输入到那个默认的里面去。消费的时候只关心你设置的动态分区。

我的理解就是一些不是我们匹配规则所需要的消息因为没有配置默认topic而找不到topic可写, 于是才会报错INVALID_TOPIC_EXCEPTION

虽然未来同事配置了canal.instance.filter.regex=broker\\..* 但是有可能grant之类的语句还是会有问题吧

最后附上自己1.1.4的配置吧

canal.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip = 172.18.8.32
# register ip to zookeeper
canal.register.ip = 172.18.8.32
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

canal.zkServers = 172.18.12.212:2181,172.18.12.213:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = true
# tcp, kafka, RocketMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = false
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
######### destinations #############
#################################################
canal.destinations =
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### MQ #############
##################################################
canal.mq.servers = 172.18.12.212:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 33554432
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.transaction = false
canal.mq.extraParams.enable.idempotence=true
canal.mq.extraParams.max.in.flight.requests.per.connection=1
#canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
#canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =

##################################################
######### Kafka Kerberos Info #############
##################################################
#canal.mq.kafka.kerberos.enable = false
#canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
#canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

instance.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=true

# position info
canal.instance.master.address=172.18.8.200:3372
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=false
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal_r
canal.instance.dbPassword=canal1234!@#$
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=fanboshi\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=
#canal.mq.topic=cherry_default
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.dynamicTopic= cherry:fanboshi
#canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

Powered by Hexo and Hexo-theme-hiker

Copyright © 2013 - 2022 Fan() All Rights Reserved.

访客数 : | 访问量 :