欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 游戏 > 山东大学软件学院项目实训-创新实训-基于大模型的旅游平台(二十八)- 微服务(8)

山东大学软件学院项目实训-创新实训-基于大模型的旅游平台(二十八)- 微服务(8)

2024/10/24 9:17:27 来源:https://blog.csdn.net/qq_64663346/article/details/139507836  浏览:    关键词:山东大学软件学院项目实训-创新实训-基于大模型的旅游平台(二十八)- 微服务(8)

目录

11.4 SpringAMQP

11.4.2 Work Queue工作队列

11.4.3 发布订阅模型

11.4.4 FanoutExchange(广播交换机)

11.4.5 DirectExchange(路由模式交换机)

11.4.6 TopicExchange

11.5 消息转换器


11.4 SpringAMQP

父工程引入AMQP依赖

  <!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

编写测试方法

yml配置文件中编写配置

  spring:rabbitmq:host: 192.168.142.130   # rabbitmq的ip地址port: 5672  # 端口username: xxxxxpassword: xxxxxxxvirtual-host: /

发消息测试

  @SpringBootTestpublic class AMQPTest {​@Autowiredprivate RabbitTemplate rabbitTemplate;​@Testpublic void testSendMessage2SimpleQueue(){String queueName = "simple.queue";String message = "hello,spring amqp";rabbitTemplate.convertAndSend(queueName,message);}}

在consumer中编写消费逻辑,监听simple.queue

配置文件配置 :

  spring:rabbitmq:host: 192.168.142.129   # rabbitmq的ip地址port: 5672  # 端口username: xxxxxpassword: xxxxxvirtual-host: /

编写监听类

  @Componentpublic class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void ListenSimpleQueue(String msg){System.out.println("消费者接收到simple.queue的消息 : " + msg);}}

启动主启动类,控制台可看到输出的监听到的消息

消息一旦被消费,就会从队列中删除,没有回收机制

11.4.2 Work Queue工作队列

publisher代码

  @Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello,spring amqp__";for(int i = 1 ; i <= 50 ; i ++){rabbitTemplate.convertAndSend(queueName,message + i);   Thread.sleep(20);}}

consumer接收消息

  // 消费者1@RabbitListener(queues = "simple.queue")public void ListenWork1Queue(String msg) throws InterruptedException {System.out.println("消费者1接收到simple.queue的消息 : " + msg + LocalTime.now());Thread.sleep(20);}​// 消费者2@RabbitListener(queues = "simple.queue")public void ListenWork2Queue(String msg) throws InterruptedException {System.err.println("消费者2接收到simple.queue的消息 : " + msg + LocalTime.now());Thread.sleep(200);}

消息预取机制使得两者平均分配消息 不符预期

配置文件中 :

处理预取值

  spring:rabbitmq:host: 192.168.142.129   # rabbitmq的ip地址port: 5672  # 端口username: xxxxxxpassword: xxxxxxxvirtual-host: /listener:simple:prefetch: 1    # 每次只能获取一条消息 ,处理完成才能获取下一个信息

11.4.3 发布订阅模型

11.4.4 FanoutExchange(广播交换机)

步骤一 : 声明交换机,队列 , 并绑定队列和交换机

在consumer中编写配置类

  @Configurationpublic class FanoutConfig {// 声明交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("xinbo.fanout");}​// 声明队列1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}​// 绑定队列1到交换机@Beanpublic Binding fanoutBindind(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}​// 声明队列2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}​// 绑定队列2到交换机@Beanpublic Binding fanoutBindind2(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}

消息监听 :

  @Componentpublic class SpringRabbitListener {​// 消费者1@RabbitListener(queues = "fanout.queue1")public void ListenWork1Queue(String msg) throws InterruptedException {System.out.println("消费者1接收到fanout.queue1的消息 : " + msg + LocalTime.now());Thread.sleep(20);}​// 消费者2@RabbitListener(queues = "fanout.queue2")public void ListenWork2Queue(String msg) throws InterruptedException {System.err.println("消费者2接收到fanout.queue2的消息 : " + msg + LocalTime.now());Thread.sleep(200);}}

消息发送 :

  @Testpublic void testSendFanoutExchange(){String exchangeName = "xinbo.fanout";               // 交换机名称String message = "hello,everyone";rabbitTemplate.convertAndSend(exchangeName,null,message);}

11.4.5 DirectExchange(路由模式交换机)

利用@RabbitListener声明Exchange Queue RoutingKey

SpirngRabbitListener中

  @Componentpublic class SpringRabbitListener {​@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "xinbo.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void ListenDirectQueue1(String msg) throws InterruptedException {System.out.println("消费者接收到direct.queue1的消息 : " + msg + LocalTime.now());Thread.sleep(20);}​@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "xinbo.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void ListenDirectQueue2(String msg) throws InterruptedException {System.out.println("消费者接收到direct.queue2的消息 : " + msg + LocalTime.now());Thread.sleep(20);}​}

发送消息测试 :

  @Testpublic void testSendDirectExchange(){// 交换机名称String exchangeName = "xinbo.direct";String message = "hello,blue";rabbitTemplate.convertAndSend(exchangeName,"blue",message);}

11.4.6 TopicExchange

绑定队列和交换机的关系 :

  @Componentpublic class SpringRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name="xinbo.topic",type = ExchangeTypes.TOPIC),key = "china.#"))public void ListenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息 : " + msg + LocalTime.now());}​@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name="xinbo.topic",type = ExchangeTypes.TOPIC),key = "#.news"))public void ListenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息 : " + msg + LocalTime.now());}​}

发送消息 :

  @Testpublic void testSendTopicExchange(){// 交换机名称String exchangeName = "xinbo.topic";String message = "中国发生了xxxxx";rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}

11.5 消息转换器

发送和接受json类型的消息

添加依赖 :

  <dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId></dependency>

在配置类中

  @Beanpublic MessageConverter messageCondition(){return new Jackson2JsonMessageConverter();}

接收消息 :

引依赖 :同上

在Listener中 :

  @RabbitListener(queues = "object.queue")public void ListenObjectQueue(Map<String,Object> msg){System.out.println(msg);}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com