侧边栏壁纸
博主头像
Leokoの小破站博主等级

行动起来,活在当下

  • 累计撰写 18 篇文章
  • 累计创建 10 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

kafka

Leoko
2024-12-19 / 0 评论 / 0 点赞 / 16 阅读 / 29072 字

安装

docker 安装 kafka

前提是已经安装好 zookeeper

docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://服务器ip地址:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
docker.rainbond.cc/bitnami/kafka
​

架构

image-20240903101713661

为了数据可靠性,可以将数据进行备份,但是 Kafka 没有备份的概念,Kafka 称之为 副本,多个副本同时只有一个副本(leader)提供数据的读写操作,其他副本(follower)只是用于备份。

ControllerKafka Broker 集群中的管理节点(master)。

常用命令

主题相关

# 在 localhost:9092 这个kafka 服务创建名字为 test 的 topic 
kafka-topics.sh --bootstrap-server localhost:9092 --topic test --create
# 详细信息
kafka-topics.sh --bootstrap-server localhost:9092 --topic test --describe
# 删除
kafka-topics.sh --bootstrap-server localhost:9092 --topic test --delete
# 获取 topic 列表
kafka-topicc.sh --bootstrap-server localhost:9092 --list

生产者相关

# 往 test 主题发送消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

消费者相关

# 订阅 test 主题
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

流程分析

Broker 启动流程

  • 注册 Broker 节点(即 Zookeeper/brokers/ids 节点);

  • 监听 /controller 节点;

    • 当前集群没有 Controller 时,则注册 /controller 节点,选举为 Controller,并监听 /brokers/ids 节点来感知集群内 Broker 节点的变化,当 Broker 节点增动态变化时,需要连接所有的 Broker 来传输集群数据。

    • 当前集群有 Controller 时,只监听,当前 Controller 宕机后,会竞争选举。选举成功后,也需要连接所有的 Broker 来传输更新集群数据。

主题创建流程

image-20241215220458867

  • 使用 Admin 创建主题时,先与配置的 broker 进行连接通信,获取集群元数据,从元数据中找到 Controller 并真正发送创建主题的请求;

  • Controller接收到请求后,转发给 Zookeeper处理,Zookeeper 会创建对应主题的节点,且 Controller 会监听改节点;

  • 一旦主题发生变化,Controller 中的分区状态机和副本状态机会执行与分区、副本相关的流程,并通知集群中其他 Broker 进行更新操作。

数据应答 ACKS(生产者配置)

  • ACKS = 0:优先考虑发送效率,生产者发送消息后,不等待任何确认。

  • ACKS = -1 / ACKS = all:生产者等待所有同步副本ISRIn-Sync Replicas)都确认消息已经成功写入,才认为消息写入成功。

  • ACKS = 1:生产者等待分区的领导者Leader)确认消息已成功写入。

幂等

kafka 生产者重试机制可能会导致消息的重复以及乱序(ACKS =1 或 -1),kafka 提供幂等机制来避免这种情况,幂等操作的要求:

  • ACKS = -1

  • 启用生产者的幂等性(enable.idempotence=true

  • 开启重试机制

  • 在途请求缓冲区(max.in.flight.requests.per.connection)的数量不能大于 5(默认值为 5)

// 生产者开启幂等
map.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// acks 需要设置为 -1
map.put(ProducerConfig.ACKS_CONFIG, "-1");
// 重试次数设置
map.put(ProducerConfig.RETRIES_CONFIG, 3);

原理kafka 在启用幂等性时,会为每个生产者实例分配一个唯一的 生产者 ID消息序列号(分区唯一)。每个消息都有一个与其关联的序列号(producerIdsequenceNumber)。生产者在发送每个消息时,都会发送该消息的序列号和当前生产者的 IDkafka 会使用这些信息来判断该消息是否已经成功写入。

事务

在开启幂等的情况下,消息会有 producerId,但是生产者重启后,producerId 会重新生成,这样的话还是可能会导致消息的重复。而事务机制就是为了避免这种情况的,当开启事务时,kafka 会为每个事务生成一个 事务 IDtransactional.id)。该 ID 保证了生产者即使重启也能保持一致的事务上下文。

