欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 社会 > Kafka 性能提升秘籍:涵盖配置、迁移与深度巡检的综合方案

Kafka 性能提升秘籍:涵盖配置、迁移与深度巡检的综合方案

2025/1/3 11:31:32 来源:https://blog.csdn.net/qq_40477248/article/details/144830451  浏览:    关键词:Kafka 性能提升秘籍:涵盖配置、迁移与深度巡检的综合方案

文章目录

  • 1.1.网络和io操作线程配置优化
  • 1.2.log数据文件刷盘策略
  • 1.3.日志保留策略配置
  • 1.4.replica复制配置
  • 1.5.配置jmx服务
  • 1.6.系统I/O参数优化
    • 1.6.1.网络性能优化
    • 1.6.2.常见痛点以及优化方案
    • 1.6.4.优化参数
  • 1.7.版本升级
  • 1.8.数据迁移
    • 1.8.1.同集群broker之间迁移
    • 1.8.2.跨集群迁移
  • 1.9.深度巡检
    • 1.9.1.集群状态检查
    • 1.9.2.查看消费者组
    • 1.9.3.查看消费偏移量

Kafka配置优化其实都是修改server.properties文件中参数值

1.1.网络和io操作线程配置优化

num.network.threads=xxx # broker处理消息的最大线程
num.io.threads=xxx # broker处理磁盘IO的线程数

建议配置:
一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1.
num.io.threads主要进行磁盘io操作,高峰期可能有些io等待,因此配置需要大些。配置线程数量为cpu核数2倍,最大不超过3倍.

1.2.log数据文件刷盘策略

为了大幅度提高producer写入吞吐量,需要定期批量写文件。

建议配置:
每当producer写入10000条消息时,刷数据到磁盘
log.flush.interval.messages=10000
每间隔1秒钟时间,刷数据到磁盘
log.flush.interval.ms=1000

1.3.日志保留策略配置

当kafka server的被写入海量消息后,会生成很多数据文件,且占用大量磁盘空间,如果不及时清理,可能磁盘空间不够用,kafka默认是保留7天。

建议配置:
保留三天,也可以更短
log.retention.hours=72
段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,
kafka启动时是单线程扫描目录(log.dir)下所有数据文件)
log.segment.bytes=1073741824

1.4.replica复制配置

每个follow从leader拉取消息进行同步数据,follow同步性能由这几个参数决定,分别为拉取线程数(num.replica.fetchers)、最小字节数(replica.fetch.min.bytes)、最大字节数(replica.fetch.max.bytes)、最大等待时间(replica.fetch.wait.max.ms)

建议配置:
num.replica.fetchers 配置多可以提高follower的I/O并发度,单位时间内leader持有跟多请求,相应负载会增大,需要根据机器硬件资源做权衡
replica.fetch.min.bytes=1 默认配置为1字节,否则读取消息不及时
replica.fetch.max.bytes= 5 * 1024 * 1024 默认为1MB,这个值太小,5MB为宜,根据业务情况调整
replica.fetch.wait.max.ms follow拉取频率,频率过高,会导致cpu飙升,因为leader无数据同步,leader会积压大量无效请求情况,又因为0.8.2.x版本存在bug,定时器超时检查比较消耗CPU,使用者需要做好权衡

1.5.配置jmx服务

kafka server中默认是不启动jmx端口的,需要用户自己配置

1.6.系统I/O参数优化

vm.dirty_background_ratio是内存可以填充脏页的百分比,当脏页总大小达到这个比例后,系统后台进程就会开始将脏页刷磁盘(vm.dirty_background_bytes类似,只不过是通过字节数来设置)
vm.dirty_ratio是绝对的脏数据限制,内存里的脏数据百分比不能超过这个值。如果脏数据超过这个数量,新的IO请求将会被阻挡,直到脏数据被写进磁盘
vm.dirty_writeback_centisecs指定多长时间做一次脏数据写回操作,单位为百分之一秒
vm.dirty_expire_centisecs指定脏数据能存活的时间,单位为百分之一秒,比如这里设置为30秒,在操作系统进行写回操作时,如果脏数据在内存中超过30秒时,就会被写回磁盘

参考配置如下:

