一点废话,关于消峰限流网上的各种资料不少,按图索骥后发现质量良莠不齐,时常出现运行异常,消峰失败等问题,反复踩坑。本人结合网上案例,在上文环境基础上,记录一种可以完成消峰限流的方式。
基本描述
- 本文所述,消峰限流,用于描述【消费者】在从RabbitMQ中获取消息时的限流操作
- 简单来说就是,生产者瞬时推送了M条消息到RabbitMQ,
- 消费者只是一次最多只抓取N条(N小于M)进行处理
- 重复上述步骤,知道M条消息都被处理完成,从而减少消费者接收到消息的瞬时压力,防止消费者运行崩溃
问题说明
- 消峰的核心,就是指定消费者一次最多可抓取的消息条数
- RabbmitMQ中通过对消费者指定preFetchCount,修改消费者一次最多可抓取的消息条数,默认情况下,Prefetch_count为250
实现方式
消费者修改
YML配置文件
server:port: 8882
spring:application:name: RabbitMQ-Direct-Qos-Clientrabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guest
# 以下配置在注入了自定义解析工厂时,都不生效!!!!
# listener:
# type: direct
# direct:
# #设置监听为手动答应模式
# acknowledge-mode: manual
# prefetch: 5 # 在这里配置预取值
# simple:
# acknowledge-mode: manual
# prefetch: 5
消费工厂
@Configuration
public class RabbitMQCustomerConfiguration {@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());//开启手工确认模式factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//配置消峰量factory.setPrefetchCount(10);return factory;}
}
消费逻辑
@Component
@Slf4j
public class DirectReceiver {@RabbitListener(queues = {"direct-qos-queue"})public void process(Message message, Channel channel) throws IOException {log.info("Received message: {}", new String(message.getBody()));try {Thread.sleep(1000);} catch (InterruptedException e) {log.error(e.getMessage());}//注意一定要执行手工确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);}
}
生产者实现
@Service
@Slf4j
public class DirectProviderService {@Resourceprivate RabbitTemplate rabbitTemplate;//这里模拟高频发送100个请求public String sendMsgBatch() {String result = "OK";for (int i = 0; i < 100 ; i++) {try {DirectDto directDto = new DirectDto();directDto.setMsg(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"));directDto.setMsgId(UUID.randomUUID().toString());directDto.setMsgType(""+i);rabbitTemplate.convertAndSend(DirectConfiguration.DIRECT_EXCHANGE, DirectConfiguration.DIRECT_ROUTING_KEY, directDto);} catch (Exception e) {log.error(e.getMessage(), e);result = "FAIL";}}return result;}
}
其他配置与controller接口代码省略
运行结果
未推送请求时管理界面
执行请求时管理界面
- 显然可以看到,每次只有10个请求处于Unacked状态(等待确认)
- 其他未被处理的消息,仍然属于Ready状态(等待从队列出队,被消费者解析)
- 特别注意:此时消费者永远只有一个
- 所以,因此可以认为消峰成功
消费者控制台输出
2025-03-25 15:51:47.111 INFO 12460 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : Received message: {"msgId":"82d66ee9-9585-4c78-9dea-624688a8a11d","msg":"2025-03-25 15:51:47","msgType":"0"}
2025-03-25 15:51:48.118 INFO 12460 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : Received message: {"msgId":"db7f1535-a540-44d6-9ceb-c3845b545682","msg":"2025-03-25 15:51:47","msgType":"1"}
-- ...中间值省略
2025-03-25 15:53:26.283 INFO 12460 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : Received message: {"msgId":"c3ac6b0c-2bee-41f3-a4eb-605969fc7655","msg":"2025-03-25 15:51:47","msgType":"98"}
2025-03-25 15:53:27.297 INFO 12460 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : Received message: {"msgId":"425f7b7e-5cf7-427c-b147-bf604247d61b","msg":"2025-03-25 15:51:47","msgType":"99"}
存疑示例
- 对于配置了自定义解析器的消费者,经测试yml配置不生效,实际使用过程中不建议使用此方法
- 网络上还有一种采用channel.basicQos+DefaultConsumer方式实现,但这种方式在与Spring Cloud集成时,感觉有点问题,这里只做一个记录
- 存疑方式原文 Springboot与RabbitMQ上手学习之Qos限流(四) - 简书
- 非Spring Cloud集成方式原文 RabbitMQ高级应用(三)消费端限流策略(basicQos)-CSDN博客
消费工厂
@Configuration
public class RabbitMQCustomerConfiguration {@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());//开启手工确认模式factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//配置消峰量,不配置此项//factory.setPrefetchCount(10);return factory;}
}
消费逻辑
@RabbitListener(queues = "direct-qos-queue")public void receiveA(Message message, Channel channel) throws IOException {//容量为2channel.basicQos(0,2,false);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(1000);} catch (InterruptedException e) {log.error(e.getMessage());}log.info("[x] Received '{}", new String(body));channel.basicAck(envelope.getDeliveryTag(), true);}};//设置 Channel 消费者绑定队列channel.basicConsume("direct-qos-queue",false, consumer);}
运行结果
未推送请求时管理界面
推送时管理界面
- 显然可以看到,每次只有100个请求处于Unacked状态(等待确认),而设置的2消费者没有效果
- 执行过程中,没有消息处于Ready状态,即是说消息已经都出队了,只是消费者没有确认
- 特别注意:此时消费者永远只有一个
- 所以,因此可以认为消峰没有成功
消费者控制台输出
- 一种情况是正常输出
- 有时也会出现消费不响应的问题