假如Kafka集群中一个broker宕机无法恢复, 应该如何处理? 坐地铁时想到这个问题, 印象中书中说添加新的broker, 是不会自动同步旧数据的.
笨办法 环境介绍 三个broker的集群, zk,kafka装在一起1 2 3 4 5 | broker | IP | broker.id | |--------- |--------------- |----------- | | broker1 | 172.18.12.211 | 211 | | broker2 | 172.18.12.212 | 212 | | broker3 | 172.18.12.213 | 213 |
创建测试topic 1 2 #./bin/kafka-topics.sh --zookeeper 172.18 .12 .212 :2181 --create --topic test1 --replication-factor 3 --partitions 1 Created topic "test1" .
查看1 2 3 #./bin/kafka-topics.sh --zookeeper 172.18 .12 .212 :2181 --describe --topic test1 Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs: Topic: test1 Partition: 0 Leader: 213 Replicas: 213 ,212 ,211 Isr: 213 ,212 ,211
注意当前Replicas: 213,212,211
Isr: 213,212,211
造一些消息1 2 3 4 #./bin/kafka-console-producer.sh --broker-list 172.18 .12 .212 :9092 --topic test1 >1 >2 >3
kill broker2 1 2 3 4 5 6 [root@node024212 ~]# ps -ef| grep kafka root 17633 1 1 Feb17 ? 00 :55 :18 /usr/ local/java/ bin/java -server -Xmx2g -Xms2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=85 -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Xloggc:/u sr/local/ kafka/bin/ ../logs/ kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100 M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9966 -Dkafka.logs.dir=/usr/ local/kafka/ bin/../ logs -Dlog4j.configuration=file :./bin/ ../config/ log4j.properties -cp .:/usr/ local/java/ lib:/usr/ local/java/ jre/lib:/u sr/local/ kafka/bin/ ../libs/ activation-1.1 .1 .jar:/usr/ local/kafka/ bin/../ libs/aopalliance-repackaged-2.5.0-b42.jar:/u sr/local/ kafka/bin/ ../libs/ argparse4j-0.7 .0 .jar:/usr/ local/kafka/ bin/../ libs/audience-annotations-0.5.0.jar:/u sr/local/ kafka/bin/ ../libs/ commons-lang3-3.5 .jar:/usr/ local/kafka/ bin/../ libs/compileScala.mapping:/u sr/local/ kafka/bin/ ../libs/ compileScala.mapping.asc:/usr/ local/kafka/ bin/../ libs/connect-api-2.1.0.jar:/u sr/local/ kafka/bin/ ../libs/ connect-basic-auth-extension-2.1 .0 .jar:/usr/ local/kafka/ bin/../ libs/connect-file-2.1.0.jar:/u sr/local/ kafka/bin/ ../libs/ connect-json-2.1 .0 .jar:/usr/ local/kafka/ bin/../ libs/connect-runtime-2.1.0.jar:/u sr/local/ kafka/bin/ ../libs/ connect-transforms-2.1 .0 .jar:/usr/ local/kafka/ bin/../ libs/guava-20.0.jar:/u sr/local/ kafka/bin/ ../libs/ hk2-api-2.5 .0 -b42.jar:/usr/ local/kafka/ bin/../ libs/hk2-locator-2.5.0-b42.jar:/u sr/local/ kafka/bin/ ../libs/ hk2-utils-2.5 .0 -b42.jar:/usr/ local/kafka/ bin/../ libs/jackson-annotations-2.9.7.jar:/u sr/local/ kafka/bin/ ../libs/ jackson-core-2.9 .7 .jar:/usr/ local/kafka/ bin/../ libs/jackson-databind-2.9.7.jar:/u sr/local/ kafka/bin/ ../libs/ jackson-jaxrs-base-2.9 .7 .jar:/usr/ local/kafka/ bin/../ libs/jackson-jaxrs-json-provider-2.9.7.jar:/u sr/local/ kafka/bin/ ../libs/ jackson-module-jaxb-annotations-2.9 .7 .jar:/usr/ local/kafka/ bin/../ libs/javassist-3.22.0-CR2.jar:/u sr/local/ kafka/bin/ ../libs/ javax.annotation-api-1.2 .jar:/usr/ local/kafka/ bin/../ libs/javax.inject-1.jar:/u sr/local/ kafka/bin/ ../libs/ javax.inject -2.5 .0 -b42.jar:/usr/ local/kafka/ bin/../ libs/javax.servlet-api-3.1.0.jar:/u sr/local/ kafka/bin/ ../libs/ javax.ws.rs-api-2.1 .1 .jar:/usr/ local/kafka/ bin/../ libs/javax.ws.rs-api-2.1.jar:/u sr/local/ kafka/bin/ ../libs/ jaxb-api-2.3 .0 .jar:/usr/ local/kafka/ bin/../ libs/jersey-client-2.27.jar:/u sr/local/ kafka/bin/ ../libs/ jersey-common-2.27 .jar:/usr/ local/kafka/ bin/../ libs/jersey-container-servlet-2.27.jar:/u sr/local/ kafka/bin/ ../libs/ jersey-container-servlet-core-2.27 .jar:/usr/ local/kafka/ bin/../ libs/jersey-hk2-2.27.jar:/u sr/local/ kafka/bin/ ../libs/ jersey-media-jaxb-2.27 .jar:/usr/ local/kafka/ bin/../ libs/jersey-server-2.27.jar:/u sr/local/ kafka/bin/ ../libs/ jetty-client-9.4 .12 .v20180830.jar:/usr/ local/kafka/ bin/../ libs/jetty-continuation-9.4.12.v20180830.jar:/u sr/local/ kafka/bin/ ../libs/ jetty-http-9.4 .12 .v20180830.jar:/usr/ local/kafka/ bin/../ libs/jetty-io-9.4.12.v20180830.jar:/u sr/local/ kafka/bin/ ../libs/ jetty-security-9.4 .12 .v20180830.jar:/usr/ local/kafka/ bin/../ libs/jetty-server-9.4.12.v20180830.jar:/u sr/local/ kafka/bin/ ../libs/ jetty-servlet-9.4 .12 .v20180830.jar:/usr/ local/kafka/ bin/../ libs/jetty-servlets-9.4.12.v20180830.jar:/u sr/local/ kafka/bin/ ../libs/ jetty-util-9.4 .12 .v20180830.jar:/usr/ local/kafka/ bin/../ libs/jopt-simple-5.0.4.jar:/u sr/local/ kafka/bin/ ../libs/ kafka_2.12 -2.1 .0 .jar:/usr/ local/kafka/ bin/../ libs/kafka_2.12-2.1.0-sources.jar:/u sr/local/ kafka/bin/ ../libs/ kafka-clients-2.1 .0 .jar:/usr/ local/kafka/ bin/../ libs/kafka-log4j-appender-2.1.0.jar:/u sr/local/ kafka/bin/ ../libs/ kafka-streams-2.1 .0 .jar:/usr/ local/kafka/ bin/../ libs/kafka-streams-examples-2.1.0.jar:/u sr/local/ kafka/bin/ ../libs/ kafka-streams-scala_2.12 -2.1 .0 .jar:/usr/ local/kafka/ bin/../ libs/kafka-streams-test-utils-2.1.0.jar:/u sr/local/ kafka/bin/ ../libs/ kafka-tools-2.1 .0 .jar:/usr/ local/kafka/ bin/../ libs/log4j-1.2.17.jar:/u sr/local/ kafka/bin/ ../libs/ lz4-java-1.5 .0 .jar:/usr/ local/kafka/ bin/../ libs/maven-artifact-3.5.4.jar:/u sr/local/ kafka/bin/ ../libs/m etrics-core-2.2 .0 .jar:/usr/ local/kafka/ bin/../ libs/osgi-resource-locator-1.0.1.jar:/u sr/local/ kafka/bin/ ../libs/ plexus-utils-3.1 .0 .jar:/usr/ local/kafka/ bin/../ libs/reflections-0.9.11.jar:/u sr/local/ kafka/bin/ ../libs/ rocksdbjni-5.14 .2 .jar:/usr/ local/kafka/ bin/../ libs/scala-library-2.12.7.jar:/u sr/local/ kafka/bin/ ../libs/ scala-logging_2.12 -3.9 .0 .jar:/usr/ local/kafka/ bin/../ libs/scala-reflect-2.12.7.jar:/u sr/local/ kafka/bin/ ../libs/ slf4j-api-1.7 .25 .jar:/usr/ local/kafka/ bin/../ libs/slf4j-log4j12-1.7.25.jar:/u sr/local/ kafka/bin/ ../libs/ snappy-java-1.1 .7.2 .jar:/usr/ local/kafka/ bin/../ libs/validation-api-1.1.0.Final.jar:/u sr/local/ kafka/bin/ ../libs/ zkclient-0.10 .jar:/usr/ local/kafka/ bin/../ libs/zookeeper-3.4.13.jar:/u sr/local/ kafka/bin/ ../libs/ zstd-jni-1.3 .5 -4 .jar kafka.Kafka config/server.properties root 21806 21651 0 11 :27 pts/2 00 :00 :00 grep --color=auto kafka [root@node024212 ~]# kill -9 17633 [root@node024212 ~]# ps -ef| grep kafka root 21875 21651 0 11 :27 pts/2 00 :00 :00 grep --color=auto kafka
稍等一会, 再次describe test11 2 3 #./bin/kafka-topics.sh --zookeeper 172.18 .12 .212 :2181 --describe --topic test1 Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs: Topic: test1 Partition: 0 Leader: 213 Replicas: 213 ,212 ,211 Isr: 213 ,211
可看到副本仍然是Replicas: 213,212,211
ISR已经变为Isr: 213,211
在212启动新broker 创建一份新的配置文件, 自动一个新的broker1 2 3 4 5 只修改这两个参数 broker.id=218 log.dirs=/DATA21/ kafka/kafka-logs,/ DATA22/kafka/ kafka-logs,/DATA23/ kafka/kafka-logs,/ DATA24/kafka/ kafka-logs
创建相应目录1 2 3 4 mkdir -p /DATA21/ kafka/kafka-logs mkdir -p /DATA22/ kafka/kafka-logs mkdir -p /DATA23/ kafka/kafka-logs mkdir -p /DATA24/ kafka/kafka-logs
启动新broker1 ./bin/ kafka-server-start.sh -daemon config/server2.properties
稍等, 查看 test1 状态
1 2 3 #./bin/kafka-topics.sh --zookeeper 172.18 .12 .212 :2181 --describe --topic test1 Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs: Topic: test2 Partition: 0 Leader: 213 Replicas: 213 ,212 ,211 Isr: 213 ,218 ,211
可以看到 test1 副本仍然是Replicas: 213,212,211
ISR为Isr: 213,218,211
. 也就是说缺失的副本不会自动迁移到新broker上.
使用kafka-reassign-partitions.sh重分配分区 将212删除,添加2181 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 [root@node024211 12:04:48 /usr/local /kafka] #echo '{"version" :1,"partitions" :[{"topic" :"test1" ,"partition" :0,"replicas" :[211,213,218]}]}' > increase-replication-factor .json [root@node024211 12:58:30 /usr/local /kafka] #./bin/kafka-reassign-partitions.sh --zookeeper 172.18.12.211:2181 --reassignment-json-file increase-replication-factor .json --execute Current partition replica assignment {"version" :1,"partitions" :[{"topic" :"test1" ,"partition" :0,"replicas" :[213,212,211],"log_dirs" :["any" ,"any" ,"any" ]}]} Save this to use as the --reassignment-json-file option during rollbackSuccessfully started reassignment of partitions. [root@node024211 12:58:49 /usr/local /kafka] #./bin/kafka-reassign-partitions.sh --zookeeper 172.18.12.211:2181 --reassignment-json-file increase-replication-factor .json --verify Status of partition reassignment: Reassignment of partition test1-0 completed successfully
查看topic信息1 2 3 4 #./bin/kafka-topics.sh --zookeeper 172.18 .12 .212 :2181 --describe --topic test1 Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs: Topic: test1 Partition: 0 Leader: 213 Replicas: 211 ,213 ,218 Isr: 213 ,211 ,218
验证218是否有全部数据 虽然看副本信息中已经有了218, 但是218是否包含旧消息呢? 我的办法是, kill 211,213, 然后–from-beginning 消费218数据, 实际测试也是可以的1 2 3 4 5 6 7 8 9 10 11 12 13 #./bin/kafka-console-consumer.sh --bootstrap-server 172.18 .12 .212 :9092 --topic test1 --from -beginning 1 2 3 4 5 6 7 8 9 10 11 11
看了下211 218的log文件大小也是一样的1 2 3 4 5 6 [2019 -02 -21 13 :29 :19 ]#ls -l /DATA22/kafka/kafka-logs/test1-0 / [2019 -02 -21 13 :29 :19 ]total 8 [2019 -02 -21 13 :29 :19 ]-rw-r--r--. 1 root root 10485760 Feb 21 12 :58 00000000000000000000. index [2019 -02 -21 13 :29 :19 ]-rw-r--r--. 1 root root 381 Feb 21 13 :00 00000000000000000000. log [2019 -02 -21 13 :29 :19 ]-rw-r--r--. 1 root root 10485756 Feb 21 12 :58 00000000000000000000. timeindex [2019 -02 -21 13 :29 :19 ]-rw-r--r--. 1 root root 16 Feb 21 13 :00 leader-epoch-checkpoint
更简单的办法 通过阅读文档发现https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Howtoreplaceafailedbroker?
How to replace a failed broker? When a broker fails, Kafka doesn’t automatically re-replicate the data on the failed broker to other brokers. This is because in the common case, one brings down a broker to apply code or config changes, and will bring up the broker quickly afterward. Re-replicating the data in this case will be wasteful. In the rarer case that a broker fails completely, one will need to bring up another broker with the same broker id on a new server. The new broker will automatically replicate the missing data.
这上面说的,如果服务器真的坏了, 只需要新启动一个broker, 把broker.id设置为 损坏的那个broker的id, 就会自动复制过去丢失的数据
我实际测试了一下, 确实可以恢复