Kafka 的分片和副本机制

2024/04/15

我们在使用 Kafka 生产和消费消息的时候,肯定是希望能够将数据均匀地分配到所有服务器上。比如在日志收集场景,数据量是非常巨大的,例如大批量的集群每分钟产生的日志都能以 GB 计,所以如何将这么大的数据量均匀地分配到 Kafka 的各个 Broker 上,就是一个非常重要的问题。

除了应对吞吐量的问题,为了确保系统的高可用性和消息的高持久性,Kafka 还会有副本机制来保障数据的高持久性,同时也可以提供读操作高伸缩性的能力,解决单个节点的读取瓶颈。

我们下面依次来对分片和副本的基本运作机制进行总结和分析。

1.为什么要分区

我们都知道 Kafka 的 Topic 是承载数据的逻辑容器,在每个 Topic 下面会划分为多个分区,可以说分区是数据的物理载体。所以 Kafka 的消息组织方式从上到下实际是:Topic -> Partition -> Message,每条消息只会保存在某一个分区中,不会跨分区保存多份。

image

这张图片来自 Kafka 官网,这里有 P1 ~ P4 这 4 个分区,两个不同的生产者客户端通过网络将事件写入 Topic 对应的分区,并且多个生产者发送数据是相互独立的,彼此不需要感知到对方的存在,数据也可以同时写入一个分区,而这些操作都由 Kafka Broker 来完成,这种分布式的写入方式对 Kafka 的可扩展性至关重要。

分区的主要作用就是提供负载均衡的能力,从而实现系统的高伸缩性。数据读写操作的最小粒度是分区,而分区可以被放置到不同节点的机器上,所以每个机器的分区都可以各地独立并行地执行读写处理,可以非常方便地通过添加新的机器实现横向扩展,提升系统整体的吞吐。

不同的分布式系统对分区的概念定义略有不同,比如 Elasticsearch 或 ClickHouse 中叫做分片(Shard),在 HBase 中叫做 Region,虽然从叫法不同,但是对数据做分布式划分的基本思想是相通的。

除了最核心的负载均衡功能之外,利用分区也可以实现一些业务方面的特殊需求,比如消息的顺序问题等。

2.常见的分区策略

分区策略就是决定生产者将消息发送到哪个分区的算法,除了默认的策略外 Kafka 还支持自定义分区策略。

对于自定义分区策略,我们只需要实现 org.apache.kafka.clients.producer.Partitioner 这个接口即可,其实就是里面的 partition 方法和 close 方法,对于 partition 方法签名如下:

int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

其中 topic,key,keyBytes,value,valueBytes 是消息数据本身,cluster 是集群信息,比如 Kafka 集群的 Broker 数量等信息,Kafka 将这些信息给到我们,我们可以充分利用这些信息实现自定义的分区算法。

实现好分区算法之后我们在创建生产者示例的时候指定 partitioner.class 参数为我们自己实现的类,类的写法是以 Full Qualified Name 格式传输,也就是全限定的名称字符串,这样 Kafka 生产者客户端在发送消息时就会调用我们实现的分区类将数据投递到计算后的分区上去。

不过我们绝大多数情况下都不需要自己实现分区策略,而是采用 Kafka 提供的几种标准的分区策略,这些策略都非常经典并且可以满足大多数场景的使用情况,我们下面分别来看一下。

2.1 轮询(Round-robin)策略

轮询策略是 Kafka 标准分区策略中最常用的一种,也就是顺序分配。假如一个 Topic 下面有 3 个分区,那么第 1 条消息会发送到分区 0,第 2 条消息会发送到分区 1,第 3 条消息会发送到分区 2,第 4 条消息又会发送到分区 0,就这样循环往复,这就是轮询策略。

轮询策略也是 Java 以及大部分语言中生产者 API 默认提供的分区策略,这种策略可以让数据分布的非常均匀,所以有非常优秀的负载均衡表现,我们认为它是通常情况下最合理的分配策略,而且大规模的数据写入情况下性能表现也非常好,没有特殊需要的话我们直接选择这种策略即可。

2.2 随机(Randomness)策略

随机策略非常简单,也就是对于每条消息随机投放到其中一个分区上,这个策略的源码实现非常简单:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

我们只需要根据 Topic 所得到分区的数量,然后随机生成 [0, partitions.size()) 范围内的整数作为要写入到的分区编号。

