Kafka消费组如何提交位移
Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。Consumer 需要为分配给它的每个分区提交各自的位移数据。
一、位移提交方式
从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。
1. 位移提交控制
enable.auto.commit
:Consumer 端位移提交配置,默认值true,自动提交位移。
auto.commit.interval.ms
:它的默认值是 5 秒,表明 Kafka 每 5 秒会为你自动提交一次位移
2. 自动提交位移
Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费
1 | Properties props = new Properties(); |
3. 手动提交位移
同步提交
该方法会提交 KafkaConsumer#poll() 返回的最新位移
1 | while (true) { |
异步提交
调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS,commitAsync 是不会自动重试的。
1 | while (true) { |
同步异步结合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16try {
while(true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAysnc(); // 使用异步提交规避阻塞
}
} catch(Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}批量提交位点
1
2
3
4
5
6
7
8
9
10
11
12
13
14private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
……
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
process(record); // 处理消息
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1);
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回调处理逻辑是null
count++;
}
}
Standalone Consumer
- 很多流处理框架的Kafka connector都没有使用consumer group,而是直接使用standalone consumer,因为group机制不好把控
- standalone consumer没有rebalance,也没有group提供的负载均衡,你需要自己实现。其他方面(比如位移提交)和group没有太大的不同
二、查看消费进度
1. 需要关注消费进度引起的问题
消费速度小于主题发送消息速度,当消费延迟越来越大,导致消费的数据已经不在操作系统的页缓存中,消费组者不得不从磁盘读取它们,这样就进一步拉大延迟
2. 消费延迟查看方式
Kafka 自带的命令行工具 kafka-consumer-groups 脚本
1
bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>
三、常见异常
CommitFailedException
自动commit 不会抛 CommitFailedException
原因
是消费者组已经开启了 Rebalance 过程,并且将要提交位移的分区分配给了另一个消费者实例。出现这个情况的原因是,你的消费者实例连续两次调用 poll 方法的时间间隔超过了期望的 max.poll.interval.ms 参数值。这通常表明,你的消费者实例花费了太长的时间进行消息处理,耽误了调用 poll 方法。如:Consumer 端参数max.poll.interval.ms=5
秒,最后在循环调用 KafkaConsumer.poll 方法之间,插入Thread.sleep(6000)
和手动提交位移,会出现CommitFailedException
解决方法
客户端优化
- 缩短单条消息处理的时间
- 下游系统使用多线程来加速消费
客户端配置
增加期望的时间间隔
max.poll.interval.ms
参数值。减少 poll 方法一次性返回的消息数量,即减少
max.poll.records
参数值。
其他导致CommitFailedException异常的情况
设置相同group.id 值的消费者组程序和Standalone Consumer 的独立消费程序,那么当独立消费者程序手动提交位移时,Kafka 就会立即抛出 CommitFailedException 异常