欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 产业 > Spring+ActiveMQ

Spring+ActiveMQ

2024/10/26 5:35:57 来源:https://blog.csdn.net/weixin_44929475/article/details/143117529  浏览:    关键词:Spring+ActiveMQ

1. 环境搭建

1.1 env-version

JDK

1.8

Spring

2.7.13

Maven

3.6

ActiveMQ

5.15.2

1.2 docker-compose.yml

version: '3.8'services:activemq:image: rmohr/activemq:5.16.3container_name: activemqports:- "61616:61616"- "8161:8161"environment:- ACTIVEMQ_ADMIN_LOGIN=admin- ACTIVEMQ_ADMIN_PASSWORD=admin- ACTIVEMQ_CONFIG_MINMEMORY=512- ACTIVEMQ_CONFIG_MAXMEMORY=2048
#    volumes:
#     - ./data/activemq:/var/activemq/data
#     - ./conf/activemq.xml:/var/activemq/conf/activemq.xmlnetworks:- activemq-network    networks:activemq-network:driver: bridge

在这个docker-compose.yml文件中:

  • activemq服务使用了rmohr/activemq Docker镜像,这是一个社区维护的ActiveMQ镜像。请确保选择一个与你的Spring Boot版本兼容的ActiveMQ版本。

  • container_name设置了容器的名称。

  • ports映射了ActiveMQ的JMS端口(61616)和管理控制台端口(8161)到宿主机的相同端口。

  • environment部分设置了管理员账号和密码,以及JVM的最小和最大内存配置。这些可以根据需要进行调整。

  • volumes部分映射了宿主机的目录到容器内部,用于持久化ActiveMQ的数据和配置文件。你需要创建相应的目录并放置你的activemq.xml配置文件。

  • networks定义了一个自定义网络,以便ActiveMQ服务可以连接到其他可能需要的Docker服务。

在使用这个docker-compose.yml文件之前,请确保你已经创建了dataconf目录,并且在conf目录中放置了自定义的activemq.xml配置文件。如果不需要持久化存储,可以移除volumes部分。

1.3 添加依赖

<!-- ActiveMQ 依赖 -->
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-spring -->
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-spring</artifactId><version>5.15.4</version><!-- 排除依赖 --><exclusions><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></exclusion>
</exclusions>
</dependency><!-- Spring Boot 与 JMS 集成的 starter -->
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-activemq -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId><version>2.7.12</version>
</dependency>

2. 工程结构

activemq/
├── src/
│   ├── main/
│   │   ├── java/
│   │   │   ├── com/
│   │   │   │   ├── xiaokai/
│   │   │   │   │   ├── ActiveMQApplication.java   // 应用程序的主类,通常包含main方法
│   │   │   │   │   ├── config/                   // 配置包
│   │   │   │   │   │   └── JmsConfig.java        // ActiveMQ的配置类
│   │   │   │   │   ├── event/                   // 事件包
│   │   │   │   │   │   ├── Eventinfo.java        // 事件信息类:构建消息、send topic
│   │   │   │   │   ├── listener/                // 监听器包
│   │   │   │   │   │   └── MessageListener.java  // 消息监听器类
│   │   │   │   │   ├── service/                 // 服务包
│   │   │   │   │   │   └── ActiveMQService.java   // ActiveMQ服务类
│   │   │   │   │   
│   │   │   ├── resources/                      // 资源文件
│   │   │   │   └── application.yml             // Spring配置文件
│   │   │   
│   │   ├── test/                                // 测试代码
│   │   │   ├── java/                           // 测试Java代码
│   │   │   │   ├── com/
│   │   │   │   │   ├── xiaokai/
│   │   │   │   │   └── ActiveMQTest.java        // ActiveMQ的测试类
└── pom.xml                                     // Maven构建配置文件(未在文件内容中列出)

ActiveMQApplication.java:项目的主类,通常包含启动Spring应用程序的main方法。

JmsConfig.java:配置ActiveMQ的Java配置类。

Eventinfo.java:可能用于表示事件信息的类。

MessageListener.java:消息监听器,用于监听并处理ActiveMQ消息。

ActiveMQService.java:服务类,可能包含与ActiveMQ交互的业务逻辑。

application.yml:Spring Boot的配置文件,用于配置应用程序的各种参数。

ActiveMQTest.java:用于测试ActiveMQ功能的测试类。

3. 示例代码

JmsConfig.java

package com.xiaokai.config;import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;import javax.jms.ConnectionFactory;/*** Author:yang* Date:2024-10-19 15:57*/
@Configuration
@EnableJms
public class JmsConfig {@Beanpublic ActiveMQConnectionFactory connectionFactory() {ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();connectionFactory.setBrokerURL("tcp://116.198.242.56:61616");connectionFactory.setUserName("admin");connectionFactory.setPassword("admin");return connectionFactory;}@Beanpublic DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setSessionTransacted(true);return factory;}}

