欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 家装 > 【高并发】消息队列(MQ)全解析:原理、主流产品及 Java 实现

【高并发】消息队列(MQ)全解析:原理、主流产品及 Java 实现

2025/3/18 23:04:33 来源:https://blog.csdn.net/Prince140678/article/details/146057663  浏览:    关键词:【高并发】消息队列(MQ)全解析:原理、主流产品及 Java 实现

🚀 消息队列(MQ)全解析:原理、主流产品及 Java 实现

“消息队列(MQ)如何解决高并发、解耦、异步处理等问题?”

在分布式系统中,消息队列(Message Queue, MQ) 作为一种高效的异步通信机制,广泛用于 削峰填谷、解耦、提高系统吞吐量 等场景。

本篇文章带你全面解析 MQ 的 原理、主流产品、快速入门指南,并用 Java 实现一个简单的 MQ,助你深入理解消息队列!🚀


🎯 1. MQ 消息队列的核心概念

🛠️ MQ 解决的问题

1️⃣ 解耦:让系统的不同模块通过 MQ 进行通信,而不是直接调用。
2️⃣ 异步处理:提高系统响应速度,例如订单系统下单后,异步通知物流系统。
3️⃣ 削峰填谷:应对高并发流量,避免系统瞬间过载,例如秒杀、抢购场景。

🏗️ MQ 的核心组件

  • 生产者(Producer):发送消息的应用。
  • 消息队列(Queue/Topic):存储和传递消息。
  • 消费者(Consumer):接收并处理消息的应用。

📌 消息模型

  • 点对点(P2P):一个消息只能被一个消费者消费(如 RabbitMQ Queue)。
  • 发布/订阅(Pub/Sub):多个消费者都能收到消息(如 Kafka Topic)。

🌍 2. 主流 MQ 产品对比

MQ 产品模型适用场景主要特点
RabbitMQ队列(P2P)企业级应用,事务、可靠性要求高基于 AMQP 协议,功能丰富,消息确认机制强大
Kafka发布订阅(Pub/Sub)日志、数据流处理,吞吐量大高吞吐,支持分区,持久化能力强
RocketMQ发布订阅分布式系统、大数据阿里开源,支持事务消息
ActiveMQ队列+发布订阅传统企业系统兼容 JMS,适用于 Java 项目
Pulsar发布订阅云原生、大数据Apache 开源,分层存储,高扩展性

⚡ 3. 快速入门:RabbitMQ 和 Kafka

✅ RabbitMQ 快速入门

1️⃣ 安装 RabbitMQ(Docker 启动)

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

2️⃣ Java 代码示例:发送消息

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare("hello", false, false, false, null);String message = "Hello, MQ!";channel.basicPublish("", "hello", null, message.getBytes());
}

3️⃣ Java 代码示例:接收消息

Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received: " + message);
};
channel.basicConsume("hello", true, deliverCallback, consumerTag -> {});

✅ Kafka 快速入门

1️⃣ 安装 Kafka(Docker 启动)

docker-compose up -d

2️⃣ 创建 Topic

kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

3️⃣ Java 代码示例:生产者

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("test", "key", "Hello Kafka!"));

4️⃣ Java 代码示例:消费者

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println("Received: " + record.value());}
}

🏗️ 4. 自己用 Java 实现一个简单的 MQ

📌 目标: 实现一个 基于内存队列的简单 MQ,支持 生产者发送消息,消费者消费消息

📝 Step 1:定义消息队列

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class SimpleQueue {private BlockingQueue<String> queue = new LinkedBlockingQueue<>();public void send(String message) {queue.offer(message);}public String receive() {try {return queue.take(); // 阻塞等待消息} catch (InterruptedException e) {Thread.currentThread().interrupt();return null;}}
}

📝 Step 2:生产者

public class Producer implements Runnable {private SimpleQueue queue;public Producer(SimpleQueue queue) {this.queue = queue;}@Overridepublic void run() {for (int i = 0; i < 5; i++) {String message = "Message " + i;queue.send(message);System.out.println("Produced: " + message);try { Thread.sleep(1000); } catch (InterruptedException e) { }}}
}

📝 Step 3:消费者

public class Consumer implements Runnable {private SimpleQueue queue;public Consumer(SimpleQueue queue) {this.queue = queue;}@Overridepublic void run() {while (true) {String message = queue.receive();System.out.println("Consumed: " + message);}}
}

📝 Step 4:运行测试

public class Main {public static void main(String[] args) {SimpleQueue queue = new SimpleQueue();new Thread(new Producer(queue)).start();new Thread(new Consumer(queue)).start();}
}

运行效果

Produced: Message 0
Consumed: Message 0
Produced: Message 1
Consumed: Message 1
...

🎉 总结

📌 MQ 核心要点:
✔️ 适用于 解耦、异步处理、削峰填谷
✔️ RabbitMQ 适合事务、可靠性要求高的业务
✔️ Kafka 适合高吞吐、大数据流处理
✔️ 自己实现 MQ 需掌握 多线程、队列、分布式存储

🚀 如果你觉得有帮助,欢迎点赞 + 关注,持续更新更多分布式架构干货! 💡

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词