Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。Consumer 需要为分配给它的每个分区提交各自的位移数据。

一、位移提交方式

从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。

1. 位移提交控制

enable.auto.commit:Consumer 端位移提交配置,默认值true,自动提交位移。

auto.commit.interval.ms:它的默认值是 5 秒,表明 Kafka 每 5 秒会为你自动提交一次位移

2. 自动提交位移

Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(),record.value());
}

3. 手动提交位移

  • 同步提交

    该方法会提交 KafkaConsumer#poll() 返回的最新位移

1
2
3
4
5
6
7
8
9
10
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 处理提交失败异常
}
}
  • 异步提交

    调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS,commitAsync 是不会自动重试的。

1
2
3
4
5
6
7
8
9
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
handle(exception);
});
}
  • 同步异步结合

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    try {
    while(true) {
    ConsumerRecords<String, String> records =
    consumer.poll(Duration.ofSeconds(1));
    process(records); // 处理消息
    commitAysnc(); // 使用异步提交规避阻塞
    }
    } catch(Exception e) {
    handle(e); // 处理异常
    } finally {
    try {
    consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
    } finally {
    consumer.close();
    }
    }
  • 批量提交位点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    int count = 0;
    ……
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record: records) {
    process(record); // 处理消息
    offsets.put(new TopicPartition(record.topic(), record.partition()),
    new OffsetAndMetadata(record.offset() + 1);
    if(count % 100 == 0
    consumer.commitAsync(offsets, null); // 回调处理逻辑是null
    count++;
    }
    }

Standalone Consumer

  1. 很多流处理框架的Kafka connector都没有使用consumer group,而是直接使用standalone consumer,因为group机制不好把控
  2. standalone consumer没有rebalance,也没有group提供的负载均衡,你需要自己实现。其他方面(比如位移提交)和group没有太大的不同

二、查看消费进度

1. 需要关注消费进度引起的问题

消费速度小于主题发送消息速度,当消费延迟越来越大,导致消费的数据已经不在操作系统的页缓存中,消费组者不得不从磁盘读取它们,这样就进一步拉大延迟

2. 消费延迟查看方式

  1. Kafka 自带的命令行工具 kafka-consumer-groups 脚本

    1
    $ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>

三、常见异常

CommitFailedException

自动commit 不会抛 CommitFailedException

  • 原因
    是消费者组已经开启了 Rebalance 过程,并且将要提交位移的分区分配给了另一个消费者实例。出现这个情况的原因是,你的消费者实例连续两次调用 poll 方法的时间间隔超过了期望的 max.poll.interval.ms 参数值。这通常表明,你的消费者实例花费了太长的时间进行消息处理,耽误了调用 poll 方法。如:Consumer 端参数 max.poll.interval.ms=5 秒,最后在循环调用 KafkaConsumer.poll 方法之间,插入 Thread.sleep(6000) 和手动提交位移,会出现CommitFailedException

  • 解决方法

    1. 客户端优化

      • 缩短单条消息处理的时间
      • 下游系统使用多线程来加速消费
    2. 客户端配置

      • 增加期望的时间间隔 max.poll.interval.ms 参数值。

      • 减少 poll 方法一次性返回的消息数量,即减少 max.poll.records 参数值。

    其他导致CommitFailedException异常的情况

    设置相同group.id 值的消费者组程序和Standalone Consumer 的独立消费程序,那么当独立消费者程序手动提交位移时,Kafka 就会立即抛出 CommitFailedException 异常