Kafka位点主题:_consumer_offsets是什么
和创建的其他主题一样,位移主题就是普通的 Kafka 主题
位移主题中的消息本质为KV结构,
- key保存了3部分内容:group ID、主题名、分区号。
- value保存格式有3种
- 位移值
- 用于保存 Consumer Group 信息:用来注册 Consumer Group
- 删除 Group 过期位移甚至是删除 Group 的消息:
tombstone 消息,即墓碑消息,也称 delete mark。一旦某个 Consumer Group 下的所有 Consumer 实例都停止了,而且它们的位移数据都已被删除时,Kafka 会向位移主题的对应分区写入 tombstone 消息,表明要彻底删除这个 Group 的信息。
位移主题创建时机
当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题。
分区数:如果是 Kafka 自动创建的,按照Broker 端参数 offsets.topic.num.partitions
的取值,它的默认值是 50。
副本数: Broker 端另一个参数 offsets.topic.replication.factor
,默认值是 3
注:可以选择手动创建位移主题,具体方法就是,在 Kafka 集群尚未启动任何 Consumer 之前,使用 Kafka API 创建它
客户端提交位点方式
自动提交位
Consumer 端有个参数叫 enable.auto.commit,如果值是 true,则 Consumer 在后台默默地为你定期提交位移,提交间隔由一个专属的参数 auto.commit.interval.ms 来控制。
只要 Consumer 一直启动着,它就会无限期地向位移主题写入消息。这就要求 Kafka 必须要有针对位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘
手动提交位移
设置 enable.auto.commit = false,Kafka Consumer API 为你提供了位移提交的方法,如 consumer.commitSync 等。当调用这些方法时,Kafka 会向位移主题写入相应的消息。
清理位点消息
Kafka 使用 Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。那么应该如何定义 Compact 策略中的过期呢?对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起
Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner。很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查一下 Log Cleaner 线程的状态,通常都是这个线程挂掉了导致的。
案例
log cleaner线程挂掉还有可能导致消费端出现:Marking Coordinator Dead!
原因大概如下:
log cleaner线程挂掉之后会导致磁盘上位移主题的文件越来越多(当然,大部分是过期数据,只是依旧存在),broker内存中会维护offsetMap,从名字上看这个map就是维护消费进度的,而这个map和位移主题的文件有关联,文件越来越多会导致offsetMap越来越大,甚至导致offsetMap构建失败(为什么会失败没有搞明白),offsetMap构建失败之后broker不会承认自己是coordinator。 消费者组找coordinator的逻辑很简单:abs(consumer_groupName.hashCode) % __consumer_offset.partition.num对应的partition所在的broker就是这个group的coordinate,一旦这个broker的offsetMap构建失败,那么这个broker就不承认自己是这个group的coordinate,这个group的消费就无法继续进行,会出现Marking Coordinator Dead错误。 此时需要删除过期的位移主题的文件(根据文件名很容易确定哪个是最新的),重启broker。重启过程中需要关注log cleaner是否会再次挂掉。