使confluent_kafka支持SASL_PLAINTEXT

同事之前一直使用kafka-python开发. 上了ACL以后发现kafka-python居然不支持SASL_PLAINTEXT

https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
sasl_mechanism (str) – string picking sasl mechanism when security_protocol is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported. Default: None

看了一下confluent-kafka是支持的但需要重新编译librdkafka
否则会报错:

1
KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Failed to create producer: No provider for SASL mechanism GSSAPI: recompile librdkafka with libsasl2 or openssl support. Current build options: PLAIN SASL_SCRAM"}

安装confluent-kafka

1
2
3
4
pip install confluent-kafka
Collecting confluent-kafka
Downloading https://files.pythonhosted.org/packages/2a/ba/dccb27376453f91ad8fa57f75a7ba5dc188023700c1789273dec976477b2/confluent_kafka-0.11.6-cp37-cp37m-manylinux1_x86_64.whl (3.9MB)
100% |████████████████████████████████| 3.9MB 984kB/s

安装librdkafka

克隆下来

1
2
3
4
5
6
7
8
9
[root@node004110 18:21:05 /tmp]
git clone https://github.com/edenhill/librdkafka.git
Cloning into 'librdkafka'...
remote: Enumerating objects: 213, done.
remote: Counting objects: 100% (213/213), done.
remote: Compressing objects: 100% (111/111), done.
remote: Total 18133 (delta 133), reused 149 (delta 98), pack-reused 17920
Receiving objects: 100% (18133/18133), 11.15 MiB | 946.00 KiB/s, done.
Resolving deltas: 100% (13758/13758), done.

检查依赖

1
2
3
4
rpm -qa| grep openssl
openssl-1.0.2k-16.el7.x86_64
openssl-libs-1.0.2k-16.el7.x86_64
openssl-devel-1.0.2k-16.el7.x86_64

The GNU toolchain
GNU make
pthreads
zlib-dev (optional, for gzip compression support)
libssl-dev (optional, for SSL and SASL SCRAM support) –这个对于centos openssl-devel
libsasl2-dev (optional, for SASL GSSAPI support)
libzstd-dev (optional, for ZStd compression support)
安装

1
2
./configure
make && make install

注意下make的时候这里是ok就行

1
2
checking for libssl (by pkg-config)... ok
checking for libssl (by compile)... ok (cached)

替换库文件

查找

1
2
find / -name "librdkafka*"
/root/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs/librdkafka.so.1

替换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#cd /root/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs/
[root@node004110 18:24:03 ~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs]
#ll
total 10316
-rwxr-xr-x 1 root root 2903144 Mar 8 18:20 libcrypto-4c524931.so.1.0.0
-rwxr-xr-x 1 root root 6944424 Mar 8 18:20 librdkafka.so.1
-rwxr-xr-x 1 root root 584072 Mar 8 18:20 libssl-01b7eff1.so.1.0.0
-rwxr-xr-x 1 root root 87848 Mar 8 18:20 libz-a147dcb0.so.1.2.3
-rw-r--r-- 1 root root 35336 Mar 8 18:20 monitoring-interceptor.so
[root@node004110 18:24:04 ~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs]
#mv librdkafka.so.1 librdkafka.so.1.bak
[root@node004110 18:24:11 ~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs]
#ln -s /usr/local/lib/librdkafka.so.1 librdkafka.so.1
[root@node004110 18:24:23 ~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/confluent_kafka/.libs]
#ll
total 10316
-rwxr-xr-x 1 root root 2903144 Mar 8 18:20 libcrypto-4c524931.so.1.0.0
lrwxrwxrwx 1 root root 30 Mar 8 18:24 librdkafka.so.1 -> /usr/local/lib/librdkafka.so.1
-rwxr-xr-x 1 root root 6944424 Mar 8 18:20 librdkafka.so.1.bak
-rwxr-xr-x 1 root root 584072 Mar 8 18:20 libssl-01b7eff1.so.1.0.0
-rwxr-xr-x 1 root root 87848 Mar 8 18:20 libz-a147dcb0.so.1.2.3
-rw-r--r-- 1 root root 35336 Mar 8 18:20 monitoring-interceptor.so

