欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 八卦 > 【Kafka 实战】Kafka 如何保证消息的顺序性?

【Kafka 实战】Kafka 如何保证消息的顺序性?

2024/11/30 14:48:59 来源:https://blog.csdn.net/qq_37967783/article/details/143931949  浏览:    关键词:【Kafka 实战】Kafka 如何保证消息的顺序性?

👉博主介绍: 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO 专家博主

⛪️ 个人社区:个人社区
💞 个人主页:个人主页
🙉 专栏地址: ✅ Java 中级
🙉八股文专题:剑指大厂,手撕 Java 八股文

在这里插入图片描述

文章目录

      • 1. Kafka 如何保证消息的顺序性?
        • 1.1 分区(Partition)
        • 1.2 生产者
        • 1.3 消费者
        • 1.4 配置参数

1. Kafka 如何保证消息的顺序性?

Apache Kafka 是一个高吞吐量的分布式消息系统,广泛用于构建实时数据流处理平台。Kafka 在设计上考虑了消息的顺序性,通过多种机制确保消息在特定条件下按顺序处理。以下是 Kafka 保证消息顺序性的主要机制:

1.1 分区(Partition)

Kafka 将主题(Topic)划分为多个分区(Partition),每个分区是一个有序的、不可变的消息序列。分区是 Kafka 中消息顺序性的基本单位。

  • 单个分区:在一个分区内部,消息是严格有序的。生产者发送的消息会按照发送顺序追加到分区的末尾,消费者从分区中读取消息时也是按顺序读取的。
  • 多个分区:如果一个主题有多个分区,那么消息的全局顺序性无法保证。但是,可以确保每个分区内部的消息是有序的。
1.2 生产者

Kafka 生产者(Producer)通过以下方式确保消息的顺序性:

  • 分区键(Partition Key):生产者可以为每条消息指定一个分区键。Kafka 会根据分区键将消息路由到特定的分区。如果多条消息具有相同的分区键,它们会被路由到同一个分区,从而保证这些消息在该分区内的顺序性。
  • 幂等生产者:Kafka 2.0 引入了幂等生产者(Idempotent Producer),确保每条消息在分区中最多只出现一次,避免重复消息的问题。
  • 事务性生产者:Kafka 2.0 还引入了事务性生产者(Transactional Producer),允许生产者在事务中发送多条消息,确保这些消息要么全部成功写入,要么全部失败。
1.3 消费者

Kafka 消费者(Consumer)通过以下方式确保消息的顺序性:

  • 单个分区:如果一个消费者组中的消费者只消费一个分区的消息,那么消息的顺序性是可以保证的。
  • 多个分区:如果一个消费者组中的消费者消费多个分区的消息,那么全局的顺序性无法保证。但是,每个分区内部的消息仍然是有序的。
1.4 配置参数

Kafka 提供了一些配置参数,可以帮助确保消息的顺序性:

  • max.in.flight.requests.per.connection:控制生产者在收到确认之前可以发送的最大请求数。设置为 1 可以确保消息的顺序性,但会降低吞吐量。
  • enable.idempotence:启用幂等生产者,确保每条消息在分区中最多只出现一次。
  • transactional.id:启用事务性生产者,确保多条消息的原子性。

以下是一个简单的 Java 示例,展示了如何使用分区键和幂等生产者来确保消息的顺序性。

生产者配置

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class OrderlyProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("max.in.flight.requests.per.connection", 1); // 确保消息顺序性props.put("enable.idempotence", true); // 启用幂等生产者Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {String key = "key-" + (i % 3); // 使用分区键String value = "message-" + i;ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", key, value);producer.send(record);}producer.close();}
}

消费者配置

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class OrderlyConsumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "my-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("my-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}

Kafka 通过分区、生产者配置和消费者配置等多种机制确保消息的顺序性。通过合理使用分区键、幂等生产者和事务性生产者,可以确保在特定条件下消息的顺序性。

精彩专栏推荐订阅:在下方专栏👇🏻
✅ 2023年华为OD机试真题(A卷&B卷)+ 面试指导
✅ 精选100套 Java 项目案例
✅ 面试需要避开的坑(活动)
✅ 你找不到的核心代码
✅ 带你手撕 Spring
✅ Java 初阶

在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com