欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 养生 > RaabitMQ 快速入门

RaabitMQ 快速入门

2025/4/19 8:38:41 来源:https://blog.csdn.net/AUGENSTERN_dc/article/details/147258103  浏览:    关键词:RaabitMQ 快速入门

🎉欢迎大家观看AUGENSTERN_dc的文章(o゜▽゜)o☆✨✨

🎉感谢各位读者在百忙之中抽出时间来垂阅我的文章,我会尽我所能向的大家分享我的知识和经验📖

🎉希望我们在一篇篇的文章中能够共同进步!!!

🌈个人主页:AUGENSTERN_dc

🔥个人专栏:C语言 |Java | 数据结构 | 算法 | MySQL | RabbitMQ | Redis

⭐个人格言:

一重山有一重山的错落,我有我的平仄

一笔锋有一笔锋的着墨,我有我的舍得


接下来,我会向大家介绍如何快速入门RabbitMQ,以及如何编写一个简单的RabbitMQ代码

1. 引入依赖

在编写我们的代码之前,我们需要引入RabbitMQ的依赖:

如果你使用的是Maven, 你可以使用以下依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version>
</dependency>

 2. 生产者消费者模型

在编写代码之前,大家需要了解生产者消费者模型:

生产者-消费者模型(Producer-Consumer Model): 是一种经典的多线程同步问题,用于解决生产者线程和消费者线程之间的数据共享和同步问题。它在多线程编程、并发编程以及分布式系统中被广泛应用。

2.1 主要角色

生产者-消费者模型涉及两个主要角色:

1. 生产者(producer)

  • 负责生成数据并将其放入缓冲区(Buffer)。
  • 如果缓冲区已满,生产者需要等待,直到缓冲区有空间可以存放数据。

2. 消费者(consumer)

  • 负责从缓冲区中取出数据并消费。
  • 如果缓冲区为空,消费者需要等待,直到缓冲区中有数据可以消费。

缓冲区(Buffer)是一个共享资源,用于存储生产者生成的数据,供消费者消费。

2.2 关键问题

生产者-消费者模型需要解决以下两个关键问题:

1. 互斥访问

  • 多个线程(生产者和消费者)需要访问共享的缓冲区,因此需要确保对缓冲区的访问是互斥的,避免数据竞争和不一致。

2. 同步问题:

  • 生产者需要在缓冲区有空间时才能生产数据。
  • 消费者需要在缓冲区有数据时才能消费数据。
  • 需要一种机制来协调生产者和消费者之间的同步。

3. 编写生产者代码

3.1 创建连接

要想使用创建一个生产者,首先需要将生产者和RabbitMQ的服务器进行连接

// 1. 创建连接⼯⼚
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("你的RabbitMQ服务器IP");//ip 默认值localhost
factory.setPort(5672); //默认值5672
factory.setVirtualHost("test");//虚拟机名称, 默认 /
factory.setUsername("guest");//⽤⼾名,默认guest
factory.setPassword("guest");//密码, 默认guest
//3. 创建连接Connection
Connection connection = factory.newConnection();

RabbitMQ 默认的⽤于客⼾端连接的TCP 端⼝号是5672, 需要提前进⾏开放

3.2 创建Channel

//4. 创建channel通道
Channel channel = connection.createChannel();

⽣产者和消费者创建的channel并不是同⼀个

3.3 声明一个队列Queue

/*queueDeclare(String queue, boolean durable, boolean exclusive, booleanautoDelete, Map<String, Object> arguments)1.queue: 队列名称2.durable: 是否持久化.true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。3.exclusive:* 是否独占, 只能有⼀个消费者监听队列* 当Connection关闭时, 是否删除队列4.autoDelete: 是否⾃动删除, 当没有Consumer时, ⾃动删除掉5.arguments: ⼀些参数
*/
//如果没有⼀个hello_world 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
channel.queueDeclare("hello",true,false,false,null);

3.4 发送消息

当一个新的RabbitMQ节点启动时,他会预声明(declare)几个内置的交换机, 内置交换机名称是空字符串(""), 生产者发送的消息会根据队列名称直接路由到对应的队列.