vm.dirty_background_ratio = 10
vm.dirty_background_bytes = 0
vm.dirty_ratio = 20
vm.dirty_bytes = 0
vm.dirty_writeback_centisecs = 500
vm.dirty_expire_centisecs = 3000

1.6.1.网络性能优化

参考配置:
net.core.optmem_max 增大每个套接字的缓冲区大小 参考设置81920
net.core.rmem_max 增大套接字接收缓冲区大小 参考设置513920
net.core.wmem_max 发送缓冲区大小 参考设置513920
net.ipv4.tcp_rmem 增大 TCP 接收缓冲区大小 参考设置4096 87380 16777216
net.ipv4.tcp_wmem 发送缓冲区大小 参考设置4096 65535 16777216
net.ipv4.udp_mem 增大UDP缓冲区范围 参考设置188562 251418 377124
net.ipv4.tcp_max_tw_buckets 增大TIME_WAIT 状态的连接数量 参考设置 1048576
net.netfilter.nf_conntrack_max 连接跟踪表大小 参考设置 1048576
net.ipv4.tcp_fin_timeout 减少连接超时时间 参考设置 15
net.netfilter.nf_conntrack_tcp_timeout_time_wait 缩短连接跟踪表中处于TIME_WAIT状态连接的超时时间 参考设置 30
net.ipv4.tcp_tw_reuse 开启端口复用
net.ipv4.ip_local_port_range 增大本地端口范围 参考设置 10000 65000
fs.nr_open 设置最大文件描述符数 参考设置 1048576
net.ipv4.tcp_max_syn_backlog 增加半连接数最大数量 参考设置 16384
net.ipv4.tcp_syncookies 开启SYN Cookies 参考设置 1
net.ipv4.ip_forward 开启ip转发 参考设置 1

说明:
tcp_rmem 和 tcp_wmem 的三个数值分别是 min,default,max,系统会根据这些设置,自动调整 TCP 接收 / 发送缓冲区的大小
udp_mem 的三个数值分别是 min,pressure,max,系统会根据这些设置,自动调整 UDP 发送缓冲区的大小

1.6.2.常见痛点以及优化方案

1.集群⽊桶效应,broker雪崩
痛点:
当整个集群当leader和follower分布不均衡时,这可能导致流量分布不均衡。⼀部分节点⽐较空闲,⼀部分节点负载过⾼(这⾥当负载主要是磁盘IO与⽹络带宽,CPU基本上不会成为Kafka的瓶颈)。最后导致出现⼤量副本缺失,直⾄broker挂掉后,流量压⼒转移到另外⼀个broker节点,很可能这个节点也会因为负载过⼤⽽被打挂。

优化⽅案:
1) 实现⼀套⾃动负载均衡程序,⾃动⽣成均衡计划,逐个topic进⾏均衡。【推荐】
2)⼿动切换分区leader或者进⾏副本迁移;【效率低,不推荐】

2.集群扩容⽆法⾃动负载均衡
痛点:
当扩容集群时,topic的分区⽣产消费⽆法落在新加⼊的broker节点上。这样相当于已经存在的topic读写数据⽆法落在新broker节点上,从⽽新节点⽆法得到充分利⽤。
优化⽅案:
1)实现⼀套⾃动负载均衡程序,⾃动⽣成均衡计划,逐个topic进⾏均衡。【推荐】
2)⼿动切换分区leader或者进⾏副本迁移;【效率低,不推荐】

3.集群副本迁移影响集群稳定,迁移任务不可控
痛点:
当需要迁移的分区数据量⽐较⼤时(单分区数据量超过100GB以上),这将导致迁移任务启动的新副本会从leader所在的broker节点⼤量拉取历史数据。这可能带来以下问题:
1)broker的磁盘IO被持续打满;
2)操作系统pagecache受到污染,导致⽣产消费延迟;
3)出现⼤量副本缺失;
4)迁移任务⼀旦开启便⽆法停⽌,只有让其执⾏失败或者成果才能结束。

优化⽅案:
1)改造源码,从最新偏移量开始同步数据,实现增量副本迁移;
2)增量迁移,新副本加⼊isr列表的时机可以根据当前分区是否有消费延迟和指定同步时间两个⽅⾯去考虑;
3)修改源码,对迁移任务添加可⼿动终⽌功能;