MessageListener.java

package com.xiaokai.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;/*** Author:yang* Date:2024-10-19 15:55*/
@Component
@Slf4j
public class MessageListener {// 监听队列test.queue@JmsListener(destination = "test.queue")public void onMessage(String message) {log.info("Received message: " + message);}}

ActiveMQService .java

package com.xiaokai.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;/*** Author:yang* Date:2024-10-19 16:01*/
@Service
@Slf4j
public class ActiveMQService {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;public void send(String message) {log.info("Sending message: {}", message);jmsMessagingTemplate.convertAndSend("test.queue", message);}
}

注:JmsMessagingTemplate作为Spring相关bean,封装了JmsTemplate 。总的来说JmsTemplate更底层,但是在使用过程中不需要过多关注底层实现。

@Autowired private JmsMessagingTemplate jmsMessagingTemplate; 
@Autowired private JmsTemplate jmsTemplate;

application.yml

spring:activemq:broker-url: tcp://116.198.242.56:61616user: adminpassword: admin

注:在JmsConfig.java配置文件中配置后,可以不需要配置文件,二者选其一。

测试:

package com.xiaokai;import com.xiaokai.service.ActiveMQService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;/*** Author:yang* Date:2024-10-21 10:28*/
@SpringBootTest
@RunWith(SpringRunner.class)
@Slf4j
public class ActiveMQTest {@Autowiredprivate ActiveMQService activeMQService;@Testpublic void testSend(){activeMQService.send("test");}
}

结果:

 Sending message: testStarted ActiveMQApplication in 1.368 seconds (JVM running for 1.799)Received message: test

注:在ActiveMQ提供的可视化控制台可以查看相关信息。

访问:http://116.198.242.56:8161/admin/index.jsp

4. 消息模型

4.1 P2P模型

bean

@Bean
public Destination queue() {return new ActiveMQQueue("test.queue");
}

消息监听器

@Component
@Slf4j
public class MessageListener {// 监听队列test.queue@JmsListener(destination = "test.queue")public void onMessage1(String message) {log.info("Received queue message1: " + message);}// 监听队列test.queue@JmsListener(destination = "test.queue")public void onMessage2(String message) {log.info("Received queue message2: " + message);}}

消息服务

@Service
@Slf4j
public class ActiveMQService {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;@Autowiredprivate Destination queue;// 发送点对点消息public void sendP2P(String message) {log.info("Sending queue message: {}", message);jmsMessagingTemplate.convertAndSend(queue, message);}}

application.yml

spring:activemq:broker-url: tcp://116.198.242.56:61616user: adminpassword: admin#    true表示使用发布/订阅模式,false表示使用点对点模式jms:pub-sub-domain: false

结论:点对点消息模式是将消息推送到queue中,消费者通过轮训的方式消费消息

4.2 发布/订阅模型

Bean

@Bean
public Destination topic() {return new ActiveMQTopic("test.topic");
}

消息监听器

@Component
@Slf4j
public class PubMessageListener {// 监听主题test.topic@JmsListener(destination = "test.topic")public void onMessage3(String message) {log.info("Received topic message1: " + message);}// 监听主题test.topic@JmsListener(destination = "test.topic")public void onMessage4(String message) {log.info("Received topic message2: " + message);}}

消息服务

@Service
@Slf4j
public class ActiveMQService {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;@Autowiredprivate Destination topic;// 发送发布订阅消息public void sendPubSub(String message) {log.info("Sending topic message: {}", message);jmsMessagingTemplate.convertAndSend(topic, message);}}

application.yml

spring:activemq:broker-url: tcp://116.198.242.56:61616user: adminpassword: admin#    true表示使用发布/订阅模式,false表示使用点对点模式jms:pub-sub-domain: true

注:需要将发布/订阅开关打开

结论:发送消息后,订阅主题的消费者都能收到同一条消息去消费。

5. 消息类型

5.1 普通消息

        普通消息如上述案例,生产者生产消息后,由消费者消费消息,中间不需要做额外的事情。

5.2 延迟消息

延迟消息指在生产者生产带有延迟时间的消息后,broker接收到消息后,并不立即投送到队列或者主题,而是到达延迟时间后,再将消息投送到队列、主题。

配置ActiveMQ支持延迟消息: 修改ActiveMQ的配置文件activemq.xml,确保<broker>标签包含schedulerSupport="true"属性。这允许ActiveMQ的计划任务功能,从而支持延迟消息。

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

修改配置后,需要重启ActiveMQ服务器以使更改生效。

发送延迟消息

// 发送延迟消息
public void sendDelay(String message) {HashMap<String, Object> properties = new HashMap<>();properties.put(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10000);log.info("Sending delay queue message: {}", message);jmsMessagingTemplate.convertAndSend(queue, message, properties);
}

消息监听器

...

5.3 事务消息

        没啥用,用ActiveMQ实现事务消息还不如不用,辣鸡(狗头保命)

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com