主题

  • 创建主题

    1. –zookeeper 会绕过 Kafka 的安全体系。这就是说,即使你为 Kafka 集群设置了安全认证,限制了主题的创建,如果你使用 –zookeeper 的命令,依然能成功创建任意主题,不受认证体系的约束
    1
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
    1. 直接使用–bootstrap-server 与集群进行交互
    1
    bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name  --partitions 1 --replication-factor 1
  • 发送消息

    1
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  • 查询指定topic

    1
    bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --describe
  • 查询所有主题的列表

    仍然区分使用--zookeeper--broker-list,如果指定了 –bootstrap-server,那么这条命令就会受到安全认证体系的约束,即对 发起者进行权限验证,然后返回它能看到的主题。否则,如果指定 –zookeeper 参数,那么默认会返回集群中所有的主题详细数据

    1
    bin/kafka-topics.sh --zookeeper localhost:2181 broker_host:port --list
  • 查询topic最小分片最小偏移量(–time -1 是最大变异量)

    1
    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092  --topic test --time -2
  • 删除主题

    异步删除

    1
    ./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
  • 查询主题offset

    –time参数

    • -1:最大offset
    • -2:最小offset
    1
    ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic test --time -2
  • 修改主题分区(分区扩容)

    1
    ./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic test --partitions 3

消息

  • 主题消息总数

    最大偏移减去最小偏移量值

1
2
3
4
5
6
7
8
9
10
11
#最小偏移
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -2 --topic test-topic

test-topic:0:0
test-topic:1:0

#最大偏移
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -1 --topic test-topic

test-topic:0:5500000
test-topic:1:5500000
  • 消息文件数据

    查看消息文件信息

1
2
3
4
5
6
7
$ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log

Dumping ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 14 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1561597044933 size: 1237 magic: 2 compresscodec: LZ4 crc: 646766737 isvalid: true
baseOffset: 15 lastOffset: 29 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1237 CreateTime: 1561597044934 size: 1237 magic: 2 compresscodec: LZ4 crc: 3751986433 isvalid: true
......

​ 查询消息文件实际内容

1
$ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log --deep-iteration --print-data-log

配置

  • 查看这个 topic 设置的参数

    设置常规的主题级别参数,还是使用 --zookeeper。设置动态参数指定 --bootstrap-server 参数

    1
    ./bin/kafka-configs.sh --zookeeper localhost:2181  --entity-type topics --entity-name test-topic --describe 
  • 设置配置参数

    1
    ./bin/kafka-configs.sh --zookeeper localhost:2181  --entity-type topics --entity-name test-topic --alter --add-config max.message.bytes=10485760
  • 变更副本数

  • 修改主题限速

    主题test,让该主题各个分区的 Leader 副本和 Follower 副本在处理副本同步时,不得占用超过 100MBps 的带宽

    1. broker限速配置

      Leader 副本和 Follower 副本使用的带宽。有时候,我们想要让某个主题的副本在执行副本同步机制时,不要消耗过多的带宽

    1
    bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0
    • –entity-type:限速类型为broker
    • –entity-name : Broker ID,倘若该主题的副本分别在 0、1、2、3 多个 Broker 上,那么还要依次为 Broker 1、2、3 执行这条命令
    1. 主题限速

      我们想要为所有副本都设置限速,因此统一使用通配符 *

    1
    bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test
    • –entity-type:限速类型为主题
    • –entity-name :主题名称
  • 主题分区迁移

特殊主题的管理与运维

  • __consumer_offsets

    副本数

    Kafka 0.11 之前:__consumer_offsets主题的副本数为:min(Broker台数 , Broker端参数offsets.topic.replication.factor 值)。如果出现Broker的offsets.topic.replication.factor 设置为3,但是__consumer_offsets主题的副本数是1,是因为这个主题是在只有一台 Broker 启动时被创建
    Kafka-0.11 版本之后:Kafka 会严格遵守 offsets.topic.replication.factor值。如果当前运行的 Broker 数量小于 offsets.topic.replication.factor 值,Kafka 会创建主题失败,并显式抛出异常

  • 将副本数从1增加到3

    1. 创建一个 json 文件,显式提供 50 个分区对应的副本数

      1
      2
      3
      4
      5
      6
      7
      8
      {"version":1, "partitions":[
      {"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]},
      {"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
      {"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
      {"topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]},
      ...
      {"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
      ]}`
    2. 执行 kafka-reassign-partitions 脚本

      1
      bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute
  • 查看消费者组提交的位移数据

    1
    bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning --max-messages 10
  • 消费者组的状态信息

    1
    bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning --max-messages 10

对于内部主题 __transaction_state 而言,方法是相同,只需要指定 kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter 即可

常见主题错误处理

  • 主题删除失败

    现象:执行删除主题命令后,磁盘的主题分区并没有被删除

    原因:

    1. 副本所在的 Broker 宕机了

      解决方法:重启对应的 Broker 之后,删除操作就能自动恢复

    2. 待删除主题的部分分区依然在执行迁移过程

      解决方法:

      1. 手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode
      2. 手动删除该主题在磁盘上的分区目录
      3. 在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存
        可能造成大面积的分区 Leader 重选举,事实上,仅仅执行前两步也是可以的,只是 Controller 缓存中没有清空待删除主题罢了,不影响使用
  • __consumer_offsets 占用太多的磁盘

    原因:
    jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态。通常情况下,这都是因为该线程挂掉了,无法及时清理此内部主题
    解决方案:重启相应的 Broker

思考

  • 为什么 Kafka 不允许减少分区数
    因为多个broker节点都冗余有分区的数据,减少分区数需要操作多个broker且需要迁移该分区数据到其他分区。offset确认中,还在发过来的路上,结果分区没了。这个分区所有的历史消息数据也要被迁移到别的Broker上,千亿规模太大的话,会造成Broker卡顿,而且这些数据跟目标Broker上的原有数据还要排序重写,否则就破坏了Kafka分区内的消息的有序性

消费组

  • 消费消息

    1
    bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic test --from-beginning
  • 消息回溯

    • Earliest
    1
    bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
    • Latest
    1
    bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
    • 当前位置
    1
    bin/kafka-consumer-groups.sh --bootstrap-server 10.196.39.133:9092 --group test_consumer --reset-offsets --topic test_topic --to-current --execute
    • Specified-Offset
    1
    bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute
    • 多少消息之前
    1
    bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute
    • 指定时间
    1
    bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute
    • 多长时间之前
    1
    bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute
  • 查询所有消费组

    1
    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  • 查询指定消费组状态

    1
    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cainiao11_forecast --describe
  • 查看位点消费组提交offset明细

    1
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic  __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

压测

  • 测试生产者性能
1
bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4
  • 测试消费着性能
1
bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topic