随机策略也可以将数据比较好的分散到每个分区上,但是从实际表现上看,最平均的仍然是轮询策略,随机策略只能随着数据量的增长使得每个分区上的数据量趋于平均,所以随机策略基本上很少使用了。

2.3 按 key 分区(散列)策略

Kafka 允许为每条消息定义一个 key,这个 key 可以有明确的业务含义,比如用户 ID、身份信息标识等,也可以用来表示消息的元数据。

在 Kafka 早期的版本消息默认不支持时间戳,很多公司在使用的时候会将时间戳封装到消息的 key 中,这样每条消息都可以拿到对应的时间戳元数据,当然这不是目前最常见的用法,目前大家主流的用法是在业务场景中保证同一个 key 的数据被分配到相同的分区中,从而保证消息的有序性以及可压缩性,以此来满足业务需要。

当然这个分区方法也不复杂,实现的代码如下:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

也就是将消息 key 的 hashCode 对分区数量取余,得到消息对应的分区编号,所以消息的 key 相同,那么所写入的分区也是相同的,并且一个分区下的消息必然是严格有序的。

以上 3 种分区策略是 Kafka 中最常用的分区策略,不过目前随机分区策略用的比较少,通常如果消息不指定 key 那么默认采用轮询策略,如果指定 key 那么默认就是散列策略,最常用的其实就这两种。

如果我们在业务中要求数据存在全局的顺序性,那么最简单的方法是采用单个分区,但是这样会失去 Kafka 集群的高吞吐以及负载均衡的优势,这种情况下我们就可以对消息设计合适的 key,区分哪些消息是需要保证顺序的,而哪些消息之前没有关系,可以并行。设计好之后我们就可以放心地创建多个分区,在生产消息时为消息设置好合适的 key,这样既可以保证同一个分区下的数据有序,又可以实现分区间的并行,可以将整体性能提升好多倍。

在实际的业务上,很多大厂其实还存在一种地理分区的策略,对于大规模的 Kafka 集群,可以将所有的分区跨城市分布,这样可以实现向不同地区推送不同的消息。

比如不同地区的用户兴趣爱好可能存在不同,推荐系统就可以集中进行算法的推理并根据地区推送不同的结果,然后每个地区运行消费者就近消费当地分区的数据,这样就可以实现个性化的推荐效果。再或者要实现给不同地区的用户发放当地的优惠券,每个地区的终端只能消费到当前地区的优惠券消息,并推送给当地的用户,实现跨地区的优惠活动等。

3.消息压缩

上面主要谈了分片对于大规模消息带来的负载均衡和高吞吐的能力,那么这里为什么要提到压缩?压缩其实是一种时间换空间的思想,就是用 CPU 时间去换取磁盘存储和网络传输的空间,因为现代的压缩算法性能都比较强劲,可以用较小的 CPU 开销来节约更多的存储成本和更少的网络传输成本,基本上对客户端的性能没有什么影响,在大规模的数据分片上能够显著降低成本。

比如在 i7-2600k at 4.4GHz 这样的 CPU 上面,常见的一些压缩算法性能比较如下:

C Size ratio% C MB/s D MB/s Name
52509931 52.5 290.96 347.16 brotli 1
52928477 52.9 69.17 276.75 zlib 1
52983490 52.9 393.67 984.00 zstd 1
57606731 57.6 386.90 3948.64 lzturbo 11
61455711 61.4 800.71 4003.54 lzturbo 10
61938605 61.9 730.46 3330.40 lz4 1
100098564 100.0 8647.84 8408.10 memcpy

基线是以 memcpy 系统调用为 100% 计算,其他更完整的评测可以参考下面的链接:

  1. https://morotti.github.io/
  2. https://github.com/powturbo/TurboBench

Kafka 主要有两大类消息格式,简单来说就是 V1 和 V2 两大版本,V2 版本是从 Kafka 0.11.0.0 开始正式引入的。

首先要明确 Kafka 的消息分为两个层次,分别是消息集合和消息项,之所以这么分是为了减小网络传输、存储等调用的开销,将一定数量的消息封装到一起进行操作,这也是计算机中最常用的摊销思想。

V2 相比 V1 的主要优化是在消息集合元数据上的提升,在 V1 版本中每条消息都需要 CRC 校验,但是在 Broker 端会为消息时间戳进行更新,所以需要重新计算 CRC,或者在 Broker 端执行其他转换时,也会带来 CRC 值的变化。因为有这些情况,对于每条消息执行 CRC 校验会带来比较大的开销,所以在 V2 版本就在消息集合的级别进行 CRC,既能降低 CPU 占用也能减少消息总大小。另外在 V1 版本上是将每条消息压缩后在封装成消息集合,而 V2 版本直接对整个消息集合进行压缩,所以整体的压缩效率和压缩比都会更高。

