🎉欢迎大家观看AUGENSTERN_dc的文章(o゜▽゜)o☆✨✨
🎉感谢各位读者在百忙之中抽出时间来垂阅我的文章,我会尽我所能向的大家分享我的知识和经验📖
🎉希望我们在一篇篇的文章中能够共同进步!!!
🌈个人主页:AUGENSTERN_dc
🔥个人专栏:C语言 |Java | 数据结构 | 算法 | MySQL | RabbitMQ | Redis
⭐个人格言:
一重山有一重山的错落,我有我的平仄
一笔锋有一笔锋的着墨,我有我的舍得
接下来,我会向大家介绍如何快速入门RabbitMQ,以及如何编写一个简单的RabbitMQ代码
1. 引入依赖
在编写我们的代码之前,我们需要引入RabbitMQ的依赖:
如果你使用的是Maven, 你可以使用以下依赖:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version>
</dependency>
2. 生产者消费者模型
在编写代码之前,大家需要了解生产者消费者模型:
生产者-消费者模型(Producer-Consumer Model): 是一种经典的多线程同步问题,用于解决生产者线程和消费者线程之间的数据共享和同步问题。它在多线程编程、并发编程以及分布式系统中被广泛应用。
2.1 主要角色
生产者-消费者模型涉及两个主要角色:
1. 生产者(producer)
- 负责生成数据并将其放入缓冲区(Buffer)。
- 如果缓冲区已满,生产者需要等待,直到缓冲区有空间可以存放数据。
2. 消费者(consumer)
- 负责从缓冲区中取出数据并消费。
- 如果缓冲区为空,消费者需要等待,直到缓冲区中有数据可以消费。
缓冲区(Buffer)是一个共享资源,用于存储生产者生成的数据,供消费者消费。
2.2 关键问题
生产者-消费者模型需要解决以下两个关键问题:
1. 互斥访问:
- 多个线程(生产者和消费者)需要访问共享的缓冲区,因此需要确保对缓冲区的访问是互斥的,避免数据竞争和不一致。
2. 同步问题:
- 生产者需要在缓冲区有空间时才能生产数据。
- 消费者需要在缓冲区有数据时才能消费数据。
- 需要一种机制来协调生产者和消费者之间的同步。
3. 编写生产者代码
3.1 创建连接
要想使用创建一个生产者,首先需要将生产者和RabbitMQ的服务器进行连接
// 1. 创建连接⼯⼚
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("你的RabbitMQ服务器IP");//ip 默认值localhost
factory.setPort(5672); //默认值5672
factory.setVirtualHost("test");//虚拟机名称, 默认 /
factory.setUsername("guest");//⽤⼾名,默认guest
factory.setPassword("guest");//密码, 默认guest
//3. 创建连接Connection
Connection connection = factory.newConnection();
RabbitMQ 默认的⽤于客⼾端连接的TCP 端⼝号是5672, 需要提前进⾏开放
3.2 创建Channel
//4. 创建channel通道
Channel channel = connection.createChannel();
⽣产者和消费者创建的channel并不是同⼀个
3.3 声明一个队列Queue
/*queueDeclare(String queue, boolean durable, boolean exclusive, booleanautoDelete, Map<String, Object> arguments)1.queue: 队列名称2.durable: 是否持久化.true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。3.exclusive:* 是否独占, 只能有⼀个消费者监听队列* 当Connection关闭时, 是否删除队列4.autoDelete: 是否⾃动删除, 当没有Consumer时, ⾃动删除掉5.arguments: ⼀些参数
*/
//如果没有⼀个hello_world 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
channel.queueDeclare("hello",true,false,false,null);
3.4 发送消息
当一个新的RabbitMQ节点启动时,他会预声明(declare)几个内置的交换机, 内置交换机名称是空字符串(""), 生产者发送的消息会根据队列名称直接路由到对应的队列.
例如: 如果有⼀个名为 "hello" 的队列, ⽣产者可以直接发送消息到 "hello" 队列, ⽽消费者可以从 "hello" 队列中接收消息, ⽽不需要关⼼交换机的存在. 这种模式⾮常适合简单的应⽤场景,其中⽣产者和消费者之间的通信是⼀对⼀的.
//6. 通过channel发送消息到队列中
/*basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)1.exchange: 交换机名称, 简单模式下, 交换机会使⽤默认的""2.routingKey: 路由名称, routingKey = 队列名称3.props: 配置信息4.body: 发送消息的数据
*/
String msg = "Hello World";
//使⽤的是内置交换机. 使⽤内置交换机时, routingKey要和队列名称⼀样, 才可以路由到对应的队列上去
channel.basicPublish("","hello",null,msg.getBytes());
System.out.println(msg + "消息发送成功");
3.5 释放资源
//显式地关闭Channel是个好习惯, 但这不是必须的, Connection关闭的时候,Channel也会⾃动关闭.
channel.close();
connection.close();
4. 编写消费者代码
4.1 创建连接
和生产者类似, 想要接收RabbtiMQ的消息, 首先需要和RabbitMQ建立一个连接
//1. 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("你的RabbitMQ服务器IP");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("test");
这里要注意和生产者使用同一个虚拟机
4.2 创建Channel
//2. 创建Channel
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
4.3 声明队列
//3. 声明队列
channel.queueDeclare("hello", true, false, false, null);
这里需要注意, 要和生产者使用同一个队列, 这样生产者发送的消息, 才能被消费者正常接收
4.4 消费资源
//4. 消费资源
/**
* 参数说明:
* consumerTag : 消费者标签, 通常是消费者在订阅队列时指定的.
* envelope : 包含消息的封包信息,如队列名称, 交换机等.
* properties : ⼀些配置信息
* body : 消息的具体内容
*/
DefaultConsumer consumer = new DefaultConsumer(channel) {//从队列中收到消息就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Received: " + new String(body));}
};
/**
* 参数说明:
* queue: 队列名称
* autoAck: 是否自动确认
* callback: 接收到消息后, 执行的逻辑是什么
*/
channel.basicConsume("hello", true, consumer);
这里的DefaultConsumer 是 RabbitMQ提供的⼀个默认消费者, 实现了Consumer 接⼝.
Consumer ⽤于定义消息消费者的⾏为. 当我们需要从RabbitMQ接收消息时, 需要提供⼀个实现了Consumer 接⼝的对象.
4.5 释放资源
// 5. 释放资源
channel.close();
connection.close();
当我们运行生产者代码时, 就会向RabbitMQ服务器发送一条消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {//1.建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("study");factory.setPassword("study");factory.setVirtualHost("test");Connection connection = factory.newConnection();//2. 开启通道Channel channel = connection.createChannel();//3. 声明交换机 使用内置的交换机//4. 声明队列/*** 参数说明:* queue: 队列名称* durable: 可持久化* exclusive: 是否独占* autoDelete: 是否自动删除* arguments: 参数*/channel.queueDeclare("hello", true, false, false, null);//5. 发送消息/*** 参数说明:* exchange: 交换机名称* routingKey: 路由的规则, 使用内置交换机, routingKey和队列名称保持一致* props: 属性配置* body: 要发送的消息*/String msg = "hello rabbitmq";channel.basicPublish("", "hello", null, msg.getBytes());System.out.println("消息发送成功!!");//6. 资源释放channel.close();connection.close();}
}
当我运行消费者代码时, 就会从RabbitMQ服务器中获取一条消息
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerDemo {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1. 创建连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("study");factory.setPassword("study");factory.setVirtualHost("test");//2. 创建ChannelConnection connection = factory.newConnection();Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare("hello", true, false, false, null);//4. 消费资源/*** 参数说明:* queue: 队列名称* autoAck: 是否自动确认* callback: 接收到消息后, 执行的逻辑是什么*/DefaultConsumer consumer = new DefaultConsumer(channel) {//从队列中收到消息就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Received: " + new String(body));}};channel.basicConsume("hello", true, consumer);//5. 关闭资源
// Thread.sleep(1000);channel.close();connection.close();}
}
依次运行生产者消费者代码, 就能得到以下结果
以上就是本章的所有内容, 谢谢大家观看!!