Kafka常用命令,日常运维必备
主题
创建主题
- –zookeeper 会绕过 Kafka 的安全体系。这就是说,即使你为 Kafka 集群设置了安全认证,限制了主题的创建,如果你使用 –zookeeper 的命令,依然能成功创建任意主题,不受认证体系的约束
1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
- 直接使用–bootstrap-server 与集群进行交互
1
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1
发送消息
1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
查询指定topic
1
bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --describe
查询所有主题的列表
仍然区分使用
--zookeeper
和--broker-list
,如果指定了 –bootstrap-server,那么这条命令就会受到安全认证体系的约束,即对 发起者进行权限验证,然后返回它能看到的主题。否则,如果指定 –zookeeper 参数,那么默认会返回集群中所有的主题详细数据1
bin/kafka-topics.sh --zookeeper localhost:2181 broker_host:port --list
查询topic最小分片最小偏移量(–time -1 是最大变异量)
1
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -2
删除主题
异步删除
1
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
查询主题offset
–time参数
- -1:最大offset
- -2:最小offset
1
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic test --time -2
修改主题分区(分区扩容)
1
./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic test --partitions 3
消息
主题消息总数
最大偏移减去最小偏移量值
1 | 最小偏移 |
消息文件数据
查看消息文件信息
1 | bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log |
查询消息文件实际内容
1 | bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log --deep-iteration --print-data-log |
配置
查看这个 topic 设置的参数
设置常规的主题级别参数,还是使用
--zookeeper
。设置动态参数指定--bootstrap-server
参数1
./bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name test-topic --describe
设置配置参数
1
./bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name test-topic --alter --add-config max.message.bytes=10485760
变更副本数
修改主题限速
主题test,让该主题各个分区的 Leader 副本和 Follower 副本在处理副本同步时,不得占用超过 100MBps 的带宽
broker限速配置
Leader 副本和 Follower 副本使用的带宽。有时候,我们想要让某个主题的副本在执行副本同步机制时,不要消耗过多的带宽
1
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0
- –entity-type:限速类型为broker
- –entity-name : Broker ID,倘若该主题的副本分别在 0、1、2、3 多个 Broker 上,那么还要依次为 Broker 1、2、3 执行这条命令
主题限速
我们想要为所有副本都设置限速,因此统一使用通配符 *
1
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test
- –entity-type:限速类型为主题
- –entity-name :主题名称
主题分区迁移
特殊主题的管理与运维
__consumer_offsets
副本数
Kafka 0.11 之前:__consumer_offsets主题的副本数为:min(Broker台数 , Broker端参数
offsets.topic.replication.factor
值)。如果出现Broker的offsets.topic.replication.factor
设置为3,但是__consumer_offsets主题的副本数是1,是因为这个主题是在只有一台 Broker 启动时被创建
Kafka-0.11 版本之后:Kafka 会严格遵守offsets.topic.replication.factor
值。如果当前运行的 Broker 数量小于 offsets.topic.replication.factor 值,Kafka 会创建主题失败,并显式抛出异常将副本数从1增加到3
创建一个 json 文件,显式提供 50 个分区对应的副本数
1
2
3
4
5
6
7
8{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]},
{"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
{"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
{"topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]},
...
{"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}`执行 kafka-reassign-partitions 脚本
1
bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute
查看消费者组提交的位移数据
1
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning --max-messages 10
消费者组的状态信息
1
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning --max-messages 10
对于内部主题 __transaction_state
而言,方法是相同,只需要指定 kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter
即可
常见主题错误处理
主题删除失败
现象:执行删除主题命令后,磁盘的主题分区并没有被删除
原因:
副本所在的 Broker 宕机了
解决方法:重启对应的 Broker 之后,删除操作就能自动恢复
待删除主题的部分分区依然在执行迁移过程
解决方法:
- 手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode
- 手动删除该主题在磁盘上的分区目录
- 在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存
可能造成大面积的分区 Leader 重选举,事实上,仅仅执行前两步也是可以的,只是 Controller 缓存中没有清空待删除主题罢了,不影响使用
__consumer_offsets 占用太多的磁盘
原因:
jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态。通常情况下,这都是因为该线程挂掉了,无法及时清理此内部主题
解决方案:重启相应的 Broker
思考
- 为什么 Kafka 不允许减少分区数
因为多个broker节点都冗余有分区的数据,减少分区数需要操作多个broker且需要迁移该分区数据到其他分区。offset确认中,还在发过来的路上,结果分区没了。这个分区所有的历史消息数据也要被迁移到别的Broker上,千亿规模太大的话,会造成Broker卡顿,而且这些数据跟目标Broker上的原有数据还要排序重写,否则就破坏了Kafka分区内的消息的有序性
消费组
消费消息
1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
消息回溯
- Earliest
1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
- Latest
1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
- 当前位置
1
bin/kafka-consumer-groups.sh --bootstrap-server 10.196.39.133:9092 --group test_consumer --reset-offsets --topic test_topic --to-current --execute
- Specified-Offset
1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute
- 多少消息之前
1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute
- 指定时间
1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute
- 多长时间之前
1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute
查询所有消费组
1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
查询指定消费组状态
1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cainiao11_forecast --describe
查看位点消费组提交offset明细
1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
压测
- 测试生产者性能
1 | bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4 |
- 测试消费着性能
1 | bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topic |