使confluent_kafka支持SASL_PLAINTEXT

同事之前一直使用kafka-python开发. 上了ACL以后发现kafka-python居然不支持SASL_PLAINTEXT

https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
sasl_mechanism (str) – string picking sasl mechanism when security_protocol is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported. Default: None

看了一下confluent-kafka是支持的但需要重新编译librdkafka
否则会报错:

1
KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Failed to create producer: No provider for SASL mechanism GSSAPI: recompile librdkafka with libsasl2 or openssl support. Current build options: PLAIN SASL_SCRAM"}

安装confluent-kafka

1
2
3
4
pip install confluent-kafka
Collecting confluent-kafka
Downloading https://files.pythonhosted.org/packages/2a/ba/dccb27376453f91ad8fa57f75a7ba5dc188023700c1789273dec976477b2/confluent_kafka-0.11.6-cp37-cp37m-manylinux1_x86_64.whl (3.9MB)
100% |████████████████████████████████| 3.9MB 984kB/s

安装librdkafka

克隆下来

1
2
3
4
5
6
7
8
9
[root@node004110 18:21:05 /tmp]
git clone https://github.com/edenhill/librdkafka.git
Cloning into 'librdkafka'...
remote: Enumerating objects: 213, done.
remote: Counting objects: 100% (213/213), done.
remote: Compressing objects: 100% (111/111), done.
remote: Total 18133 (delta 133), reused 149 (delta 98), pack-reused 17920
Receiving objects: 100% (18133/18133), 11.15 MiB | 946.00 KiB/s, done.
Resolving deltas: 100% (13758/13758), done.

检查依赖

1
2
3
4
rpm -qa| grep openssl
openssl-1.0.2k-16.el7.x86_64
openssl-libs-1.0.2k-16.el7.x86_64
openssl-devel-1.0.2k-16.el7.x86_64

The GNU toolchain
GNU make
pthreads
zlib-dev (optional, for gzip compression support)
libssl-dev (optional, for SSL and SASL SCRAM support) –这个对于centos openssl-devel
libsasl2-dev (optional, for SASL GSSAPI support)
libzstd-dev (optional, for ZStd compression support)
安装

1
2
./configure
make && make install

注意下make的时候这里是ok就行

1
2
checking for libssl (by pkg-config)... ok
checking for libssl (by compile)... ok (cached)

替换库文件

查找

1
2
find / -name "librdkafka*"
/root/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs/librdkafka.so.1

替换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#cd /root/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs/
[root@node004110 18:24:03 ~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs]
#ll
total 10316
-rwxr-xr-x 1 root root 2903144 Mar 8 18:20 libcrypto-4c524931.so.1.0.0
-rwxr-xr-x 1 root root 6944424 Mar 8 18:20 librdkafka.so.1
-rwxr-xr-x 1 root root 584072 Mar 8 18:20 libssl-01b7eff1.so.1.0.0
-rwxr-xr-x 1 root root 87848 Mar 8 18:20 libz-a147dcb0.so.1.2.3
-rw-r--r-- 1 root root 35336 Mar 8 18:20 monitoring-interceptor.so
[root@node004110 18:24:04 ~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs]
#mv librdkafka.so.1 librdkafka.so.1.bak
[root@node004110 18:24:11 ~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs]
#ln -s /usr/local/lib/librdkafka.so.1 librdkafka.so.1
[root@node004110 18:24:23 ~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs]
#ll
total 10316
-rwxr-xr-x 1 root root 2903144 Mar 8 18:20 libcrypto-4c524931.so.1.0.0
lrwxrwxrwx 1 root root 30 Mar 8 18:24 librdkafka.so.1 -> /usr/local/lib/librdkafka.so.1
-rwxr-xr-x 1 root root 6944424 Mar 8 18:20 librdkafka.so.1.bak
-rwxr-xr-x 1 root root 584072 Mar 8 18:20 libssl-01b7eff1.so.1.0.0
-rwxr-xr-x 1 root root 87848 Mar 8 18:20 libz-a147dcb0.so.1.2.3
-rw-r--r-- 1 root root 35336 Mar 8 18:20 monitoring-interceptor.so

验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#python
Python 3.7.2 (default, Mar 4 2019, 16:55:21)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from confluent_kafka import Producer
>>> p = Producer({'bootstrap.servers': '192.168.4.114:9092', 'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism':'SCRAM-SHA-256','sasl.username':'admin','sasl.password':'your-admin-pass'})
>>> def delivery_report(err, msg):
... if err is not None:
... print('Message delivery failed: {}'.format(err))
... else:
... print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
...
>>> for data in ['hello','word']:
... p.produce('test_acl', data.encode('utf-8'), callback=delivery_report)
...
>>> p.poll(10)
Message delivered to test_acl [0]
Message delivered to test_acl [0]
1
>>> p.flush()
0
>>> quit()

成功收到消息

1
2
3
[root@node004114 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.4.114:9092 --topic test_acl --consumer.config config/client-sasl.properties --from-beginning
hello
word

Powered by Hexo and Hexo-theme-hiker

Copyright © 2013 - 2019 Fan() All Rights Reserved.

访客数 : | 访问量 :

#