欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 能源 > java微服务中消息队列处理中间件基础语法学习,零基础学习

java微服务中消息队列处理中间件基础语法学习,零基础学习

2025/2/21 3:19:14 来源:https://blog.csdn.net/xiaozukun/article/details/145251814  浏览:    关键词:java微服务中消息队列处理中间件基础语法学习,零基础学习

在 Java 微服务中,消息队列处理中间件可以帮助实现服务之间的异步通信、解耦和负载均衡。常用的 Java 消息队列工具包括 RabbitMQ、Apache Kafka 和 ActiveMQ。下面我将详细介绍这些消息队列工具在 Java 中的基础语法和使用方法。

1. RabbitMQ

RabbitMQ 是一个广泛使用的开源消息代理软件,支持多种协议(AMQP、MQTT、STOMP 等)。我们可以使用 Spring AMQP 来简化 RabbitMQ 的集成。

1.1 安装 RabbitMQ
  • 在 Ubuntu 上安装 RabbitMQ

bash

sudo apt-get update
sudo apt-get install rabbitmq-server
  • 启动 RabbitMQ 服务

bash

sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
  • 启用管理插件

bash

sudo rabbitmq-plugins enable rabbitmq_management
  • 访问管理界面:打开浏览器,访问 http://<your_server_ip>:15672,默认用户名和密码为 guest

1.2 基本概念
  • Exchange: 消息的路由中心。
  • Queue: 存储消息的地方。
  • Binding: 将 Exchange 和 Queue 关联起来。
  • Routing Key: 路由键,用于匹配 Binding。
  • Producer: 发送消息的应用程序。
  • Consumer: 接收消息的应用程序。
1.3 示例代码
1.3.1 创建 Spring Boot 项目

使用 Spring Initializr 创建一个新的 Spring Boot 项目,并添加以下依赖:

  • Spring Web
  • Spring AMQP
  • Lombok (可选)
1.3.2 配置 RabbitMQ

application.yml 中配置 RabbitMQ 连接信息:

yaml

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest
1.3.3 生产者

创建一个生产者类来发送消息:

RabbitMQSender.java

package com.example.demo.rabbitmq;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class RabbitMQSender {@Autowiredprivate AmqpTemplate amqpTemplate;public void send(String message) {amqpTemplate.convertAndSend("my_queue", message);System.out.println("Sent message = " + message);}
}
1.3.4 消费者

创建一个消费者类来接收消息:

RabbitMQReceiver.java

package com.example.demo.rabbitmq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class RabbitMQReceiver {@RabbitListener(queues = "my_queue")public void receiveMessage(String message) {log.info("Received message = {}", message);}
}
1.3.5 控制器

创建一个控制器来触发消息发送:

RabbitMQController.java

package com.example.demo.controller;import com.example.demo.rabbitmq.RabbitMQSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class RabbitMQController {@Autowiredprivate RabbitMQSender rabbitMQSender;@GetMapping("/send-rabbitmq-message")public String sendMessage(@RequestParam String message) {rabbitMQSender.send(message);return "Message sent successfully";}
}
2. Apache Kafka

Apache Kafka 是一个分布式流处理平台,常用于实时数据管道和流应用程序。我们可以使用 Spring Kafka 来简化 Kafka 的集成。

2.1 安装 Kafka
  • 下载并解压 Kafka

bash

wget https://downloads.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz
tar -xzf kafka_2.13-3.0.0.tgz
cd kafka_2.13-3.0.0
  • 启动 Zookeeper:

bash

bin/zookeeper-server-start.sh config/zookeeper.properties
  • 启动 Kafka 服务器

bash

bin/kafka-server-start.sh config/server.properties
2.2 基本概念
  • Broker: Kafka 集群中的节点。
  • Topic: 数据分类的主题。
  • Partition: Topic 的分区。
  • Producer: 发送消息的应用程序。
  • Consumer: 接收消息的应用程序。
  • Consumer Group: 消费者的组。
2.3 示例代码
2.3.1 创建 Spring Boot 项目

使用 Spring Initializr 创建一个新的 Spring Boot 项目,并添加以下依赖:

  • Spring Web
  • Spring for Apache Kafka
  • Lombok (可选)
2.3.2 配置 Kafka

application.yml 中配置 Kafka 连接信息:

yaml

spring:kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
2.3.3 创建 Topic

在 Kafka 中创建一个主题 test-topic:

bash

bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
2.3.4 生产者

创建一个生产者类来发送消息:

KafkaProducer.java

package com.example.demo.kafka;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send("test-topic", message);System.out.println("Sent message = " + message);}
}
2.3.5 消费者

创建一个消费者类来接收消息:

KafkaConsumer.java

package com.example.demo.kafka;import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class KafkaConsumer {@KafkaListener(topics = "test-topic", groupId = "my-group")public void consume(String message) {log.info("Consumed message = {}", message);}
}
2.3.6 控制器

创建一个控制器来触发消息发送:

KafkaController.java

package com.example.demo.controller;import com.example.demo.kafka.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@Autowiredprivate KafkaProducer kafkaProducer;@GetMapping("/send-kafka-message")public String sendMessage(@RequestParam String message) {kafkaProducer.sendMessage(message);return "Message sent successfully";}
}

3. ActiveMQ

ActiveMQ 是另一个流行的开源消息代理,支持 JMS 协议。我们可以使用 Spring JMS 来简化 ActiveMQ 的集成。

3.1 安装 ActiveMQ
  • 下载并解压 ActiveMQ

bash

