欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 艺术 > Java操作RabbitMQ

Java操作RabbitMQ

2025/3/17 9:47:44 来源:https://blog.csdn.net/weixin_53946852/article/details/146301197  浏览:    关键词:Java操作RabbitMQ

文章目录

  • Spring集成RabbitMQ
    • 1. AMQP&SpringAMQP
    • 2. SpringBoot集成RabbitMQ
    • 3. 模型
      • work模型
    • 4.交换机
      • Fanout交换机
      • Direct交换机
      • Topic交换机
    • 5.声明式队列和交换机
      • 基于API声明
      • 基于注解声明
    • 6.消息转换器


Spring集成RabbitMQ

1. AMQP&SpringAMQP

  • AMQP(高级消息队列协议):Advanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。是一种面向消息通信的协议,就像HTTP协议是一种浏览器向服务器发消息的协议。
  • SpringAMQP:Spring AMOP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象spring-rabbit是底层的默认实现。也就是说SpringAMQP只是一种思想,而spring-rabbit是其具体实现

2. SpringBoot集成RabbitMQ

在Maven依赖中引入amqp的起步依赖即可

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在Spring配置文件中配置

spring:rabbitmq:host: 127.0.0.1port: 5672# 虚拟主机virtual-host: /hhyusername: hhypassword: hhy

RabbitTemplate是Spring封装好的操作RabbitMQ的工具类

生产者

@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendMessage2Queue() {String queueName = "hhy.q1";String msg = "hello, mq!666";rabbitTemplate.convertAndSend(queueName, msg);}

消费者

@Component
public class MqListener {@RabbitListener(queues = "hhy.q1")public void listenSimpleQueue(String msg){System.out.println("hhy.q1的消息:【" + msg +"】");}}

3. 模型

work模型

假设消息生产者生产消息的速度非常的快,消息消费者消费消息的速度赶不上生产的速度,就会导致MQ队列中的消息越来越多,从而导致消息堆积问题,如何处理消息堆积问题?

  1. 让多个消费者绑定一个队列,加快消息处理速度
  2. 还可以在代码层面使用异步操作,比说线程池

绑定多个消费者,每个消费者的处理能力也可能不一致,而Spring默认将消息以轮询的方式发送给多个消费者,处理能力慢的消费者还是会影响处理速度,此时就可以通过添加配置prefetch让消费者只获取一条消息处理完成后再获取,进一步避免消息堆积问题

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

在这里插入图片描述

work模型就是多个消费者绑定一个队列

@Component
public class MqListener {@RabbitListener(queues = "work.q")public void workListen1(String msg){System.out.println("消费者1:work.q的消息:【" + msg +"】");}@RabbitListener(queues = "work.q")public void workListen2(String msg){System.err.println("消费者2:work.q的消息:【" + msg +"】");}
}

4.交换机

上诉实例代码中并没有使用交换机,生产者是直接将消息发送到队列中,实际这种方式是不合理的,假设多个服务都需要订阅同一条消息这种方式就无法满足需求了,那么就要引入交换机。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

Fanout交换机

Fanout交换机其实就是广播,将生产者发布的消息广播给绑定的自身的所有消息队列。发送消息流程:

  • 可以有多个队列
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机
  • 交换机把消息发送给绑定过的所有队列
  • 订阅队列的消费者都能拿到消息

在这里插入图片描述

根据上诉图编写代码

// 消费者1消费队列1
@RabbitListener(queues = "fanout.q1")
public void fanoutListen1(String msg){System.out.println("消费者1:fanout.q1的消息:【" + msg +"】");
}
// 消费者2消费队列2
@RabbitListener(queues = "fanout.q2")
public void fanoutListen2(String msg){System.out.println("消费者1:fanout.q2的消息:【" + msg +"】");
}

生产者向Fanout类型交换机发送消息,前提需要创建Fanout类型的交换机

@Test
void testSendFanout() {// 交换机名称String exchangeName = "amq.fanout";String msg = "hello, fanout!";rabbitTemplate.convertAndSend(exchangeName, null, msg);
}

Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在这里插入图片描述

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

在这里插入图片描述

通过key进行绑定,如下图也就是说生产者发送消息时指定key为test两个消费者内的队列都能收到,key为java时只有dirct.q1队列能收到,key为cpp时只有dirct.q2队列能收到

在这里插入图片描述

消费者代码

@RabbitListener(queues = "direct.q1")
public void fanoutDirect1(String msg){System.out.println("消费者1:direct.q1的消息:【" + msg +"】");
}
@RabbitListener(queues = "direct.q2")
public void fanoutDirect2(String msg){System.out.println("消费者2:direct.q2的消息:【" + msg +"】");
}

生产者代码

生产者在指定消息时指定不同的key来发送消息

@Test
void testSendDirect() {String exchangeName = "hhy.direct";String msg = "所有队列都能收到该消息";rabbitTemplate.convertAndSend(exchangeName, "test", msg);
}
@Test
void testSendDirect() {String exchangeName = "hhy.direct";String msg = "只有队列direct.q1能收到消息";rabbitTemplate.convertAndSend(exchangeName, "java", msg);
}
@Test
void testSendDirect() {String exchangeName = "hhy.direct";String msg = "只有队列direct.q2能收到消息";rabbitTemplate.convertAndSend(exchangeName, "cpp", msg);
}

