欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 建筑 > RabbitMQ实践——使用WebFlux响应式方式实时返回队列中消息

RabbitMQ实践——使用WebFlux响应式方式实时返回队列中消息

2024/10/24 5:22:40 来源:https://blog.csdn.net/breaksoftware/article/details/139751603  浏览:    关键词:RabbitMQ实践——使用WebFlux响应式方式实时返回队列中消息

大纲

  • Pom.xml
  • 监听队列
  • 实时返回消息
  • 测试
  • 完整代码
  • 工程代码

在之前的案例中,我们在管理后台收发消息都是通过短连接的形式。本文我们将探索对队列中消息的实时读取,并通过流式数据返回给客户端。
webflux是反应式Web框架,客户端可以通过一个长连接和服务端相连,后续服务端可以通过该连接持续给客户端发送消息。可以达到:发送一次,多次接收的效果。

Pom.xml

由于我们要使用Rabbitmq,所以要新增如下依赖

		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-stream</artifactId></dependency>

webflux的依赖如下:

		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.6.7</version></dependency>

监听队列

下面代码会返回一个监听队列的Container

    private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener = listeners.get(queueName);if (listener == null && messageListener != null) {listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}

实时返回消息

一旦消费者读取到消息,onMessage方法会被调用。然后Flux的消费者会将消息投递到流上。

    public Flux<String> listen(String queueName) {return Flux.create(emitter -> {SimpleMessageListenerContainer container = getListener(queueName, new MessageListener() {@Overridepublic void onMessage(Message message) {String msg = new String(message.getBody());System.out.println("listen function Received message: " + msg);emitter.next(msg);}});container.start();});}

测试

由于OpenApi不能支持实时展现流式数据,所以我们采用Postman来测试。
发送请求后,该页面一直处于滚动状态。
在这里插入图片描述
在管理后台发送一条消息
在这里插入图片描述
可以看到Postman收到了该消息
在这里插入图片描述
然后在发一条,Postman又会收到一条
在这里插入图片描述
这样我们就完成了“请求一次,多次返回”的效果。

完整代码

需要注意的是,返回的格式需要标记为produces = “text/event-stream”。

// controller
package com.rabbitmq.consumer.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.consumer.service.ConsumerService;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/consumer")
public class ConsumerController {@Autowiredprivate ConsumerService comsumerService;@GetMapping(value = "/listen", produces = "text/event-stream")public Flux<String> listen(@RequestParam String queueName) {return comsumerService.listen(queueName);}
}
// service
package com.rabbitmq.consumer.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import reactor.core.publisher.Flux;@Service
public class ConsumerService {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;private final ReentrantLock lock = new ReentrantLock();private Map<String, SimpleMessageListenerContainer> listeners = new java.util.HashMap<>();@PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();}public Flux<String> listen(String queueName) {return Flux.create(emitter -> {SimpleMessageListenerContainer container = getListener(queueName, new MessageListener() {@Overridepublic void onMessage(Message message) {String msg = new String(message.getBody());System.out.println("listen function Received message: " + msg);emitter.next(msg);}});container.start();});}private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener = listeners.get(queueName);if (listener == null && messageListener != null) {listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}
}

工程代码

https://github.com/f304646673/RabbitMQDemo/tree/main/consumer

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com