@RabbitListener
是 Spring AMQP 提供的核心注解,用于简化 RabbitMQ 消息监听器的创建。以下是对 @RabbitListener(queues = "balloonWords.queue")
的详细解析:
一、基础功能
- 队列监听
通过queues
属性指定监听的队列名称(如"balloonWords.queue"
)。若队列不存在,需配合@Queue
注解声明队列:
@RabbitListener(queuesToDeclare = @Queue("balloonWords.queue"))
- 消息处理入口
标注在方法上时,该方法成为消息处理入口。示例:
@RabbitListener(queues = "balloonWords.queue")
public void handleMessage(String message) {System.out.println("Received: " + message);
}
二、关键特性
1. 灵活的参数绑定
- 消息体类型
支持直接接收反序列化后的对象(如String
、User
等),由MessageConverter
自动转换。例如:
@RabbitHandler
public void process(@Payload User user, @Headers Map<String, Object> headers) { ... }
- 元数据获取
使用@Header
或@Headers
注解获取消息头信息,如路由键、投递标签等。
2. 动态绑定声明
通过 bindings
属性声明 Exchange、Queue 和 RoutingKey 的绑定关系,避免手动配置:
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "balloonWords.queue", durable = "true"),exchange = @Exchange(value = "balloon.exchange", type = ExchangeTypes.TOPIC),key = "balloon.#"
))
3. 并发控制与确认机制
- 并发配置
通过containerFactory
属性指定自定义的RabbitListenerContainerFactory
,调整消费者并发数:
@Bean
public SimpleRabbitListenerContainerFactory customFactory() {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConcurrentConsumers(5); // 并发消费者数量 factory.setMaxConcurrentConsumers(10); return factory;
}
- 消息确认
支持AUTO
(自动确认)、MANUAL
(手动确认)等模式,需配合channel.basicAck()
使用。
三、消息序列化
- 默认序列化
使用SimpleMessageConverter
,将 Java 对象序列化为二进制(性能较低)。 - 推荐方案
使用Jackson2JsonMessageConverter
实现 JSON 序列化:
@Bean
public MessageConverter jsonConverter() {return new Jackson2JsonMessageConverter();
}
四、底层原理
- 注解扫描
Spring 启动时,RabbitListenerAnnotationBeanPostProcessor
扫描所有@RabbitListener
注解,生成消息监听端点(MethodRabbitListenerEndpoint
)。 - 监听容器
每个@RabbitListener
会创建一个MessageListenerContainer
,负责与 RabbitMQ 建立连接、拉取消息并触发方法调用。 - 多方法分发
当注解标注在类上时,@RabbitHandler
根据消息类型选择执行方法(需配合isDefault = true
设置默认方法)。
五、常见问题
- 队列不存在:需确保队列已声明或通过
queuesToDeclare
动态创建。 - 参数类型不匹配:检查
MessageConverter
配置是否与消息的content_type
一致。 - 重复消费:确认
AcknowledgeMode
设置正确,避免消息未确认导致重新投递。
通过合理配置 @RabbitListener
,可高效实现 RabbitMQ 消息监听与处理,同时结合序列化优化和并发控制提升系统性能。
业务实操
我们这边是进行的监听后
填充数据
释放锁
然后 controller 层返回数据
// Rabbit监听器@RabbitListener(queues = "balloonWords.queue")public void listen(String balloonWordsSentenceString) throws JsonProcessingException {ObjectMapper objectMapper = new ObjectMapper();GetRespVO getRespVO = objectMapper.readValue(balloonWordsSentenceString, GetRespVO.class);this.getRespVO = getRespVO;// 释放锁,允许 Controller 层返回数据latch.countDown();}