4.异常流量打挂集群
痛点:
当出现⼊流量当突增或出流量当突增时,可能造成broker节点负载过⼤(经常时磁盘IO被持续打满到100%)。以下情况容易造成流量突增:
1)新业务上线,数据源加⼊了新的⼤业务,⽣产者写⼊流量突增;
2)消费端程序从历史最早位置开始消费,拉去⼤量历史数据;
3)消费端程序停⽌服务⼀段时间后,重启追数;
4)⼤topic在消费端程序有新的消费者组加⼊,出流量突增;
1)根据⽤户维度,对各集群⽤户的出⼊流量进⾏限制;保证单个broker节点上所有⽤户的出⼊流量之和在broker的处理能⼒范围内;

5.⼀个业务异常影响整个集群稳定
痛点:
当某个业务的部分topic异常时,可能会影响到集群上的其他业务。
优化⽅案:
根据不同业务线,以资源组为单位对集群进⾏物理隔离,让各业务线的topic都是分布在⾃⼰的资源组内。不受其他业务线影响

6.pagecache污染及优化
痛点:
1)⼤量拉去历史数据,导致pagecache污染,造成⽣产消费延迟;

优化⽅案:
1)对pagecache参数进⾏调优;⽂章地址:
2)修改源码,对kafka的cache进⾏改造。⾃定义⼀套cache,专门⽤来做消费cache。保证副本同步和拉取历史数据不会污染最近⽣产的数据。

7.磁盘故障或者坏道,整个broker半死不活
痛点:
1)当整块磁盘故障或出现磁盘坏道情况,整个broker部分分区可以读写,部分分区⽆法读写;broker进程不会主动推出,坏道所影响的分区消费⼀直被卡住,同时造成数据丢失。

优化⽅案:
1)当整块磁盘故障或者坏道时,消费者消费会在服务端抛出⼀些特定关键字符串当异常信息,⽤程序扫描异常信息,检测到后,把对应的磁盘从 log.dirs 中剔除;

8.同⼀个消费者组消费多个topic问题
痛点:
1)当同⼀个消费者组消费多个topic,⽆法回收消费者组下某个topic的读权限,只能对整个组下⾯的topic进⾏读权限回收;
2)消费组内多个topic间经常出现join操作,导致topic分区重新分配,影响整个组下⾯topic消费;

优化⽅案:
1)限定⼀个组只能消费⼀个topic;
1.6.3.ack机制
ack不同设置的区别:

  • acks=1, 默认值1,表示只要分区的leader副本成功写入就算成功;
  • acks=0,生产者不需要等待任何服务端的响应,可能会丢失数据;
  • acks=-1或acks=all,需要全部处于同步状态的副本确认写入成功,可靠性最强,性能也差

不同的ack机制可能产生的问题:
在这里插入图片描述

  • ack为-1时吞吐量吞吐最低,数据最安全,可能发生重复
  • ack为1时吞吐量,安全性最均衡
  • ack为0时吞吐最高,数据安全性最低

ack为-1的重复问题
在这里插入图片描述

1.6.4.优化参数

1)Broker参数配置(server.properties)
1、网络和io操作线程配置优化
broker处理消息的最大线程数(默认为3)
num.network.threads=cpu核数+1
broker处理磁盘IO的线程数
num.io.threads=cpu核数*2

2、log数据文件刷盘策略
每当producer写入10000条消息时,刷数据到磁盘
log.flush.interval.messages=10000
每间隔1秒钟时间,刷数据到磁盘
log.flush.interval.ms=1000

3、日志保留策略配置
保留三天,也可以对个别主题单独设置 (log.cleaner.delete.retention.ms)
log.retention.hours=72

4、Replica相关配置
offsets.topic.replication.factor:3
这个参数指新创建一个topic时,默认的Replica数量,Replica过少会影响数据的可用性,太多则会白白浪费存储资源,一般建议在2~3为宜

2)Producer优化(producer.properties)
buffer.memory:33554432 (32m)
在Producer端用来存放尚未发送出去的Message的缓冲区大小。缓冲区满了之后可以选择阻塞发送或抛出异常,由block.on.buffer.full的配置来决定。

compression.type:none
默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和Broker的存储压力。

