欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 高考 > RabbitMq手动ack的超简单案例+Confirm和Return机制的配置和使用

RabbitMq手动ack的超简单案例+Confirm和Return机制的配置和使用

2024/10/24 10:25:40 来源:https://blog.csdn.net/LUCIAZZZ/article/details/140751225  浏览:    关键词:RabbitMq手动ack的超简单案例+Confirm和Return机制的配置和使用

最简单的例子

先简单介绍一下这三个方法

basicAck

表示确认成功,使用此方法后,消息会被rabbitmq broker删除

 

basicNack

表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列

basicReject

拒绝消息,与basickNack区别在于不能进行批量操作,其他用法很相似

形参

multiple表示是否批量处理

requeue表示是否重复入队


deliverTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliverTag都会增加。手动消息确认模式下,我们可以对指定deliverTag的消息进行ack、nack、reject等操作。

mutiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 delivertag 的消息


依赖

  <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.51</version></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId></dependency></dependencies>

springboot配置

spring:rabbitmq:host: 192.168.88.130port: 5672username: itcastpassword: 123321virtual-host: /   #虚拟主机,默认是/,RabbitMQ 使用虚拟主机来隔离不同的消息环境,虚拟主机用于将消息、交换器、队列和绑定隔离开来publisher-confirm-type: correlated #发布者消息确认功能(异步)listener:simple:retry:enabled: true  #开启消费者失败重试max-attempts: 5 #最大重试次数initial-interval: 1000ms #初始失败等待时长为1秒multiplier: 1 #下次失败的等待时长倍数,下次等待时长=multipiler*last-intervalacknowledge-mode: manual #开启手动ack机制 auto是自动 none是发送后直接ack(这个不会用上的)publisher-returns: true #发布者返回消息功能logging:level:com.atguigu.mq.config.MQProducerAckConfig: info

交换机和队列配置

package com.example.rabbitmq.Configuration;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.ImmediateRequeueMessageRecoverer;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class QueueConfig {@Bean(name = "confirmTestQueue")public Queue confirmTestQueue() {return new Queue("confirm_test_queue", true, false, false);}@Bean(name = "confirmTestExchange")public FanoutExchange confirmTestExchange() {return new FanoutExchange("confirmTestExchange");}@Beanpublic Binding confirmTestFanoutExchangeAndQueue(@Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,@Qualifier("confirmTestQueue") Queue confirmTestQueue) {return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);}}

消费者

我设置了最多重试三次

有一个小细节

就是他重试的时候是在队头重试的

所以他重试的时候会阻塞一段时间,此时后面的消息是不能消费的

package com.example.rabbitmq.Listener;import com.alibaba.fastjson2.JSON;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.tools.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;@Slf4j
@Component
public class ReceiverMessage1 {//用来存放消息唯一标识的map,设置一定的重试次数public static final ConcurrentMap<String, Integer> map = new ConcurrentHashMap<>();@RabbitListener(queues = "confirm_test_queue")public void getMessage3(String msg, Channel channel, Message message) throws IOException {//得到当前信息的唯一标识String messageId=message.getMessageProperties().getMessageId();try {System.out.println("成功接收到消息:" + msg);int i=1/0;channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("确认成功");} catch (Exception e) {map.put(messageId,map.getOrDefault(messageId, 0)+1);log.error("接收消息失败");log.info("开始重试");log.info(messageId);//重复处理失败if(map.get(messageId)<=3) {log.info("确认失败,重新入队");channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);map.put(messageId, map.getOrDefault(messageId, 0)+1);}
else {log.info("重试仍然失败,所以我们决定丢弃这个消息");channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}}}}

生产者(测试类)

package com.example.rabbitmq;import org.apache.catalina.Executor;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.doThrow;@SpringBootTest
class RabbitmqApplicationTests {@Testvoid contextLoads() {}@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid sendMessage() {rabbitTemplate.convertAndSend("confirmTestExchange","confirm_test_queue", "这是一个测试消息",message -> {message.getMessageProperties().setMessageId(UUID.randomUUID().toString());  //把消息的唯一标识设置为UUIDreturn message;});}}

修改了messageId

因为messageId是基于交换机,内容,队列来生成的

相同的消息可能messageId是一样的

所以我发送消息的时候把底层改成了UUID


Return和Confirm机制

Confirm机制是消息发送到交换机成功或失败时的处理机制

Retrun机制是消息发送到队列失败时的处理机制

配置(加了日志输出)

package com.example.rabbitmq.Configuration;import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;@Configuration
@Slf4j
public class RabbitConfig implements RabbitTemplate.ReturnsCallback,RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;//在Bean注入前就要执行的方法@PostConstructprivate  void initRabbitTemplate(){rabbitTemplate.setReturnsCallback(this);rabbitTemplate.setConfirmCallback(this);}//消息发送到交换机成功或失败时调用这个方法@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("confirm()回调函数大打印correlationData:"+correlationData);
log.info("confirm()回调函数大打印ack:"+ack);
log.info("confirm()回调函数大打印cause:"+cause);}//发送到队列失败的时候调用这个方法@Overridepublic void returnedMessage(ReturnedMessage returned) {log.info("消息主体: " + new String(returned.getMessage().getBody()));log.info("应答码: " + returned.getReplyCode());log.info("描述:" + returned.getReplyText());log.info("消息使用的交换器 exchange : " + returned.getExchange());log.info("消息使用的路由键 routing : " + returned.getRoutingKey());}
}

遇到的小问题

它会有个报错

2024-07-28T14:00:10.866+08:00 ERROR 45104 --- [168.88.130:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)

这个是因为我们的多次ack,导致的错误

例如我们可能多次ac或者reject了,就会发生这种错误

这是因为我们的配置默认是自动ack

如果我们不开启手动ack,我们的自己写的手动ack代码就算是再次ack了

所以会出现这个通道错误

这个会让我们的mq通道断连,然后再重连,这样子会导致部分消息丢失,所以记得配置开启手动ack


发送消息回队尾

这个我处理失败了,我也不知道为什么,可能我自己修改了messageId吧

反正这个重新发送消息,我得到的和之前的不同,有部分属性缺失了

所以我就没弄这个

如果想详细了解建议看参考文章


参考文章

Springboot + RabbitMQ 用了消息确认机制,感觉掉坑里了!-腾讯云开发者社区-腾讯云 (tencent.com)

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com