验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#python
Python 3.7.2 (default, Mar 4 2019, 16:55:21)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from confluent_kafka import Producer
>>> p = Producer({'bootstrap.servers': '192.168.4.114:9092', 'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism':'SCRAM-SHA-256','sasl.username':'admin','sasl.password':'your-admin-pass'})
>>> def delivery_report(err, msg):
... if err is not None:
... print('Message delivery failed: {}'.format(err))
... else:
... print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
...
>>> for data in ['hello','word']:
... p.produce('test_acl', data.encode('utf-8'), callback=delivery_report)
...
>>> p.poll(10)
Message delivered to test_acl [0]
Message delivered to test_acl [0]
1
>>> p.flush()
0
>>> quit()

成功收到消息

1
2
3
[root@node004114 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.4.114:9092 --topic test_acl --consumer.config config/client-sasl.properties --from-beginning
hello
word

假如Kafka集群中一个broker宕机无法恢复, 应该如何处理?

假如Kafka集群中一个broker宕机无法恢复, 应该如何处理?

坐地铁时想到这个问题, 印象中书中说添加新的broker, 是不会自动同步旧数据的.

笨办法

环境介绍

三个broker的集群, zk,kafka装在一起

1
2
3
4
5
| broker | IP | broker.id |
|---------|---------------|-----------|
| broker1 | 172.18.12.211 | 211 |
| broker2 | 172.18.12.212 | 212 |
| broker3 | 172.18.12.213 | 213 |

阅读全文

Kafka SASL_SCRAM+ACL实现动态创建用户及权限控制

SASL_SCRAM+ACL实现动态创建用户及权限控制

  研究了一段时间Kafka的权限控制. 之前一直看SASL_PLAINTEXT, 结果发现这玩意不能动态创建用户. 今天没死心又查了查,发现这个人也问了这个问题
https://stackoverflow.com/questions/54147460/kafka-adding-sasl-users-dynamically-without-cluster-restart
于是研究了下SCRAM. 写了这篇完整的文档

本篇文档中使用的是自己部署的zookeeper, zookeeper无需做任何特殊配置

阅读全文

MGR vs PXC Benchmark

MGR vs PXC Benchmark

本次测试将MGR与PXC进行对比, 意在比较MGR自身不同流控阈值下性能表现以及观察MGR与PXC相同负载下的性能表现. 测试工具使用sysbench 1.1.0-431660d

基础环将信息如下表所示, 在三个测试服务器部署了3306, 3307两套集群

Host IP MGR PXC
data-191 192.168.8.191 3306 3307
data-219 192.168.8.219 3306 3307
data-220 192.168.8.220 3306 3307

阅读全文

MySQL Group Replication- understanding Flow Control[译文]

MySQL Group Replication: understanding Flow Control

使用MySQL Group Replication时,某些成员可能落后于该组。 由于负载,硬件限制等原因…这种滞后可能会导致不能保持良好的性能认证行为和不能尽可能降低认证失败次数.应用队列(applying queue)越大, 与尚未应用的事务发生冲突的风险就越大(这在Multi-Primary Groups上是有问题的)。

Galera用户已经熟悉这个概念(译注: 指流控的概念). MySQL Group Replication的实现与Galera有两个主要不同点:

  • the Group is never totally stalled(译注: Group不会彻底停止接收写请求)

  • the node having issues doesn’t send flow control messages to the rest of the group asking for slowing down(译注: 短板节点不会向Group的其他节点发送flow control消息来要求集群减速)

实际上,该组的每个成员都会向其他成员发送有关其队列(applier queue and certification queue)的一些统计信息。 然后,如果每个节点意识到一个节点达到了其中一个队列的阈值,则决定是否减速:

1
2
3
4
group_replication_flow_control_applier_threshold (default is 25000)
group_replication_flow_control_certifier_threshold (default is 25000)

因此,如果在节点上将group_replication_flow_control_mode设置为QUOTA,并且看到该集群的其他成员之一落后(达到阈值),则会将写入操作限制为最小配额。 此配额是根据最后一秒中应用的transactions数计算的,然后通过减去上一个周期的“over the quota”消息来减少。

and then it is reduced below that by subtracting the “over the quota” messages from the last period.

这意味着与Galera相反,Galera的(flow control)阈值是有短板节点决定的,对于我们在MySQL组复制中,编写事务的节点(译注: 写节点)检查其阈值流量控制值并将它们与来自其他节点的统计数据进行比较以决定是否进行flow control。

您可以在Vitor的文章Zooming-in on Group Replication Performance一文中找到有关组复制流控制的更多信息


Powered by Hexo and Hexo-theme-hiker

Copyright © 2013 - 2019 Fan() All Rights Reserved.

访客数 : | 访问量 :

#