-
RabbitMQ 工作队列:
- 默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳
- 采用工作队列模式:
- 在通道中只需要设置 baseicQos 的值即可
- 表示 MQ 服务器每次只会给消费者推送 n 条消息
- 必须手动应答之后才会继续发送
- 在通道中只需要设置 baseicQos 的值即可
- 生产者:
public class ProducerFanout {private static final String QUEUE_NAME = "BoyatopMamber";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// 1.创建新的连接Connection connection = RabbitMQConnection.getConnection();// 2.设置 channelChannel channel = connection.createChannel();for (int i = 0; i < 10; i++) {// 3.发送消息String msg = "Hello my Bro" + i;channel.confirmSelect();channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());boolean result = channel.waitForConfirms();}// 4.关闭资源channel.close();connection.close();} }
- 消费者1:
public class Consumer1 {//定义队列private static final String QUEUE_NAME = "BoyatopMamber";public static void main(String[] args) throws IOException, TimeoutException {//创建连接Connection connection = RabbitMQConnection.getConnection();//创建通道final Channel channel = connection.createChannel();channel.basicQos(1);DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body,"UTF-8");System.out.println("用户1:" + msg);channel.basicAck(envelope.getDeliveryTag(),false);}};//监听消息channel.basicConsume(QUEUE_NAME,false,defaultConsumer);} }
- 消费者2:
public class Consumer2 {//定义队列private static final String QUEUE_NAME = "BoyatopMamber";public static void main(String[] args) throws IOException, TimeoutException {//创建连接Connection connection = RabbitMQConnection.getConnection();//创建通道final Channel channel = connection.createChannel();channel.basicQos(3);DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body,"UTF-8");System.out.println("消费者2:" + msg);channel.basicAck(envelope.getDeliveryTag(),false);}};//监听消息channel.basicConsume(QUEUE_NAME,false,defaultConsumer);} }
- 默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳
-
RabbitMQ 交换机类型:
- Direct exchange:直连交换机
- Fanout exchange:扇形交换机
- Topic exchange:主体交换机
- Headers exchange:头交换机
- Virtual Hostos:区分不同的团队
- 队列:存放消息
- 交换机:路由消息存放在那个队列中,类似于 Nginx
- 路由:key 分发规则
-
RabbitMQ Fanout 发布订阅:
- 生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就多个不同的消费者
- 步骤:
- 需要创建两个队列,每个队列都对应一个消费者
- 队列需要绑定我们交换机
- 生产者投递消息到交换机中,交换机再将消息分配给两个队列中都存放起来
- 消费者从队列中获取消息
- 生产者:
public class OrderConsumer {//定义队列private static final String QUEUE_NAME = "BoyatopOrder";//定义交换机private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws IOException, TimeoutException {///创建连接Connection connection = RabbitMQConnection.getConnection();//创建通道Channel channel = connection.createChannel();//关联队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body,"UTF-8");System.out.println("订单接收:" + msg);}};//监听消息channel.basicConsume(QUEUE_NAME,true,defaultConsumer);} }
- 数据消费者:
public class MamConsumer {//定义队列private static final String QUEUE_NAME = "BoyatopMamber";//定义交换机的名称private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws IOException, TimeoutException {//创建连接Connection connection = RabbitMQConnection.getConnection();//创建通道final Channel channel = connection.createChannel();//关联队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body,"UTF-8");System.out.println("消费者接收:" + msg);}};//开启监听channel.basicConsume(QUEUE_NAME,true,defaultConsumer);} }
- 订单消费者:
public class OrderConsumer {//定义队列private static final String QUEUE_NAME = "BoyatopOrder";//定义交换机private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws IOException, TimeoutException {///创建连接Connection connection = RabbitMQConnection.getConnection();//创建通道Channel channel = connection.createChannel();//关联队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body,"UTF-8");System.out.println("订单接收:" + msg);}};//监听消息channel.basicConsume(QUEUE_NAME,true,defaultConsumer);} }
- 生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就多个不同的消费者
-
Direct 路由模式:
- 当交换机类型为 direct 类型时,根据队列绑定的路由转发到具体的队列中存放消息
- 生产者:
public class Producer {//定义交换机private static final String EXCHANGE_NAME = "newDirect_exchange";public static void main(String[] args) throws IOException, TimeoutException {//创建连接Connection connection = RabbitMQConnection.getConnection();//创建 channelChannel channel = connection.createChannel();//通道关联交换机channel.exchangeDeclare(EXCHANGE_NAME,"direct",true);//发送消息for (int i = 0; i < 10; i++) {String msg = "生产消息 --- 路由模式";System.out.println(msg + i);channel.basicPublish(EXCHANGE_NAME,"mail",null,msg.getBytes());channel.basicPublish(EXCHANGE_NAME,"sms",null,msg.getBytes());}channel.close();connection.close();} }
- 邮件消费者:
public class MailConsumer {//定义交换机private static final String EXCHANGE_NAME = "newDirect_exchange";//定义队列private static final String QUEUE_NAME = "newDirectqueueOne";public static void main(String[] args) throws IOException, TimeoutException {//创建连接Connection connection = RabbitMQConnection.getConnection();//创建 channelChannel channel = connection.createChannel();//关联队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"mail");DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body,"UTF-8");System.out.println("邮件消费者接收信息:" + msg);}};//监听队列channel.basicConsume(QUEUE_NAME,true,defaultConsumer);} }
- 短信消费者:
public class SmsConsumer {//定义交换机private static final String EXCHANGE_NAME = "newDirect_exchange";//定义队列private static final String QUEUE_NAME = "newDirectqueueTwo";public static void main(String[] args) throws IOException, TimeoutException {//创建连接Connection connection = RabbitMQConnection.getConnection();//创建 channelChannel channel = connection.createChannel();//关联队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"sms");DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body,"UTF-8");System.out.println("短信消费者接收消息:" + msg);}};//监听队列channel.basicConsume(QUEUE_NAME,true,defaultConsumer);} }
- 当交换机类型为 direct 类型时,根据队列绑定的路由转发到具体的队列中存放消息
-
Topic 主体模式:
- 当交换机类型为 topic 类型时,根据队列绑定的路由键模糊转发到具体队列中存放
- #:表示支持匹配多个词
- *:表示只能匹配一个词
RabbitMQ深度探索:五种消息模式
2025/2/8 9:38:55
来源:https://blog.csdn.net/SOS_suyan/article/details/145438504
浏览:
次
关键词:RabbitMQ深度探索:五种消息模式
版权声明:
本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。
我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com
热文排行
- 华为 海思22AP10(SS524)H.265 编解码处理器用户指南
- 数据库物理结构设计
- npm install puppeteer 报错 npm ERR! PUPPETEER_DOWNLOAD_HOST is deprecated解决办法
- 如何在 Mac 上清空硬盘后恢复丢失的数据?
- 基于重要抽样的主动学习不平衡分类方法ALIS
- 《缺失MRI模态下的脑肿瘤分割的潜在相关表示学习》| 文献速递-深度学习肿瘤自动分割
- (2)Django生产环境数据库的切换以及环境配置python-dotenv方案
- CCF GESP Python编程 一级认证真题 2024年6月
- 大模型分离架构学习记录
- 【微信小程序】自定义组件 - 组件的生命周期