综合来说 V2 版本相比 V1 在 CPU 效率和存储成本上都有很大提升,当启用压缩时,提升效果也更加明显。

消息压缩会发生在两个地方,分别是生产者端和 Broker 端,最常用的是在生产者端进行压缩,这样可以降低 Broker 的 CPU 占用,而且压缩是分散到多个生产者并行执行的,在 Java Producer API 中可以通过设置 compression.type 参数来指定具体的压缩算法:

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 // 生产者使用 Gzip 压缩
 props.put("compression.type", "gzip");
 
 Producer<String, String> producer = new KafkaProducer<>(props);

上面的代码中 props.put("compression.type", "gzip") 表示生产者使用 Gzip 压缩算法对消息进行压缩,压缩后传输到 Broker 时就可以节省传输的带宽。

另外在 Broker 端也可能会发生压缩,因为在 Broker 端也存在配置参数 compression.type,不过这个参数在默认的含义是接收到消息后会原封不动的存储并不会对其进行任何修改,我们来看看官网上的说明:

image

这个参数支持 gzip,snappy,lz4,zstd 等几种压缩算法,也可以指定 uncompressed 表示不进行压缩,不过默认值是 producer 表示会继承生产者的压缩算法。

对于这几类压缩算法而言,吞吐量方面的关系是:LZ4 > Snappy > zstd > Gzip,压缩比方面的关系是:zstd > LZ4 > Gzip > Snappy,要根据吞吐量和存储空间来综合选择最合适的算法。

假如这个值被修改并且配置了和生产者不同的压缩算法,例如我们上面生产者配置了 Gzip 算法,但是 Broker 端配置了 Snappy 算法,这种情况在 Broker 端会使用 Gzip 算法对消息解压缩后再采用 Snappy 算法对消息进行压缩,这样会导致 Broker 端 CPU 的飙升,会拖慢 Kafka 的处理性能,所以 Broker 端的压缩参数建议保持默认,不要做任何修改。

另外还有一种情况会导致消息在 Broker 端发生转换,就是 V1 和 V2 版本的转换,这种情况主要是为了兼容老版本的消费者所进行的转换。当有消费者消费数据时,Kafka 发现消费者使用的版本比较老(V1),而 Broker 端保存的消息版本比较新(V2),为了兼容旧版本的客户端,Broker 会将 V2 版本的消息转换为 V1,当然这个过程也涉及到消息的解压缩和重新压缩,也会消耗很多的 CPU 资源,同时也无法利用零拷贝(Zero-Copy)这种特性。

所谓零拷贝,简单来说就是当数据在磁盘和网络进行传输时,可以绕过昂贵的内核态到用户态的数据拷贝,从而不需要经过程序的地址空间就可以完成数据的快速传输,假如出现了消息转换或者重新解压缩,这些操作必须由程序代码在用户态完成,因此性能就会大打折扣。

image

这种就是通常情况下 readwrite 系统调用的过程,对于消息消费来说首先是 Kafka 发起 read 请求,数据先从磁盘通过 DMA 控制器拷贝到内核缓冲区,然后再通过 CPU 将数据从内核缓冲区拷贝到程序的缓冲区,这其中涉及到两次空间切换和一次 CPU 拷贝,然后 Kafka 对消息转换完毕后又需要将数据发送给消费者,需要调用 write 将数据先通过 CPU 将数据从用户缓冲区拷贝到内核的 Socket 缓冲区,然后再通过 DMA 控制器将数据从内核拷贝到网卡完成发送,这其中同样涉及到两次空间切换和一次 CPU 拷贝。

综合来说一共发生了 2 次系统调用带来了 4 次空间的切换和 2 次 CPU 拷贝,这个开销还是比较大的。

所以 Kafka 在这个过程中采用零拷贝的特性来进行优化,当然零拷贝有多种实现方式,例如:mmap 和 sendfile,其中 mmap 仍然涉及到 2 次系统调用和 1 次 CPU 拷贝,很多数据库都是采用这种方式实现文件的读写,sendfile 在 Linux 2.1 版本引入的,直接调用一次就可以完成 read + write 两次调用的效果,比 mmap 直接降低了一次系统调用,到 Linux 2.6 内核对 sendfile 进一步优化,直接通过 DMA 控制器实现内核缓冲区的共享,直接省去了 CPU 拷贝过程,真正实现了 “零”拷贝,其实在 Linux 2.6 内核还引入了 splice 系统调用并且不需要硬件支持,直接实现两个文件之间的零拷贝。对于 Kafka 来说是采用的是 Java NIO 中的 transferTo ,底层的调用就是 sendfile ,其定义如下:

