Spring Boot 3 集成 RabbitMQ 实践指南
1. RabbitMQ 核心原理
1.1 什么是RabbitMQ
RabbitMQ是一个开源的消息代理和队列服务器,使用Erlang语言开发,基于AMQP(Advanced Message Queuing Protocol)协议实现。它支持多种消息传递模式,具有高可用性、可扩展性和可靠性等特点。
1.2 核心概念
1.2.1 基础组件
-
Producer(生产者)
- 消息的发送者
- 负责创建消息并发布到RabbitMQ中
-
Consumer(消费者)
- 消息的接收者
- 连接到RabbitMQ服务器并订阅队列
-
Exchange(交换机)
- 接收生产者发送的消息并根据路由规则转发到队列
- 类型:
- Direct Exchange:根据routing key精确匹配
- Topic Exchange:根据routing key模式匹配
- Fanout Exchange:广播到所有绑定队列
- Headers Exchange:根据消息属性匹配
-
Queue(队列)
- 消息存储的地方
- 支持持久化、临时、自动删除等特性
-
Binding(绑定)
- 交换机和队列之间的虚拟连接
- 定义消息路由规则
1.2.2 高级特性
-
消息持久化
- 交换机持久化:创建时设置durable=true
- 队列持久化:创建时设置durable=true
- 消息持久化:设置delivery-mode=2
-
消息确认机制
- 生产者确认:Publisher Confirm和Return机制
- 消费者确认:自动确认、手动确认、批量确认
-
死信队列(DLX)
- 消息被拒绝且不重新入队
- 消息过期(TTL)
- 队列达到最大长度
1.3 应用场景
-
异步处理
- 发送邮件、短信通知
- 日志处理、报表生成
- 文件处理、图片处理
-
应用解耦
- 系统间通信
- 服务解耦
- 流程分离
-
流量控制
- 削峰填谷
- 请求缓冲
- 流量整形
-
定时任务
- 延迟队列
- 定时处理
- 任务调度
2. 环境搭建
2.1 基础环境
- Spring Boot: 3.x
- Java: 17+
- RabbitMQ: 3.12+
- Maven/Gradle
2.2 依赖配置
<dependencies><!-- Spring Boot Starter AMQP --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- Jackson --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
</dependencies>
2.3 基础配置
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /# 消息确认配置publisher-confirm-type: correlated # 开启发布确认publisher-returns: true # 开启发布返回template:mandatory: true # 消息路由失败返回# 消费者配置listener:simple:acknowledge-mode: manual # 手动确认prefetch: 1 # 每次获取消息数量retry:enabled: true # 开启重试initial-interval: 1000 # 重试间隔时间max-attempts: 3 # 最大重试次数multiplier: 1.0 # 重试时间乘数# SSL配置(可选)ssl:enabled: falsekey-store: classpath:keystore.p12key-store-password: passwordtrust-store: classpath:truststore.p12trust-store-password: password
3. 核心配置类
3.1 RabbitMQ配置类
@Configuration
@EnableRabbit
public class RabbitMQConfig {// 交换机名称public static final String BUSINESS_EXCHANGE = "business.exchange";public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";// 队列名称public static final String BUSINESS_QUEUE = "business.queue";public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";// 路由键public static final String BUSINESS_KEY = "business.key";public static final String DEAD_LETTER_KEY = "dead.letter.key";// 业务交换机@Beanpublic DirectExchange businessExchange() {return ExchangeBuilder.directExchange(BUSINESS_EXCHANGE).durable(true).build();}// 死信交换机@Beanpublic DirectExchange deadLetterExchange() {return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE).durable(true).build();}// 业务队列@Beanpublic Queue businessQueue() {Map<String, Object> args = new HashMap<>(3);// 消息过期时间args.put("x-message-ttl", 60000);// 队列最大长度args.put("x-max-length", 1000);// 死信交换机args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);args.put("x-dead-letter-routing-key", DEAD_LETTER_KEY);return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build();}// 死信队列@Beanpublic Queue deadLetterQueue() {return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}// 业务绑定@Beanpublic Binding businessBinding() {return BindingBuilder.bind(businessQueue()).to(businessExchange()).with(BUSINESS_KEY);}// 死信绑定@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DEAD_LETTER_KEY);}// 消息转换器@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}// RabbitTemplate配置@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter());return rabbitTemplate;}
}
3.2 消息确认配置
@Configuration
@Slf4j
public class RabbitConfirmConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息发送到交换机成功: correlationData={}", correlationData);} else {log.error("消息发送到交换机失败: correlationData={}, cause={}", correlationData, cause);// 处理失败逻辑,如重试、告警等}}@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("消息路由到队列失败: exchange={}, routingKey={}, replyCode={}, replyText={}, message={}",returned.getExchange(),returned.getRoutingKey(),returned.getReplyCode(),returned.getReplyText(),new String(returned.getMessage().getBody()));// 处理失败逻辑,如重试、告警等}
}
4. 消息生产者
4.1 消息发送服务
@Service
@Slf4j
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(Object message, String exchange, String routingKey) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());try {rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);log.info("消息发送成功: message={}, exchange={}, routingKey={}, correlationData={}",message, exchange, routingKey, correlationData);} catch (Exception e) {log.error("消息发送异常: message={}, exchange={}, routingKey={}, correlationData={}, error={}",message, exchange, routingKey, correlationData, e.getMessage());throw new RuntimeException("消息发送失败", e);}}public void sendDelayMessage(Object message, String exchange, String routingKey, long delayMillis) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());MessagePostProcessor messagePostProcessor = msg -> {msg.getMessageProperties().setDelay((int) delayMillis);return msg;};try {rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);log.info("延迟消息发送成功: message={}, exchange={}, routingKey={}, delay={}, correlationData={}",message, exchange, routingKey, delayMillis, correlationData);} catch (Exception e) {log.error("延迟消息发送异常: message={}, exchange={}, routingKey={}, delay={}, correlationData={}, error={}",message, exchange, routingKey, delayMillis, correlationData, e.getMessage());throw new RuntimeException("延迟消息发送失败", e);}}
}
5. 消息消费者
5.1 消息处理服务
@Service
@Slf4j
public class MessageConsumer {@RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE)public void handleMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 获取消息内容String messageBody = new String(message.getBody());log.info("收到消息: message={}, deliveryTag={}", messageBody, deliveryTag);// 业务处理processMessage(messageBody);// 手动确认消息channel.basicAck(deliveryTag, false);log.info("消息处理成功: deliveryTag={}", deliveryTag);} catch (Exception e) {log.error("消息处理异常: deliveryTag={}, error={}", deliveryTag, e.getMessage());// 判断是否重新投递if (message.getMessageProperties().getRedelivered()) {log.error("消息已重试,拒绝消息: deliveryTag={}", deliveryTag);channel.basicReject(deliveryTag, false);} else {log.info("消息首次处理失败,重新投递: deliveryTag={}", deliveryTag);channel.basicNack(deliveryTag, false, true);}}}private void processMessage(String message) {// 实现具体的业务逻辑log.info("处理消息: {}", message);}
}
5.2 死信消息处理
@Service
@Slf4j
public class DeadLetterConsumer {@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)public void handleDeadLetter(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {String messageBody = new String(message.getBody());log.info("收到死信消息: message={}, deliveryTag={}", messageBody, deliveryTag);// 死信消息处理逻辑processDeadLetter(messageBody);channel.basicAck(deliveryTag, false);log.info("死信消息处理成功: deliveryTag={}", deliveryTag);} catch (Exception e) {log.error("死信消息处理异常: deliveryTag={}, error={}", deliveryTag, e.getMessage());channel.basicReject(deliveryTag, false);}}private void processDeadLetter(String message) {// 实现死信消息处理逻辑log.info("处理死信消息: {}", message);}
}
6. 接口控制器
@RestController
@RequestMapping("/api/mq")
@Slf4j
public class MessageController {@Autowiredprivate MessageProducer messageProducer;@PostMapping("/send")public ResponseEntity<String> sendMessage(@RequestBody MessageDTO message) {try {messageProducer.sendMessage(message.getContent(),RabbitMQConfig.BUSINESS_EXCHANGE,RabbitMQConfig.BUSINESS_KEY);return ResponseEntity.ok("消息发送成功");} catch (Exception e) {log.error("消息发送失败: {}", e.getMessage());return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("消息发送失败: " + e.getMessage());}}@PostMapping("/send/delay")public ResponseEntity<String> sendDelayMessage(@RequestBody MessageDTO message,@RequestParam long delayMillis) {try {messageProducer.sendDelayMessage(message.getContent(),RabbitMQConfig.BUSINESS_EXCHANGE,RabbitMQConfig.BUSINESS_KEY,delayMillis);return ResponseEntity.ok("延迟消息发送成功");} catch (Exception e) {log.error("延迟消息发送失败: {}", e.getMessage());return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("延迟消息发送失败: " + e.getMessage());}}
}
7. 监控与运维
7.1 RabbitMQ管理界面
- 访问地址:http://localhost:15672
- 默认账号:guest/guest
- 主要功能:
- 队列监控
- 交换机管理
- 连接状态
- 消息追踪
7.2 Prometheus + Grafana监控
# prometheus.yml
scrape_configs:- job_name: 'rabbitmq'static_configs:- targets: ['localhost:15692']
7.3 日志配置
logging:level:org.springframework.amqp: INFOcom.your.package: DEBUGpattern:console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
7.4 告警配置
@Configuration
public class RabbitMQAlertConfig {@Value("${alert.dingtalk.webhook}")private String webhookUrl;@Beanpublic AlertService alertService() {return new DingTalkAlertService(webhookUrl);}
}
8. 最佳实践
8.1 消息幂等性处理
@Service
public class MessageIdempotentHandler {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public boolean isProcessed(String messageId) {String key = "mq:processed:" + messageId;return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS));}
}
8.2 消息重试策略
@Configuration
public class RetryConfig {@Beanpublic RetryTemplate retryTemplate() {RetryTemplate retryTemplate = new RetryTemplate();FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();backOffPolicy.setBackOffPeriod(1000);retryTemplate.setBackOffPolicy(backOffPolicy);SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();retryPolicy.setMaxAttempts(3);retryTemplate.setRetryPolicy(retryPolicy);return retryTemplate;}
}
8.3 消息序列化
@Configuration
public class MessageConverterConfig {@Beanpublic MessageConverter jsonMessageConverter() {Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();converter.setCreateMessageIds(true);return converter;}
}
8.4 消息追踪
@Aspect
@Component
@Slf4j
public class MessageTraceAspect {@Around("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")public Object traceMessage(ProceedingJoinPoint joinPoint) throws Throwable {String messageId = MDC.get("messageId");log.info("开始处理消息: messageId={}", messageId);try {Object result = joinPoint.proceed();log.info("消息处理完成: messageId={}", messageId);return result;} catch (Exception e) {log.error("消息处理异常: messageId={}, error={}", messageId, e.getMessage());throw e;}}
}
9. 常见问题与解决方案
9.1 消息丢失问题
- 生产者确认机制
- 消息持久化
- 手动确认模式
- 集群高可用
9.2 消息重复消费
- 幂等性处理
- 消息去重
- 业务检查
9.3 消息堆积问题
- 增加消费者数量
- 提高处理效率
- 队列分片
- 死信队列处理
9.4 性能优化
- 合理设置预取数量
- 批量确认消息
- 消息压缩
- 连接池优化
10. 高可用部署
10.1 集群配置
spring:rabbitmq:addresses: rabbit1:5672,rabbit2:5672,rabbit3:5672username: adminpassword: passwordvirtual-host: /
10.2 镜像队列
# 设置镜像策略
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
10.3 负载均衡
# nginx.conf
upstream rabbitmq_cluster {server rabbit1:15672;server rabbit2:15672;server rabbit3:15672;
}
11. 参考资源
- Spring AMQP官方文档
- RabbitMQ官方文档
- Spring Boot官方文档