使用python消费canal protobuf格式数据

canal -> kafka -> consumer. flatMessage=False
参考 canal Python客户端.
由于canal Python客户端是作为canal的client直连canal 11111端口消费数据而非消费kafka数据, 所以example不能照搬, 需要做一些修改

Python3.7.4

requriments

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
backcall==0.1.0
bleach==3.1.0
canal-python==0.4
certifi==2019.6.16
chardet==3.0.4
confluent-kafka==1.3.0
decorator==4.4.2
docopt==0.6.2
docutils==0.15.2
idna==2.8
ipython==7.13.0
ipython-genutils==0.2.0
jedi==0.16.0
parso==0.6.2
pexpect==4.8.0
pickleshare==0.7.5
pkginfo==1.5.0.1
prompt-toolkit==3.0.4
protobuf==3.9.1
ptyprocess==0.6.0
Pygments==2.4.2
readme-renderer==24.0
requests==2.22.0
requests-toolbelt==0.9.1
six==1.12.0
tqdm==4.34.0
traitlets==4.3.3
twine==1.13.0
urllib3==1.25.3
wcwidth==0.1.8
webencodings==0.5.1

主要参考

https://github.com/haozi3156666/canal-python/blob/master/canal/client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# -*- coding: utf8 -*-
# __author__ = 'Fan()'
# Date: 2020-03-18

'''
Usage:
canal_kafka_protobuf_consume.py --bootstrap-servers=<host:port,host2:port2..> [--k_user=<user> ] [--from-beginning=<false> | --from-end=<false>] --topic=<topic_name> [--partition=<partition_number>] [--verbose=<0>]
canal_kafka_protobuf_consume.py -h | --help
canal_kafka_protobuf_consume.py --version

Options:
-h --help 打印帮助信息.
--version 版本信息.
--bootstrap_servers=<host:port,host2:port2..> kafka servers
--from-beginning=<false> 从头开始消费 [default: False]
--from-end=<false> 从最后开始消费 [default: True]
--k_user=<user> kafka用户, 可选项
--topic=<topic_name> topic名称
--partition=<partition_number> topic分区号 [default: 0]
--verbose=<0> 输出详细信息0,1,2 默认0不输出 [default: 0]
'''
import getpass

from docopt import docopt
from canal.protocol import CanalProtocol_pb2
from canal.protocol import EntryProtocol_pb2
from confluent_kafka import Consumer, KafkaError, TopicPartition, OFFSET_END, OFFSET_BEGINNING


class DocOptArgs:
def __init__(self, args):
self.topic = args['--topic']
self.k_user = args['--k_user']
self.verbose = int(args['--verbose'])
self.partition = int(args['--partition'])
self.bootstrap_servers = args['--bootstrap-servers']
self.from_end = eval(args['--from-end'].capitalize())
self.from_beginning = eval(args['--from-beginning'].capitalize())

if not self.k_user:
self.k_password = None
elif self.k_user == 'admin':
self.k_password = 'superSecurt'
else:
self.k_password = getpass.getpass("please enter kafka password: ")


class MyConsumer(DocOptArgs):
def __init__(self, docopt_args):
self.args = docopt_args
DocOptArgs.__init__(self, self.args)

if self.verbose >= 1:
print(self.args)

def _on_send_response(self, err, partations):
pt = partations[0]
if isinstance(err, KafkaError):
print('Topic {} 偏移量 {} 提交异常. {}'.format(pt.topic, pt.offset, err))
raise Exception(err)

def messages(self, offset_end=True):
config = {'bootstrap.servers': self.bootstrap_servers,
"group.id": self.topic,
'enable.auto.commit': True,
"fetch.wait.max.ms": 3000,
"max.poll.interval.ms": 60000,
'session.timeout.ms': 60000,
"on_commit": self._on_send_response,
"default.topic.config": {"auto.offset.reset": "latest"}}
if self.k_user and self.k_password:
config['security.protocol'] = 'SASL_PLAINTEXT'
config['sasl.mechanism'] = 'SCRAM-SHA-256'
config['sasl.username'] = self.k_user
config['sasl.password'] = self.k_password

