Kafka 创建时未指定多个副本或者副本数量过少,都可以在后期手动添加,另外如果副本过多也可以减少,当前调整基于 Kafka 的版本是 2.5.1,但是估计 2.1 ~ 2.5 应该都是兼容的。
下面先来操作一下 Topic 副本减少的过程,首先查看 Kafka Topic 的详情:
# 新版本的 Kafka 建议使用 --bootstrap-server 不建议再使用 --zookeeper
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-topic
比如输出如下:
Topic: test-topic PartitionCount: 4 ReplicationFactor: 2 Configs: cleanup.policy=delete
Topic: test-topic Partition: 0 Leader: 1002 Replicas: 1002,1004 Isr: 1002,1004
Topic: test-topic Partition: 1 Leader: 1001 Replicas: 1001,1002 Isr: 1001,1002
Topic: test-topic Partition: 2 Leader: 1003 Replicas: 1003,1001 Isr: 1001,1003
Topic: test-topic Partition: 3 Leader: 1004 Replicas: 1004,1003 Isr: 1003,1004
然后我们看每个 partition 其实是有两个副本的,我们接下来准备缩减一下,那么我们尽量只留 Leader 副本就可以了,干掉 Follower 副本,我们首先编写下面的配置文件:
{"version":1,
"partitions":[
{"topic":"test-topic","partition":0,"replicas":[1002]},
{"topic":"test-topic","partition":1,"replicas":[1001]},
{"topic":"test-topic","partition":2,"replicas":[1003]},
{"topic":"test-topic","partition":3,"replicas":[1004]}
]}
然后将上面的 JSON 保存到 reassign.json 文件,这个配置相当于每个分区只保留一个 Leader 副本,然后我们执行命令减少副本:
# 注意这里需要指定 ZooKeeper 地址,2.5 之前的都是这样,最新的好像是支持 --bootstrap-server 参数
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file reassign.json --execute
执行完成之后,Kafka 会给我们提示开始执行,然后我们可以用下面的命令查看进度:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file reassign.json --verify
等进度完成后,我们再次查看 Topic 的详情:
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-topic
这个时候分区就减少了:
Topic: test-topic PartitionCount: 4 ReplicationFactor: 1 Configs: cleanup.policy=delete,segment.bytes=1073741824
Topic: test-topic Partition: 0 Leader: 1002 Replicas: 1002 Isr: 1002
Topic: test-topic Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: test-topic Partition: 2 Leader: 1003 Replicas: 1003 Isr: 1003
Topic: test-topic Partition: 3 Leader: 1004 Replicas: 1004 Isr: 1004
然后这个时候分区的底层文件会先标记为删除,并在稍后进行清理。正常情况下所有的生产者和消费者都不会受到影响而导致中断。
然后我们还可以增加分区的副本数量,同样需要编写文件如下:
{"version":1,
"partitions":[
{"topic":"test-topic","partition":0,"replicas":[1002, 1004]},
{"topic":"test-topic","partition":1,"replicas":[1001, 1002]},
{"topic":"test-topic","partition":2,"replicas":[1003, 1001]},
{"topic":"test-topic","partition":3,"replicas":[1004, 1003]}
]}
Kafka 会使用 replicas
下的第一个 broker id 作为 Leader 副本,其余的作为 Follower 副本,所以我们尽量保证现有的 Leader 和变化后的 Leader 一致,这样不会由于 Rebalance 导致消费者阻塞,从而保证服务不中断,如果后期确实分配不均再考虑调整,然后将上面 JSON 保存成文件开始操作:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file reassign.json --execute
然后查看进度:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file reassign.json --verify
等执行完,再次查看 Topic 详情,确定 Isr 中包含所有的副本就说明已经同步了。
然后在当前状态下,还可以让 Kafka 给我们生成比较好的副本方案,这样比较均衡,我们可以创建一个包含 Topic 的文件:
{
"topics": [
{
"topic": "test-topic"
}
],
"version": 1
}
这个文件可以指定多个 Topic,然后我们保存文件为:topic-generate.json,并让 Kafka 给我们生成分配方案:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --topics-to-move-json-file topic-generate.json --broker-list "1001,1002,1003,1004" --generate
注意上面 –broker-list 要输入集群所有的 broker id,一定不要漏,然后执行完成 Kafka 就会输出一个基于当前副本数量的分配方案,前面是当前的,后面是 Kafka 为我们生成的,然后我们可以保存后面的 JSON 文件然后执行它即可。
还有一种比较常用的场景是 Kafka 集群添加了一些节点,但是之前的分区没有重新分配,我们这个时候也需要这样来执行重分配,首先也是按照上面的命令指定全部的 broker id,包括新添加的节点,然后生成分配的方案,生成后执行即可。
但是需要注意的是,如果我们原来的分区数量就比较少,比如每个机器只有一个分区,那么这个时候重分配无论如何也无法均衡,我们需要调用 Kafka 命令手动添加分区,然后分区数量足够的条件下再执行重分配即可。当然假如我们开始的分区比较多,那么直接重新分配也是没问题的,并且可以同时调整分区跟副本。另外还有一些技巧,比如尽量保证 Leader replica 前后不变,可以做到尽量不影响生产者和消费者的运行等。