3)Consumer优化
num.consumer.fetchers:1
启动Consumer的个数,适当增加可以提高并发度。

fetch.min.bytes:1
每次Fetch Request至少要拿到多少字节的数据才可以返回。

fetch.wait.max.ms:100
在Fetch Request获取的数据至少达到fetch.min.bytes之前,允许等待的最大时长。对应上面说到的Purgatory中请求的超时时间。

4)Kafka内存调整(kafka-server-start.sh)
默认内存1个G,生产环境尽量不要超过6个G。
export KAFKA_HEAP_OPTS=“-Xms4g -Xmx4g”

1.7.版本升级

FromToAction
0.8.x~a.b.xa.b < n.m n.m.01. 编辑server.properties
(1)<0.11.0:inter.broker.protocol.version=…log.messages.format.version=…
(2) >=0.11.0:inter.broker.protocol.version=…
2. 停止服务, 更新代码, 重启服务
3. 升级完毕, 重新编辑server.properties
4. 改为n.m
5. 重启服务
0.8.x~0.11.x1.0.01. 编辑server.properties
(1) <0.11.0:inter.broker.protocol.version=…log.messages.format.version=…
(2) >=0.11.0:inter.broker.protocol.version=0.11.0
log.messages.format.version=0.11.0
2. 停止服务, 更新代码, 重启服务
3. 升级完毕, 重新编辑server.properties inter.broker.protocol.version=1.0
4. 重启服务
0.8.x~0.10.x0.11.0.01. 编辑server.properties
inter.broker.protocol.version=…
log.messages.format.version=…
2. 停止服务, 更新代码, 重启服务
3. 升级完毕, 重新编辑server.properties
inter.broker.protocol.version=0.11.0
4. 重启服务
0.8.x.x0.9.0.01. 编辑server.properties
增加行:inter.broker.protocol.version=0.8.2.X
2.停止服务, 更新代码, 重启服务
3.升级完毕, 重新编辑server.properties
0.8.2.x改为0.9.0.0
4.重启服务
0.8.10.8.2完全兼容
0.8.00.8.1完全兼容
0.70.8.0不兼容

1.8.数据迁移

Kafka两种迁移场景,分别是同集群数据迁移、跨集群数据迁移。

1.8.1.同集群broker之间迁移

在这里插入图片描述
1.8.1.1.应用场景
broker 迁移 主要使用的场景是broker 上线,下线,或者扩容等.基于同一套zookeeper的操作.
实践:
将需要新添加的broker 列表一并添加到kafka的集群中。(启动新的kafka指定同一套zk)
Kafka由之前的三节点,扩容至四节点
在这里插入图片描述
1.8.1.2.数据迁移
查询信息:
(四个分区分布在三台机器上)
在这里插入图片描述

新建json文件:
cat topic-to-move.json
{"topics": [{"topic": "test-topic"}],"version":1}

获取重新分配方案:
kafka-reassign-partitions.sh --zookeeper node1:2181,node2:2181,node3:2181 --topics-to-move-json-file topics-to-move.json --broker-list “150,151,155,159” –generate

通过kafka-reassign-partitions.sh 获取重新分配方案,–broker-lsit 的参数 “150,151,155,159"是指集群中每个broker的id,由于我们是需要将所有topic均匀分配到扩完结点的4台机器上,所以要指定。同理,当业务改变为将原来的所有数据从旧节点(0,5,9)迁移到新节点(1)实现数据平滑迁移,这时的参数应"4”
,执行后会出现以下内容:
在这里插入图片描述
复制新的方案到一个json文件 assignplan.json (文件名不重要,文件格式也不一定要以json为 结尾,只要保证内容是json即可)

 ./kafka-reassign-partitions.sh --zookeeper node1:2181 --reassignment-json-file assignplan.json --execute

在这里插入图片描述
完成后查看topic信息:(四个分区分布在四台机器上)
在这里插入图片描述

1.8.2.跨集群迁移

在这里插入图片描述
1.8.2.1.应用场景
主要用于kafka 集群的变更. 将数据同步等操作. 相当于是实现了双打.等各项数据消费端在零误差的对接好了后,可以停掉就集群

1.8.2.2.数据迁移

  • 方案一:MirrorMaker
    修改kafka配置
    consumer.properties(内容为原始集群信息)
    在这里插入图片描述
