目录
工作模式
Simple(简单模式)
Work Queue(工作队列)
Publish/Subscribe(发布/订阅)
Exchange(交换机)
Routing(路由模式)
Topics(通配符模式)
RPC(RPC通信)
Publisher Confirms(发布确认)
代码实现
Simple(简单模式)
生产者代码
消费者代码
Work Queues(工作队列)
生产者代码
消费者代码
Publish/Subscribe(发布/订阅)
生产者代码
消费者代码
Routing(路由模式)
生产者代码
消费者代码
Topics(通配符模式)
生产者代码
消费者代码
RPC(RPC通信)
客户端代码
服务端代码
Publisher Confirms(发布确认)
Publishing Messages Individually(单独确认)
Publishing Messages in Batches(批量确认)
Handling Publisher Confirms Asynchronously(异步确认)
RabbitMQ 共提供了 7 种工作模式进行消息传递:
在本篇文章中,我们就来学习 RabbitMQ 的工作模式,我们首先来了解这 7 种工作模式分别是怎样的
工作模式
Simple(简单模式)
P 表示 生产者,是消息的发送方
C 表示 消费者,是消息的接收者
Queue:表示 消息队列,用于缓存消息,生产者生产的消息发送到队列中,消费者从队列中取出消息
简单模式下,只有一个生产者和一个消费者,生产者生产的消息存储到队列中后,都由这个消费者消费
特点:一个生产者 P,一个消费者 C,消息只能被消费一次,也称为 点对点(Point-to-Point)模式
适用场景:消息只能被单个消费者处理
在 RabbitMQ 入门中的入门代码的工作模式就是简单模式
Work Queue(工作队列)
此时有 一个生产者和多个消费者
当生产者向队列中发送多条消息后,Work Queue 会将消息分配给不同的消费者,每个消费者接收到的消息不同,由多个消费者共同消费生产者生产的消息
例如:
由 A (生产者)发送不同消息,消息存储到 RabbitMQ 中,接着,由 B(消费者1) 和 C(消费者2) 共同消息 A 发送的消息,此时,RabbitMQ 选择将第一条消息分配给 B,B 消费第一条消息,RabbitMQ 将第二条消息分配给 C,C 消费第二条消息......
B 和 C 接收到的消息是不同的,这两个消费者共同消费 A 发送的所有消息
特点:消息不会重复,分配给不同的消费者
适用场景:集群环境中实现异步处理
Publish/Subscribe(发布/订阅)
其中,X 表示的是 交换机,在 发布/订阅 模式中,多了 Exchange 角色,因此,我们先来学习交换机相关知识
Exchange(交换机)
Exchange(交换机)的作用是 接收生产者发送的消息,并将消息按照一定的规则路由到一个或多个队列中
生产者的消息都会先发送到交换机,然后再由交换机将消息路由到队列中
在前面 简单模式 和 工作队列模式 下,图中都没有出现交换机,但实际上,生产者生产的消息都是先发送到交换机,然后再路由到队列中的。在前两种模式下,直接使用 RabbitMQ 提供的内置交换机就可以实现,因此,并没有突出交换机的存在,但实际上生产者生产的消息不会直接投递到队列中
在 RabbitMQ 中,交换机有 4 种类型:Fanout、Direct、Topic 和 Headers,不同的类型有着不同的路由策略
而 AMQP 协议中还有两种类型,System 和 自定义,在这里,我们并不重点关注
Fanout:广播,将消息交给所有绑定到交换机的队列(Publish / Subscribe 模式)
Direct:定向,将消息交给符合指定 routing key 的队列(Routing 模式)
Topic:通配符,将消息交给符合 routing patterm(路由模式)的队列(Topics 模式)
Headers:Headers 类型的交换机通过消息头部的属性来路由消息,而不依赖路由键的匹配规则来路由消息。根据发送的消息内容中的 headers 属性进行匹配,headers 类型的交换机性能会较差,因此也不太实用,基本上也不会进行使用
Exchange(交换机)只负责转发消息,并不具备存储消息的能力,因此,若是没有任何队列与 Exchange 绑定,或是没有符合路由规则的队列,消息就会丢失
接下来,我们来看 RoutingKey 和 BindingKey
RoutingKey:路由键,当生产者将消息发送给交换机时,会指定一个字符串,用于告诉交换机如何处理这个消息
BindingKey:绑定,RabbitMQ 中通过 Binding(绑定)将交换机与队列关联起来,在绑定时会指定一个 Binding Key,这样 RabbitMQ 就知道如何正确地将消息路由到队列了
即,绑定时,需要的路由键是 BindingKey;发送消息时,需要的路由键是 RoutingKey
例如:
使用 BindingKey1 将交换机与 队列1 进行绑定,使用 BindingKey2 将交换机与 队列2 进行绑定
若在发送消息时,若设置 Routing Key 设置为 BindingKey1,交换机就会将消息路由到 队列1
即,当消息的 RoutingKey 与队列绑定的 BindingKey 相匹配时,消息才会被路由到这个队列中
其实,BindingKey 也属于路由键的一种,即,在绑定时使用的路由键,有时,也会使用 RoutingKey 表示 BindingKey,即使用 RoutingKey 表示 BindingKey 和 RoutingKey,因此,我们需要根据其使用场景进行区分
在了解了相关概念之后,我们继续看 Publish/ Subscribe 模式
上述有一个生产者 P,多个消费者 C1、C2,X 表示交换机,交换机将消息复制多份,每个消费者接收到相同的消息
也就是说,生产者发送一条消息,经过交换机转发到不同的队列,不同的消费者从不同的队列中取出消息进行消费
特点:不同的消费者接收到的消息是相同的
适用场景:消息需要被多个消费者同时接收,如:实时通信或广播消息
Publish/Subscribe(发布/订阅)模式 与 Work Queue(工作队列)模式 最大的区别就是:发布/订阅 模式下,不同消费者接收到的消息是相同的;而 工作队列 模式下,不同消费者接收到的消息是不同的
Routing(路由模式)
路由模式可以看做是 发布订阅模式 的变种,其在发布订阅模式的基础上,增加了路由 key
发布订阅模式会无条件的将所有消息发送给所有消费者,而路由模式下,交换机会根据 RoutingKey 的规则,将数据筛选后发送给对应的消费者队列
也就是说,只有满足条件的队列才会收到消息
如上图所示,Q1 通过 a 与交换机进行绑定,Q2 通过 a、b 和 c 与交换机进行绑定
当 P (生产者)在发送消息时,若设置 Routing Key 设置为 a,则此时 Q1 和 Q2 的 BindingKey 都与其相匹配,消息就会被路由到 Q1 和 Q2 中
而当 P 发送消息时,设置 Routing Key 设置为 b,此时,只有 Q2 的 BindingKey 与其相匹配,消息也就只会被路由到 Q2 中
适用场景:需要根据特定规则分发消息
Topics(通配符模式)
通配符模式,则是 路由模式 的变种,在 RoutingKey 的基础上,增加了 通配符 的功能,使得匹配更加灵活
Topics 和 Routing 的基本原理相同,即:生产者将消息发送给交换机,交换机根据 RoutingKey 将消息转发给与 RoutingKey 匹配的队列
而不同的是,Routing 模式下,需要 RoutingKey 和 BingingKey 完全匹配;而 Topics 模式下,则是通配符匹配
在 BindingKey 中,存在两种特殊的字符串,用于模糊匹配
* :表示能够匹配任意一个单词
#:表示能够匹配任意多个单词(可以为 0 个)
Q1 通过 *.a.* 与交换机进行绑定,Q2 通过 *.*.b 和 c.# 与交换机进行绑定
当 P (生产者)在发送消息时,若设置 Routing Key 设置为 work.a.b,则此时 Q1 和 Q2 的 BindingKey 都能够与其相匹配,消息就会被路由到 Q1 和 Q2 中
而当 P 发送消息时,设置 Routing Key 设置为 a.a.a,此时,只有 Q1 的 BindingKey 与其相匹配,消息也就只会被路由到 Q1 中
适用场景:需要灵活匹配和过滤消息的场景
RPC(RPC通信)
在 RPC 通信过程中,没有生产者和消费者,而是通过两个队列实现了一个可回调的过程
例如:
客户端(Client)发送请求消息到指定队列(rpc_queue),并在消息属性中设置 reply_to 字段,这个字段指定了一个回调队列(amq.gen-Xa2...),这个回调队列用于接收服务端的响应消息
服务器(Server)从队列 rpc_queue 中取出请求消息,处理请求后,将响应消息发送到 reply_to 指定的回调队列(amq.gen-Xa2...)
客户端(Client)在回调队列上等待响应消息,一旦接收到响应,客户端就会检查消息的 correlation_id 属性,确保其是所期望的响应
简而言之,客户端将请求消息发送到 队列Q1 中,服务器从 Q1 中取出请求消息进行处理,然后将响应消息发送到 队列Q2 中,客户端从 Q2 中读取响应消息
从而实现了 客户端向服务器发送请求,服务器返回对应的响应 的功能
Publisher Confirms(发布确认)
Publisher Confirms 模式是 RabbitMQ 提供的一种确保消息可靠发送到 RabbitMQ 服务器的机制,在这种模式下,生产者可以等待 RabbitMQ 服务器的确认,以确保消息已经被服务器接收并处理
其过程为:
(1)生产者将 Channel 设置为 confirm 模式(通过调用 channel.confirmSelect() 完成),发布的每一条消息都会获得一个唯一的 ID,生产者可以将这些 序列号与消息关联 起来,以便追踪消息的状态
(2)当消息被 RabbitMQ 服务器接收并处理后,服务器会异步地向生产者发送一个确认(ACK),其中包含消息的唯一 ID,表示消息已经送达
通过 Publisher Confirms 模式,生产者可以确保消息被 RabbitMQ 服务器成功接收,从而避免消息丢失
适用场景:对数据安全性要求较高,如金融交易,订单处理等
在基本了解了 RabbitMQ 的 7 种工作模式后,我们就来通过代码简单实现一下这 7 种工作模式
代码实现
Simple(简单模式)
在简单模式下,只有一个生产者和一个消费者,生产者生产的消息存储到队列后,都由这个消费者消费
在 RabbitMQ入门-CSDN博客 中的入门代码的工作模式就是简单模式,因此,在这里就不再进行过多解释了
首先引入依赖:
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version></dependency>
生产者代码
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost("49.232.238.62"); // ip 的默认值为 localhostfactory.setPort(5672); // 默认值为 5672factory.setVirtualHost("test01"); // 虚拟主机,默认值为 /// 账号factory.setUsername("admin"); // 用户名,默认为 guestfactory.setPassword("123456"); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare("simple.test", true, false, false, null);// 6. 通过 channel 发送消息到队列中String message = "test...";channel.basicPublish("", "simple.test", null, message.getBytes());System.out.println("消息:" + message + " 发送成功");// 7. 释放资源channel.close();connection.close();}
}
消费者代码
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost("49.232.238.62"); // ip 的默认值为 localhostfactory.setPort(5672); // 默认值为 5672factory.setVirtualHost("test01"); // 虚拟主机,默认值为 /// 账号factory.setUsername("admin"); // 用户名,默认为 guestfactory.setPassword("123456"); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare("simple.test", true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("成功接收到消息: " + new String(body));}};channel.basicConsume("simple.test", true, consumer);// 7. 释放资源channel.close();connection.close();}
}
Work Queues(工作队列)
生产者代码
工作队列模式下,由一个生产者生产消息,多个消费者共同接收消息,消费者之间是竞争关系,每个消息只能被一个消费者接收
由于我们每次连接时都要使用 IP、端口号、虚拟主机名等,因此,我们可以将它们提取出来,放到 Constants 类中:
public class Constants {public static final String HOST = "49.232.238.62";public static final int PORT = 5672;public static final String VIRTUAL_HOST = "test01";public static final String USER_NAME = "admin";public static final String USER_PASSWORD = "123456";
}
声明 工作队列 模式下使用的队列:
// 工作模式public static final String WORK_QUEUE = "work.queue";
接下来,我们就来实现生产者的代码:
工作队列模式与简单模式的区别在于工作模式下有多个消费者,因此生产者的消费代码与简单模式下差别不大,但在发送消息时,我们一次发送 20 条消息:
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// 6. 通过 channel 发送消息到队列中for (int i = 0; i < 20; i++) {String message = "work test... " + i;channel.basicPublish("", Constants.WORK_QUEUE, null, message.getBytes());}System.out.println("消息发送成功!");// 7. 释放资源channel.close();connection.close();}
}
运行代码,可以看到 work.queue 队列被创建,且存储了 20 条消息
接下来,我们继续编写消费者代码
消费者代码
消费者的代码也与简单模式下的代码差别不大,但在最后,我们并不进行资源的释放:
Consumer1:
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);}
}
Consumer2:
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);}
}
启动 Consumer1 和 Consumer2:
由于我们之前先启动了生产者,此时再启动消费者,由于消息较少,因此,先启动的 Consumer1 会瞬间将 20 条消息消费掉
因此,再次启动 Producer,观察结果:
可以看到两个消费者分别消费了 10 条消息
Publish/Subscribe(发布/订阅)
生产者代码
在 发布/订阅 模式中,多了 Exchange 角色
Exchange 常见有三种类型,分别代表不同的路由规则:
Fanout:广播,将消息交给所有绑定到交换机的队列(Publish/Subscribe 模式)
Direct:定向,将消息交给符合指定 RoutingKey 的队列(Routing 模式)
Topics:通配符,将消息交给符合 Routing Pattern(路由模式)的队列(Topics 模式)
此时,在 发布/订阅 模式下,我们就需要 声明交换机,并绑定队列和交换机
我们首先来看声明交换机:
使用 channel.exchangeDeclare 方法来创建交换机,我们来看 exchangeDeclare 方法:
Exchange.DeclareOk exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
参数:
exchange:交换机名称
type:交换机类型
durable:是否持久化,当为 true 时,会将交换机存盘,在服务器重启时不会丢失相关信息
autoDelete:是否自动删除,自动删除的前提是至少有一个队列或交换机与这个交换机绑定,之后与这个交换机绑定的队列或交换机都会与此解绑
internal:是否为内部使用,若设置为 true,则表示内部使用,客户端无法直接发送消息到这个交换机中,只能通过交换机路由到交换机这种方式
arguments:相关参数
其中,type 表示交换机类型,其类型为 BuiltinExchangeType,也可以为 String:
我们来看 BuiltinExchangeType,它是一个枚举类型:
DIRECT("direct"):定向,直连,将消息交给符合指定 RoutingKey 的队列(Routing 模式)
FANOUT("fanout"):扇形,广播,将消息交给所有绑定到交换机的队列(Publish/Subscribe 模式)
TOPIC("topic"):通配符,将消息交给符合 Routing Pattern(路由模式)的队列(Topics 模式)
HEADERS("headers"):参数模式(较少使用)
返回值:
Exchange.DeclareOk:声明确认方法,用于指示已成功声明交换
在 Constants 类中定义 发布/订阅 模式下使用的交换机和两个队列:
// 广播模式public static final String PUBLISH_CHANGE = "fanout";public static final String PUBLISH_QUEUE_1 = "publish.queue.1";public static final String PUBLISH_QUEUE_2 = "publish.queue.2";
建立连接,并声明交换机和两个队列:
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机channel.exchangeDeclare(Constants.PUBLISH_CHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);// 6. 声明队列channel.queueDeclare(Constants.PUBLISH_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.PUBLISH_QUEUE_2, true, false, false, null);}
}
交换机的类型为 BuiltinExchangeType.FANOUT 广播模式
接着,我们使用 channel.queueBind 方法将队列和交换机进行绑定:
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
queue:要绑定的队列名称
exchange:要绑定的交换机名称
routingKey:路由 key,路由规则
arguments:相关参数
在这里的 routingKey,其实就是 BindingKey,将交换机与队列关联起来,从而让 RabbitMQ 知道如何正确地将消息路由到队列
在发布/订阅模式下,交换机类型为 fanout,routingKey 设置为 "",表示每个消费者都可以收到全部信息
// 7. 绑定交换机和队列channel.queueBind(Constants.PUBLISH_QUEUE_1, Constants.PUBLISH_CHANGE, "", null);channel.queueBind(Constants.PUBLISH_QUEUE_2, Constants.PUBLISH_CHANGE, "", null);
接下来,就可以发送消息了:
// 8. 发送消息for (int i = 0; i < 20; i++) {String message = "work test... " + i;channel.basicPublish(Constants.PUBLISH_CHANGE, "", null, message.getBytes());}System.out.println("消息发送成功!");// 9. 释放资源channel.close();connection.close();
完整代码:
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机channel.exchangeDeclare(Constants.PUBLISH_CHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);// 6. 声明队列channel.queueDeclare(Constants.PUBLISH_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.PUBLISH_QUEUE_2, true, false, false, null);// 7. 绑定交换机和队列channel.queueBind(Constants.PUBLISH_QUEUE_1, Constants.PUBLISH_CHANGE, "", null);channel.queueBind(Constants.PUBLISH_QUEUE_2, Constants.PUBLISH_CHANGE, "", null);// 8. 发送消息for (int i = 0; i < 20; i++) {String message = "work test... " + i;channel.basicPublish(Constants.PUBLISH_CHANGE, "", null, message.getBytes());}System.out.println("消息发送成功!");// 9. 释放资源channel.close();connection.close();}
}
运行代码,并观察结果:
可以看到,publish.queue.1 和 publish.queue.2 中都已经存储了 20 条消息
查看 fanout 的绑定关系:
成功绑定 publish.queue.1 和 publish.queue.2:
接下来,我们继续编写消费者代码
消费者代码
交换机和队列的绑定关系已经在生产者中实现了,因此,消费者代码中可以不必再写
其实现与 工作队列模式 下是基本相同的,只需要修改读取的队列即可
Consumer1:
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.PUBLISH_QUEUE_1, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.PUBLISH_QUEUE_1, true, consumer);}
}
Consumer2:
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.PUBLISH_QUEUE_2, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.PUBLISH_QUEUE_2, true, consumer);}
}
运行 Consumer1 和 Consumer2:
Consumer1 和 Consumer2 都接收到了这 20 条消息
Routing(路由模式)
生产者代码
Routing 模式下,队列与交换机之间的绑定,不再是任意的绑定了,而是需要指定一个 BindingKey
生产者在向 交换机 发送消息时,也需要指定消息的 RoutingKey
交换机不会将消息发送给每一个绑定的 key,而是会根据消息的 RoutingKey 进行判断,只有队列绑定时的 BindingKey 和发送消息的 RoutingKey 完全一致时,才会接收消息
先在 Constants 类中定义 路由模式下使用的交换机和队列:
// 路由模式public static final String ROUTING_CHANGE = "routing";public static final String ROUTINT_QUEUE_1 = "routing.queue.1";public static final String ROUTINT_QUEUE_2 = "routing.queue.2";
路由模式下,生产者的代码与 发布/订阅模式 下的区别在于:交换机的类型不同 以及 绑定队列的 BindKey 不同
(1)交换机类型不同
在声明交换机时,交换机的类型为 BuiltinExchangeType.DIRECT
// 5. 声明交换机channel.exchangeDeclare(Constants.ROUTING_CHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);
(2)声明队列
// 6. 声明队列channel.queueDeclare(Constants.ROUTINT_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.ROUTINT_QUEUE_2, true, false, false, null);
(3)交换机与队列的绑定方式不同
// 7. 绑定交换机和队列channel.queueBind(Constants.ROUTINT_QUEUE_1, Constants.ROUTING_CHANGE, "a", null);channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "a", null);channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "b", null);channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "c", null);
此时,我们就可以发送消息了
在发送消息时,需要指定 RoutingKey:
// 8. 发送消息String messageA = "test a...";channel.basicPublish(Constants.ROUTING_CHANGE, "a", null, messageA.getBytes());String messageB = "test b...";channel.basicPublish(Constants.ROUTING_CHANGE, "b", null, messageB.getBytes());String messageC = "test c... ";channel.basicPublish(Constants.ROUTING_CHANGE, "c", null, messageB.getBytes());System.out.println("消息发送成功!");
完整代码:
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机channel.exchangeDeclare(Constants.ROUTING_CHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);// 6. 声明队列channel.queueDeclare(Constants.ROUTINT_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.ROUTINT_QUEUE_2, true, false, false, null);// 7. 绑定交换机和队列channel.queueBind(Constants.ROUTINT_QUEUE_1, Constants.ROUTING_CHANGE, "a", null);channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "a", null);channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "b", null);channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "c", null);// 8. 发送消息String messageA = "test a...";channel.basicPublish(Constants.ROUTING_CHANGE, "a", null, messageA.getBytes());String messageB = "test b...";channel.basicPublish(Constants.ROUTING_CHANGE, "b", null, messageB.getBytes());String messageC = "test c... ";channel.basicPublish(Constants.ROUTING_CHANGE, "c", null, messageB.getBytes());System.out.println("消息发送成功!");// 9. 释放资源channel.close();connection.close();}
}
运行代码,并观察结果:
routing.queue.1 中有 1 条消息,routing.queue.2 中有两条消息
查看 routing 交换机与 队列的绑定关系:
接下来,我们继续编写消费者的代码
消费者代码
消费者代码与 发布/订阅 模式下基本相同,只需要修改队列名称即可:
Consumer1:
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.ROUTINT_QUEUE_1, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer1 成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.ROUTINT_QUEUE_1, true, consumer);}
}
Consumer2:
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.ROUTINT_QUEUE_2, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer2 成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.ROUTINT_QUEUE_2, true, consumer);}
}
运行结果:
Topics(通配符模式)
生产者代码
相比于 routing 模式,topics 类型的交换机在匹配规则上进行了扩展,BindingKey 支持通配符匹配
其中,RoutingKey 是一系列由 . 分割的单词,如 user.name、work.abc等
BindingKey 也和 RoutingKey 一样,由 . 分割的字符串
在 BindingKey 中可以存在两种特殊的字符串,用于模糊匹配:
* :表示能够匹配任意一个单词
#:表示能够匹配任意多个单词(可以为 0 个)
例如:
交换机 与 队列1(Q1)的 BindingKey 为 *.a.*
交换机 与 队列2(Q2)的 BindingKey 为 *.*.b
交换机 与 队列2(Q2)的 BindingKey 为 c.#
则:
若生产者的 RoutingKey 为 work.a.b,则消息会被路由到 Q1 和 Q2
若生产者的 RoutingKey 为 a.a.a,则消息会被路由到 Q1
若生产者的 RoutingKey 为 c.work.a,则消息会被路由到 Q2
若生产者的 RoutingKey 为 b.c.g,则消息会被丢弃,或是返回给生产者(需要设置 mandatory 参数)
接下来,我们就来实现 通配符模式下 的生产者:
先在 Constants 类中定义通配符模式下使用的交换机和队列:
// 通配符模式public static final String TOPICS_CHANGE = "topics";public static final String TOPICS_QUEUE_1 = "topics.queue.1";public static final String TOPICS_QUEUE_2 = "topics.queue.2";
与 路由模式相比,发布订阅模式与其区别为:交换机类型不同 以及 绑定队列的 RoutingKey 不同
(1)交换机类型不同
交换机的类型为 BuiltinExchangeType.TOPIC
// 5. 声明交换机channel.exchangeDeclare(Constants.TOPICS_CHANGE, BuiltinExchangeType.TOPIC, true, false, false, null);
(2)声明队列
// 6. 声明队列channel.queueDeclare(Constants.TOPICS_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.TOPICS_QUEUE_2, true, false, false, null);
(3)交换机与队列的绑定方式不同
// 7. 绑定交换机和队列channel.queueBind(Constants.TOPICS_QUEUE_1, Constants.TOPICS_CHANGE, "*.a.*", null);channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "*.*.b", null);channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "c.#", null);
此时,我们就可以发送消息了
在发送消息时,需要指定 RoutingKey:
// 8. 发送消息String message1 = "test work.a.b";channel.basicPublish(Constants.TOPICS_CHANGE, "work.a.b", null, message1.getBytes());String message2 = "test a.a.a";channel.basicPublish(Constants.TOPICS_CHANGE, "a.a.a", null, message2.getBytes());String message3 = "test c.work.a";channel.basicPublish(Constants.TOPICS_CHANGE, "c.work.a", null, message3.getBytes());System.out.println("消息发送成功!");
完整代码:
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机channel.exchangeDeclare(Constants.TOPICS_CHANGE, BuiltinExchangeType.TOPIC, true, false, false, null);// 6. 声明队列channel.queueDeclare(Constants.TOPICS_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.TOPICS_QUEUE_2, true, false, false, null);// 7. 绑定交换机和队列channel.queueBind(Constants.TOPICS_QUEUE_1, Constants.TOPICS_CHANGE, "*.a.*", null);channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "*.*.b", null);channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "c.#", null);// 8. 发送消息String message1 = "test work.a.b";channel.basicPublish(Constants.TOPICS_CHANGE, "work.a.b", null, message1.getBytes());String message2 = "test a.a.a";channel.basicPublish(Constants.TOPICS_CHANGE, "a.a.a", null, message2.getBytes());String message3 = "test c.work.a";channel.basicPublish(Constants.TOPICS_CHANGE, "c.work.a", null, message3.getBytes());System.out.println("消息发送成功!");// 9. 释放资源channel.close();connection.close();}
}
运行并观察结果:
topics.queue.1 和 topics.queue.2 中都已经存储了两条消息
我们来看 topics.queue.1 中的消息:
我们继续实现消费者代码
消费者代码
Topics 模式的消费者代码与 Routing 模式下相同,只需要修改消费的队列名称即可:
Consumer1:
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.TOPICS_QUEUE_1, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer1 成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.TOPICS_QUEUE_1, true, consumer);}
}
Consumer2:
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.TOPICS_QUEUE_2, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer1 成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.TOPICS_QUEUE_2, true, consumer);}
}
运行 Consumer1 和 Consumer2,并观察结果:
RPC(RPC通信)
RPC(Remote Procedure Call),远程过程调用,是一种通过网络从远程计算机上请求服务,不需要了解底层网络的技术,类似于 Http 远程调用
RabbitMQ 实现 RPC 通信,是通过两个队列实现一个可回调的过程:
其过程为:
客户端(Client)发送请求消息到指定队列(rpc_queue),并在消息属性中设置 reply_to 字段,这个字段指定了一个回调队列(amq.gen-Xa2...),这个回调队列用于接收服务端的响应消息
服务器(Server)从队列 rpc_queue 中取出请求消息,处理请求后,将响应消息发送到 reply_to 指定的回调队列(amq.gen-Xa2...)
客户端(Client)在回调队列上等待响应消息,一旦接收到响应,客户端就会检查消息的 correlation_id 属性,确保其是否是所期望的响应
接下来,我们就来实现 RPC 的客户端:
客户端代码
客户端主要实现的功能有:
1. 发送请求消息到队列中
2. 从回调队列中读取响应消息
我们先来看发送请求消息到队列的过程:
(1)声明两个队列:消息发送到的队列(Queue) 和 回调队列(replayQueue),并声明本次请求的唯一标志 corrId
(2)将 replayQueue 和 corrId 配置到 Queue 中
接下来,需要从回调队列中读取响应消息,若我们直接从回调队列中读取响应消息,此时,可能服务端还没有处理完请求,也就未将响应消息发送到回调队列中,就读取不到响应
因此,我们可以使用阻塞队列来监听回调队列中的消息
1. 使用阻塞队列阻塞当前进程,监听回调队列中的消息,回调队列中有消息时,将响应消息放到阻塞队列中
2. 阻塞队列中有消息后,主线程被唤醒,处理返回内容
先在 Constants 类中声明 RPC 模式下使用的两个队列:
// RPC 模式public static final String RPC_QUEUE_1 = "rpc.queue1";public static final String RPC_QUEUE_2 = "rpc.queue2";
在这里,我们就不再声明交换机了,直接使用默认的交换机
声明 消息发送的队列 和 回调队列:
// 声明队列channel.queueDeclare(Constants.RPC_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.RPC_QUEUE_2, true, false, false, null);
使用 UUID 生成本次请求的唯一标志,并配置消息属性:
// 本次请求的唯一标识String corrId = UUID.randomUUID().toString();
消息相关配置的类型为 BasicProperties,位于 com.rabbitmq.client.AMQP 下:
AMQP.BasicProperties 提供了一个构造器,可以通过 builder() 来设置一些属性:
使用 correlationId 方法设置唯一标识,replyTo 方法设置回调队列:
// 本次请求的唯一标识String corrId = UUID.randomUUID().toString();// 消息属性AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(corrId) // 唯一ID.replyTo(Constants.RPC_QUEUE_2) // 回调队列.build();
最后调用 build 方法创建实例
使用内置交换机发送消息:
// 7. 发送消息String message = "test rpc...";channel.basicPublish("", Constants.RPC_QUEUE_1, basicProperties, message.getBytes());
接着,使用阻塞队列存储回调结果:
// 阻塞队列,存放回调结果,一次获取一条消息BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
从回调队列中接收响应消息:
// 8. 接收服务器的响应DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到回调消息: " + new String(body));// 判断标识是否正确if(corrId.equals(properties.getCorrelationId())) {queue.offer(new String(body, "UTF-8"));}}};channel.basicConsume(Constants.RPC_QUEUE_2, true, consumer);
最后,从阻塞队列中获取响应消息:
// 9. 获取响应消息String result = queue.take();System.out.println("result: " + result);
完整代码:
public class Client {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException, IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 使用默认的交换机// 5. 声明队列channel.queueDeclare(Constants.RPC_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.RPC_QUEUE_2, true, false, false, null);// 6. 设置消息属性// 本次请求的唯一标识String corrId = UUID.randomUUID().toString();// 消息属性AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(corrId) // 唯一ID.replyTo(Constants.RPC_QUEUE_2) // 回调队列.build();// 7. 发送消息String message = "test rpc...";channel.basicPublish("", Constants.RPC_QUEUE_1, basicProperties, message.getBytes());// 阻塞队列,存放回调结果,一次获取一条消息BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);// 8. 接收服务器的响应DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到回调消息: " + new String(body));// 判断标识是否正确if(corrId.equals(properties.getCorrelationId())) {queue.offer(new String(body, "UTF-8"));}}};channel.basicConsume(Constants.RPC_QUEUE_2, true, consumer);// 9. 获取响应消息String result = queue.take();System.out.println("result: " + result);}
}
我们继续编写服务端代码
服务端代码
服务端要实现的功能为:
1. 从队列中接收请求消息
2. 根据消息内容处理请求消息,并将响应消息返回到回调队列中
我们先来实现接收消息:
建立连接、声明队列等过程都与 客户端代码相同
但需要注意的是,我们需要设置服务端同时最多只能获取一条消息:
// 6. 设置同时最多只能获取一条消息channel.basicQos(1);
若不设置 basicQos,RabbitMQ 会使用默认的 Qos 设置,其 prefetchCount 默认值为 0,当 prefetchCount 为 0 时,RabbitMQ 会根据内部实现和当前网络状况等因素,可能同时发送多条消息给消费者。这也就意味着,在默认情况下,消费者可能会同时接收到多条消息,但具体数量不是严格保证的,可能会有波动
而在 RPC 模式下,通常希望是一对一的消息处理,即,一个请求对应一个响应。服务端在处理完一个消息并确认后,才会接收到下一条消息
接收消息后,就可以对请求消息进行处理并返回响应结果了:
// 7. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 将消息发到队列2中AMQP.BasicProperties replayProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();// 返回String message = new String(body);System.out.println("接收到消息: " + message);// 响应消息String response = "request: " + message + " 接收成功";channel.basicPublish("", properties.getReplyTo(), replayProperties, response.getBytes());// 对消息进行应答channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(Constants.RPC_QUEUE_1, false, consumer);
需要注意的是,在这里我们需要手动对消息进行应答,而不是自动确认:
在 RabbitMQ 中,basicConsume 方法的 autoAck 参数用于指定消费者是否会自动向消息队列确认消息
当设置为 true 时,消息队列会在将消息发送给消费者后,认为消息已经被成功消费,立即删除该条消息,这也就意味着,若消费者处理消息失败,消息就会丢失
当设置为 false 时,消息队列在将消息发送给消费者后,需要消费者显示地调用 basicAck 方式来确认消息,手动确认提供了更高的可靠性,保证消息不会被意外丢失,适用于消息处理重要且需要确保每个消息被正确处理的场景
完整代码:
public class Service {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 使用默认的交换机// 5. 声明队列channel.queueDeclare(Constants.RPC_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.RPC_QUEUE_2, true, false, false, null);// 6. 设置同时最多只能获取一条消息channel.basicQos(1);// 7. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 将消息发到队列2中AMQP.BasicProperties replayProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();// 返回String message = new String(body);System.out.println("接收到消息: " + message);// 响应消息String response = "request: " + message + " 接收成功";channel.basicPublish("", properties.getReplyTo(), replayProperties, response.getBytes());// 对消息进行应答channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(Constants.RPC_QUEUE_1, false, consumer);}
}
运行代码,观察结果:
Publisher Confirms(发布确认)
消息中间件,都会面临消息丢失的问题
消息丢失大概分为三种情况:
1. 生产者的问题:由于应用程序故障、网络抖动等各种原因,生产者没有成功向 broker 发送消息
2. 消息中间件的问题:生产者成功将消息发送给了 broker,但 broker 未能将消息保存好,导致消息丢失
3. 消费者的问题:broker 将消息发送给了消费者,消费者在消费消息时,未处理好,导致 broker 将消费失败的消息从队列中删除了
Rabbit 针对上述问题给出了相应的解决方案:
针对问题1,可以采用发布确认(Publisher Confirms)机制实现
针对问题2,可以通过持久化机制
针对问题3,可以采用消息应答机制
接下来,我们就来进一步学习 发布确认机制
发布确认的过程:
(1)生产者将 Channel 设置为 confirm 模式(通过调用 channel.confirmSelect() 完成),发布的每一条消息都会获得一个唯一的 ID,生产者可以将这些序列号与消息关联起来,以便追踪消息的状态
(2)当消息被 RabbitMQ 服务器接收并处理后,服务器会向生产者发送一个确认(ACK),其中包含消息的唯一 ID,表示消息已经送达
其中,deliveryTag 包含了确认消息的序号,此外,broker 也可以设置 channel.basicAck 方法中的 multiple 参数,表示这个序号之前的所有消息都已经被处理
发送确认机制最大的好处在于它是 异步 的,生产者可以同时发布消息和等待信道返回确认消息
当消息最终得到确认之后,生产者可以通过 回调方法 来处理该确认消息
若 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack(Basic.Nack)命令,生产者同样可以在回调方法中处理该 nack 命令
使用发布确认机制,需要将信道设置为 confirm(确认)模式:
// 开启信道Channel channel = connection.createChannel();// 开启信道确认模式channel.confirmSelect();
发布模式有 3 种确认策略,我们分别来进行学习
由于使用每种策略时都需要建立连接,因此,我们将建立连接抽取出来:
public static Connection createConnection() throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理return connection;}
Publishing Messages Individually(单独确认)
单独确认模式下,每发送一条消息,RabbitMQ 会在消息被成功持久化到队列或者被消费者成功接收后,发回一个确认(acknowledgment)。生产者可以收到关于每条消息的确认信息
也就是说,生产者发送消息后会等待每条消息的确认信号。如果消息发送成功,RabbitMQ 会返回一个确认信号;如果消息失败,RabbitMQ 会返回一个负确认信号(nack)
我们先在 Constans 类中声明会使用的队列:
// 发布确认模式public static final String PUBLISH_CONFIRMS_QUEUE_1 = "publish.confims.queue1";public static final String PUBLISH_CONFIRMS_QUEUE_2 = "publish.confims.queue2";public static final String PUBLISH_CONFIRMS_QUEUE_3 = "publish.confims.queue3";
我们仍使用默认的交换机进行路由
每次都发送 200 条消息:
public class Producer {public static int MESSAGE_COUNT = 200;
}
每当发送一条消息,就使用 channel.waitForConfirms() 方法等待确认消息
void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;
等待确认消息,当消息被确认,方法就会返回,若消息超时,就会抛出 TimeoutException 异常,若消息丢失,就会抛出 IOException
此外,我们记录 单独确认策略 发送消息的耗时:
public class Producer {public static int MESSAGE_COUNT = 200;public static int WAIT_TIME = 5000;public static void publishMessageIndividually() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 开启信道确认模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_1, true, false, false, null);// 记录开始时间long startTime = System.currentTimeMillis();// 发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_1, null, message.getBytes());// 等待确认channel.waitForConfirms(WAIT_TIME);}// 记录结束时间long endTime = System.currentTimeMillis();System.out.printf("publish %d messages individually in %d ms\n", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}
}
完整代码:
public class Producer {public static int MESSAGE_COUNT = 200;public static int WAIT_TIME = 5000;// 建立连接public static Connection createConnection() throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理return connection;}// 单独确认模式public static void publishMessageIndividually() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 开启信道确认模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_1, true, false, false, null);// 记录开始时间long startTime = System.currentTimeMillis();// 发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_1, null, message.getBytes());// 等待确认channel.waitForConfirmsOrDie(WAIT_TIME);}// 记录结束时间long endTime = System.currentTimeMillis();System.out.printf("publish %d messages individually in %d ms\n", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}public static void main(String[] args) {// 单独确认模式publishMessageIndividually();}
}
运行结果:
可以看到,发送 200 条消息的耗时较长
且,单独确认策略是每发送一条消息后,就调用 channel.waitForConfirmsOrDie 方法,等待服务端的确认,也就是一种串行同步等待的方式,尤其是对于持久化的消息而言,需要等待消息确认存储在磁盘之后才会返回
但发布确认机制支持异步确认,即,可以一边发送消息,一边等待消息确认
我们接着看另外两种策略
Publishing Messages in Batches(批量确认)
批量确认会在每发送一批消息后,调用 waitForConfirms 方法,等待服务器的确认返回
我们每发送 50 条消息,就调用 waitForConfirms 方法进行确认:
public static int BATCH_SIZE = 50;// 批量确认模式public static void publishMessageInBatches() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 设置为 confirm 模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_2, true, false, false, null);// 发送消息int messageCount = 0;long startTime = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_2, null, message.getBytes());messageCount++;// 批量确认if(messageCount == BATCH_SIZE) {channel.waitForConfirms(WAIT_TIME);messageCount = 0;}}// 消息发送完,若还有未确认消息,则进行最后的确认if (messageCount > 0) {channel.waitForConfirms(WAIT_TIME);}long endTime = System.currentTimeMillis();System.out.printf("publish %d messages in batch in %d ms\n", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}
需要注意的是,若我们发送的消息为 210 条,此时最后的十条消息未被确认,因此,我们在消息发送完成后,进行最后的确认
调用 publishMessageInBatches 方法,并观察结果:
我们可以看到,相比于单独确认策略,批量确认极大的提高了 confirm 的效率,但当出现了 Basic.Nack 或超时时,我们无法确定是哪一条消息出现了问题,客户端需要将这一批消息都进行重发,这也就重复发送了很多消息,当消息经常丢失时,批量确认的性能会不升反降
最后,我们来看 异步确认
Handling Publisher Confirms Asynchronously(异步确认)
异步确认提供了一个回调方法,服务端确认了一条或多条消息后,客户端会调用这个方法进行处理
Channel 接口提供了 addConfirmListener 方法,可以添加 ConfirmListener 回调接口
在 ConfirmListener 接口中包含两个重要方法:
handleAck 和 handleNack,分别对应处理 RabbitMQ 发送给生产者的 ack 和 nack
deliveryTag 表示发送消息的序号
multiple 表示是否批量确认,开启批量确认后,若 RabbitMQ 返回的消息序号为 20,则表明前 20 条消息都已经接收成功;当不开启批量确认时,若 RabbitMQ 返回的消息序号为 20 ,则表明第 20 条消息被成功接收
在使用异步确认策略时,我们需要为每个 Channel 维护一个已发送消息的序号集合,当收到 RabbitMQ 的 confirm 回调时,从集合中删除掉对应消息
当 Channel 开启 confirm 模式后,channel 上发送消息都会附带一个从 1 开始递增的 deliveryTag 序号。我们可以使用 SortedSet 的有序性来维护这个已发送消息的集合
实现步骤:
1. 使用有序集合存储未确认的消息序号
// 使用有序集合来存储未确认的消息SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
2. 当收到 ack 时,从集合中删除消息序号,若为批量确认,则删除小于等于当前消息序号的所有序号
// 进行确认channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {// 若为批量确认,则删除确认序号前所有元素if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);}}});
3. 当接收到 nack 时,需要根据具体情况进行消息重发等操作
在这里,我们就不对其进行处理了,直接将消息清除:
@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {// 处理失败,消息重发...// 若为批量确认,则删除确认序号前所有元素if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);}}
接着,我们发送消息,每当发送一条消息,就将其序号存储到有序集合中:
// 发送消息long startTime = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;// 获取序号long nextPublishSeqNo = channel.getNextPublishSeqNo();channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_3, null, message.getBytes());// 存储序号confirmSet.add(nextPublishSeqNo);}
当有序集合为空时,消息确认完,因此,我们使用 while 循环等待消息确认完毕:
// 消息确认完毕while (!confirmSet.isEmpty()) {Thread.sleep(10);}
若循环体中什么也不写,while 循环执行的速度会非常快,因此,每当判断一次,我们让其等待 10 ms
完整代码:
// 异步确认模式public static void handlePublishConfirmAsynchronously() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 设置 confirm 模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_3, false, false, true, null);// 使用有序集合来存储未确认的消息SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());// 进行确认channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {// 若为批量确认,则删除确认序号前所有元素if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {// 处理失败,消息重发...// 若为批量确认,则删除确认序号前所有元素if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);}}});// 记录开始时间long startTime = System.currentTimeMillis();// 发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;// 获取序号long nextPublishSeqNo = channel.getNextPublishSeqNo();channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_3, null, message.getBytes());// 存储序号confirmSet.add(nextPublishSeqNo);}// 消息确认完毕while (!confirmSet.isEmpty()) {Thread.sleep(10);}// 记录结束时间long endTime = System.currentTimeMillis();System.out.printf("publish %d messages and handled confirms asynchronously in %d ms\n", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}
运行结果:
可以看到,三种策略中,异步确认的表现更好
完整代码:
public class Producer {public static int MESSAGE_COUNT = 200;public static int WAIT_TIME = 5000;public static int BATCH_SIZE = 50;// 建立连接public static Connection createConnection() throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理return connection;}// 单独确认模式public static void publishMessageIndividually() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 开启信道确认模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_1, true, false, false, null);// 记录开始时间long startTime = System.currentTimeMillis();// 发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_1, null, message.getBytes());// 等待确认channel.waitForConfirmsOrDie(WAIT_TIME);}// 记录结束时间long endTime = System.currentTimeMillis();System.out.printf("publish %d messages individually in %d ms\n", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}// 批量确认模式public static void publishMessageInBatches() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 设置为 confirm 模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_2, true, false, false, null);// 发送消息int messageCount = 0;long startTime = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_2, null, message.getBytes());messageCount++;// 批量确认if(messageCount == BATCH_SIZE) {channel.waitForConfirms(WAIT_TIME);messageCount = 0;}}// 消息发送完,若还有未确认消息,则进行最后的确认if (messageCount > 0) {channel.waitForConfirms(WAIT_TIME);}long endTime = System.currentTimeMillis();System.out.printf("publish %d messages in batch in %d ms\n", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}// 异步确认模式public static void handlePublishConfirmAsynchronously() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 设置 confirm 模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_3, false, false, true, null);// 使用有序集合来存储未确认的消息SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());// 进行确认channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {// 若为批量确认,则删除确认序号前所有元素if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {// 处理失败,消息重发...// 若为批量确认,则删除确认序号前所有元素if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);}}});// 记录开始时间long startTime = System.currentTimeMillis();// 发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;// 获取序号long nextPublishSeqNo = channel.getNextPublishSeqNo();channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_3, null, message.getBytes());// 存储序号confirmSet.add(nextPublishSeqNo);}// 消息确认完毕while (!confirmSet.isEmpty()) {Thread.sleep(10);}// 记录结束时间long endTime = System.currentTimeMillis();System.out.printf("publish %d messages and handled confirms asynchronously in %d ms\n", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}public static void main(String[] args) {// 单独确认模式publishMessageIndividually();// 批量确认模式publishMessageInBatches();// 异步确认handlePublishConfirmAsynchronously();}
}