// 开启幂等的情况下设置事务 ID
map.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "自定义事务ID");
​
KafkaProducer<String, String> producer = new KafkaProducer<>(map);
// 初始化事务
producer.initTransaction();
​
try {
    producer.send(xxx);
    // 提交事务
    producer.commitTransaction();
} catch (Exception e) {
    // 终止事务
    producer.abortTransaction();
} finally {
    producer.close();
}

流程分析

  • 生产者发送 FIND_COORDINATOR 请求给 Broker 查找事务管理器所在的节点;

  • 根据自定义的事务 IDhashCode 和 50 取模,取模结果为 /topics/__transaction_state 节点下的分区,获取该分区的 Leader 副本所在节点(该节点的事务管理器来事务的处理;),并返回给生产者;

  • 生产者发送 INIT_PRODUCER_ID 请求初始化 producerId

  • 生产者发送 ADD_PARTITIONS_TO_TXN 请求将数据的分区信息发送给事务管理器;

  • 生产数据(commitTransaction);

  • Leader 副本所在的 Broker 通知事务管理器保存成功的分区信息,并返回 ACK 给生产者;

  • 生产者结束事务;

  • 事务管理器修改 /topics/__transaction_state 节点的事务状态为 PrepareCommit

  • 事务管理器通知 Broker 发送消息标记,表示可以让消费者消费数据了;

  • 事务管理器修改 /topics/__transaction_state 节点的事务状态为 CompleteCommit

数据存储

流程分析

  1. ACKS 校验:生产者将数据发送给 Broker 时,会告知 Broker 当前生产者的数据生产场景,从而要求 Kafka对数据请求进行应答响应确认数据的接收情况。根据 ACKS 的值,做不同的操作。

    • ACKS = 0:将数据发送到网络输出流中,此时 kafka 就会进行响应,无法保证 Broker 节点能够接收到消息。

    • ACKS = 1:生产者将数据发送到 Broker 中,并保存到当前节点的数据日志文件中,kafka 就会进行确认收到数据的响应。

    • ACKS = -1:此时不仅仅保证 Leader 副本保存数据成功,还需要同步到 Follower 副本(ISR),一旦这些副本数据同步完毕后,kafka 再对生产者进行收到数据的确认。

  2. 内部主题校验:生产者向 Broker 发送数据时,是必须指定主题的,但是这个主题的名称不能是 kafka 的内部主题名称。kafka 为了管理的需要,创建了2个内部主题,一个是用于事务处理的 __transaction_state 内部主题,还有一个是用于处理消费者偏移量的 __consumer_offsets 内部主题。生产者是无法对这两个主题生产数据的,所以在存储数据之前,需要对主题名称进行校验有效性校验。

  3. 日志文件滚动判断:据存储到文件中,如果数据文件太大,对于查询性能是会有很大影响的,所以副本数据文件并不是一个完整的大的数据文件,而是根据某些条件分成很多的小文件,每个小文件我们称之为文件段。其中的一个条件就是文件大小,参数名为:log.segment.bytes。默认值为1G。如果当前日志段剩余容量可能无法容纳新消息集合,因此有必要创建一个新的日志段来保存待写入的所有消息。此时日志文件就需要滚动生产新的。

    除了文件大小外,还有时间间隔,如果文件段第一批数据有时间戳,那么当前批次数据的时间戳和第一批数据的时间戳间隔大于滚动阈值,那么日志文件也会滚动生产新的。如果文件段第一批数据没有时间戳,那么就用当前时间戳和文件创建时间戳进行比对,如果大于滚动阈值,那么日志文件也会滚动生产新的。这个阈值参数名为:log.roll.hours,默认为7天。如果时间到达,但是文件不满 1G,依然会滚动生产新的数据文件。

    如果索引文件或时间索引文件满了,或者索引文件无法存放当前索引数据了,那么日志文件也会滚动生产新的。

  4. 数据校验:因为 kafka 允许生产者进行数据重试操作,所以因为一些特殊的情况,就会导致数据请求被 kafka 重复获取导致数据重复或者乱序,需要在 Broker 端对数据进行校验。

  5. 数据存储:将数据写入日志文件,写入完成后,更新当前日志文件的数据偏移量。

存储文件

数据日志文件(.log)

kafka 数据是使用 log 文件进行保存的文件名长度为20位长度的数字字符串,数字含义为当前日志文件的第一批数据的基础偏移量,也就是文件中保存的第一条数据偏移量。字符串数字位数不够的,前面补0。数据主要分为两部分:批次头 + 数据体。批次头总的字节数为:61 byte,数据体的字节数根据消息体计算。

数据索引文件(.index)

kafka 的基础设置中,数据日志文件到达1G才会滚动生产新的文件。那么从1G文件中想要快速获取我们想要的数据,效率还是比较低的。为了定位方便 kafka 在提供日志文件保存数据的同时,还提供了用于数据定位的索引文件,索引文件中保存的就是逻辑偏移量和数据物理存储位置(偏移量)的对应关系。

但是要注意,kafka 的索引文件是不连续的,也就是不是对每一条数据都保存对应的索引,那是因为如果每条数据如果都把偏移量的定位保存下来,数据量也不小。且 Kafka 的底层实现采用的是虚拟内存映射技术 mmap,将内存和文件进行双向映射,操作内存数据就等同于操作文件,所以效率是非常高的,但是因为是基于内存的操作,所以并不稳定,容易丢数据,因此 kafka 的索引文件中的索引信息是不连续的,而且为了效率,kafka 默认情况下,4kb 的日志数据才会记录一次索引,但是这个是可以进行配置修改的,参数为 log.index.interval.bytes,默认值为4096。所以我们有的时候会将 kafka 的不连续索引数据称之为稀疏索引。

数据时间索引文件(.timeindex)

某些场景中,我们不想根据顺序(偏移量)获取 kafka的数据,而是想根据时间来获取的数据。这个时候,没有对应的偏移量来定位数据,那么查找的效率就非常低了,因此 kafka 还提供了时间索引文件。

image-20241218152500550

数据刷写

Linux 系统中,kafka 把数据写入文件系统之后,其实数据在操作系统的 PageCache(页缓冲)里面,并没有刷到磁盘上。如果操作系统挂了,数据就丢失了。一方面,应用程序可以调用 fsync 这个系统调用来强制刷盘,另一方面,操作系统有后台线程,定时刷盘。频繁调用 fsync 会影响性能,需要在性能和可靠性之间进行权衡。kafka 提供了参数进行数据的刷写:

  • log.flush.interval.messages:达到消息数量时,会将数据 flush 到日志文件中;

  • log.flush.interval.ms:间隔多少时间(ms),执行一次强制的 flush 操作;

  • flush.scheduler.interval.ms:所有日志刷新到磁盘的频率。

官方不建议通过上述的三个参数来强制写盘,数据的可靠性应该通过 replica 来保证,而强制 flush 数据到磁盘会对整体性能产生影响。

数据一致性

kafka 的分区副本有两种:LeaderFollower,但是只有 Leader 负责读写,Follower 只负责从 Leader 同步数据。当 Leader 所在的 Broker 宕机后,就需要从 Follower 副本中(ISR 列表)选举出一个成为 Leader(默认按 ISR 列表顺序选举)。这样就提升了分区可用性,但是相对的,在提升了分区可用性的同时,也就牺牲了数据的一致性。

来看这样的一个场景:一个分区有3个副本,一个 Leader 和两个 FollowerLeader 副本作为数据的读写副本,所以生产者的数据都会发送给 Leader 副本,而两个 Follower 副本会周期性地同步 Leader 副本的数据,但是因为网络,资源等因素的制约,同步数据的过程是有一定延迟的,所以3个副本之间的数据可能是不同的。

image-20241218154136017

此时,假设 Leader 副本因为意外原因宕掉了,此时会选择2个 Follower 副本中的一个作为 Leader 对外提供数据服务(假设是上面那个 Follower)。此时我们就会发现,对于消费者而言,之前 Leader 副本能访问的数据是 d,但是重新选择 Leader 副本后,能访问的数据就变成了 c,这样消费者就会认为数据丢失了,也就是所谓的数据不一致了。

为了提升数据的一致性,kafka 引入了高水位(HW :High Watermark)机制,kafka 在不同的副本之间维护了一个水位线的机制(其实也是一个偏移量的概念),消费者只能读取到水位线以下的的数据,这个高水位其实就是数据最少的那个副本的最后数据位置。以上面那张图来说,高水位就是 c 所在的偏移量,所以虽然 Leader 副本中已经有 a、b、c、d 4条数据,但是由于高水位线的限制,所以也只能消费到 a、b 这两条数据。这样即使 Leader 挂掉了,但是对于消费者来讲,消费到的数据其实还是一样的,因为它能看到的数据是一样的,也就是说,消费者不会认为数据不一致。

