| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 
 | 
 
 
 '''
 Usage:
 canal_kafka_protobuf_consume.py --bootstrap-servers=<host:port,host2:port2..> [--k_user=<user> ] [--from-beginning=<false> | --from-end=<false>]  --topic=<topic_name> [--partition=<partition_number>]  [--verbose=<0>]
 canal_kafka_protobuf_consume.py -h | --help
 canal_kafka_protobuf_consume.py --version
 
 Options:
 -h --help                                       打印帮助信息.
 --version                                       版本信息.
 --bootstrap_servers=<host:port,host2:port2..>   kafka servers
 --from-beginning=<false>                        从头开始消费 [default: False]
 --from-end=<false>                              从最后开始消费 [default: True]
 --k_user=<user>                                 kafka用户, 可选项
 --topic=<topic_name>                            topic名称
 --partition=<partition_number>                  topic分区号 [default: 0]
 --verbose=<0>                                   输出详细信息0,1,2 默认0不输出 [default: 0]
 '''
 import getpass
 
 from docopt import docopt
 from canal.protocol import CanalProtocol_pb2
 from canal.protocol import EntryProtocol_pb2
 from confluent_kafka import Consumer, KafkaError, TopicPartition, OFFSET_END, OFFSET_BEGINNING
 
 
 class DocOptArgs:
 def __init__(self, args):
 self.topic = args['--topic']
 self.k_user = args['--k_user']
 self.verbose = int(args['--verbose'])
 self.partition = int(args['--partition'])
 self.bootstrap_servers = args['--bootstrap-servers']
 self.from_end = eval(args['--from-end'].capitalize())
 self.from_beginning = eval(args['--from-beginning'].capitalize())
 
 if not self.k_user:
 self.k_password = None
 elif self.k_user == 'admin':
 self.k_password = 'superSecurt'
 else:
 self.k_password = getpass.getpass("please enter kafka password: ")
 
 
 class MyConsumer(DocOptArgs):
 def __init__(self, docopt_args):
 self.args = docopt_args
 DocOptArgs.__init__(self, self.args)
 
 if self.verbose >= 1:
 print(self.args)
 
 def _on_send_response(self, err, partations):
 pt = partations[0]
 if isinstance(err, KafkaError):
 print('Topic {} 偏移量 {} 提交异常. {}'.format(pt.topic, pt.offset, err))
 raise Exception(err)
 
 def messages(self, offset_end=True):
 config = {'bootstrap.servers': self.bootstrap_servers,
 "group.id": self.topic,
 'enable.auto.commit': True,
 "fetch.wait.max.ms": 3000,
 "max.poll.interval.ms": 60000,
 'session.timeout.ms': 60000,
 "on_commit": self._on_send_response,
 "default.topic.config": {"auto.offset.reset": "latest"}}
 if self.k_user and self.k_password:
 config['security.protocol'] = 'SASL_PLAINTEXT'
 config['sasl.mechanism'] = 'SCRAM-SHA-256'
 config['sasl.username'] = self.k_user
 config['sasl.password'] = self.k_password
 
 consumer = Consumer(config)
 offset = OFFSET_END if offset_end else OFFSET_BEGINNING
 pt = TopicPartition(self.topic, 0, offset)
 consumer.assign([pt])
 
 
 try:
 while True:
 ret = consumer.consume(num_messages=100, timeout=0.1)
 if ret is None:
 print("No message Continue!")
 continue
 for msg in ret:
 if msg.error() is None:
 
 yield msg.value()
 elif msg.error():
 if msg.error().code() == KafkaError._PARTITION_EOF:
 continue
 else:
 raise Exception(msg.error())
 except Exception as e:
 print(e)
 consumer.close()
 except KeyboardInterrupt:
 consumer.close()
 
 
 class Decoder:
 
 @staticmethod
 def create_canal_message(kafka_message):
 data = kafka_message
 packet = CanalProtocol_pb2.Packet()
 packet.MergeFromString(data)
 
 message = dict(id=0, entries=[])
 
 
 messages = CanalProtocol_pb2.Messages()
 messages.MergeFromString(packet.body)
 
 for item in messages.messages:
 entry = EntryProtocol_pb2.Entry()
 entry.MergeFromString(item)
 message['entries'].append(entry)
 
 return message
 
 
 if __name__ == '__main__':
 version = 'canal_kafka_protobuf_consume 0.1.0'
 arguments = docopt(__doc__, version=version)
 
 consumer = MyConsumer(arguments)
 
 for message in consumer.messages():
 canal_message = Decoder.create_canal_message(message)
 entries = canal_message['entries']
 for entry in entries:
 entry_type = entry.entryType
 if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
 continue
 row_change = EntryProtocol_pb2.RowChange()
 row_change.MergeFromString(entry.storeValue)
 
 header = entry.header
 database = header.schemaName
 table = header.tableName
 binlog_file = header.logfileName
 binlog_pos = header.logfileOffset
 characterset = header.serverenCode
 es = header.executeTime
 gtid = header.gtid
 event_type = header.eventType
 for row in row_change.rowDatas:
 format_data = dict()
 if event_type == EntryProtocol_pb2.EventType.DELETE:
 for column in row.beforeColumns:
 format_data.update({
 column.name: column.value
 })
 elif event_type == EntryProtocol_pb2.EventType.INSERT:
 for column in row.afterColumns:
 format_data.update({
 column.name: column.value
 })
 else:
 format_data['before'] = dict()
 format_data['after'] = dict()
 for column in row.beforeColumns:
 format_data['before'][column.name] = column.value
 for column in row.afterColumns:
 format_data['after'][column.name] = column.value
 data = dict(
 db=database,
 table=table,
 event_type=EntryProtocol_pb2.EventType.Name(event_type),
 is_ddl=row_change.isDdl,
 binlog_file=binlog_file,
 binlog_pos=binlog_pos,
 characterset=characterset,
 es=es,
 gtid=header.gtid,
 data=format_data,
 )
 print(data)
 
 |