consumer = Consumer(config)
offset = OFFSET_END if offset_end else OFFSET_BEGINNING
pt = TopicPartition(self.topic, 0, offset)
consumer.assign([pt])
# consumer.seek(pt)

try:
while True:
ret = consumer.consume(num_messages=100, timeout=0.1)
if ret is None:
print("No message Continue!")
continue
for msg in ret:
if msg.error() is None:
# protobuf binary
yield msg.value()
elif msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise Exception(msg.error())
except Exception as e:
print(e)
consumer.close()
except KeyboardInterrupt:
consumer.close()


class Decoder:

@staticmethod
def create_canal_message(kafka_message):
data = kafka_message
packet = CanalProtocol_pb2.Packet()
packet.MergeFromString(data)

message = dict(id=0, entries=[])
# 因为从kafka获取的canal写入的消息, 所以这个条件应该永远成立
# if packet.type == CanalProtocol_pb2.PacketType.MESSAGES:
messages = CanalProtocol_pb2.Messages()
messages.MergeFromString(packet.body)

for item in messages.messages:
entry = EntryProtocol_pb2.Entry()
entry.MergeFromString(item)
message['entries'].append(entry)

return message


if __name__ == '__main__':
version = 'canal_kafka_protobuf_consume 0.1.0'
arguments = docopt(__doc__, version=version)

consumer = MyConsumer(arguments)

for message in consumer.messages():
canal_message = Decoder.create_canal_message(message)
entries = canal_message['entries']
for entry in entries:
entry_type = entry.entryType
if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
continue
row_change = EntryProtocol_pb2.RowChange()
row_change.MergeFromString(entry.storeValue)
# event_type = row_change.eventType
header = entry.header
database = header.schemaName
table = header.tableName
binlog_file = header.logfileName
binlog_pos = header.logfileOffset
characterset = header.serverenCode
es = header.executeTime
gtid = header.gtid
event_type = header.eventType
for row in row_change.rowDatas:
format_data = dict()
if event_type == EntryProtocol_pb2.EventType.DELETE:
for column in row.beforeColumns:
format_data.update({
column.name: column.value
})
elif event_type == EntryProtocol_pb2.EventType.INSERT:
for column in row.afterColumns:
format_data.update({
column.name: column.value
})
else:
format_data['before'] = dict()
format_data['after'] = dict()
for column in row.beforeColumns:
format_data['before'][column.name] = column.value
for column in row.afterColumns:
format_data['after'][column.name] = column.value
data = dict(
db=database,
table=table,
event_type=EntryProtocol_pb2.EventType.Name(event_type),
is_ddl=row_change.isDdl,
binlog_file=binlog_file,
binlog_pos=binlog_pos,
characterset=characterset,
es=es,
gtid=header.gtid,
data=format_data,
)
print(data)

使用效果

1
2
3
#python canal_kafka_protobuf_consume.py --bootstrap-servers=172.16.xx.xx:9092,172.16.xx.xx:9092,172.16.xx.xx:9092 --topic=fanboshi.monitor_delay
{'db': 'fanboshi', 'table': 'monitor_delay', 'event_type': 'INSERT', 'is_ddl': False, 'binlog_file': 'mysql-bin.000006', 'binlog_pos': 469896982, 'characterset': 'UTF-8', 'es': 1584535911000, 'gtid': 'c30c6a02-4e32-11ea-84ec-fa163edcd14e:1-2940100', 'data': {'id': '5', 'ctime': '2020-03-18 20:51:51'}}
{'db': 'fanboshi', 'table': 'monitor_delay', 'event_type': 'INSERT', 'is_ddl': False, 'binlog_file': 'mysql-bin.000006', 'binlog_pos': 469897261, 'characterset': 'UTF-8', 'es': 1584535912000, 'gtid': 'c30c6a02-4e32-11ea-84ec-fa163edcd14e:1-2940101', 'data': {'id': '6', 'ctime': '2020-03-18 20:51:52'}}

Powered by Hexo and Hexo-theme-hiker

Copyright © 2013 - 2022 Fan() All Rights Reserved.

访客数 : | 访问量 :