主题模式
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:# 匹配零个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert
生产者代码
public class TopicProducer {public static void main(String[] args) throws Exception {//1.创建连接ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.64.140");cf.setUsername("guest");cf.setPassword("guest");Connection nc = cf.newConnection();Channel cc = nc.createChannel();//2.定义交换机cc.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);//3.发送数据while(true) {Scanner scanner = new Scanner(System.in);System.out.println("消息:");String s = scanner.nextLine();System.out.println("路由键:");String key=scanner.nextLine();cc.basicPublish("topic_logs", key, null, s.getBytes());System.out.println("=======================================");}}
}
消费者代码
public class TopicConsumer {public static void main(String[] args) throws Exception {//1.创建连接ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.64.140");cf.setUsername("guest");cf.setPassword("guest");Connection nc = cf.newConnection();Channel cc = nc.createChannel();//2.定义交换机、队列、绑定cc.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);String queue = cc.queueDeclare().getQueue();System.out.print("输入绑定键:");String s = new Scanner(System.in).nextLine();String[] a = s.split("\\s+");for (String key : a) {cc.queueBind(queue, "topic_logs", key);}//3.处理消息DeliverCallback deliverCallback = new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {byte[] a = message.getBody();String s = new String(a);String key = message.getEnvelope().getRoutingKey();System.out.println(s+"--"+key);System.out.println("========================================");}};CancelCallback cancelCallback = new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}};cc.basicConsume(queue, true,deliverCallback,cancelCallback);}
}
RPC模式
如果我们需要在远程电脑上运行一个方法,并且还要等待一个返回结果该怎么办?这和前面的例子不太一样, 这种模式我们通常称为远程过程调用,即RPC。
在本节中,我们将会学习使用RabbitMQ去搭建一个RPC系统:一个客户端和一个可以升级(扩展)的RPC服务器。为了模拟一个耗时任务,我们将创建一个返回斐波那契数列的虚拟的RPC服务。
客户端
在客户端定义一个RPCClient类,并定义一个call()方法,这个方法发送一个RPC请求,并等待接收响应结果
RPCClient client = new RPCClient();
String result = client.call("4");
System.out.println( "第四个斐波那契数是: " + result);
回调队列
使用RabbitMQ去实现RPC很容易。一个客户端发送请求信息,并得到一个服务器端回复的响应信息。为了得到响应信息,我们需要在请求的时候发送一个“回调”队列地址。我们可以使用默认队列。下面是示例代码:
//定义回调队列,
//自动生成对列名,非持久,独占,自动删除
callbackQueueName = ch.queueDeclare().getQueue();//用来设置回调队列的参数对象
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();
//发送调用消息
ch.basicPublish("", "rpc_queue", props, message.getBytes());
消息属性 Message Properties
AMQP 0-9-1协议定义了消息的14个属性。大部分属性很少使用,下面是比较常用的4个:
deliveryMode:将消息标记为持久化(值为2)或非持久化(任何其他值)。
contentType:用于描述mime类型。例如,对于经常使用的JSON格式,将此属性设置为:application/json。
replyTo:通常用于指定回调队列。
correlationId:将RPC响应与请求关联起来非常有用。
关联id(correlationId)
在上面的代码中,我们会为每个RPC请求创建一个回调队列。 这是非常低效的,这里还有一个更好的方法:让我们为每个客户端创建一个回调队列。
这就提出了一个新的问题,在队列中得到一个响应时,我们不清楚这个响应所对应的是哪一条请求。这时候就需要使用关联id(correlationId)。我们将为每一条请求设置唯一的的id值。稍后,当我们在回调队列里收到一条消息的时候,我们将查看它的id属性,这样我们就可以匹配对应的请求和响应。如果我们发现了一个未知的id值,我们可以安全的丢弃这条消息,因为它不属于我们的请求。
服务端代码
public class RPCServer {public static void main(String[] args) throws Exception {//1.接受客户端发送的调用信息(正整数n)//2.执行算法求第n个斐波那契数的结果//3.向客户端发送计算结果ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.64.140");cf.setPort(5672);cf.setUsername("guest");cf.setPassword("guest");Connection nc = cf.newConnection();Channel cc = nc.createChannel();cc.queueDeclare("rpc_queue", false, false, false, null);DeliverCallback deliverCallback = new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {//从message中取出消息:n,返回队列的队列名,关联idString s = new String(message.getBody());String replyTo = message.getProperties().getReplyTo();//返回队列的队列名String cid = message.getProperties().getCorrelationId();long fbnq = fbnq(Integer.parseInt(s));BasicProperties props = new BasicProperties.Builder().correlationId(cid).build();cc.basicPublish("", replyTo, props, (""+fbnq).getBytes());}};CancelCallback cancelCallback = new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}};cc.basicConsume("rpc_queue", true,deliverCallback,cancelCallback);}//服务:接受一个整数值n,求第n个斐波那契数//1 1 2 3 5 8 13 21 34 55 89 144 ......//递归求斐波那契数,递归的效率是非常低的//递归效率低,可以用来模拟服务器端的耗时运算static long fbnq(int n) {if(n==1 || n==2) {return 1;}return fbnq(n-1)+fbnq(n-2);}
}
客户端代码
public class RPCClient {static BlockingQueue<Long> q=new ArrayBlockingQueue<Long>(10);public static void main(String[] args) throws Exception {System.out.print("输入求第几个斐波那契数:");int n = new Scanner(System.in).nextInt();long fbnq=fbnq(n);System.out.println(fbnq);}//异步调用服务器,从服务器获取结果private static long fbnq(int n) throws Exception {ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.64.140");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");Connection nc = cf.newConnection();Channel cc = nc.createChannel();//定义发送调用消息的队列cc.queueDeclare("rpc_queue", false, false, false, null);cc.queuePurge("rpc_queue");//返回队列String replyTo = cc.queueDeclare().getQueue();//关联idString cid = UUID.randomUUID().toString();BasicProperties props = new BasicProperties.Builder().replyTo(replyTo).correlationId(cid).build();cc.basicPublish("", "rpc_queue", props, (""+n).getBytes());//模拟执行其他运算,不等待计算结果System.out.println("调用消息已发送");System.out.println("模拟执行其他运算,不立即等待计算结果");//获取计算结果DeliverCallback deliverCallback = new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {//处理数据之前,先比对关联idif(cid.equals(message.getProperties().getCorrelationId())) {String s=new String(message.getBody());long fbnq = Integer.parseInt(s);q.offer(fbnq);ch.getConnection().close();}}};CancelCallback cancelCallback = new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}};cc.basicConsume(replyTo, true,deliverCallback,cancelCallback);return q.take();}
}
上一篇文章:https://blog.csdn.net/Z0412_J0103/article/details/143355002https://blog.csdn.net/Z0412_J0103/article/details/143355002下一篇文章: