欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > Kafka 如何保证消息不丢失

Kafka 如何保证消息不丢失

2025/3/20 2:36:03 来源:https://blog.csdn.net/qq_24396737/article/details/146256675  浏览:    关键词:Kafka 如何保证消息不丢失

Apache Kafka 是一个分布式流处理平台,以其高吞吐量、可扩展性和持久性而闻名。为了确保消息在传输过程中的安全性,Kafka 从生产者(Producer)、Broker(服务器)和消费者(Consumer)三个层面采取了一系列措施来防止消息丢失。本文将详细介绍这些关键配置及其背后的原理。

生产者端(Producer)

关键配置

  • acks=all:确保消息被所有 In-Sync Replicas (ISR) 副本写入。
  • retries=Integer.MAX_VALUE:遇到临时故障时重试,避免消息丢失。
  • enable.idempotence=true:开启幂等性,防止因重试导致的消息重复。
  • max.in.flight.requests.per.connection=1:确保消息按顺序发送,避免乱序。

配置文件 (producer.properties)

acks=all
retries=2147483647
enable.idempotence=true
max.in.flight.requests.per.connection=1
request.timeout.ms=60000
delivery.timeout.ms=120000

示例代码:可靠的Kafka生产者实现

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class ReliableProducer {public static void main(String[] args) {// 配置生产者属性Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有 ISR 副本确认props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 开启幂等性props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); // 避免乱序props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("reliable_topic", "key" + i, "value" + i);producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.println("Message sent to partition " + metadata.partition() + " at offset " + metadata.offset());} else {System.err.println("Message send failed: " + exception.getMessage());}});}// 关闭生产者producer.close();}
}

Broker 端(服务器)

关键配置

  • replication.factor=3:设置副本数量为3,确保数据不会因为单点故障丢失。
  • min.insync.replicas=2:至少2个副本同步,保证消息写入安全。
  • log.flush.interval.messages=1log.flush.interval.ms=1000:控制日志刷盘频率,避免崩溃导致的数据丢失。
  • unclean.leader.election.enable=false:禁止非ISR副本选举,避免数据丢失。

配置文件 (server.properties)

default.replication.factor=3
min.insync.replicas=2
log.flush.interval.messages=1
log.flush.interval.ms=1000
unclean.leader.election.enable=false

消费者端(Consumer)

关键配置

  • enable.auto.commit=false:关闭自动提交Offset,改为手动提交,确保消费成功后才提交,防止消息丢失。
  • auto.offset.reset=earliest:确保消费者从头读取未消费的消息。

配置文件 (consumer.properties)

enable.auto.commit=false
auto.offset.reset=earliest

示例代码:可靠的Kafka消费者实现

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class ReliableConsumer {public static void main(String[] args) {// 配置消费者属性Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "reliable_group");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 关闭自动提交 Offsetprops.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 重新消费未处理消息props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("reliable_topic"));// 轮询消息并手动提交 Offsettry {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println("Received message: " + record.value() + ", offset: " + record.offset());// 处理成功后手动提交 Offsetconsumer.commitSync();}}} finally {consumer.close();}}
}

事务支持(Exactly-Once 语义)

Kafka 2.0+ 版本引入了事务API,确保生产和消费操作的原子性,即 Exactly-Once 语义。

生产者端(带事务)

props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-id-123"); // 事务 ID
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();try {producer.beginTransaction();producer.send(new ProducerRecord<>("reliable_topic", "key", "value"));producer.commitTransaction(); // 提交事务
} catch (Exception e) {producer.abortTransaction(); // 回滚事务
}

消费者端(带事务)

props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // 只消费已提交的事务数据

通过上述配置和代码示例,您可以有效地利用 Kafka 的特性来保证消息传输的安全性和可靠性。希望这篇文章能帮助您更好地理解和应用 Kafka 中的消息持久化机制。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词