Topic交换机

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。
只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!也就是说Topic交换机是非常灵活的,Bindingkey支持模糊匹配。

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: china.hunan

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

假设有多个队列绑定的Bindingkey分别为:

  • china.hunan.chenzhou.weather:湖南郴州的天气
  • china.hunan.chenzhou.news:湖南郴州的新闻
  • china.zhejiang.hangzhou.weather:浙江杭州的天气
  • japan.tokyo.news:日本东京的新闻

那么使用通配符:

  • china.hunan.#:表示接受湖南的所有新闻和天气消息
  • #.news:表示接受所有新闻消息
  • china.hunan.*.news:表示接受湖南省各个市区的新闻

建立绑定关系:

在这里插入图片描述

在这里插入图片描述

代码实例

// 消费者
@RabbitListener(queues = "topic.q1")
public void topicListen1(String msg){System.out.println("消费者1:topic.q1的消息:【" + msg +"】");
}@RabbitListener(queues = "topic.q2")
public void topicListen2(String msg){System.out.println("消费者2:topic.q2的消息:【" + msg +"】");
}

生产者代码

这一条消息topic.q1topic.q2两个队列都能收到消息,因为它们和交换机绑定的关系的时候指定的KEY:

  • #.news:接受所有地方的新闻
  • china.hunan.#:接受湖南的新闻和天气

@Test
void testSendTopic() {// 交换机名称String exchangeName = "hhy.topic";String key = "china.hunan.chenzhou.news";String msg = "这是一条湖南郴州的新闻!";rabbitTemplate.convertAndSend(exchangeName, key, msg);
}

下面这条消息只有topic.q2能收到,因为topic.q2和交换机绑定时指定的KEY为china.hunan.#,接受湖南的所有天气和新闻消息

@Test
void testSendTopic() {// 交换机名称String exchangeName = "hhy.topic";String key = "china.hunan.chenzhou.weather";String msg = "郴州今天多云转晴";rabbitTemplate.convertAndSend(exchangeName, key, msg);
}

小结:

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

5.声明式队列和交换机

通过RabbitMQ提供的管理页面创建队列和交换机比较麻烦,SpringAMQP提供了对应API方便开发者来创建队列和交换机。

基于API声明

通过Spring提供的API创建fanout交换机和队列并建立绑定关系

@Configuration
public class FanoutConfiguration {/*** 声明式创建fanout交换机* @return*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("hhy.fanout");}/*** 声明式创建队列* @return*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 声明式创建绑定关系* @param fanoutQueue1* @param fanoutExchange* @return*/@Beanpublic Binding fanoutBinding3(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}}

但如果使用这种方式创建Direct交换机就会非常麻烦,因为如果要绑定时要指定多个Key就会出现很多冗余代码,每绑定一个不同的Key就需要多写一份代码

@Configuration
public class DirectConfiguration {@Beanpublic DirectExchange directExchange(){return new DirectExchange("test.direct");}@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}@Beanpublic Binding directQueue1BindingRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}@Beanpublic Binding directQueue1BindingBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}@Beanpublic Binding directQueue2BindingRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}@Beanpublic Binding directQueue2BindingBlue(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}}

基于注解声明

基于@Bean的方式声明队列和交换机的方式比价麻烦,代码有点冗余,Spring还为我们提供基于注解的方式来声明。

使用注解的方式声明Direct模式的交换机和队列,通过注解声明这种创建方式更简单清爽,一个注解直接创建交换机并且绑定队列。并且对应消费者直接就可以监听队列接收消息

@Component
public class MqListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}))public void listenSimpleQueue1(String msg){System.out.println("消费者1:收到了simple.queue的消息:【" + msg +"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}))public void listenSimpleQueue2(String msg){System.out.println("消费者2:收到了simple.queue的消息:【" + msg +"】");}
}

6.消息转换器

前面我们生产者发送的消息都是一些字符串,当我们发送的消息是一个对象的时候就会出现问题。

@Test
void testSendObject() {String exchangeName = "test.direct";Map<String, Object> msg = new HashMap<>(2);msg.put("name", "jack");msg.put("age", 21);rabbitTemplate.convertAndSend(exchangeName, "red", msg);
}

如下图RabbitMQ中的消息队列中存储的消息,数据类型是通过JDK自带的序列化后的数据

在这里插入图片描述

而JDK自带的序列化,存在以下问题:

  • 消息体积大
  • 毫无可读性
  • 有安全漏洞,利用Java字节码反序列化能被替换恶意代码

所以使用JDK自带的序列化方式并不合适,那么我可以使用JSON的序列化方式来解决这个问题。

使用jackson就行,引入jackson依赖

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId>
</dependency>

将消息转换器交给Spring管理

@Bean
public MessageConverter jacksonMessageConvertor(){return new Jackson2JsonMessageConverter();
}

在这里插入图片描述


版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词