Kafka消费组如何进行重平衡
一、概念
Rebalance 就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程。在 Rebalance 过程中,所有 Consumer 实例共同参与,在协调者组件的帮助下,完成订阅主题分区的分配。在整个过程中,所有实例都不能消费任何消息,因此它对 Consumer 的 TPS 影响很大。
Coordinator
它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。
所有 Broker 都有各自的 Coordinator 组件,从broker上找到当前分组的协调器, 你可以理解为找到位移主题所在的broker lead副本所存在的节点。
Consumer Group 确定 Coordinator 所在的 Broker 的算法
- 确定由位移主题的哪个分区来保存该 Group 数据:
partitionId=Math.abs(groupId.hashCode()%offsetsTopicPartitionCount)
。 - 找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator
实战: 当 Consumer Group 出现问题,需要快速排查 Broker 端日志时,我们能够根据这个算法准确定位 Coordinator 对应的 Broker,不必一台 Broker 一台 Broker 地盲查
二、Consumer感知Rebalance流程
Consumer从broker拉取消息,具体使用
KafkaConsumer#poll
方法
并且此方法会受到客户端rebalance影响,直到Rebalance成功后才会继续拉取消息,也就是说poll方法可能会超时,直到Rebalance成功向ConsumerCoordinator请求协调器事件,同时确保ConsumerCoordinator已经知晓消费者加入此消费组。具体做了下面的操作
(1)更新上次轮询时间的心跳,以便心跳线程不会由于应用程序不活动离开组,即使(例如)找不到协调员。如果需要,唤醒检测信号线程(上次心跳时间距现在已经超过心跳检测间隔时间)
(2)判断消费客户端是否需要触发ReJoin消费组,满足下面其中一个条件触发ReJoin消费组
- 如果我们执行了任务并且元数据已更改
- 如果我们的订阅自上次加入以来发生了变化
- ReJoin请求已在运行中,或者客户端ReJoin状态为true
再次检查与ConsumerCoordinator的链接状态是否正常,如果正常则继续,否则重复第2步
初始化心跳线程(只会初始化一次)
持续向broker发送FindCoordinator请求,直至与coordinator连接成功,超时时间为
KafkaConsumer#poll
设置时间执行
ConsumerRebalanceListener#onPartitionsRevoked
方法,通知Consumer客户端队列即将重平衡关闭心跳线程,使其无法干扰Join组操作
向coordinator发送JoinGroup请求,并修改Consumer状态:UNJOINED => REBALANCING,根据JoinGroup响应做出处理:
成功
Consumer状态:REBALANCING => STABLE
开启心跳线程
执行
ConsumerRebalanceListener#onPartitionsAssigned
方法,重平衡完成
日志:
1
Setting newly assigned partitions:
失败
REBALANCING => UNJOINED
重平衡失败,如果是因为异常不是
RetriableException
,则抛出异常,否则重试步骤2
拉下队列消息
三、如何避免 Rebalance
1. Rebalance 的弊端
Rebalance 影响 Consumer 端 TPS
在 Rebalance 期间,Consumer 会停下手头的事情,什么也干不了Rebalance 很慢
几百个 Consumer 实例,Rebalance 一次可能需要几个小时Rebalance 效率不高
当前 Kafka 的设计机制决定了每次 Rebalance 时,Group 下的所有成员都要参与进来,而且通常不会考虑局部性原理,但局部性原理对提升系统性能是特别重要的。(0.11.0.0 版本推出了 StickyAssignor,即有粘性的分区分配策略。所谓的有粘性,是指每次 Rebalance 时,该策略会尽可能地保留之前的分配方案,尽量实现分区分配的最小变动)
2. Rebalance 发生的时机
- 组成员数量发生变化
- 增加 Consumer 实例
- 减少Consumer实例数
- 订阅主题数量发生变化
- 订阅主题的分区数发生变化
组成员数量发生变化
增加 Consumer 实例
Consumer实例数减少
现象:
broker端日志会频繁输出类似:(Re)join group
原因:
Consumer 实例会被 Coordinator 错误地认为“已停止”从而被“踢出”Group
每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。某个 Consumer 实例不能及时地发送心跳请求,Coordinator 就会认为该 Consumer 已经“死”了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。两个 关键参数:
session.timeout.ms
:默认值是 10,如果 Coordinator 在 10 秒之内没有收到 Group 下某 Consumer 实例的心跳,它就会认为这个 Consumer 实例已经挂了heartbeat.interval.ms
:这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance。 Coordinator通知consumer进行rebalence的方法是将REBALANCE_NEEDED
标志封装在心跳包的响应体里。max.poll.interval.ms
:它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。
如何避免:
未能及时发送心跳:- 设置
session.timeout.ms
= 6s - 设置
heartbeat.interval.ms
= 2s - 要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
Consumer 消费时间过长:
max.poll.interval.ms
设置时间要大于消费消息处理耗时,如业务处理消息时间为5分钟,那么max.poll.interval.ms要设置为6分钟
Consumer 端的 GC 表现
- 客户端出现了频繁的 Full GC 导致的长时间停顿,从而引发了 Rebalance