wget https://downloads.apache.org/activemq/5.16.5/apache-activemq-5.16.5-bin.tar.gz
tar -xzf apache-activemq-5.16.5-bin.tar.gz
cd apache-activemq-5.16.5
  • 启动 ActiveMQ 服务:
bin/activemq start
  • 访问管理界面:

     

    打开浏览器,访问 http://<your_server_ip>:8161/admin,默认用户名和密码为 adminadmin

3.2 基本概念
  • Broker: 消息代理。
  • Queue: 存储消息的地方。
  • Topic: 订阅模式的消息通道。
  • Producer: 发送消息的应用程序。
  • Consumer: 接收消息的应用程序。
3.3 示例代码
3.3.1 创建 Spring Boot 项目

使用 Spring Initializr 创建一个新的 Spring Boot 项目,并添加以下依赖:

  • Spring Web
  • Spring for Apache ActiveMQ
  • Lombok (可选)
3.3.2 配置 ActiveMQ

application.yml 中配置 ActiveMQ 连接信息:

yaml

spring:activemq:broker-url: tcp://localhost:61616user: adminpassword: admin
3.3.3 生产者

创建一个生产者类来发送消息:

ActiveMQProducer.java

package com.example.demo.activemq;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;@Service
public class ActiveMQProducer {@Autowiredprivate JmsTemplate jmsTemplate;public void sendMessage(String message) {jmsTemplate.convertAndSend("my_queue", message);System.out.println("Sent message = " + message);}
}
3.3.4 消费者

创建一个消费者类来接收消息:

ActiveMQConsumer.java

package com.example.demo.activemq;import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class ActiveMQConsumer {@JmsListener(destination = "my_queue")public void receiveMessage(String message) {log.info("Received message = {}", message);}
}
3.3.5 控制器

创建一个控制器来触发消息发送:

ActiveMQController.java

package com.example.demo.controller;import com.example.demo.activemq.ActiveMQProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class ActiveMQController {@Autowiredprivate ActiveMQProducer activeMQProducer;@GetMapping("/send-activemq-message")public String sendMessage(@RequestParam String message) {activeMQProducer.sendMessage(message);return "Message sent successfully";}
}

4.RabbitMQ、Apache Kafka 和 ActiveMQ 在微服务架构中作为消息队列处理工具的优缺点对比。

1. RabbitMQ
优点
  • 成熟稳定: 已经存在很长时间,社区支持强大。
  • 丰富的协议支持: 支持多种协议(AMQP、MQTT、STOMP 等),灵活性高。
  • 可靠的消息传递: 提供持久化和确认机制,确保消息不丢失。
  • 易于管理: 提供强大的管理界面和监控工具。
  • 广泛使用: 社区活跃,文档丰富。
缺点
  • 性能限制: 相比 Kafka,在大规模数据处理方面可能稍逊一筹。
  • 复杂性: 配置和管理相对复杂,尤其是在高级场景下。
  • 许可证: 使用 Erlang VM,可能会增加一些学习成本。
2. Apache Kafka
优点
  • 高性能: 设计用于高吞吐量和低延迟,适合大规模数据处理。
  • 可扩展性强: 容易水平扩展,支持多副本和分区。
  • 持久化能力强: 默认情况下所有消息都会被持久化到磁盘。
  • 容错性好: 内置了复制机制,确保数据安全。
  • 流处理能力: 内置流处理功能,支持实时数据处理。
缺点
  • 配置复杂: 初次配置和调优较为复杂。
  • 资源消耗大: 需要较多的内存和磁盘空间。
  • 依赖 Zookeeper: 需要额外部署和维护 Zookeeper。
  • 学习曲线陡峭: 对于新手来说,理解其内部工作原理需要一定时间。
3. ActiveMQ
优点
  • 成熟的 JMS 实现: 符合 JMS 规范,适用于 Java 应用程序。
  • 支持多种传输协议: 支持 TCP、UDP、HTTP 等多种传输协议。
  • 广泛的集成: 与 Spring 等框架无缝集成。
  • 易于配置: 相对简单,适合中小型项目。
  • 灵活的消息模式: 支持点对点(Queue)和发布/订阅(Topic)两种模式。
缺点
  • 性能较低: 相比 RabbitMQ 和 Kafka,在高吞吐量场景下性能较差。
  • 缺乏现代特性: 不如 Kafka 新颖,缺少一些现代流处理特性。
  • 社区活跃度下降: 相比其他两个工具,社区活跃度有所下降。
  • 复杂性: 在大型系统中配置和管理相对复杂。
总结
特征RabbitMQApache KafkaActiveMQ
成熟稳定性
协议支持多种协议 (AMQP, MQTT, STOMP)主要支持 Kafka 协议多种协议 (TCP, UDP, HTTP)
可靠性极强
易于管理是,提供强大的管理界面较复杂,需要配置 Zookeeper是,易于配置
性能中等中等
扩展性中等非常好,支持水平扩展中等
持久化极强
容错性极强
流处理能力基本支持内置流处理功能基本支持
学习曲线中等陡峭中等
资源消耗中等中等

选择哪种消息队列工具取决于你的具体需求,例如性能要求、可靠性需求、现有技术栈以及团队的技术背景。

以上内容涵盖了如何在 Java 微服务中使用 RabbitMQ、Apache Kafka 和 ActiveMQ 进行消息队列处理。每个部分都包含了安装步骤、基本概念和示例代码。你可以根据实际需求选择合适的工具,并进一步扩展和完善这些示例。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词