还有一个 LEO需要了解,LEOLog End Offset)日志末端位移,表示下一条待写入消息的 offset每个分区副本都会记录自己的 LEO

高水位更新机制

HW 高水位线会随着 Follower 的数据同步操作,而不断上涨,也就是说,Follower 同步的数据越多,那么水位线也就越高,那么消费者能访问的数据也就越多。接下来,我们就看一看,Follower 在同步数据时 HW 的变化。

  1. 首先,初始状态下,LeaderFollower 都没有数据,所以和偏移量相关的值都是初始值0,而由于 Leader 需要管理 Follower,所以也包含着 Follower 的相关偏移量(LEO)数据。

image-20241218155731478

  1. 生产者向 Leader 发送两条数据,此时 Leader 收到数据并保存成功,故 LEO 值被更新为2。

image-20241218160011383

  1. 接下来,Follower 开始同步 Leader 的数据,同步数据时,会将自身的 LEO 值作为参数传递给 Leader 。此时,Leader 会将数据传递给 Follower,且同时 Leader 会根据所有副本的 LEO 值更新 HW

image-20241218160144565

  1. 由于两个 Follower 的数据拉取速率不一致,所以 Follower-1 抓取了 2 条数据,而 Follower-2 抓取了 1 条数据。Follower 收到数据后,会将数据写入文件,并更新自身的偏移量信息。Follower-1 更新 LEO 为 2,Follower-2 更新 LEO 为 1。此时,LeaderFollower 副本的 LEO 都不为 0,但各自的高水位依然是0,还没有被更新。它们需要在下一轮的拉取中被更新

image-20241218160326026

  1. Leader 收到了生产者的数据 c,更新自身的 LEO 为 3 。Follower 接着向 Leader 发送 Fetch 请求,同样会将最新的 LEO 作为参数传递给 LeaderLeaderFollower 中取最小的那个作为高水位值,即更新 HW 为 1。

image-20241218160643383

  1. 然后,Leader 会将数据发送给 Follower,同时也会将 HW 一起发送。

image-20241218160832091

  1. Follower 收到数据后,会将数据写入文件,并更新自身偏移量信息。

image-20241218161003405

  1. 因为 Follower 会不断重复 Fetch 数据的过程,所以前面的操作会不断地重复。最终,Follower 副本和 Leader 副本的数据和偏移量是保持一致的。

Leader Epoch

依托于高水位,kafka 既界定了消息的对外可见性,又实现了异步的副本同步机制,但是这个真的一点问题都没有吗?

从刚才的分析中,我们知道,Follower 副本的高水位更新需要一轮额外的拉取请求才能实现,那么就可能出现这样一种情况:假设现在一个分区只有 LeaderFollower 两个副本,且 LeaderFollower 都写入了两条消息,而且 Leader 副本的高水位也已经更新了,但 Follower 副本高水位还未更新(相当于还没执行第6、7步),此时,LeaderFollower 的情况:Leader {LEO = 2, HW = 2}Follower{LEO = 2, HW = 0}

倘若此时 Follower 所在的 Broker 宕机,当它重启回来后,执行日志截断操作,将 LEO 值调整为之前的高水位值,也就是 0。这就是说,位移值为0,1的消息会被磁盘删除,此时 Follower 副本的底层磁盘文件中没有保存消息。

当执行完截断操作后,Follower 开始从 Leader 拉取消息,执行正常的消息同步。如果此时, Leader 所在的Broker 宕机了,那么 kafka 就别无选择,只能让 Follower 成为新的 Leader,此时,当旧 Leader 回来后,需要执行相同的日志截断操作,其 LEO 值依然为 2。但只要它向新 Leader发起同步请求后就会更新其 HW 为0(计算min(Follower LEO, Leader HW)),数据也被截断。这样操作之后,位移值为 0 和 1 的两条消息就从这两个副本中被永远地抹掉了。这个就是数据丢失场景。

严格来说,这个场景发生的前提是 Broker 端参数 min.insync.replicas 设置为1。即一旦消息被写入到Leader 副本的磁盘,就会被认为是“已提交状态”,生产者不会因 Follower 没有同步成功而重新发送消息,这两条消息也就永远丢失了。