#config/consumer.properties 在网上看到有在此配置zookeeper的应该是之前的老版本。kafka_2.11-2.4.1中不需要
bootstrap.servers=kafka-cluster1:9092,kafka-cluster1:9093 # source-cluster的broker list
group.id=test-consumer-group1 # 自定义一个消费者的group id
auto.offset.reset= # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据; earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费; none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

producer.properties(内容为新集群信息)
在这里插入图片描述

#config/producer.properties 在网上看到有在此配置zookeeper的应该是之前的老版本。kafka_2.11-2.4.1中不需要
bootstrap.servers=kafka-cluster2:9092,kafka-cluster2:9093 # destination-cluster的broker list
compression.type=none # 数据压缩方式none, gzip, snappy, lz4, zstd
partitioner.class= # 指定分区程序路径,默认为随机分区
request.timeout.ms= # 请求超时时间
max.block.ms= # KafkaProducer.send and KafkaProducer.partitionsFor 阻塞时间
linger.ms= # 等待指定时间后批量发送
max.request.size= # 发送消息最大字节数
batch.size= # 单次批量处理的字节数
buffer.memory= # 指定等待发送消息的缓冲区大小

执行操作:

./kafka-mirror-maker.sh --consumer.config ../config/consumer.properties --producer.config ../config/producer.properties --whitelist 'test'

说明:
1、–num.streams: 指定流就是指定消费者,所有消费者公用一个生产者。
2、–whitelist: 表明需要同步的白名单,可以使用”|”来连接多个topic,还可以使用正则表达式。可设置黑名单。

  • 方案二:zk迁移
  1. zk迁移就比较简单了,起新节点加入zk集群,稳定后关停旧节点。
  2. 新增broker加入集群,将所有topic分区只分配给新broker,执行分配任务后,kafka将旧broker的分区数据复制到新broker,新broker成为各分区的leader,随后kafka删除旧broker上的分区数据;
  3. 整个过程中客户端应用正常生产消费消息,执行结束后使用新的消费者组从头消费可以获取到全部历史消息。
  4. 停止旧broker后,正在运行的客户端应用正常生产消费消息,新建客户端连接旧broker失败,连接新broker正常

1.9.深度巡检

1.9.1.集群状态检查

1.9.1.1.查看kafka节点的集群角色
使用kafka自带的zookeeper shell工具查看:

[root@db3 bin]# ./zookeeper-shell.sh db3:2184,db3:2185,db3:2186
get /controller
{"version":1,"brokerid":0,"timestamp":"1525847148171"}
#当前kafka集群的中央控制器为broker 0
quit
#退出zookeeper shell

还可以使用zookeeper的zkCli工具查看:

./zkCli.sh -server localhost:2184,localhost:2185,localhost:2186 
get /controller
{"version":1,"brokerid":0,"timestamp":"1531967020325"}
cZxid = 0x1200000046
ctime = Thu Jul 19 10:23:40 CST 2018
mZxid = 0x1200000046
mtime = Thu Jul 19 10:23:40 CST 2018
pZxid = 0x1200000046
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x30355dd23ee0000
dataLength = 54
numChildren = 0
#当前kafka集群的中央控制器为broker 0
quit
#退出zkCli

zookeeper事务日志中各个字段的含义:
cZxid:创建节点的事务id
ctime:创建节点的时间
mZxid:修改节点的事务id
mtime:修改节点的时间
pZxid:子节点列表最后一次修改的事务id。删除或添加子节点,不包含修改子节点的数据
cversion:子节点的版本号,删除或添加子节点,版本号会自增
dataVersion:节点数据版本号,数据写入操作,版本号会递增
aclVersion:节点ACL权限版本,权限写入操作,版本号会递增
ephemeralOwner:临时节点创建时的事务id,如果节点是永久节点,则它的值为0
dataLength:节点数据长度(单位:byte),中文占3个byte
numChildren:子节点数量

1.9.1.2.查看kafka集群中所有topic

./kafka-topics.sh --zookeeper db3:2184,db3:2185,db3:2186 --list –force

在这里插入图片描述
1.9.1.3.查看所有topic的分区和副本信息

