欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 社会 > springboot 整合 rabbitMQ(2)

springboot 整合 rabbitMQ(2)

2025/1/7 6:23:53 来源:https://blog.csdn.net/WYH1233211234567/article/details/142780265  浏览:    关键词:springboot 整合 rabbitMQ(2)

springboot 整合 rabbitMQ(1)-CSDN博客

上期说了rabbitMQ的基础用法(普通队列模式)

这期学习一下如何防止消息重复消费和进阶用法(订阅者模式)

目录

重复消费问题

导致 RabbitMQ 重复消费问题的原因:

解决思路

代码实现(这里在上一期的代码上进行修改)

生产者:

消费者:

订阅者模式:

是什么:

代码实现:

 生产者:

消费者:       ​

效果:


重复消费问题

导致 RabbitMQ 重复消费问题的原因:

  1. 网络问题: 在分布式系统中,网络通信是不稳定的因素之一。如果生产者发送一条消息到 RabbitMQ 但尚未收到确认(acknowledgment),可能会导致 RabbitMQ 认为消息未被正确处理并重新发送。

  2. 消费者故障: 消费者在处理消息时可能会发生故障,例如应用程序崩溃或因某种原因终止。如果 RabbitMQ 未收到消费者的确认消息,它可能会认为消息未被消费并重新发送。

  3. 网络分区: 当分布式系统中的网络发生分区(网络隔离)时,可能会导致消息在不同部分之间重复传递。这是因为每个分区可能都会独立处理消息。

  4. 消息重复传递策略: RabbitMQ 提供了不同的消息传递策略,例如“至少一次传递”和“最多一次传递”。这些策略可能会导致消息的重复传递,尤其在异常情况下。

  5. 消费者超时设置不当: 如果消费者设置了较长的超时时间,在消费者未确认消息的情况下,RabbitMQ 可能会认为消息未被处理并重新发送。

解决思路

  • 生产者发送消息时携带一个唯一的id
  • 消费者每次消费前先判断一下在redis中是否在id,不存在就消费,消费完之后就把id存储到redis中

代码实现(这里在上一期的代码上进行修改)

生产者:

消费者:
    @Autowiredprivate RedisTemplate redisTemplate;private static final String QUEUE_NAME="login_queue";@RabbitListener(queuesToDeclare = @Queue(QUEUE_NAME))public void test01(User getUser, Message message, Channel channel) throws IOException {//获取消息唯一idString messageId = message.getMessageProperties().getMessageId();long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//判断消息是否存在if(redisTemplate.opsForHash().hasKey(QUEUE_NAME,messageId)){//存在表示该消息已被消费,returnreturn;}//将该消息id存入redis 第三个参数随便字符串,不要填对象,没法序列化redisTemplate.opsForHash().put(QUEUE_NAME,messageId,"a");//发送消息到邮箱sendMail(getUser);System.out.println("消费成功");//手动确认消息  multiple 是一个布尔值,它决定了是否确认多个消息//  false:只确认指定的单个消息  true:确认所有未确认的消息,这些消息的deliveryTag 小于或等于指定的 deliveryTagchannel.basicAck(deliveryTag,false);} catch (Exception e) {//消息回滚  deliveryTag:指定要拒绝的消息 false:表示只拒绝指定的这条消息 true:表示将消息重新入队,让其他消费者有机会再次处理这条消息。channel.basicNack(deliveryTag,false,true);System.out.println("消息消费异常");}}

订阅者模式:

是什么:

  简单解释就是,可以将消息发送给不同类型的消费者。做到发布一次,消费多个。下图取自于官方网站(RabbitMQ)的发布/订阅模式的图例

      那么要使交换机接受到消息后转发给队列,就需要将队列绑定给交换机,可以写一个配置类,项目启动,自动将队列绑定给交换机

代码实现:

/*** @ClassName QueueConfig* @Description 声明队列和交换机 并将队列和交换机绑定* @Author* @Date 2024/10/9 16:28*/
@Configuration
public class QueueConfig {//订阅模式交换机名称public static String EXCHANGENAME="fanout_exchange";public static String TESTQUEUE01="test_01";public static String TESTQUEUE02="test_02";@Beanpublic Queue queue01(){return new Queue(TESTQUEUE01);}@Beanpublic Queue queue02(){return new Queue(TESTQUEUE02);}@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(EXCHANGENAME);}@Beanpublic Binding bingingQueue01(Queue queue01,FanoutExchange fanoutExchange){return BindingBuilder.bind(queue01).to(fanoutExchange);}@Beanpublic Binding bingingQueue02(Queue queue02,FanoutExchange fanoutExchange){return BindingBuilder.bind(queue02).to(fanoutExchange);}
}
 生产者:

        只需要交换机的名字和传入的数据,队列名不需要填,因为交换机已经绑定队列

消费者:       
效果:

        发送一条数据,两个消费者消费

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com