欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 焦点 > RabbitMQ 之 死信队列

RabbitMQ 之 死信队列

2025/1/8 5:11:07 来源:https://blog.csdn.net/weixin_45863786/article/details/144006354  浏览:    关键词:RabbitMQ 之 死信队列

一、死信的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理
解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息
消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

二、死信的来源

1、消息 TTL 过期
2、队列达到最大长度(队列满了,无法再添加数据到 mq 中)
3、消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

三、死信实战

代码架构图

生产者正常情况下走的是普通的交换机,这个交换机的类型是 direct ,它和普通队列之间的关系是一个叫 "zhangsan" 的路由 key, 正常情况下会被 C1 消费。但是发生了上面所说的三种情况中的一种,成为了死信,然后被转换到死信交换机中,这个死信交换机也是 direct 类型,它们之间的 routingKey 是 "lisi",然后就进入了死信队列,死信队列由  C2 消费。

消息 TTL 过期

1.application.yml 配置

spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: metieVirtualHosts
#    listener:
#      simple:
#        retry:
#          ### 开启消费者(程序出现异常的情况下会进行重试)
#          enabled: true
#          ### 最大重试次数
#          max-attempts: 5
#          ### 重试间隔次数
#          initial-interval: 3000
#        acknowledge-mode: manualserver:port: 8080###死信队列
mayikt:dlx:exchange: mayikt_dlx_exchangequeue: mayikt_order_dlx_queueroutingkey: dlx### 订单order:exchange: mayikt_order_exchangequeue: mayikt_order_queueroutingkey: mayikt.order

2MQConfig配置

package com.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;/*** @description:* @author: huangan* @date: 2024/11/23 20:48*/
@Component
public class DeadLetterMQConfig {/*** 订单交换机*/@Value("${mayikt.order.exchange}")private String orderExchange;/*** 订单队列*/@Value("${mayikt.order.queue}")private String orderQueue;/*** 订单路由key*/@Value("${mayikt.order.routingkey}")private String orderRoutingkey;/*** 死信交换机*/@Value("${mayikt.dlx.exchange}")private String dlxExchange;/*** 死信队列*/@Value("${mayikt.dlx.queue}")private String dlxQueue;/*** 死信路由*/@Value("${mayikt.dlx.routingkey}")private String dlxRoutingkey;/*** 声明死信交换机* @return*/@Beanpublic DirectExchange dlxExchange(){return new DirectExchange(dlxExchange);}/*** 声明死信队列* @return*/@Beanpublic Queue dlxQueue(){return new Queue(dlxQueue);}/*** 声明订单业务交换机* @return*/@Beanpublic DirectExchange orderExchange(){return new DirectExchange(orderExchange);}/*** 声明订单队列* @return*/@Beanpublic Queue orderQueue(){//订单队列要绑定死信交换机Map<String,Object> arguments = new HashMap<>(2);arguments.put("x-dead-letter-exchange",dlxExchange);arguments.put("x-dead-letter-routing-key",dlxRoutingkey);return new Queue(orderQueue,true,false,false,arguments);}/*** 绑定死信队列到死信交换机* @return*/@Beanpublic Binding binding(){return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(dlxRoutingkey);}/*** 绑定订单队列到订单交换机* @return*/@Beanpublic Binding orderBinding(){return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(orderRoutingkey);}
}

3.生产者OrderProducer

package com.producer;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @description:* @author: huangan* @date: 2024/11/23 21:58*/
@RestController
public class OrderProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 订单交换机*/@Value("${mayikt.order.exchange}")private String orderExchange;/*** 订单路由key*/@Value("${mayikt.order.routingkey}")private String orderRoutingkey;@RequestMapping("/sendOrder")public String sendOrder(){String msg ="测试数据";rabbitTemplate.convertAndSend(orderExchange,orderRoutingkey,msg,message -> {//设置消息过期时间10秒过期message.getMessageProperties().setExpiration("10000");return  message;});return "success";}}

4.订单消费者

package com.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @description: 订单队列* @author: huangan* @date: 2024/11/23 22:13*/
@Component
@Slf4j
public class OrderConsumer {/*** 监听队列回调方法* @param msg*/@RabbitListener(queues = "mayikt_order_queue")public void orderConsumer(String msg){log.info(">>正常订单消费者消费MSG:{}<<",msg);}
}

5.死信消费者

package com.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @description: 死信队列* @author: huangan* @date: 2024/11/23 22:17*/
@Slf4j
@Component
public class OrderDlxConsumer {/*** 监听队列回调方法* @param msg*/@RabbitListener(queues = "mayikt_order_dlx_queue")public void orderConsumer(String msg){log.info(">>死信队列消费订单消费者消费MSG:{}<<",msg);}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com