// 来源:https://man7.org/linux/man-pages/man2/sendfile.2.html
#include <sys/sendfile.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *_Nullable offset,
                 size_t count);

其中 out_fd 在 Linux 2.6.33 版本之前必须是 Socket,之后可以是任何文件,但是 in_fd 只支持文件,不能是 Socket,但是 splice 不会限制 out_fd 的类型。其实现示意如下:

image

可以看到发起 sendfile 调用后数据先从磁盘通过 DMA 拷贝到内核缓冲区然后 DMA 控制器根据传入的偏移直接从原有的内核缓冲区直接拷贝到网卡发送出去,最后返回拷贝的字节数,整个过程比较高效但是不允许对数据进行修改。

如果 Kafka 消费者和生产者保持在平衡状态,即生产的消息立刻被消费而没有积压的情况下,这个时候数据直接在操作系统的 cache 中,消费的时候数据直接从内存读取,所以实时消费可以达到极为出色的性能。

总结一下,如果想利用零拷贝和 V2 版本消息的优势,一定要使用较新版本的 Kafka (推荐 2.x)并且消费者客户端的版本一定要和 Kafka Broker 版本一致,就可以充分解锁 Kafka 的高性能特性。

最后再来看一下解压缩,通常解压缩是发生在 Consumer 端,就像刚才说的 Producer 压缩后将消息写入 Broker,Broker 原样保存,当 Consumer 消费时会直接将原始数据发送到 Consumer 端,由 Consumer 来解压缩并使用,这样设计和生产者的原理是一致的,一方面可以降低传输的带宽占用,还可以将 CPU 操作放到 Consumer 端执行,降低 Broker 端的 CPU 占用。

所以目前最佳的压缩实现方案就是:Producer 端压缩、Broker 端原样保存、Consumer 端解压缩,并且为了避免重新解压缩的开销,在 Broker 端不要设置压缩算法种类,由 Producer 设置压缩算法,同时 Consumer 端也不需要指定压缩算法,因为 Broker 给到 Consumer 的数据中就封装了压缩算法的类型,Consumer 只需要按照消息中的类型去解压缩即可。

当然如果 Producer 和 Consumer 所在的服务器已经比较繁忙,但是带宽比较充足,那么这种情况下不建议开启压缩,直接传输反而是性价比更高的方案,因此是否选择压缩以及选择何种压缩算法都要根据实际的机器环境和业务需要去权衡。

4.副本(Replication)机制基础

介绍完分片以及相关的压缩技术之后,我们来看一下副本机制,其实就是一种实时备份的实现方案,可以在多个节点上保存相同的数据拷贝,通用的副本概念主要的优势如下:

  1. 提供数据冗余,即使系统部分节点失效,系统依然能够正常运转,数据不会丢失或不可用,这可以增加整体的可用性和数据持久性。
  2. 提供高伸缩性或弹性,支持横向扩展副本节点,可以直接通过增加节点的方式来提升读性能,客户端可以从多个副本上并行读,这可以增加整体的吞吐量。
  3. 改善数据局部性,可以将副本分配到和用户地理位置相近的地方,从而降低用户访问延迟。

但是我们在 Kafka 中最常见的副本优势就是第 1 条,其余的优势 Kafka 本身并没有提供,所以我们重点来明确一下在 Kafka 中副本的特性。

在 Kafka 中副本的复制对象是分区,也就是说每个分区对应若干个副本,副本只能追加写消息,一个分区下的所有副本保存相同的消息序列,这些副本可以分散到多个 Broker 上,从而在一部分 Broker 失效时仍然可以保证分区的可用性。

在实际的生产环境中,每个 Broker 其实是同时保存了不同 Topic 下不同分区的副本,所以单个 Broker 可以保存上百甚至上千个副本,例如下面的示例:

image

这里是在 3 个 Broker 组成的 Kafka 集群上创建了一个 3 分区 3 副本的 Topic,颜色相同的表示一个分区的多个副本,可以看到每个分区的副本都分散到了 3 个 Broker 上面,而每个 Broker 上面都存在不同分区的不同副本,这样就实现了数据冗余。