基于此,社区在0.11版本正式引入了 Leader Epoch 概念,引入 Leader Epoch 后,Follower 就不再参考HW,而是根据 Leader Epoch 信息来截断 Leader 中不存在的消息。

所谓 Leader Epoch,大致可以认为是 Leader版本。它由两部分数据组成:

  • Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。

  • 起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。

假设现在有两个 Leader Epoch <0, 0> 和 <1, 120>,那么,第一个Leader Epoch 表示版本号是0,这个版本的Leader 从位移0开始保存消息,一共保存了120条消息。之后,Leader 发生了变更,版本号增加到1,新版本的起始位移是120。

Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个checkpoint 文件中。当 Leader 副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。如果该 Leader 是首次写入消息,那么 Broker 会向缓存中增加一个 Leader Epoch 条目,否则就不做更新。这样,每次有 Leader 变更时,新的 Leader 副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移,以避免数据丢失和不一致的情况。

来看下引入 Leader Epoch 后上面例子的情况:

  • 开始 LeaderFollowerLeader Epoch 都为 [Epoch=0, Offset=0]

  • Follower 副本重启回来后,需要向 Leader 发送一个特殊的请求去获取 LeaderLEO值,为 2。

  • 当获知到Leader LEO=2后,Follower 发现该 LEO 值不比它自己的 LEO 值小,而且缓存中也没有保存任何起始位移值 > 2的 Epoch 条目,因此无需执行任何日志截断操作。

  • Leader 所在的 Broker 宕机后,Follower 成为新 Leader,旧 Leader 重启回来后,执行相同的判断逻辑,发现也不用执行日志截断,两条消息在两个副本中均得到保留。

  • 后面当生产者程序向新 Leader写入新消息时,它所在的 Broker 缓存中,会生成新的 Leader Epoch 条目:[Epoch=1, Offset=2]

消息消费

消费者调度分配

消费者想要拉取数据,首先必须要加入到一个组中,成为消费组中的一员,同样道理,如果消费者出现了问题,也应该从消费者组中剥离。而这种加入组和退出组的处理,都应该由专门的管理组件进行处理,这个组件在 kafka 中称之为消费者组调度器(协调)(Group Coordinator),是 Broker 上的一个组件,用于管理和调度消费者组的成员、状态、分区分配、偏移量等信息。每个 Broker 都有一个Group Coordinator 对象,负责管理多个消费者组,但每个消费者组只有一个 Group Coordinator 。分配流程大致分为两大步:加入组(JoinGroup)和组同步(SyncGroup)。

消费者组的每个消费者到底消费哪一个主题分区,这个分配策略其实是由消费者组中的 Leader 决定的,这个 LeaderGroup Coordinator 指定的,组中其他消费者我们称之为 Follower,称呼上有点类似与分区的 LeaderFollower

  • 消费者根据设定的 group.id 向当前负载最小的 Broker 节点发送请求查找消费调度器,查询算法为 group.id.hashCode % __consumer_offests分区数量

  • 找到消费调度器后,消费者向调度器所在 Broker 节点发出 JOIN_GROUP 请求,加入消费者组。

  • 调度器然后将订阅分区信息发给 LeaderLeader 根据消费者配置中分配策略设计分区分配方案,并将分配好的方案告知调度器。这里的告知是所有消费者(包括 Leader )向 Group Coordinator 发送SyncGroupRequest 请求。需要注意的是,只有 Leader 成员发送的请求中包含了订阅分区消费分配方案,在其他成员发送的请求中,这部分的内容为空。

  • Group Coordinator 接收到分配方案后,会通过向成员发送响应的方式,通知各个成员要消费哪些分区。

