欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 名人名企 > 第六章 RabbitMQ之Work模式

第六章 RabbitMQ之Work模式

2024/10/25 21:18:31 来源:https://blog.csdn.net/qushaming/article/details/142815897  浏览:    关键词:第六章 RabbitMQ之Work模式

目录

一、介绍

二、Work模式 

三、案例演示 

3.1. 案例需求

3.2. 案例代码实现

3.2.1. 创建SpringBoot工程

3.2.2. 父工程pom依赖

3.2.3. 生产者pom依赖 

 3.2.3. 生产者配置文件

3.2.4. 生产者核心代码

3.2.5. 消费者RabbitMQConfig

 3.2.6. 消费者pom依赖

3.2.7. 消费者配置文件

3.2.8.  消费者核心代码

3.2.9. 运行效果

3.3. 消费者消息推送机制

3.3.1. 调整代码

 3.3.2. 运行效果

四、总结


一、介绍

在上一章的讲解中,我们通过一个simple模式的简单示例代码给大家做了快速入门演示,本章节我们就来讲讲RabbitMQ的Work模式 。讲解之前,先简单介绍下RabbitMQ支持六种主要的工作模式‌,分别是简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式和远程调用模式(RPC)。其中,远程调用模式不常用,因此主要介绍前五种模式‌。

简单模式

简单模式是最基本的消息传递模式,涉及一个生产者和一个消费者。生产者将消息发送到队列,消费者从队列中接收并处理消息。这种模式适用于简单的点对点通信场景,例如手机短信或邮件单发‌。

工作队列模式

工作队列模式允许多个消费者从同一个队列中接收消息,通过在消费者之间分配任务来提高消息处理的效率。RabbitMQ默认采用轮询的方式分发消息,确保每个消费者接收到的消息数量大致相同‌。

发布订阅模式

发布订阅模式是一种特殊的消息传递模式,生产者发送的消息会被发送到所有订阅了该主题的消费者。这种模式适用于需要广播消息的场景,例如邮件群发或群聊天‌。

路由模式

路由模式使用路由交换机,根据消息属性将消息发送到特定的队列。这种模式适用于需要根据消息内容精确匹配队列的场景,例如短信或聊天工具‌。

通配符模式

通配符模式使用通配符来匹配路由键,允许更灵活的消息路由。这种模式适用于需要根据多级路径匹配队列的场景,例如中国.四川.成都.武侯区‌。

二、Work模式 

​​​​​​​​​​​​​​RabbitMQ的Work模式是一种简单的消息队列模式,也称为“竞争消费者模式”或“任务分发模式”。‌ 在这种模式下,多个消费者同时监听同一个队列,当队列中有消息时,只有一个消费者能够获得这个消息并进行处理,其他消费者需要等待下一个消息的到来。这种模式广泛应用于分布式系统中的任务调度或并行处理场景中‌。

在Work模式下,生产者将消息发送到队列中,多个消费者监听这个队列。当队列中有消息时,RabbitMQ会将消息分发给其中一个消费者。每个消费者独立处理分配到的任务,处理完成后确认消息,然后等待下一个任务。这种模式确保了消息在执行过程中可以分布到多个消费者中,并且每个消费者可以执行自己的任务‌。

Work模式的特点:

  • 多个消费者监听同一个队列‌:当队列中有消息时,只有一个消费者能获得并处理这个消息。
  • 消息按顺序分配‌:消费者按顺序接收消息,处理完一条后再接收下一条。
  • 公平分发‌:可以根据消费者的处理能力进行公平分发,处理快的消费者会处理更多的消息‌。

在实际应用中,Work模式常用于需要并行处理大量任务的场景,例如批量数据处理、后台任务处理等。通过合理配置消费者数量和处理能力,可以有效地利用资源并提高系统整体的处理效率‌。

三、案例演示 

3.1. 案例需求

模拟WorkQueue,实现一个队列绑定多个消费者

基本思路如下:

1. 在RabbitMQ的控制台创建一个队列名为work.queue

2. 在publisher服务中定义测试方法,发送50条消息到work.queue

3. 在consumer服务中定义两个消息监听者,都监听work.queue队列

5. 消费者1每秒处理40条消息,消费者2每秒处理5条消息

3.2. 案例代码实现

3.2.1. 创建SpringBoot工程

完整的工程目录结构及代码文件如下:

3.2.2. 父工程pom依赖

引入核心依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>mq-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>mq-demo</name><description>mq-demo</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><!-- spring amqp依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

3.2.3. 生产者pom依赖 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>publisher</artifactId><version>0.0.1-SNAPSHOT</version><name>publisher</name><description>publisher</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

 3.2.3. 生产者配置文件

spring:rabbitmq:# 主机host: 127.0.0.1# 端口port: 5672# 默认用户密码是guest guest 如果需要自己创建新用户,参看我的第四章节内容username: Wangzhexiaopassword: Wangzhexiao# 默认的虚拟主机是/ 如果需要自己创建虚拟主机,参看我的第四章节内容virtual-host: /hangzhou

3.2.4. 生产者核心代码

SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。发送消息代码如下:

package com.example.publisher;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMessage() throws InterruptedException {String queueName = "work.queue";String message = "人生苦短,持续不断地努力拼搏,迎难而上!";for (int i = 0; i < 50; i++) {rabbitTemplate.convertAndSend(queueName, message);Thread.sleep(20);}}
}

3.2.5. 消费者RabbitMQConfig

package com.example.consumer;import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@BeanQueue myQueue() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("work.queue").build();}
}

 3.2.6. 消费者pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>consumer</name><description>consumer</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.34</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

3.2.7. 消费者配置文件

spring:rabbitmq:# 主机host: 127.0.0.1# 端口port: 5672# 默认用户密码是guest guest 如果需要自己创建新用户,参看我的第四章节内容username: Wangzhexiaopassword: Wangzhexiao# 默认的虚拟主机是/ 如果需要自己创建虚拟主机,参看我的第四章节内容virtual-host: /hangzhou

3.2.8.  消费者核心代码

SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法:

package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "work.queue")public void listener1(String message) {System.out.println("消费者1:人生是个不断攀登的过程【" + message + "】");}@RabbitListener(queues = "work.queue")public void listener2(String message) {System.err.println("消费者2:人生是个不断攀登的过程【" + message + "】");}
}

3.2.9. 运行效果

 

我们可以看到,生产者的消息,平均的分配给了两个消费者,类似于轮询机制。 

3.3. 消费者消息推送机制

3.3.1. 调整代码

默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。 因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:

spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1

我们对消费者的代码也做个调整,通过增加线程等待时间来模拟两个消费者的消费能力:

package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "work.queue")public void listener1(String message) throws InterruptedException {System.out.println("消费者1:人生是个不断攀登的过程【" + message + "】");Thread.sleep(20);}@RabbitListener(queues = "work.queue")public void listener2(String message) throws InterruptedException {System.err.println("消费者2:人生是个不断攀登的过程【" + message + "】");Thread.sleep(200);}
}

 3.3.2. 运行效果

我们可以看到,消费者1因为消费速度比较快,所以能者多劳。当我们生产者的消费者特别多的情况下, 我们就需要多个消费者同时监听消费一个队列。实际项目中我们会通过集群模式,多台服务监听同一队列,来达到这种效果,解决消息堆积的问题。

四、总结

Work模型的使用:

多个消费者绑定到一个队列,可以加快消息处理速度

同一条消息只会被一个消费者处理

通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com