5.副本角色机制

上面说到既然分区下可以创建多个副本,并且这些副本的消息要保持同步,那么该如何确保数据的一致性就成为非常重要的问题。

针对这个问题,Kafka 采用的是基于领导者的副本机制,也叫 Leader-based,其工作机制的实现原理如下:

image

在 Kafka 中同一个分区的副本也是分为两类,分别是领导者副本(Leader Replica)和追随者副本(Follower Replica),每个分区的副本在创建后都会通过 ZooKeeper 进行选举,投票选举出 Leader,然后其余的副本自动成为追随者副本。

并且 Kafka 中的副本机制比较严格,Follower Replica 是不对外提供服务的,也就是说所有的 Follower Replica 都不能响应消费者和生产者的读写请求,所有的请求必须由 Leader Replica 来处理,Follower Replica 唯一的任务就是从其对应的 Leader Replica 上异步拉取消息并写入到自己本地的日志段中,以此来实现和 Leader Replica 的同步。

如果当 Leader Replica 挂掉了,Kafka 依托于 ZooKeeper 提供的监控能力可以感知到变化,并且重新开启 Leader Replica 的选举,从剩余的 Follower Replica 中选举一个新的 Leader Replica,如果之后原来的 Leader Replica 重启后,会自动转为 Follower Replica 加入进来并开启数据复制。

所以根据上面的叙述,由于 Follower Replica 不对外提供服务,所以 Kafka 其实也就无法提供多副本横向扩展以及局部性低延迟的能力,只能实现数据冗余以及高可用性,不过 Kafka 设计既然这样设计也有其独特的优势:

1. 消息可见性

当使用生产者向 Kafka 写入消息成功后,马上使用消费者读取可以立即读取到刚生产的消息,这也叫做 “Read-your-writes”。比如我们发朋友圈,发完之后肯定希望立即可以看到,而不是过一会才能看到,如果允许 Follower Replica 对外提供服务的话,由于副本同步是异步的,因此有可能出现 Follower 还没有同步消息消费者就来消费的情况,这样最新写入的消息是有可能消费不到的。

2. 方便实现单调读

单调读简单来说就是对于消费者来说,多次重复消费会看到一条新的消息一会在一会又不在了,就好像时光倒流一样,这也是分布式系统里面经典的问题之一,在分布式数据库中假如用户从其中一个库查询到了某条消息,又刷新了一次页面,这个时候可能会从另外一个库查询消息,如果另外一个库还没有同步,那么后读的数据反而是少的,就出现了这种情况。单调读其实是介于强一致性和最终一致性之间的一种一致性模型。

对于 Kafka 来说,如果有两个 Follower 分别是 F1 和 F2,这两个副本都是异步拉取数据,如果 F1 已经拉取更新而 F2 还没有更新,如果一个消费者读取完 F1 之后又读取 F2,那么就会出现消费到的消息又不见了这种情况,要实现单调读就复杂得多。而 Kafka 选择了所有的请求都是由 Leader 来处理,那么 Kafka 就自然而然地实现了单调读的能力。

6.In-sync Replicas

上面主要说了 Kafka Follower Replica 是不对外提供服务而只会异步拉取 Leader Replica 更新的消息,既然是异步拉取,那么就会存在和 Leader 不同步的风险,所以我们必须明确定义同步的含义,以及 Follower 到底在什么情况下是和 Leader 同步的。

在 Kafka 中是引入了 In-sync Replicas 机制,简称 ISR,表示与 Leader 保持同步的副本集合。也可以说 ISR 中的副本都是与 Leader 保持同步的副本,需要特别注意的是 ISR 中是包含 Leader 的,相反地,不在 ISR 中的 Follower 就被认为是和 Leader 不同步的。平常我们使用 Kafka 的工具 kafka-topics.sh 查看 Topic 详情时就可以看到 ISR 的列表:

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --topic testMessage --describe
Topic: testMessage      PartitionCount: 4       ReplicationFactor: 2    Configs: cleanup.policy=delete
        Topic: testMessage      Partition: 0    Leader: 1004    Replicas: 1004,1003     Isr: 1003,1004
        Topic: testMessage      Partition: 1    Leader: 1002    Replicas: 1002,1004     Isr: 1002,1004
        Topic: testMessage      Partition: 2    Leader: 1001    Replicas: 1001,1002     Isr: 1001,1002
        Topic: testMessage      Partition: 3    Leader: 1003    Replicas: 1003,1001     Isr: 1001,1003