消费分配策略

  • 轮询分配。每个消费者组中的消费者都会含有一个自动生产的 UUID 作为 memberid,轮询策略中会将每个消费者按照 memberid 进行排序分配。

  • 范围分配。按照每个主题的分区数量计算出每个消费者应该分配的分区数量,然后分配,分配的原则就是一个主题的分区尽可能的平均分,如果不能平均分,那就按顺序向前补齐即可。

  • 粘性分配(StickyAssignor)。在第一次分配后,每个组成员都保留分配给自己的分区信息。如果有消费者加入或退出,那么在进行分区再分配时(一般情况下,消费者退出45s后,才会进行再分配,因为需要考虑可能又恢复的情况),尽可能保证消费者原有的分区不变,重新对加入或退出消费者的分区进行分配。

  • CooperativeStickyAssignor。前面的三种分配策略再进行重分配时使用的是 EAGER 协议,会让当前的所有消费者放弃当前分区,关闭连接,资源清理,重新加入组和等待分配策略。明显效率是比较低的,所以从kafka2.4 版本开始,在粘性分配策略的基础上,优化了重分配的过程,使用的是 COOPERATIVE 协议,COOPERATIVE 协议将一次全局重平衡,改成每次小规模重平衡,直至最终收敛平衡的过程。

例如:一个Topic(T0,三个分区),两个 consumer(consumer1、consumer2)均订阅Topic(T0)。如果consumer 订阅信息为:

consumer1

T0P0、T0P2

consumer2

T0P1

此时,新的 consumer3 加入消费者组,那么基于eager协议的分区重分配策略流程如下:

  • consumer1consumer2 正常发送心跳信息到 Group Coordinator

  • 随着 consumer3 加入,Group Coordinator 收到对应的 Join Group 请求,Group Coordinator 确认有新成员需要加入消费者组。

  • Group Coordinator 通知 consumer1consumer2,需要 Rebalance(再平衡)了。

  • consumer1consumer2 放弃当前各自持有的已有分区,重新发送 Join Group 请求到 Group Coordinator

  • Group Coordinator 依据指定的分区分配策略的处理逻辑,生成新的分区分配方案,然后通过 Sync Group 请求,将新的分区分配方案发送给 consumer1consumer2consumer3

  • 所有 consumer 按照新的分区分配,重新开始消费数据。

基于cooperative协议的分区分配策略的流程如下:

  • consumer1consumer2 正常发送心跳信息到 Group Coordinator

  • 随着 consumer3 加入,Group Coordinator 收到对应的 Join Group 请求,Group Coordinator 确认有新成员需要加入消费者组。

  • Group Coordinator 通知 consumer1consumer2,需要 Rebalance(再平衡)了。

  • consumer1consumer2 通过 Join Group 请求将已经持有的分区发送给 Group Coordinator。注意:并没有放弃已有分区。

  • Group Coordinator 取消 consumer1 对分区 p2 的消费,然后发送 Sync Group请求给 consumer1consumer2

  • consumer1consumer2 接收到分区分配方案,重新开始消费。至此,一次 Rebalance 完成。

  • 当前 p2 也没有被消费,再次触发下一轮 Rebalance,将 p2 分配给 consumer3 消费。

脑裂

在集群环境下,如果有 Leader 概念的话,一般都会有脑裂问题,kafkaController 也不例外。

kafka controller 相当于整个 kafka 集群的 master,负责 topic 的创建、删除、以及 partition 的状态机转换,Broker 的上线、下线等。前面已经分析过 Controller 是如何选举的,有一种特殊的情况,就是Controller 节点并没有宕掉,而是因为网络的抖动,不稳定,导致和 ZooKeeper 之间的会话超时,那么此时,整个 Kafka 集群就会认为之前的 Controller 已经下线(退出)从而选举出新的 Controller ,而之前的Controller 的网络又恢复了,以为自己还是 Controller 了,继续管理整个集群。那么现在集群内就有两个 Controller 了,这样破坏系统的一致性和可靠性。

为了解决这个问题,kafka 引入了 epoch (纪元)概念,每次选举 Controllerepoch+1,该信息在 zookeeper/controller_epoch 节点中维护。假设存在脑裂的情况,那么旧 Controller 给其他 Broker 发送请求时,会带上当前 Controllerepoch,当其他 Broker 接收到请求后,会对比最新 Controller epoch版本号与接收到的 epoch,如果接收到的版本号小,那么直接拒绝请求。当旧的 Controller 一旦发现集群中有新任 Controller 的时候,自身会进行卸任。

注意:不要和前面提到过的 leader_epoch 搞混了。

常见问题

消费者启动后消费不到消息

进入zookeeper客户端,将brokers节点下的topics节点下的__consumer_offsets删除就可。

deleteall /brokers/topics/__consumer_offsets

参考资料

0

评论区