夯实 kafka 系列|第四章:实战-亿级消息处理
文章目录
- 夯实 kafka 系列|第四章:实战-亿级消息处理
- 1.前言
- 2.服务端
- 2.1 Zookeeper 集群配置
- 2.2 Brokers 集群配置
- 3.生产者
- 3.1 Topic 设计
- 3.1.1 分区数量
- 3.1.2 副本数量
- 3.1.3 消息留存策略
- 3.1.4 cleanup.policy
- 3.1.4.1 delete(默认)
- 3.1.4.2. compact
- 3.1.4.3 delete,compact(组合策略)
- 3.1.5 compression.type=producer
- 3.1.5.1 工作原理
- 3.1.5.2 优点
- 3.1.5.3 缺点
- 4.消费者
- 4.1 集群消费者数量
- 4.2 参数配置
- 4.2.1. `fetch-min-size=1024000`(对应 `fetch.min.bytes`)
- 4.2.2. `fetch-max-wait=300`(对应 `fetch.max.wait.ms`)
- 4.2.3. `max-poll-records=1000`
- 4.2.4. `enable-auto-commit=false` 与 `ack-mode=manual`
- 4.2.5. `auto-offset-reset=latest`
- 4.2.6. `listener.concurrency=6`(`重点`)
- 4.3 代码层面(`重点`)
- 4.3.1 缓存(本地缓存)
- 4.3.2 禁止查询数据库
- 4.3.3 远程调用
- 4.3.4 分布式设计
- 5.监控
- 5.1 kafka-ui(相对简单)
- 5.2 Kafka-exporter+Prometheus+Grafana
- 5.2.1 Kafka-Exporter:数据采集与指标暴露
- 5.2.2 Prometheus:监控数据存储与告警
- 5.2.3 Grafana:数据可视化与告警通知
- 5.2.4 三者协作流程
- 5.2.5 总结对比
1.前言
亿级 kafka 消息 该如何处理,需要注意的点有哪些,本文将和大家一起讨论。
分为以下四部分进行讨论讲解
- kafka 服务端
- 集群规模、参数调优与资源隔离
- 生产者
- Topic 设计、压缩策略与流量控制
- 消费者
- 并行度控制、防积压策略与容错机制
- 监控
- 指标采集、可视化与告警联动
2.服务端
2.1 Zookeeper 集群配置
- 集群规模:至少 3 节点(奇数节点),避免单点故障,建议与 Kafka Broker 物理隔离,部署在不同的服务器。
- 磁盘隔离:单独挂载 NVMe SSD(如 1TB),
dataLogDir
与dataDir
分离,避免 ZK 事务日志与快照 IO 竞争
2.2 Brokers 集群配置
-
节点规模:至少 3 Broker,根据吞吐量动态扩展(例如每台 Broker 承载 50MB/s 写入)。
-
相关参数-具体参数含义参考之前的文章,这里就不赘述了
- 夯实 kafka 系列|第二章:kafka 常用参数配置
- 实际项目中运行的 kafka docker 镜像中配置的环境变量参数,其中实际服务器的 HostName 换成了 127.0.0.1
- 以下这些环境变量(大写),在 docker 启动的过程中,会配置到 kafka 对应的参数中进行启动
ALLOW_PLAINTEXT_LISTENER='yes'
KAFKA_CFG_LISTENERS=PLAINTEXT://:23310
KAFKA_CFG_ADVERTISED_HOST_NAME=127.0.0.1
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:16712
KAFKA_CFG_ZOOKEEPER_CONNECT=127.0.0.1:23010,127.0.0.2:23010,127.0.0.3:23010/kafka
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR='1'
KAFKA_BROKER_ID=1
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE='false'
KAFKA_CFG_LOG_CLEANUP_POLICY='compact'
KAFKA_CFG_LOG_CLEANER_MIN_COMPACTION_LAG_MS='604800000'
KAFKA_CFG_NUM_IO_THREADS='6'
KAFKA_CFG_NUM_NETWORK_THREADS='20'
KAFKA_CFG_NUM_RECOVERY_THREADS_PER_DATA_DIR='3'
KAFKA_HEAP_OPTS='-Xms6144m -Xmx6144m -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80'
JMX_PORT='23311'
KAFKA_JMX_OPTS='-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=23311'
PORT_MAPPING=16712:23310
3.生产者
3.1 Topic 设计
3.1.1 分区数量
- 计算公式:
分区数 = 峰值吞吐量(MB/s) / 单分区吞吐(约 10MB/s)
- 示例:1 亿/天 ≈ 1157 条/秒,若每条 1KB,需
1157 * 1KB/10MB ≈ 12 分区
。 - 如图:设置的10个分区,根据实际情况进行调整
- 分区后期可以扩容
- 同时分区数量也决定了,消费者的数量,后面会提到
3.1.2 副本数量
- 生产环境建议
replication.factor=3
,保障数据高可用。
3.1.3 消息留存策略
- 时间策略:
retention.ms=604800000
(保留 7 天)。 - 大小策略:
retention.bytes=1073741824
(单个分区最大 1GB)。 - 压缩策略:对日志类数据启用
compression.type=snappy
,减少磁盘占用。 - 清理策略:
cleanup.policy=compact
(压缩日志)- 这里也可以选择 delete 策略,表示7天之后删除旧数据
3.1.4 cleanup.policy
3.1.4.1 delete(默认)
- 作用:根据时间或大小删除旧数据(默认策略)。
- 适用场景:
- 日志类数据(如应用日志、审计日志),无需长期保留。
- 实时流处理场景(如订单流水),只需保留最近一段时间的数据。
- 需要严格控制磁盘占用的 Topic。
3.1.4.2. compact
- 作用:压缩日志,保留每个 Key 的最新 Value(需结合消息 Key 使用)。
- 适用场景:
- 状态更新:如用户信息变更、订单状态更新(每个 Key 的最新状态即完整状态)。
- 事件溯源:通过 Key 回溯最新事件(如设备配置更新)。
- 去重场景:避免重复消费相同 Key 的旧数据(如唯一 ID 生成)。
3.1.4.3 delete,compact(组合策略)
- 作用:先压缩日志,再根据时间或大小删除旧数据(需 Kafka ≥ 2.0)。
- 适用场景:
- 需要保留 Key 的最新状态,但同时对历史数据有保留时间限制。
- 例如:用户行为日志需按 Key 压缩,但超过 30 天的数据仍需删除。
3.1.5 compression.type=producer
compression.type=producer
是一种特殊的压缩配置,表示 Kafka Broker 会直接使用生产者发送的压缩格式,而不会对消息进行重新压缩。这种配置在性能和资源利用方面具有显著优势。
3.1.5.1 工作原理
- 生产者发送的消息已经压缩(如
gzip
、snappy
、lz4
或zstd
)。 - Broker 接收到消息后,直接存储压缩后的数据,不会解压或重新压缩。
- 消费者拉取消息时,Broker 直接将压缩数据发送给消费者,由消费者进行解压。
3.1.5.2 优点
- 减少 CPU 开销:Broker 避免了重复压缩和解压操作,节省了计算资源。
- 降低延迟:Broker 直接转发压缩数据,减少了处理时间。
- 节省磁盘和网络带宽:压缩后的消息占用更少的存储和传输资源。
3.1.5.3 缺点
- 依赖生产者的压缩配置:如果生产者未启用压缩,Broker 也不会压缩消息。
- 消费者需支持压缩格式:消费者必须能够解压生产者使用的压缩格式。
4.消费者
4.1 集群消费者数量
- 消费者数 ≤ 分区数(避免资源浪费)。
- 示例:12 分区最多部署 12 消费者实例。
- 因为一个消费者只能消费一个分区
4.2 参数配置
消费者部分关键参数如下:
kafka.config.consumer.fetch-min-size=1024000
kafka.config.consumer.fetch-max-wait=300
kafka.config.consumer.max-poll-records=1000
kafka.config.consumer.enable-auto-commit=false
kafka.config.consumer.auto-offset-reset=latest
kafka.config.listener.concurrency=6
kafka.config.listener.ack-mode=manual
4.2.1. fetch-min-size=1024000
(对应 fetch.min.bytes
)
- 含义:消费者单次从 Broker 拉取数据的最小字节数(默认 1 字节),设置为 1MB 表示 Broker 需累积足够数据才会返回。
- 作用:
- 优化吞吐量:减少网络请求次数,提升整体消费效率
- 适用场景:高吞吐需求(如日志处理、批量数据同步),牺牲实时性换取吞吐提升。
- 注意:若 Broker 数据量长期不足,可能因
fetch-max-wait
超时导致频繁空轮询。
4.2.2. fetch-max-wait=300
(对应 fetch.max.wait.ms
)
- 含义:当
fetch-min-size
未满足时,Broker 等待数据的最大时间(默认 500ms),设置为 300ms。 - 作用:
- 平衡延迟与吞吐:避免长时间等待导致消费延迟过高
- 适用场景:需兼顾实时性和吞吐量的场景(如实时监控、流式计算)。
- 联动效果:与
fetch-min-size
共同控制批量拉取策略,优先满足时间或字节数条件。
4.2.3. max-poll-records=1000
- 含义:单次
poll()
调用返回的最大消息数(默认 500),设置为 1000 条。 - 作用:
- 减少处理开销:单次拉取更多消息,降低
poll()
调用频率。 - 适用场景:批量处理任务(如数据清洗、ETL),需注意内存占用和消息处理耗时。
- 减少处理开销:单次拉取更多消息,降低
- 风险:若消费者处理能力不足,可能导致
max.poll.interval.ms
超时触发 Rebalance
4.2.4. enable-auto-commit=false
与 ack-mode=manual
- 含义:禁用自动提交消费位移(默认
true
),需手动提交。 - 作用:
- 避免消息丢失/重复:确保业务处理完成后提交位移(如数据库事务后提交)
- 适用场景:对数据一致性要求高的业务(如支付订单、核心交易)。
@KafkaListener(topics = "test-topic", containerFactory = "manualAckFactory")
public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) { try { // 业务处理逻辑 processMessage(record); ack.acknowledge(); // 手动提交 } catch (Exception e) { // 异常处理(如重试或记录日志) }
}
4.2.5. auto-offset-reset=latest
- 含义:当无有效位移或位移过期时,从最新消息开始消费(默认
latest
,另一选项earliest
)。 - 作用:
- 专注实时数据:跳过历史数据,仅处理新消息
- 适用场景:实时流处理(如实时告警、动态风控)。
4.2.6. listener.concurrency=6
(重点
)
- 含义:Spring Kafka 监听器的并发线程数(即消费者实例数),设置为 6 线程。
- 作用:
- 提升消费并行度:每个线程分配独立分区,加速消息处理
- 适用场景:Topic 分区数 ≥ 6 的高并发场景(如电商大促流量分发)。
- 注意事项:
- 分区数限制:若分区数 < 6,部分线程闲置(如 3 分区时仅 3 线程有效)
- 资源消耗:线程数过多可能增加 CPU 和内存压力,需结合机器性能调整。
通过消费者监控命令,效果如下图:
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:23310 --describe --group xxx-xxx-msgV3
可以看到有6个消费者,对应的6个分区,进行消费。
4.3 代码层面(重点
)
消费者如何处理保证消息不积压?
上面说了那么多配置参数,其实最重要的还是消费者处理的效率;只要效率高,一切都不是问题,反之,如果消费的慢,不论怎么调整参数配置,消费还是会积压。
遵循这个原则
- 整个消费链路上不要有耗时的操作。
4.3.1 缓存(本地缓存)
所有查询数据库,第三方,其他微服务的操作,全部修改为本地缓存
- 比如 kafka 返回消息里面有一个部门 ID,我们需要查询部门的名称
- 我们可以将整个部门存储到 Map 缓存中
简单代码实现如下:
@Component
@Slf4j
public class DeptCache {// 部门缓存private final Map<String, Dept> DeptMap = new ConcurrentHashMap<>();// 具体实现可以是 数据库,第三方,其他微服务feignClientprivate final DeptClient DeptClient;public DeptCache(DeptClient DeptClient) {this.DeptClient = DeptClient;}@PostConstructpublic void init() {refreshCache();}// 每分钟刷新缓存@Scheduled(fixedRate = 60000)public void refreshCache() {Response<List<Dept>> response = DeptClient.all();log.info("Dept search all res is success:{}", response.isSuccess());if (response.isSuccess()) {List<Dept> depts = response.getData();if (Depts != null) {depts.forEach(dept -> {if (dept.getId() != null) {deptMap.put(dept.getId(), dept);}});}log.info("dept cache refreshed, cache size:{}", deptMap.size());}}
4.3.2 禁止查询数据库
查询数据库即使只需要1秒返回,但是数据量大了之后,耗时会成指数级增长
- 1*1亿=27,778 小时,大约3年才能消费完
4.3.3 远程调用
-
远程接口调用
- Restful 接口调用
- FeignClient 接口调用
-
不要使用 redis 缓存
- 这也属于远程调用,也会使用到网络传输
4.3.4 分布式设计
单机虽然可以多线程来消费,但是瓶颈依然明显。
-
可以将整个消费的处理链条,抽取为一个单独的微服务。
-
配合分区数来进行部署。
- 假设分区数20个
- 每个实例
listener.concurrency=5
- 消费5个分区
-
那么可以部署4个微服务实例
- 如下图所示:实际情况分区的分配是随机的,但每个消费者对应数量是一定5个分区(如使用 RoundRobin 策略且无动态 Rebalance 情况下)
5.监控
5.1 kafka-ui(相对简单)
可以使用之前文章中提到的 kafka-ui 来监控
- 夯实 kafka 系列|第三章:kafka 常用监控工具
5.2 Kafka-exporter+Prometheus+Grafana
5.2.1 Kafka-Exporter:数据采集与指标暴露
核心作用:
- 数据采集:通过 Kafka Admin API 实时采集 Kafka 集群的 Broker、Topic、消费者组、分区等核心指标(如
kafka_consumergroup_lag
消费延迟、kafka_topic_messages_in_per_sec
生产速率)。 - 指标转换:将原始 Kafka 数据(如日志偏移量、分区状态)转换为 Prometheus 兼容的格式(基于 HTTP 的
/metrics
接口)。 - 过滤与聚合:支持按 Topic、消费者组等维度过滤冗余数据,降低监控系统负载。
典型场景:
- 监控消费者组积压量(
kafka_consum ergroup_lag_sum
)以优化消费性能。 - 采集 Broker 存活状态(
kafka_broker_info
)实现集群健康检查。
5.2.2 Prometheus:监控数据存储与告警
核心作用:
- 数据抓取:定时从 Kafka-Exporter 的
/metrics
接口拉取指标数据(基于 Pull 模型)。 - 时序存储:将数据存储为时间序列(Time Series),支持高效查询(如
rate(kafka_topic_messages_in_per_sec[5m])
计算吞吐量趋势)。 - 告警规则:配置阈值触发告警(如消费者延迟超过 5000 条时触发
HighConsumerLag
警报)。
关键能力:
- 动态服务发现:自动识别新增的 Kafka Broker 或 Topic。
- 多维度查询:通过 PromQL 实现跨指标关联分析(如结合 CPU 使用率与消息积压量定位性能瓶颈)。
5.2.3 Grafana:数据可视化与告警通知
核心作用:
- 可视化仪表盘:将 Prometheus 中的时序数据渲染为折线图、柱状图等,直观展示 Kafka 集群状态(如 Topic 分区分布、消费延迟热力图)。
- 动态交互:支持模板变量动态切换监控维度(如按消费者组或 Topic 筛选数据)。
- 告警通知:对接邮件、Slack 等渠道推送告警,并支持在仪表盘中标记异常时间点。
典型应用:
- 导入预置的 Kafka 监控模板(如 Dashboard ID 7589)快速构建监控视图。
- 通过热力图分析分区 Leader 分布不均问题。
5.2.4 三者协作流程
- 数据流:
Kafka-Exporter 采集数据 → Prometheus 定时抓取存储 → Grafana 查询并可视化。 - 告警流:
Prometheus 检测阈值 → Alertmanager 路由告警 → Grafana 展示告警历史并触发通知。
5.2.5 总结对比
组件 | 核心功能 | 关键特性 |
---|---|---|
Kafka-Exporter | 指标采集与格式转换 | 支持过滤、聚合,降低数据冗余 |
Prometheus | 时序存储与告警触发 | 动态服务发现、PromQL 灵活查询 |
Grafana | 可视化与交互分析 | 多数据源支持、模板化仪表盘 |
通过三者协同,可实现对 Kafka 集群的 实时监控→异常告警→根因分析→性能优化 全链路闭环管理。
Grafana 大概效果如下: