欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 建筑 > 【kafka系列】生产者

【kafka系列】生产者

2025/2/20 5:40:08 来源:https://blog.csdn.net/qq_31815507/article/details/145648690  浏览:    关键词:【kafka系列】生产者

目录

发送流程

1. 流程逻辑分析

阶段一:主线程处理

阶段二:Sender 线程异步发送

核心设计思想

2. 流程

关键点总结

重要参数

一、核心必填参数

二、可靠性相关参数

三、性能优化参数

四、高级配置

五、安全性配置(可选)

六、错误处理与监控

典型配置示例

关键注意事项


发送流程

  • 序列化与分区:消息通过Partitioner选择目标分区(默认轮询或哈希),序列化后加入RecordAccumulator缓冲区。
  • 批次合并Sender线程将同一分区的消息合并为ProducerBatch,减少网络请求(源码见Sender.run()方法)。
  • 发送至Broker:通过NetworkClient异步发送,Broker的LogAppendTime处理写入请求。
  • ACK机制:根据acks配置(0/1/all)等待Broker确认,通过Metadata类更新分区元数据

1. 流程逻辑分析

Kafka 生产者发送消息的核心流程分为 主线程处理Sender 线程异步发送 两个阶段,具体步骤如下:


阶段一:主线程处理
  1. 创建 ProducerRecord
    • 用户调用 producer.send(ProducerRecord),指定 Topic、Key、Value 和可选的分区或时间戳。
  1. 选择分区(Partition)
    • 若未指定分区,根据以下规则选择:
      • 有 Key:对 Key 哈希取模(hash(key) % 分区数),确保相同 Key 的消息进入同一分区。
      • 无 Key:默认使用粘性分区策略(Sticky Partitioning,Kafka 2.4+),在批次填满或超时前发送到同一分区,提升性能。
  1. 序列化(Serialize)
    • 使用配置的 key.serializervalue.serializer 对 Key 和 Value 序列化(如 StringSerializerByteArraySerializer)。
  1. 追加到缓冲区(RecordAccumulator)
    • 将消息按 Topic-Partition 分组,存入 RecordAccumulator 的批次(Batch)中。
    • 批次策略
      • batch.size:批次大小阈值(默认 16KB),达到阈值立即发送。
      • linger.ms:批次等待时间(默认 0ms),超时后发送未满批次。

阶段二:Sender 线程异步发送
  1. Sender 线程拉取批次
    • Sender 线程定期检查缓冲区,将满足条件的批次(已满或超时)封装为 ProducerRequest
  1. 构建请求并发送到 Broker
    • 根据分区的 Leader 副本所在 Broker,将请求发送到对应的节点。
    • 关键配置
      • acks:控制消息持久化确认级别:
        • 0:不等待确认(可能丢失数据)。
        • 1:等待 Leader 确认(默认)。
        • all:等待所有 ISR 副本确认(最高可靠性)。
      • max.in.flight.requests.per.connection:控制单个 Broker 的未确认请求数(默认 5)。
  1. 处理 Broker 响应
    • 成功:触发用户设置的 Callback 回调,并释放批次内存。
    • 失败
      • 可重试错误(如网络抖动、Leader 切换):根据 retries(默认 0)和 retry.backoff.ms(默认 100ms)重试。
      • 不可重试错误(如消息过大):直接触发回调并抛出异常。

核心设计思想
  • 异步批处理:通过缓冲区合并小消息,减少网络 I/O 次数。
  • 零拷贝优化:使用 sendfile 系统调用提升网络传输效率。
  • 高可靠性:通过重试机制和 acks=all 确保消息不丢失。

2. 流程


关键点总结

  1. 分区选择:优先使用 Key 哈希或粘性分区策略,保证消息顺序性和吞吐量。
  2. 批次优化:通过 batch.sizelinger.ms 平衡延迟与吞吐。
  3. 可靠性保障:通过 acksretries 配置确保消息持久化。
  4. 异步处理:主线程与 Sender 线程解耦,避免阻塞用户逻辑。

重要参数

以下是 Kafka 生产者(Producer)在日常开发中的 常见配置参数 及其作用,按功能分类整理成表格:


一、核心必填参数

参数名

默认值

说明

bootstrap.servers

Kafka 集群地址列表(逗号分隔,如 host1:9092,host2:9092

)。

key.serializer

Key 的序列化类(如 org.apache.kafka.common.serialization.StringSerializer

)。

value.serializer

Value 的序列化类(同上)。


二、可靠性相关参数

参数名

默认值

说明

acks

1

消息持久化确认机制:

0:不等待确认(可能丢失数据)。 1:等待 Leader 确认(默认)。all:等待所有 ISR 副本确认(最高可靠性)。

retries

0

发送失败后的重试次数(建议设为 Integer.MAX_VALUE

配合 delivery.timeout.ms

)。

enable.idempotence

false

是否启用幂等性(true时保证消息不重复,需配合 acks=all

retries>0)。

max.in.flight.requests.per.connection

5

单个 Broker 的未确认请求数。若启用幂等性,建议设为 1

以保证顺序。


三、性能优化参数

参数名

默认值

说明

linger.ms

0

消息在缓冲区等待时间(毫秒),增大可提升吞吐量(但增加延迟)。

batch.size

16384

(16KB)

单个批次的大小阈值,达到阈值后立即发送。

buffer.memory

33554432

(32MB)

生产者缓冲区的总内存大小。

compression.type

none

消息压缩算法(gzip

snappy

lz4

zstd

),减少网络带宽占用。


四、高级配置

参数名

默认值

说明

request.timeout.ms

30000

(30秒)

生产者等待 Broker 响应的超时时间。

max.block.ms

60000

(60秒)

生产者缓冲区满或元数据不可用时的阻塞时间(超时抛异常)。

partitioner.class

默认轮询/哈希策略

自定义分区策略(实现 Partitioner

接口)。


五、安全性配置(可选)

参数名

默认值

说明

security.protocol

PLAINTEXT

安全协议(如 SSL

SASL_SSL

)。

ssl.keystore.location

SSL 证书路径(客户端认证时需配置)。

sasl.mechanism

SASL 认证机制(如 PLAIN

SCRAM-SHA-256

)。


六、错误处理与监控

参数名

默认值

说明

interceptor.classes

生产者拦截器(实现 ProducerInterceptor

接口),用于监控或修改消息。

metrics.sample.window.ms

30000

(30秒)

性能指标采样窗口时间。


典型配置示例

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 10);
props.put("linger.ms", 20);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");
props.put("enable.idempotence", "true");

关键注意事项

  1. 可靠性 vs 性能
    • acks=allenable.idempotence=true 提高可靠性,但可能降低吞吐量。
    • 增大 batch.sizelinger.ms 可提升吞吐量,但增加延迟。
  1. 幂等性限制
    • 需 Kafka 0.11+ 版本支持,且 max.in.flight.requests=1(或 Kafka 2.0+ 允许 5)。
  1. 监控与调优
    • 通过 metrics 和拦截器监控生产者性能,动态调整参数

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词