./kafka-topics.sh --zookeeper db3:2184,db3:2185,db3:2186 --describe –force

在这里插入图片描述
1.9.1.4.查看test的topic的分区和副本信息

./kafka-topics.sh --zookeeper db3:2184,db3:2185,db3:2186 --describe --force --topic test

返回信息示例:(test队列有3个分区,分别是0、1、2,对应broker0、broker1、broker2,每个分区都有一个broker作为读写分区的leader节点)
Topic:test PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: test Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 0,1,2
Topic: test Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 0,1,2

1.9.1.5.从消费者查看test的topic中的所有消息

kafka-console-consumer.sh --bootstrap-server db3:9092,db3:9093,db3:9094 --from-beginning --topic test
#将输出test队列中的所有消息到控制台,退出请按Ctrl+C

1.9.1.6.查看kafka的LogSegment
一个Partition日志又被划分为多个日志段(LogSegment),日志段是Kafka日志对象分片的最小单位。一个日志段对应磁盘上一个“.log”的日志文件,一个“.index”的消息偏移量索引文件和一个“.timeindex”的消息时间戳索引文件。

查看指定的kafka节点的syslog队列的分区0的日志段:
kafka-run-class.sh kafka.tools.DumpLogSegments --files /opt/kafka_2.12-1.1.0/logs/kafka-logs/syslog-0/00000000000006239341.log --deep-iteration --print-data-log
截取部分数据日志:
offset: 6239505 position: 45001 CreateTime: -1 isvalid: true keysize: -1 valuesize: 240 magic: 2 compresscodec: GZIP producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] payload: {"@timestamp":"2018-05-11T06:01:10.654Z","beat":{"name":"proxy1"},"input_type":"log","message":"May 11 14:01:01 proxy1 systemd: Starting Session 1066 of user root.","offset":21415,"source":"/var/log/messages","tags":["syslog"],"type":"log"}

1.9.2.查看消费者组

消费者组类型有zookeeper和kafka两种,基于kafka类型消费者组是通过kafka中名为__consumer_offsets的topic来记录每个消费者组的OffsetMetadata信息。
相同消费者组中的不同消费者可以对topic中的消息进行负载均衡的消费(单播),不同的消费者组可以同时对队列中的消息进行消费(多播)。

1.9.2.1.查看基于zookeeper管理的消费者组
kafka-consumer-groups.sh --zookeeper db3:2184,db3:2185,db3:2186 --list

在这里插入图片描述

1.9.2.2.查看基于kafka管理的消费者组
kafka-consumer-groups.sh --bootstrap-server db3:9092,db3:9093,db3:9094 –list
在这里插入图片描述
1.9.2.3.查看消费者组的详细信息
消费者组的客户端ID、消费者ID、主机、队列、分区

./kafka-consumer-groups.sh --bootstrap-server db3:9092,db3:9093,db3:9094 --group logstash --describe –verbose

在这里插入图片描述

1.9.3.查看消费偏移量

1.9.3.1.查看指定消费者组的所有topic的partition的消费offset
kafka-consumer-groups.sh --bootstrap-server db3:9092,db3:9093,db3:9094 --describe --group logstash –offsets
在这里插入图片描述
1.9.3.2.查看指定topic的消费偏移量
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list db3:9092,db3:9093,db3:9094 --topic test --time -1
在这里插入图片描述
1.9.3.3.查看__consumer_offsets的消息

查看"__consumer_offsets"主题的分区6的消息:
kafka-simple-consumer-shell.sh --topic __consumer_offsets --broker-list db3:9092,db3:9093,db3:9094 --partition 6 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"截取部分OffsetMetadata消息:
[console-consumer-15932,heartbeat,0]::[OffsetMetadata[1274,NO_METADATA],CommitTime 1524656104986,ExpirationTime 1524742504986]
[console-consumer-94057,__consumer_offsets,26]::[OffsetMetadata[9,NO_METADATA],CommitTime 1525859391291,ExpirationTime 1525945791291]
[logstash,syslog,2]::[OffsetMetadata[5,NO_METADATA],CommitTime 1532407710812,ExpirationTime 1532494110812]
[logstash,syslog,0]::[OffsetMetadata[4,NO_METADATA],CommitTime 1532407710827,ExpirationTime 1532494110827]

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com