例如上面的这个示例可以看到 Isr 和 Replicas 完全一致,说明集群中所有的副本都是保持同步的,状态都比较良好。

那么下面我们看怎么样才算是同步,可以看下下图:

image

我们可以看到 Leader 存在两个 Follower,Leader 当前写入了 10 条消息,Follower 1 同步了 7 条消息,Follower 2 同步了 3 条消息,那么哪个 Follower 和 Leader 是同步的?或者都没有同步?

首先直观上看,Follower 1 的消息比较接近 Leader,Follower 2 的消息最少,我们可能感觉 Follower 1 是最有可能同步的,而 Follower 2 是最有可能不同步的。

事实上,答案是这两个 Follower 每个都有可能和 Leader 同步,也都有可能和 Leader 不同步,也就是说图中提供的信息即相差的消息条数不能作为我们判断副本是否同步的的依据,对于 Kafka 来说是有另外的判断依据。

这个依据就是在 Broker 中配置的参数 replica.lag.time.max.ms 值,这个值在 Kafka 2.5 之前默认是 10s,但是从 2.5 开始默认为 30s,原因是为了增加各种网络环境中的弹性,所以在大多数情况下我们设置这个值为 30s 都是运行良好的。那么这个值的含义就是表示 Follower 能够落后 Leader 的最长时间间隔,也就是说只要 Follower 连续拉取 Leader 的时间间隔不超过这个配置值,那么 Kafka 就认为该副本是同步的,反之,如果超过这个值,那么 Kafka 就认为该副本已经落后于 Leader 并会从 ISR 集合中移除,就会出现我们在网络繁忙的状态下查看 Topic 状态中 Isr 数量少于 Replicas 数量的情况。

虽然有些副本落后 Kafka 消息数量比较多,但是只要一直在拉取,那么最终是可以追上来的,这也是 Kafka 为什么不以条数来判断是否同步的原因之一,其实最早 Kafka 是有这个最大落后条数参数的,参数名叫 replica.lag.max.messages ,这个参数从 Kafka 0.9.0.0 版本就移除了,主要应该也是因为在运行中发现这个参数并不是太合理所以进行了删除。

虽然目前落后比较多的副本会被移出 ISR 集合,但是如果副本后面逐渐追上了 Leader 的进度,那么 Kafka 会将其继续加入 ISR 集合,总之 ISR 这个集合是随着副本同步的情况动态调整的。

7.Unclean Leader Election

通常来说如果 Leader 在运行中挂掉,那么 Kafka 会从 ISR 集合剩余的 Follower 中重新选举出一个作为 Leader,因为 ISR 集合中所有的副本都认为是同步的。但是在某些极端的情况下,大多数副本都不在 ISR 中,如果此时 Leader 挂掉那么 ISR 可能就为空了,这种情况将无法选举出新的 Leader 提供服务,这就出现了我们线上最棘手的问题之一,Kafka 明明运行良好,但是消费者确报错没有 Leader,因此也无法消费到消息,一直处于阻塞状态。

因为 Kafka 认为所有不在 ISR 中的副本都是不同步的,因为通常这些副本都落后于 Leader 超过前面提到的时间阈值,消息更新的差距也比较大,假如 Kafka 选择这些不同步的副本作为新的 Leader,就有可能出现比较多的数据丢失,所以干脆就不对外提供服务了。

但是实际情况中,我们可能会考虑为了高可用性允许一定的数据丢失,那么这种情况下就需要用到 Unclean Leader Election 过程了,这个过程叫做 Unclean 领导者选举,可以通过在 Broker 端开启参数 unclean.leader.election.enable 允许开启 Unclean 领导者选举,默认这个参数是关闭的。

开启参数后,Kafka 会从 ISR 集合之外的其他不同步副本中选举一个作为 Leader,虽然会导致一段时间的数据丢失,但是 Leader 是可以保持一直存在的,不至于直接对外停止服务,所以这个参数的主要作用就是在可用性和消息容错性上进行权衡,牺牲消息容错性或一致性来换取高可用性。

但是绝大多数情况下我们建议保持这个参数默认,即优先保证数据的完整性,当出现问题时想办法分析副本为什么没有同步,是网络拥挤还是机器繁忙导致了这个问题?然后有针对性地解决这些问题,或者想办法恢复原有的 Leader 来重新提供服务。

以上就是 Kafka 分区和副本的设计原理和基本的实现机制,感谢您的耐心阅读!