例如: 如果有⼀个名为 "hello" 的队列, ⽣产者可以直接发送消息到 "hello" 队列, ⽽消费者可以从 "hello" 队列中接收消息, ⽽不需要关⼼交换机的存在. 这种模式⾮常适合简单的应⽤场景,其中⽣产者和消费者之间的通信是⼀对⼀的.
 

//6. 通过channel发送消息到队列中
/*basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)1.exchange: 交换机名称, 简单模式下, 交换机会使⽤默认的""2.routingKey: 路由名称, routingKey = 队列名称3.props: 配置信息4.body: 发送消息的数据
*/
String msg = "Hello World";
//使⽤的是内置交换机. 使⽤内置交换机时, routingKey要和队列名称⼀样, 才可以路由到对应的队列上去
channel.basicPublish("","hello",null,msg.getBytes());
System.out.println(msg + "消息发送成功");

3.5 释放资源

//显式地关闭Channel是个好习惯, 但这不是必须的, Connection关闭的时候,Channel也会⾃动关闭.
channel.close();
connection.close();

4. 编写消费者代码

4.1 创建连接

和生产者类似, 想要接收RabbtiMQ的消息, 首先需要和RabbitMQ建立一个连接

//1. 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("你的RabbitMQ服务器IP");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("test");

 这里要注意和生产者使用同一个虚拟机

4.2 创建Channel

//2. 创建Channel
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

4.3 声明队列

//3. 声明队列
channel.queueDeclare("hello", true, false, false, null);

 这里需要注意, 要和生产者使用同一个队列, 这样生产者发送的消息, 才能被消费者正常接收

4.4 消费资源

//4. 消费资源
/**
*  参数说明:
*  consumerTag : 消费者标签, 通常是消费者在订阅队列时指定的.
*  envelope : 包含消息的封包信息,如队列名称, 交换机等.
*  properties : ⼀些配置信息
*  body : 消息的具体内容
*/
DefaultConsumer consumer = new DefaultConsumer(channel) {//从队列中收到消息就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Received: " + new String(body));}
};
/**
*  参数说明:
*  queue: 队列名称
*  autoAck: 是否自动确认
*  callback: 接收到消息后, 执行的逻辑是什么
*/
channel.basicConsume("hello", true, consumer);

这里的DefaultConsumer 是 RabbitMQ提供的⼀个默认消费者, 实现了Consumer 接⼝.

Consumer ⽤于定义消息消费者的⾏为. 当我们需要从RabbitMQ接收消息时, 需要提供⼀个实现了Consumer 接⼝的对象.

4.5 释放资源

// 5. 释放资源
channel.close();
connection.close();

当我们运行生产者代码时, 就会向RabbitMQ服务器发送一条消息

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {//1.建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("study");factory.setPassword("study");factory.setVirtualHost("test");Connection connection = factory.newConnection();//2. 开启通道Channel channel = connection.createChannel();//3. 声明交换机  使用内置的交换机//4. 声明队列/***  参数说明:*  queue: 队列名称*  durable: 可持久化*  exclusive: 是否独占*  autoDelete: 是否自动删除*  arguments: 参数*/channel.queueDeclare("hello", true, false, false, null);//5. 发送消息/***  参数说明:*  exchange: 交换机名称*  routingKey: 路由的规则, 使用内置交换机, routingKey和队列名称保持一致*  props: 属性配置*  body: 要发送的消息*/String msg = "hello rabbitmq";channel.basicPublish("", "hello", null, msg.getBytes());System.out.println("消息发送成功!!");//6. 资源释放channel.close();connection.close();}
}

当我运行消费者代码时, 就会从RabbitMQ服务器中获取一条消息

import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerDemo {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1. 创建连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("study");factory.setPassword("study");factory.setVirtualHost("test");//2. 创建ChannelConnection connection = factory.newConnection();Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare("hello", true, false, false, null);//4. 消费资源/***  参数说明:*  queue: 队列名称*  autoAck: 是否自动确认*  callback: 接收到消息后, 执行的逻辑是什么*/DefaultConsumer consumer = new DefaultConsumer(channel) {//从队列中收到消息就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Received: " + new String(body));}};channel.basicConsume("hello", true, consumer);//5. 关闭资源
//        Thread.sleep(1000);channel.close();connection.close();}
}

依次运行生产者消费者代码, 就能得到以下结果

以上就是本章的所有内容, 谢谢大家观看!!

热搜词