目录
1.重试配置
2.配置交换机&队列
3.发送消息
4.消费消息
5. 运行程序观察结果
6. 手动确认
注意:
在消息传递过程中, 可能会遇到各种问题, 如网络故障, 服务不可用, 资源不足等, 这些问题可能导致消息处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送.
但如果是程序逻辑引起的错误, 那么多次重试也是没有用的, 可以设置重试次数
1.重试配置
spring:rabbitmq:addresses: amqp://study:study@你的服务器IP:15673/你的虚拟机名listener:simple:acknowledge-mode: auto #消息接收确认retry:enabled: true # 开启消费者失败重试initial-interval: 5000ms # 初始失败等待时⻓为5秒max-attempts: 5 # 最⼤重试次数(包括自身消费的一次)
2.配置交换机&队列
//重试机制
public static final String RETRY_QUEUE = "retry_queue";
public static final String RETRY_EXCHANGE_NAME = "retry_exchange";/重试机制 发布订阅模式
//1. 交换机
@Bean("retryExchange")
public Exchange retryExchange() {return
ExchangeBuilder.fanoutExchange(Constant.RETRY_EXCHANGE_NAME).durable(true).build();
}
//2. 队列
@Bean("retryQueue")
public Queue retryQueue() {return QueueBuilder.durable(Constant.RETRY_QUEUE).build();
}
//3. 队列和交换机绑定 Binding
@Bean("retryBinding")
public Binding retryBinding(@Qualifier("retryExchange") FanoutExchange
exchange, @Qualifier("retryQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange);
}
3.发送消息
@RequestMapping("/retry")
public String retry(){rabbitTemplate.convertAndSend(Constant.RETRY_EXCHANGE_NAME, "", "retry
test...");return "发送成功!";
}
4.消费消息
@Component
public class RetryQueueListener {//指定监听队列的名称@RabbitListener(queues = Constant.RETRY_QUEUE)public void ListenerQueue(Message message) throws Exception {System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(),"UTF-8"),
message.getMessageProperties().getDeliveryTag());//模拟处理失败try {int num = 3/0;System.out.println("处理完成");
}catch (Exception e){System.out.println("处理失败");
}}
}
5. 运行程序观察结果
接收到消息: consumer ack test..., deliveryTag: 1
处理失败
6. 手动确认
@RabbitListener(queues = Constant.RETRY_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(),"UTF-8"),
message.getMessageProperties().getDeliveryTag());//模拟处理失败int num = 3/0;System.out.println("处理完成");//3. ⼿动签收channel.basicAck(deliveryTag, true);}catch (Exception e){//4. 异常了就拒绝签收Thread.sleep(1000);//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false, 则直接
丢弃channel.basicNack(deliveryTag, true,true);}
}
运行结果:
接收到消息: retry test..., deliveryTag: 1
接收到消息: retry test..., deliveryTag: 2
接收到消息: retry test..., deliveryTag: 3
接收到消息: retry test..., deliveryTag: 4
接收到消息: retry test..., deliveryTag: 5
接收到消息: retry test..., deliveryTag: 6
接收到消息: retry test..., deliveryTag: 7
接收到消息: retry test..., deliveryTag: 8
接收到消息: retry test..., deliveryTag: 9
接收到消息: retry test..., deliveryTag: 10
接收到消息: retry test..., deliveryTag: 11
可以看到, 手动确认模式时, 重试次数的限制不会像在自动确认模式下那样直接生效, 因为是否重试以及何时重试更多地取决于应用程序的逻辑和消费者的实现.
自动确认模式下, RabbitMQ 会在消息被投递给消费者后自动确认消息. 如果消费者处理消息时抛出异常, RabbitMQ 根据配置的重试参数自动将消息重新⼊队, 从而实现重试. 重试次数和重试间隔等参数可以直接在RabbitMQ的配置中设定,并且RabbitMQ会负责执行这些重试策略.
手动确认模式下, 消费者需要显式地对消息进行确认. 如果消费者在处理消息时遇到异常, 可以选择不确认消息使消息可以重新⼊队. 重试的控制权在于应用程序本身, 而不是RabbitMQ的内部机制. 应用程序可以通过自己的逻辑和利用RabbitMQ的高级特性来实现有效的重试策略
注意:
1. 自 动确认模式下: 程序逻辑异常, 多次重试还是失败, 消息就会被自动确认, 那么消息就丢失
了
2. 手 动确认模式下: 程序逻辑异常, 多次重试消息依然处理失败, 无法被确认, 就⼀直是
unacked的状态, 导致消息积压