安装
docker 安装 kafka
前提是已经安装好 zookeeper
。
docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://服务器ip地址:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
docker.rainbond.cc/bitnami/kafka
架构
为了数据可靠性,可以将数据进行备份,但是 Kafka
没有备份的概念,Kafka
称之为 副本,多个副本同时只有一个副本(leader
)提供数据的读写操作,其他副本(follower
)只是用于备份。
Controller
:Kafka Broker
集群中的管理节点(master
)。
常用命令
主题相关
# 在 localhost:9092 这个kafka 服务创建名字为 test 的 topic
kafka-topics.sh --bootstrap-server localhost:9092 --topic test --create
# 详细信息
kafka-topics.sh --bootstrap-server localhost:9092 --topic test --describe
# 删除
kafka-topics.sh --bootstrap-server localhost:9092 --topic test --delete
# 获取 topic 列表
kafka-topicc.sh --bootstrap-server localhost:9092 --list
生产者相关
# 往 test 主题发送消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
消费者相关
# 订阅 test 主题
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
流程分析
Broker 启动流程
注册
Broker
节点(即Zookeeper
上/brokers/ids
节点);监听
/controller
节点;当前集群没有
Controller
时,则注册/controller
节点,选举为Controller
,并监听/brokers/ids
节点来感知集群内Broker
节点的变化,当Broker
节点增动态变化时,需要连接所有的Broker
来传输集群数据。当前集群有
Controller
时,只监听,当前Controller
宕机后,会竞争选举。选举成功后,也需要连接所有的Broker
来传输更新集群数据。
主题创建流程
使用
Admin
创建主题时,先与配置的broker
进行连接通信,获取集群元数据,从元数据中找到Controller
并真正发送创建主题的请求;Controller
接收到请求后,转发给Zookeeper
处理,Zookeeper
会创建对应主题的节点,且Controller
会监听改节点;一旦主题发生变化,
Controller
中的分区状态机和副本状态机会执行与分区、副本相关的流程,并通知集群中其他Broker
进行更新操作。
数据应答 ACKS(生产者配置)
ACKS = 0:优先考虑发送效率,生产者发送消息后,不等待任何确认。
ACKS = -1 / ACKS = all:生产者等待所有同步副本(
ISR
,In-Sync Replicas
)都确认消息已经成功写入,才认为消息写入成功。ACKS = 1:生产者等待分区的领导者(
Leader
)确认消息已成功写入。
幂等
kafka
生产者重试机制可能会导致消息的重复以及乱序(ACKS =1 或 -1
),kafka
提供幂等机制来避免这种情况,幂等操作的要求:
ACKS = -1
启用生产者的幂等性(
enable.idempotence=true
)开启重试机制
在途请求缓冲区(
max.in.flight.requests.per.connection
)的数量不能大于 5(默认值为 5)
// 生产者开启幂等
map.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// acks 需要设置为 -1
map.put(ProducerConfig.ACKS_CONFIG, "-1");
// 重试次数设置
map.put(ProducerConfig.RETRIES_CONFIG, 3);
原理:kafka
在启用幂等性时,会为每个生产者实例分配一个唯一的 生产者 ID 和 消息序列号(分区唯一)。每个消息都有一个与其关联的序列号(producerId
和 sequenceNumber
)。生产者在发送每个消息时,都会发送该消息的序列号和当前生产者的 ID
。kafka
会使用这些信息来判断该消息是否已经成功写入。
事务
在开启幂等的情况下,消息会有 producerId
,但是生产者重启后,producerId
会重新生成,这样的话还是可能会导致消息的重复。而事务机制就是为了避免这种情况的,当开启事务时,kafka
会为每个事务生成一个 事务 ID(transactional.id
)。该 ID
保证了生产者即使重启也能保持一致的事务上下文。
// 开启幂等的情况下设置事务 ID
map.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "自定义事务ID");
KafkaProducer<String, String> producer = new KafkaProducer<>(map);
// 初始化事务
producer.initTransaction();
try {
producer.send(xxx);
// 提交事务
producer.commitTransaction();
} catch (Exception e) {
// 终止事务
producer.abortTransaction();
} finally {
producer.close();
}
流程分析
生产者发送
FIND_COORDINATOR
请求给Broker
查找事务管理器所在的节点;根据自定义的事务
ID
的hashCode
和 50 取模,取模结果为/topics/__transaction_state
节点下的分区,获取该分区的Leader
副本所在节点(该节点的事务管理器来事务的处理;),并返回给生产者;生产者发送
INIT_PRODUCER_ID
请求初始化producerId
;生产者发送
ADD_PARTITIONS_TO_TXN
请求将数据的分区信息发送给事务管理器;生产数据(
commitTransaction
);Leader
副本所在的Broker
通知事务管理器保存成功的分区信息,并返回ACK
给生产者;生产者结束事务;
事务管理器修改
/topics/__transaction_state
节点的事务状态为PrepareCommit
;事务管理器通知
Broker
发送消息标记,表示可以让消费者消费数据了;事务管理器修改
/topics/__transaction_state
节点的事务状态为CompleteCommit
。
数据存储
流程分析
ACKS
校验:生产者将数据发送给Broker
时,会告知Broker
当前生产者的数据生产场景,从而要求Kafka
对数据请求进行应答响应确认数据的接收情况。根据ACKS
的值,做不同的操作。ACKS = 0
:将数据发送到网络输出流中,此时kafka
就会进行响应,无法保证Broker
节点能够接收到消息。ACKS = 1
:生产者将数据发送到Broker
中,并保存到当前节点的数据日志文件中,kafka
就会进行确认收到数据的响应。ACKS = -1
:此时不仅仅保证Leader
副本保存数据成功,还需要同步到Follower
副本(ISR),一旦这些副本数据同步完毕后,kafka
再对生产者进行收到数据的确认。
内部主题校验:生产者向
Broker
发送数据时,是必须指定主题的,但是这个主题的名称不能是kafka
的内部主题名称。kafka
为了管理的需要,创建了2个内部主题,一个是用于事务处理的__transaction_state
内部主题,还有一个是用于处理消费者偏移量的__consumer_offsets
内部主题。生产者是无法对这两个主题生产数据的,所以在存储数据之前,需要对主题名称进行校验有效性校验。日志文件滚动判断:据存储到文件中,如果数据文件太大,对于查询性能是会有很大影响的,所以副本数据文件并不是一个完整的大的数据文件,而是根据某些条件分成很多的小文件,每个小文件我们称之为文件段。其中的一个条件就是文件大小,参数名为:
log.segment.bytes
。默认值为1G。如果当前日志段剩余容量可能无法容纳新消息集合,因此有必要创建一个新的日志段来保存待写入的所有消息。此时日志文件就需要滚动生产新的。除了文件大小外,还有时间间隔,如果文件段第一批数据有时间戳,那么当前批次数据的时间戳和第一批数据的时间戳间隔大于滚动阈值,那么日志文件也会滚动生产新的。如果文件段第一批数据没有时间戳,那么就用当前时间戳和文件创建时间戳进行比对,如果大于滚动阈值,那么日志文件也会滚动生产新的。这个阈值参数名为:
log.roll.hours
,默认为7天。如果时间到达,但是文件不满 1G,依然会滚动生产新的数据文件。如果索引文件或时间索引文件满了,或者索引文件无法存放当前索引数据了,那么日志文件也会滚动生产新的。
数据校验:因为
kafka
允许生产者进行数据重试操作,所以因为一些特殊的情况,就会导致数据请求被kafka
重复获取导致数据重复或者乱序,需要在Broker
端对数据进行校验。数据存储:将数据写入日志文件,写入完成后,更新当前日志文件的数据偏移量。
存储文件
数据日志文件(.log)
kafka
数据是使用 log
文件进行保存的文件名长度为20位长度的数字字符串,数字含义为当前日志文件的第一批数据的基础偏移量,也就是文件中保存的第一条数据偏移量。字符串数字位数不够的,前面补0。数据主要分为两部分:批次头 + 数据体。批次头总的字节数为:61 byte,数据体的字节数根据消息体计算。
数据索引文件(.index)
kafka
的基础设置中,数据日志文件到达1G才会滚动生产新的文件。那么从1G文件中想要快速获取我们想要的数据,效率还是比较低的。为了定位方便 kafka
在提供日志文件保存数据的同时,还提供了用于数据定位的索引文件,索引文件中保存的就是逻辑偏移量和数据物理存储位置(偏移量)的对应关系。
但是要注意,kafka
的索引文件是不连续的,也就是不是对每一条数据都保存对应的索引,那是因为如果每条数据如果都把偏移量的定位保存下来,数据量也不小。且 Kafka
的底层实现采用的是虚拟内存映射技术 mmap
,将内存和文件进行双向映射,操作内存数据就等同于操作文件,所以效率是非常高的,但是因为是基于内存的操作,所以并不稳定,容易丢数据,因此 kafka
的索引文件中的索引信息是不连续的,而且为了效率,kafka
默认情况下,4kb 的日志数据才会记录一次索引,但是这个是可以进行配置修改的,参数为 log.index.interval.bytes
,默认值为4096。所以我们有的时候会将 kafka
的不连续索引数据称之为稀疏索引。
数据时间索引文件(.timeindex)
某些场景中,我们不想根据顺序(偏移量)获取 kafka
的数据,而是想根据时间来获取的数据。这个时候,没有对应的偏移量来定位数据,那么查找的效率就非常低了,因此 kafka
还提供了时间索引文件。
数据刷写
在 Linux
系统中,kafka
把数据写入文件系统之后,其实数据在操作系统的 PageCache
(页缓冲)里面,并没有刷到磁盘上。如果操作系统挂了,数据就丢失了。一方面,应用程序可以调用 fsync
这个系统调用来强制刷盘,另一方面,操作系统有后台线程,定时刷盘。频繁调用 fsync
会影响性能,需要在性能和可靠性之间进行权衡。kafka
提供了参数进行数据的刷写:
log.flush.interval.messages
:达到消息数量时,会将数据flush
到日志文件中;log.flush.interval.ms
:间隔多少时间(ms),执行一次强制的flush
操作;flush.scheduler.interval.ms
:所有日志刷新到磁盘的频率。
官方不建议通过上述的三个参数来强制写盘,数据的可靠性应该通过 replica
来保证,而强制 flush
数据到磁盘会对整体性能产生影响。
数据一致性
kafka
的分区副本有两种:Leader
和 Follower
,但是只有 Leader
负责读写,Follower
只负责从 Leader
同步数据。当 Leader
所在的 Broker
宕机后,就需要从 Follower
副本中(ISR
列表)选举出一个成为 Leader
(默认按 ISR
列表顺序选举)。这样就提升了分区可用性,但是相对的,在提升了分区可用性的同时,也就牺牲了数据的一致性。
来看这样的一个场景:一个分区有3个副本,一个 Leader
和两个 Follower
。Leader
副本作为数据的读写副本,所以生产者的数据都会发送给 Leader
副本,而两个 Follower
副本会周期性地同步 Leader
副本的数据,但是因为网络,资源等因素的制约,同步数据的过程是有一定延迟的,所以3个副本之间的数据可能是不同的。
此时,假设 Leader
副本因为意外原因宕掉了,此时会选择2个 Follower
副本中的一个作为 Leader
对外提供数据服务(假设是上面那个 Follower
)。此时我们就会发现,对于消费者而言,之前 Leader
副本能访问的数据是 d
,但是重新选择 Leader
副本后,能访问的数据就变成了 c
,这样消费者就会认为数据丢失了,也就是所谓的数据不一致了。
为了提升数据的一致性,kafka
引入了高水位(HW :High Watermark
)机制,kafka
在不同的副本之间维护了一个水位线的机制(其实也是一个偏移量的概念),消费者只能读取到水位线以下的的数据,这个高水位其实就是数据最少的那个副本的最后数据位置。以上面那张图来说,高水位就是 c
所在的偏移量,所以虽然 Leader
副本中已经有 a、b、c、d
4条数据,但是由于高水位线的限制,所以也只能消费到 a、b
这两条数据。这样即使 Leader
挂掉了,但是对于消费者来讲,消费到的数据其实还是一样的,因为它能看到的数据是一样的,也就是说,消费者不会认为数据不一致。
还有一个
LEO
需要了解,LEO
(Log End Offset
)日志末端位移,表示下一条待写入消息的offset
,每个分区副本都会记录自己的LEO
。
高水位更新机制
HW
高水位线会随着 Follower
的数据同步操作,而不断上涨,也就是说,Follower
同步的数据越多,那么水位线也就越高,那么消费者能访问的数据也就越多。接下来,我们就看一看,Follower
在同步数据时 HW
的变化。
首先,初始状态下,
Leader
和Follower
都没有数据,所以和偏移量相关的值都是初始值0,而由于Leader
需要管理Followe
r,所以也包含着Follower
的相关偏移量(LEO
)数据。
生产者向
Leader
发送两条数据,此时Leader
收到数据并保存成功,故LEO
值被更新为2。
接下来,
Follower
开始同步Leader
的数据,同步数据时,会将自身的LEO
值作为参数传递给Leader
。此时,Leader
会将数据传递给Follower
,且同时Leader
会根据所有副本的LEO
值更新HW
。
由于两个
Follower
的数据拉取速率不一致,所以Follower-1
抓取了 2 条数据,而Follower-2
抓取了 1 条数据。Follower
收到数据后,会将数据写入文件,并更新自身的偏移量信息。Follower-1
更新LEO
为 2,Follower-2
更新LEO
为 1。此时,Leader
和Follower
副本的LEO
都不为 0,但各自的高水位依然是0,还没有被更新。它们需要在下一轮的拉取中被更新。
Leader
收到了生产者的数据c
,更新自身的LEO
为 3 。Follower
接着向Leader
发送Fetch
请求,同样会将最新的LEO
作为参数传递给Leader
,Leader
从Follower
中取最小的那个作为高水位值,即更新HW
为 1。
然后,
Leader
会将数据发送给Follower
,同时也会将HW
一起发送。
Follower
收到数据后,会将数据写入文件,并更新自身偏移量信息。
因为
Follower
会不断重复Fetch
数据的过程,所以前面的操作会不断地重复。最终,Follower
副本和Leader
副本的数据和偏移量是保持一致的。
Leader Epoch
依托于高水位,kafka
既界定了消息的对外可见性,又实现了异步的副本同步机制,但是这个真的一点问题都没有吗?
从刚才的分析中,我们知道,Follower
副本的高水位更新需要一轮额外的拉取请求才能实现,那么就可能出现这样一种情况:假设现在一个分区只有 Leader
和 Follower
两个副本,且 Leader
和 Follower
都写入了两条消息,而且 Leader
副本的高水位也已经更新了,但 Follower
副本高水位还未更新(相当于还没执行第6、7步),此时,Leader
和 Follower
的情况:Leader {LEO = 2, HW = 2}
,Follower{LEO = 2, HW = 0}
。
倘若此时 Follower
所在的 Broker
宕机,当它重启回来后,执行日志截断操作,将 LEO
值调整为之前的高水位值,也就是 0。这就是说,位移值为0,1的消息会被磁盘删除,此时 Follower
副本的底层磁盘文件中没有保存消息。
当执行完截断操作后,Follower
开始从 Leader
拉取消息,执行正常的消息同步。如果此时, Leader
所在的Broker
宕机了,那么 kafka
就别无选择,只能让 Follower
成为新的 Leader
,此时,当旧 Leader
回来后,需要执行相同的日志截断操作,其 LEO
值依然为 2。但只要它向新 Leader
发起同步请求后就会更新其 HW
为0(计算min(Follower LEO, Leader HW)
),数据也被截断。这样操作之后,位移值为 0 和 1 的两条消息就从这两个副本中被永远地抹掉了。这个就是数据丢失场景。
严格来说,这个场景发生的前提是
Broker
端参数min.insync.replicas
设置为1。即一旦消息被写入到Leader
副本的磁盘,就会被认为是“已提交状态”,生产者不会因Follower
没有同步成功而重新发送消息,这两条消息也就永远丢失了。
基于此,社区在0.11版本正式引入了 Leader Epoch
概念,引入 Leader Epoch
后,Follower
就不再参考HW
,而是根据 Leader Epoch
信息来截断 Leader
中不存在的消息。
所谓 Leader Epoch
,大致可以认为是 Leader
版本。它由两部分数据组成:
Epoch
。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的Leader
被认为是过期Leader
,不能再行使Leader
权力。起始位移(
Start Offset
)。Leader
副本在该Epoch
值上写入的首条消息的位移。
假设现在有两个 Leader Epoch
<0, 0> 和 <1, 120>,那么,第一个Leader Epoch
表示版本号是0,这个版本的Leader
从位移0开始保存消息,一共保存了120条消息。之后,Leader
发生了变更,版本号增加到1,新版本的起始位移是120。
Kafka Broker
会在内存中为每个分区都缓存 Leader Epoch
数据,同时它还会定期地将这些信息持久化到一个checkpoint
文件中。当 Leader
副本写入消息到磁盘时,Broker
会尝试更新这部分缓存。如果该 Leader
是首次写入消息,那么 Broker
会向缓存中增加一个 Leader Epoch
条目,否则就不做更新。这样,每次有 Leader
变更时,新的 Leader
副本会查询这部分缓存,取出对应的 Leader Epoch
的起始位移,以避免数据丢失和不一致的情况。
来看下引入 Leader Epoch
后上面例子的情况:
开始
Leader
和Follower
的Leader Epoch
都为[Epoch=0, Offset=0]
。Follower
副本重启回来后,需要向Leader
发送一个特殊的请求去获取Leader
的LEO
值,为 2。当获知到
Leader
LEO
=2后,Follower
发现该LEO
值不比它自己的LEO
值小,而且缓存中也没有保存任何起始位移值 > 2的Epoch
条目,因此无需执行任何日志截断操作。Leader
所在的Broker
宕机后,Follower
成为新Leader
,旧Leader
重启回来后,执行相同的判断逻辑,发现也不用执行日志截断,两条消息在两个副本中均得到保留。后面当生产者程序向新
Leader
写入新消息时,它所在的Broker
缓存中,会生成新的Leader Epoch
条目:[Epoch=1, Offset=2]
。
消息消费
消费者调度分配
消费者想要拉取数据,首先必须要加入到一个组中,成为消费组中的一员,同样道理,如果消费者出现了问题,也应该从消费者组中剥离。而这种加入组和退出组的处理,都应该由专门的管理组件进行处理,这个组件在 kafka
中称之为消费者组调度器(协调)(Group Coordinator
),是 Broker
上的一个组件,用于管理和调度消费者组的成员、状态、分区分配、偏移量等信息。每个 Broker
都有一个Group Coordinator
对象,负责管理多个消费者组,但每个消费者组只有一个 Group Coordinator
。分配流程大致分为两大步:加入组(JoinGroup
)和组同步(SyncGroup
)。
消费者组的每个消费者到底消费哪一个主题分区,这个分配策略其实是由消费者组中的 Leader
决定的,这个 Leader
是 Group Coordinator
指定的,组中其他消费者我们称之为 Follower
,称呼上有点类似与分区的 Leader
和 Follower
。
消费者根据设定的
group.id
向当前负载最小的Broker
节点发送请求查找消费调度器,查询算法为group.id.hashCode % __consumer_offests分区数量
。找到消费调度器后,消费者向调度器所在
Broker
节点发出JOIN_GROUP
请求,加入消费者组。调度器然后将订阅分区信息发给
Leader
,Leader
根据消费者配置中分配策略设计分区分配方案,并将分配好的方案告知调度器。这里的告知是所有消费者(包括Leader
)向Group Coordinator
发送SyncGroupRequest
请求。需要注意的是,只有Leader
成员发送的请求中包含了订阅分区消费分配方案,在其他成员发送的请求中,这部分的内容为空。当
Group Coordinator
接收到分配方案后,会通过向成员发送响应的方式,通知各个成员要消费哪些分区。
消费分配策略
轮询分配。每个消费者组中的消费者都会含有一个自动生产的
UUID
作为memberid
,轮询策略中会将每个消费者按照memberid
进行排序分配。范围分配。按照每个主题的分区数量计算出每个消费者应该分配的分区数量,然后分配,分配的原则就是一个主题的分区尽可能的平均分,如果不能平均分,那就按顺序向前补齐即可。
粘性分配(
StickyAssignor
)。在第一次分配后,每个组成员都保留分配给自己的分区信息。如果有消费者加入或退出,那么在进行分区再分配时(一般情况下,消费者退出45s后,才会进行再分配,因为需要考虑可能又恢复的情况),尽可能保证消费者原有的分区不变,重新对加入或退出消费者的分区进行分配。CooperativeStickyAssignor
。前面的三种分配策略再进行重分配时使用的是EAGER
协议,会让当前的所有消费者放弃当前分区,关闭连接,资源清理,重新加入组和等待分配策略。明显效率是比较低的,所以从kafka2.4
版本开始,在粘性分配策略的基础上,优化了重分配的过程,使用的是COOPERATIVE
协议,COOPERATIVE
协议将一次全局重平衡,改成每次小规模重平衡,直至最终收敛平衡的过程。
例如:一个Topic
(T0,三个分区),两个 consumer
(consumer1、consumer2
)均订阅Topic(T0)
。如果consumer
订阅信息为:
consumer1 | T0P0、T0P2 |
---|---|
consumer2 | T0P1 |
此时,新的 consumer3
加入消费者组,那么基于eager协议的分区重分配策略流程如下:
consumer1
、consumer2
正常发送心跳信息到Group Coordinator
。随着
consumer3
加入,Group Coordinator
收到对应的Join Group
请求,Group Coordinator
确认有新成员需要加入消费者组。Group Coordinator
通知consumer1
和consumer2
,需要Rebalance
(再平衡)了。consumer1
和consumer2
放弃当前各自持有的已有分区,重新发送Join Group
请求到Group Coordinator
。Group Coordinator
依据指定的分区分配策略的处理逻辑,生成新的分区分配方案,然后通过Sync Group
请求,将新的分区分配方案发送给consumer1
、consumer2
、consumer3
。所有
consumer
按照新的分区分配,重新开始消费数据。
而基于cooperative协议的分区分配策略的流程如下:
consumer1
、consumer2
正常发送心跳信息到Group Coordinator
。随着
consumer3
加入,Group Coordinator
收到对应的Join Group
请求,Group Coordinator
确认有新成员需要加入消费者组。Group Coordinator
通知consumer1
和consumer2
,需要Rebalance
(再平衡)了。consumer1
、consumer2
通过Join Group
请求将已经持有的分区发送给Group Coordinator
。注意:并没有放弃已有分区。Group Coordinator
取消consumer1
对分区p2
的消费,然后发送Sync Group
请求给consumer1
、consumer2
。consumer1
、consumer2
接收到分区分配方案,重新开始消费。至此,一次Rebalance
完成。当前
p2
也没有被消费,再次触发下一轮Rebalance
,将p2
分配给consumer3
消费。
脑裂
在集群环境下,如果有 Leader
概念的话,一般都会有脑裂问题,kafka
的 Controller
也不例外。
kafka controller
相当于整个 kafka
集群的 master,负责 topic
的创建、删除、以及 partition
的状态机转换,Broker
的上线、下线等。前面已经分析过 Controller
是如何选举的,有一种特殊的情况,就是Controller
节点并没有宕掉,而是因为网络的抖动,不稳定,导致和 ZooKeeper
之间的会话超时,那么此时,整个 Kafka
集群就会认为之前的 Controller
已经下线(退出)从而选举出新的 Controller
,而之前的Controller
的网络又恢复了,以为自己还是 Controller
了,继续管理整个集群。那么现在集群内就有两个 Controller
了,这样破坏系统的一致性和可靠性。
为了解决这个问题,kafka
引入了 epoch
(纪元)概念,每次选举 Controller
,epoch+1
,该信息在 zookeeper
的 /controller_epoch
节点中维护。假设存在脑裂的情况,那么旧 Controller
给其他 Broker
发送请求时,会带上当前 Controller
的 epoch
,当其他 Broker
接收到请求后,会对比最新 Controller epoch
版本号与接收到的 epoch
,如果接收到的版本号小,那么直接拒绝请求。当旧的 Controller
一旦发现集群中有新任 Controller
的时候,自身会进行卸任。
注意:不要和前面提到过的
leader_epoch
搞混了。
常见问题
消费者启动后消费不到消息
进入zookeeper客户端,将brokers节点下的topics节点下的__consumer_offsets删除就可。
deleteall /brokers/topics/__consumer_offsets
评论区