欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 明星 > 夯实 kafka 系列|第四章:实战-亿级消息处理

夯实 kafka 系列|第四章:实战-亿级消息处理

2025/3/30 23:48:14 来源:https://blog.csdn.net/xiewenfeng520/article/details/146477323  浏览:    关键词:夯实 kafka 系列|第四章:实战-亿级消息处理

夯实 kafka 系列|第四章:实战-亿级消息处理

文章目录

  • 夯实 kafka 系列|第四章:实战-亿级消息处理
    • 1.前言
    • 2.服务端
      • 2.1 Zookeeper 集群配置
      • 2.2 Brokers 集群配置
    • 3.生产者
      • 3.1 Topic 设计
        • 3.1.1 分区数量
        • 3.1.2 副本数量
        • 3.1.3 消息留存策略
        • 3.1.4 cleanup.policy
          • 3.1.4.1 delete(默认)
          • 3.1.4.2. compact
          • 3.1.4.3 delete,compact(组合策略)
        • 3.1.5 compression.type=producer
          • 3.1.5.1 工作原理
          • 3.1.5.2 优点
          • 3.1.5.3 缺点
    • 4.消费者
      • 4.1 集群消费者数量
      • 4.2 参数配置
        • 4.2.1. `fetch-min-size=1024000`(对应 `fetch.min.bytes`)
        • 4.2.2. `fetch-max-wait=300`(对应 `fetch.max.wait.ms`)
        • 4.2.3. `max-poll-records=1000`
        • 4.2.4. `enable-auto-commit=false` 与 `ack-mode=manual`
        • 4.2.5. `auto-offset-reset=latest`
        • 4.2.6. `listener.concurrency=6`(`重点`)
      • 4.3 代码层面(`重点`)
        • 4.3.1 缓存(本地缓存)
        • 4.3.2 禁止查询数据库
        • 4.3.3 远程调用
        • 4.3.4 分布式设计
    • 5.监控
      • 5.1 kafka-ui(相对简单)
      • 5.2 Kafka-exporter+Prometheus+Grafana
        • 5.2.1 Kafka-Exporter:数据采集与指标暴露
        • 5.2.2 Prometheus:监控数据存储与告警
        • 5.2.3 Grafana:数据可视化与告警通知
        • 5.2.4 三者协作流程
        • 5.2.5 总结对比

1.前言

亿级 kafka 消息 该如何处理,需要注意的点有哪些,本文将和大家一起讨论。

分为以下四部分进行讨论讲解

  • kafka 服务端
    • 集群规模、参数调优与资源隔离
  • 生产者
    • Topic 设计、压缩策略与流量控制
  • 消费者
    • 并行度控制、防积压策略与容错机制
  • 监控
    • 指标采集、可视化与告警联动

2.服务端

2.1 Zookeeper 集群配置

  • 集群规模:至少 3 节点(奇数节点),避免单点故障,建议与 Kafka Broker 物理隔离,部署在不同的服务器。
  • 磁盘隔离:单独挂载 NVMe SSD(如 1TB),dataLogDirdataDir分离,避免 ZK 事务日志与快照 IO 竞争

2.2 Brokers 集群配置

  • 节点规模:至少 3 Broker,根据吞吐量动态扩展(例如每台 Broker 承载 50MB/s 写入)。

  • 相关参数-具体参数含义参考之前的文章,这里就不赘述了

    • 夯实 kafka 系列|第二章:kafka 常用参数配置
    • 实际项目中运行的 kafka docker 镜像中配置的环境变量参数,其中实际服务器的 HostName 换成了 127.0.0.1
    • 以下这些环境变量(大写),在 docker 启动的过程中,会配置到 kafka 对应的参数中进行启动
ALLOW_PLAINTEXT_LISTENER='yes'
KAFKA_CFG_LISTENERS=PLAINTEXT://:23310
KAFKA_CFG_ADVERTISED_HOST_NAME=127.0.0.1
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:16712
KAFKA_CFG_ZOOKEEPER_CONNECT=127.0.0.1:23010,127.0.0.2:23010,127.0.0.3:23010/kafka
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR='1'
KAFKA_BROKER_ID=1
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE='false'
KAFKA_CFG_LOG_CLEANUP_POLICY='compact'
KAFKA_CFG_LOG_CLEANER_MIN_COMPACTION_LAG_MS='604800000'
KAFKA_CFG_NUM_IO_THREADS='6'
KAFKA_CFG_NUM_NETWORK_THREADS='20'
KAFKA_CFG_NUM_RECOVERY_THREADS_PER_DATA_DIR='3'
KAFKA_HEAP_OPTS='-Xms6144m -Xmx6144m -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80'
JMX_PORT='23311'
KAFKA_JMX_OPTS='-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=23311'
PORT_MAPPING=16712:23310

3.生产者

3.1 Topic 设计

3.1.1 分区数量
  • 计算公式:分区数 = 峰值吞吐量(MB/s) / 单分区吞吐(约 10MB/s)
  • 示例:1 亿/天 ≈ 1157 条/秒,若每条 1KB,需 1157 * 1KB/10MB ≈ 12 分区
  • 如图:设置的10个分区,根据实际情况进行调整
    • 分区后期可以扩容
  • 同时分区数量也决定了,消费者的数量,后面会提到

请添加图片描述

3.1.2 副本数量
  • 生产环境建议 replication.factor=3,保障数据高可用。
3.1.3 消息留存策略
  • 时间策略retention.ms=604800000(保留 7 天)。
  • 大小策略retention.bytes=1073741824(单个分区最大 1GB)。
  • 压缩策略:对日志类数据启用 compression.type=snappy,减少磁盘占用。
  • 清理策略cleanup.policy=compact(压缩日志)
    • 这里也可以选择 delete 策略,表示7天之后删除旧数据

请添加图片描述

3.1.4 cleanup.policy
3.1.4.1 delete(默认)
  • 作用:根据时间或大小删除旧数据(默认策略)。
  • 适用场景:
    • 日志类数据(如应用日志、审计日志),无需长期保留。
    • 实时流处理场景(如订单流水),只需保留最近一段时间的数据。
    • 需要严格控制磁盘占用的 Topic。
3.1.4.2. compact
  • 作用:压缩日志,保留每个 Key 的最新 Value(需结合消息 Key 使用)。
  • 适用场景:
    • 状态更新:如用户信息变更、订单状态更新(每个 Key 的最新状态即完整状态)。
    • 事件溯源:通过 Key 回溯最新事件(如设备配置更新)。
    • 去重场景:避免重复消费相同 Key 的旧数据(如唯一 ID 生成)。
3.1.4.3 delete,compact(组合策略)
  • 作用先压缩日志,再根据时间或大小删除旧数据(需 Kafka ≥ 2.0)。
  • 适用场景:
    • 需要保留 Key 的最新状态,但同时对历史数据有保留时间限制。
    • 例如:用户行为日志需按 Key 压缩,但超过 30 天的数据仍需删除。
3.1.5 compression.type=producer
  • compression.type=producer 是一种特殊的压缩配置,表示 Kafka Broker 会直接使用生产者发送的压缩格式,而不会对消息进行重新压缩。这种配置在性能和资源利用方面具有显著优势。
3.1.5.1 工作原理
  • 生产者发送的消息已经压缩(如 gzipsnappylz4zstd)。
  • Broker 接收到消息后,直接存储压缩后的数据,不会解压或重新压缩
  • 消费者拉取消息时,Broker 直接将压缩数据发送给消费者,由消费者进行解压。
3.1.5.2 优点
  • 减少 CPU 开销:Broker 避免了重复压缩和解压操作,节省了计算资源。
  • 降低延迟:Broker 直接转发压缩数据,减少了处理时间。
  • 节省磁盘和网络带宽:压缩后的消息占用更少的存储和传输资源。
3.1.5.3 缺点
  • 依赖生产者的压缩配置:如果生产者未启用压缩,Broker 也不会压缩消息。
  • 消费者需支持压缩格式:消费者必须能够解压生产者使用的压缩格式。

4.消费者

4.1 集群消费者数量

  • 消费者数 ≤ 分区数(避免资源浪费)。
    • 示例:12 分区最多部署 12 消费者实例。
    • 因为一个消费者只能消费一个分区

4.2 参数配置

消费者部分关键参数如下:

kafka.config.consumer.fetch-min-size=1024000
kafka.config.consumer.fetch-max-wait=300
kafka.config.consumer.max-poll-records=1000
kafka.config.consumer.enable-auto-commit=false
kafka.config.consumer.auto-offset-reset=latest
kafka.config.listener.concurrency=6
kafka.config.listener.ack-mode=manual
4.2.1. fetch-min-size=1024000(对应 fetch.min.bytes
  • 含义:消费者单次从 Broker 拉取数据的最小字节数(默认 1 字节),设置为 1MB 表示 Broker 需累积足够数据才会返回。
  • 作用
    • 优化吞吐量:减少网络请求次数,提升整体消费效率
    • 适用场景:高吞吐需求(如日志处理、批量数据同步),牺牲实时性换取吞吐提升。
  • 注意:若 Broker 数据量长期不足,可能因 fetch-max-wait 超时导致频繁空轮询。

4.2.2. fetch-max-wait=300(对应 fetch.max.wait.ms
  • 含义:当 fetch-min-size 未满足时,Broker 等待数据的最大时间(默认 500ms),设置为 300ms
  • 作用
    • 平衡延迟与吞吐:避免长时间等待导致消费延迟过高
    • 适用场景:需兼顾实时性和吞吐量的场景(如实时监控、流式计算)。
  • 联动效果:与 fetch-min-size 共同控制批量拉取策略,优先满足时间或字节数条件。

4.2.3. max-poll-records=1000
  • 含义:单次 poll() 调用返回的最大消息数(默认 500),设置为 1000 条
  • 作用
    • 减少处理开销:单次拉取更多消息,降低 poll() 调用频率。
    • 适用场景:批量处理任务(如数据清洗、ETL),需注意内存占用和消息处理耗时。
  • 风险:若消费者处理能力不足,可能导致 max.poll.interval.ms 超时触发 Rebalance

4.2.4. enable-auto-commit=falseack-mode=manual
  • 含义:禁用自动提交消费位移(默认 true),需手动提交。
  • 作用
    • 避免消息丢失/重复:确保业务处理完成后提交位移(如数据库事务后提交)
    • 适用场景:对数据一致性要求高的业务(如支付订单、核心交易)。
@KafkaListener(topics = "test-topic", containerFactory = "manualAckFactory")  
public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {  try {  // 业务处理逻辑  processMessage(record);  ack.acknowledge(); // 手动提交  } catch (Exception e) {  // 异常处理(如重试或记录日志)  }  
} 

4.2.5. auto-offset-reset=latest
  • 含义:当无有效位移或位移过期时,从最新消息开始消费(默认 latest,另一选项 earliest)。
  • 作用
    • 专注实时数据:跳过历史数据,仅处理新消息
    • 适用场景:实时流处理(如实时告警、动态风控)。

4.2.6. listener.concurrency=6重点
  • 含义:Spring Kafka 监听器的并发线程数(即消费者实例数),设置为 6 线程
  • 作用
    • 提升消费并行度:每个线程分配独立分区,加速消息处理
    • 适用场景:Topic 分区数 ≥ 6 的高并发场景(如电商大促流量分发)。
  • 注意事项
    • 分区数限制:若分区数 < 6,部分线程闲置(如 3 分区时仅 3 线程有效)
    • 资源消耗:线程数过多可能增加 CPU 和内存压力,需结合机器性能调整。

通过消费者监控命令,效果如下图:

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:23310 --describe --group xxx-xxx-msgV3

请添加图片描述

可以看到有6个消费者,对应的6个分区,进行消费。

4.3 代码层面(重点

消费者如何处理保证消息不积压?

上面说了那么多配置参数,其实最重要的还是消费者处理的效率;只要效率高,一切都不是问题,反之,如果消费的慢,不论怎么调整参数配置,消费还是会积压。

遵循这个原则

  • 整个消费链路上不要有耗时的操作。
4.3.1 缓存(本地缓存)

所有查询数据库,第三方,其他微服务的操作,全部修改为本地缓存

  • 比如 kafka 返回消息里面有一个部门 ID,我们需要查询部门的名称
    • 我们可以将整个部门存储到 Map 缓存中

简单代码实现如下:

@Component
@Slf4j
public class DeptCache {// 部门缓存private final Map<String, Dept> DeptMap = new ConcurrentHashMap<>();// 具体实现可以是 数据库,第三方,其他微服务feignClientprivate final DeptClient DeptClient;public DeptCache(DeptClient DeptClient) {this.DeptClient = DeptClient;}@PostConstructpublic void init() {refreshCache();}// 每分钟刷新缓存@Scheduled(fixedRate = 60000)public void refreshCache() {Response<List<Dept>> response = DeptClient.all();log.info("Dept search all res is success:{}", response.isSuccess());if (response.isSuccess()) {List<Dept> depts = response.getData();if (Depts != null) {depts.forEach(dept -> {if (dept.getId() != null) {deptMap.put(dept.getId(), dept);}});}log.info("dept cache refreshed, cache size:{}",  deptMap.size());}}
4.3.2 禁止查询数据库

查询数据库即使只需要1秒返回,但是数据量大了之后,耗时会成指数级增长

  • 1*1亿=27,778 小时,大约3年才能消费完
4.3.3 远程调用
  • 远程接口调用

    • Restful 接口调用
    • FeignClient 接口调用
  • 不要使用 redis 缓存

    • 这也属于远程调用,也会使用到网络传输
4.3.4 分布式设计

单机虽然可以多线程来消费,但是瓶颈依然明显。

  • 可以将整个消费的处理链条,抽取为一个单独的微服务。

  • 配合分区数来进行部署。

    • 假设分区数20个
    • 每个实例 listener.concurrency=5
      • 消费5个分区
  • 那么可以部署4个微服务实例

    • 如下图所示:实际情况分区的分配是随机的,但每个消费者对应数量是一定5个分区(如使用 RoundRobin 策略且无动态 Rebalance 情况下)

在这里插入图片描述

5.监控

5.1 kafka-ui(相对简单)

可以使用之前文章中提到的 kafka-ui 来监控

  • 夯实 kafka 系列|第三章:kafka 常用监控工具

5.2 Kafka-exporter+Prometheus+Grafana

5.2.1 Kafka-Exporter:数据采集与指标暴露

核心作用

  1. 数据采集:通过 Kafka Admin API 实时采集 Kafka 集群的 Broker、Topic、消费者组、分区等核心指标(如 kafka_consumergroup_lag 消费延迟、kafka_topic_messages_in_per_sec 生产速率)。
  2. 指标转换:将原始 Kafka 数据(如日志偏移量、分区状态)转换为 Prometheus 兼容的格式(基于 HTTP 的 /metrics 接口)。
  3. 过滤与聚合:支持按 Topic、消费者组等维度过滤冗余数据,降低监控系统负载。

典型场景

  • 监控消费者组积压量(kafka_consum ergroup_lag_sum)以优化消费性能。
  • 采集 Broker 存活状态(kafka_broker_info)实现集群健康检查。

5.2.2 Prometheus:监控数据存储与告警

核心作用

  1. 数据抓取:定时从 Kafka-Exporter 的 /metrics 接口拉取指标数据(基于 Pull 模型)。
  2. 时序存储:将数据存储为时间序列(Time Series),支持高效查询(如 rate(kafka_topic_messages_in_per_sec[5m]) 计算吞吐量趋势)。
  3. 告警规则:配置阈值触发告警(如消费者延迟超过 5000 条时触发 HighConsumerLag 警报)。

关键能力

  • 动态服务发现:自动识别新增的 Kafka Broker 或 Topic。
  • 多维度查询:通过 PromQL 实现跨指标关联分析(如结合 CPU 使用率与消息积压量定位性能瓶颈)。

5.2.3 Grafana:数据可视化与告警通知

核心作用

  1. 可视化仪表盘:将 Prometheus 中的时序数据渲染为折线图、柱状图等,直观展示 Kafka 集群状态(如 Topic 分区分布、消费延迟热力图)。
  2. 动态交互:支持模板变量动态切换监控维度(如按消费者组或 Topic 筛选数据)。
  3. 告警通知:对接邮件、Slack 等渠道推送告警,并支持在仪表盘中标记异常时间点。

典型应用

  • 导入预置的 Kafka 监控模板(如 Dashboard ID 7589)快速构建监控视图。
  • 通过热力图分析分区 Leader 分布不均问题。

5.2.4 三者协作流程
  1. 数据流
    Kafka-Exporter 采集数据 → Prometheus 定时抓取存储 → Grafana 查询并可视化。
  2. 告警流
    Prometheus 检测阈值 → Alertmanager 路由告警 → Grafana 展示告警历史并触发通知。

5.2.5 总结对比
组件核心功能关键特性
Kafka-Exporter指标采集与格式转换支持过滤、聚合,降低数据冗余
Prometheus时序存储与告警触发动态服务发现、PromQL 灵活查询
Grafana可视化与交互分析多数据源支持、模板化仪表盘

通过三者协同,可实现对 Kafka 集群的 实时监控→异常告警→根因分析→性能优化 全链路闭环管理。

Grafana 大概效果如下:

请添加图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词