欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 旅游 > RabbitMQ 学习整理2 - 消峰限流

RabbitMQ 学习整理2 - 消峰限流

2025/3/26 8:41:52 来源:https://blog.csdn.net/cyoubo/article/details/146506782  浏览:    关键词:RabbitMQ 学习整理2 - 消峰限流

一点废话,关于消峰限流网上的各种资料不少,按图索骥后发现质量良莠不齐,时常出现运行异常,消峰失败等问题,反复踩坑。本人结合网上案例,在上文环境基础上,记录一种可以完成消峰限流的方式。

基本描述

  1. 本文所述,消峰限流,用于描述【消费者】在从RabbitMQ中获取消息时的限流操作
  2. 简单来说就是,生产者瞬时推送了M条消息到RabbitMQ,
  3. 消费者只是一次最多只抓取N条(N小于M)进行处理
  4. 重复上述步骤,知道M条消息都被处理完成,从而减少消费者接收到消息的瞬时压力,防止消费者运行崩溃

问题说明

  1. 消峰的核心,就是指定消费者一次最多可抓取的消息条数
  2. RabbmitMQ中通过对消费者指定preFetchCount,修改消费者一次最多可抓取的消息条数,默认情况下,Prefetch_count为250
    在这里插入图片描述

实现方式

消费者修改

YML配置文件

server:port: 8882
spring:application:name: RabbitMQ-Direct-Qos-Clientrabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guest
#   以下配置在注入了自定义解析工厂时,都不生效!!!!
#    listener: 
#      type: direct
#      direct:
#        #设置监听为手动答应模式
#        acknowledge-mode: manual
#        prefetch: 5 # 在这里配置预取值
#      simple:
#        acknowledge-mode: manual
#        prefetch: 5

消费工厂

@Configuration
public class RabbitMQCustomerConfiguration {@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());//开启手工确认模式factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//配置消峰量factory.setPrefetchCount(10);return factory;}
}

消费逻辑

@Component
@Slf4j
public class DirectReceiver {@RabbitListener(queues = {"direct-qos-queue"})public void process(Message message, Channel channel) throws IOException {log.info("Received message: {}", new String(message.getBody()));try {Thread.sleep(1000);} catch (InterruptedException e) {log.error(e.getMessage());}//注意一定要执行手工确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);}
}

生产者实现

@Service
@Slf4j
public class DirectProviderService {@Resourceprivate RabbitTemplate rabbitTemplate;//这里模拟高频发送100个请求public String sendMsgBatch() {String result = "OK";for (int i = 0; i < 100 ; i++) {try {DirectDto directDto = new DirectDto();directDto.setMsg(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"));directDto.setMsgId(UUID.randomUUID().toString());directDto.setMsgType(""+i);rabbitTemplate.convertAndSend(DirectConfiguration.DIRECT_EXCHANGE, DirectConfiguration.DIRECT_ROUTING_KEY, directDto);} catch (Exception e) {log.error(e.getMessage(), e);result = "FAIL";}}return result;}
}

其他配置与controller接口代码省略

运行结果

未推送请求时管理界面

在这里插入图片描述

执行请求时管理界面

在这里插入图片描述
在这里插入图片描述

  1. 显然可以看到,每次只有10个请求处于Unacked状态(等待确认)
  2. 其他未被处理的消息,仍然属于Ready状态(等待从队列出队,被消费者解析)
  3. 特别注意:此时消费者永远只有一个
  4. 所以,因此可以认为消峰成功

消费者控制台输出

2025-03-25 15:51:47.111  INFO 12460 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : Received message: {"msgId":"82d66ee9-9585-4c78-9dea-624688a8a11d","msg":"2025-03-25 15:51:47","msgType":"0"}
2025-03-25 15:51:48.118  INFO 12460 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : Received message: {"msgId":"db7f1535-a540-44d6-9ceb-c3845b545682","msg":"2025-03-25 15:51:47","msgType":"1"}
-- ...中间值省略
2025-03-25 15:53:26.283  INFO 12460 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : Received message: {"msgId":"c3ac6b0c-2bee-41f3-a4eb-605969fc7655","msg":"2025-03-25 15:51:47","msgType":"98"}
2025-03-25 15:53:27.297  INFO 12460 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver     : Received message: {"msgId":"425f7b7e-5cf7-427c-b147-bf604247d61b","msg":"2025-03-25 15:51:47","msgType":"99"}

存疑示例

  1. 对于配置了自定义解析器的消费者,经测试yml配置不生效,实际使用过程中不建议使用此方法
  2. 网络上还有一种采用channel.basicQos+DefaultConsumer方式实现,但这种方式在与Spring Cloud集成时,感觉有点问题,这里只做一个记录
    1. 存疑方式原文 Springboot与RabbitMQ上手学习之Qos限流(四) - 简书
    2. 非Spring Cloud集成方式原文 RabbitMQ高级应用(三)消费端限流策略(basicQos)-CSDN博客

消费工厂

@Configuration
public class RabbitMQCustomerConfiguration {@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());//开启手工确认模式factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//配置消峰量,不配置此项//factory.setPrefetchCount(10);return factory;}
}

消费逻辑

    @RabbitListener(queues = "direct-qos-queue")public void receiveA(Message message, Channel channel) throws IOException {//容量为2channel.basicQos(0,2,false);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(1000);} catch (InterruptedException e) {log.error(e.getMessage());}log.info("[x] Received '{}", new String(body));channel.basicAck(envelope.getDeliveryTag(), true);}};//设置 Channel 消费者绑定队列channel.basicConsume("direct-qos-queue",false, consumer);}

运行结果

未推送请求时管理界面

在这里插入图片描述

推送时管理界面

在这里插入图片描述
在这里插入图片描述

  1. 显然可以看到,每次只有100个请求处于Unacked状态(等待确认),而设置的2消费者没有效果
  2. 执行过程中,没有消息处于Ready状态,即是说消息已经都出队了,只是消费者没有确认
  3. 特别注意:此时消费者永远只有一个
  4. 所以,因此可以认为消峰没有成功

消费者控制台输出

  1. 一种情况是正常输出
  2. 有时也会出现消费不响应的问题

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词