1. 下载和安装 RabbitMQ
RabbitMQ 依赖 Erlang 运行时,所以得先装 Erlang,再装 RabbitMQ。下面以 Ubuntu 为例,Windows 和 macOS 也顺便提一下。
- 1.1 安装 Erlang
RabbitMQ 需要 Erlang 支持,先装它。
Windows:
- 去 Erlang 官网 下载最新版(比如 OTP 26.x 的 .exe)。
- 双击安装,记得用管理员权限,全程点“下一步”就行。
macOS:
- 用 Homebrew:
bashbrew install erlang
- 1.2 安装 RabbitMQ
装好 Erlang 后,装 RabbitMQ。
Windows:
- 去 RabbitMQ 官网 下载最新版(比如 rabbitmq-server-3.13.x.exe)。
- 双击安装,选默认设置,装完自动跑成 Windows 服务。
macOS:
- 用 Homebrew:
bash
brew install rabbitmq
2. 本地启动 RabbitMQ
装好后,启动 RabbitMQ 服务,确认能用。
Windows:
- 安装后自动启动,检查服务:
- 打开“服务”面板(Win + R,输入 services.msc)。
- 找 RabbitMQ,状态要是“正在运行”。
- 或者用命令行: cmd
rabbitmqctl status
macOS:
- 启动:
bashbrew services start rabbitmq
- 检查:
bashrabbitmqctl status
2.1 启用管理界面(可选)
RabbitMQ 有个网页管理界面,方便看队列状态。
- 启用插件:
sudo rabbitmq-plugins enable rabbitmq_management
- 浏览器访问:http://localhost:15672,默认账号密码是 guest/guest。
- 为了案例跑通,我们用代码里的账号 admin/admin,先加用户:
sudo rabbitmqctl add_user admin admin sudo rabbitmqctl set_user_tags admin administrator sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
3. 项目依赖导入
我们用 Java 和 Maven 写代码,需导入 RabbitMQ 客户端依赖。
在 pom.xml 加:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.21.0</version>
</dependency>
<dependency><groupId>org.testng</groupId><artifactId>testng</artifactId><version>7.10.2</version><scope>test</scope>
</dependency>
- amqp-client:RabbitMQ 的 Java 客户端,处理消息发送和接收。
- testng:测试框架,跑我们的 @Test 方法。
4. 案例代码:发送和消费消息
下面是完整的案例代码,模拟发送 10 条消息到队列,再消费它们。代码简单,带中文注释,跑通就能看到效果。
javapackage org.example.suanfa.project.codeVerse.judege0.mq;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;import org.testng.annotations.Test;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;public class RabbitMqTest {// 消息队列名称private final static String QUEUE_NAME = "hello";@Testpublic void sendMessages() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setUsername("admin");factory.setPassword("admin");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明一个持久化的队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);for (int i = 0; i < 10; i++) {String message = "消息-" + i + " 时间戳: " + System.currentTimeMillis();// 设置消息为持久化存储AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2) // 2表示持久化.build();channel.basicPublish("", QUEUE_NAME, props, message.getBytes());System.out.println(" [x] 已发送: " + message);Thread.sleep(500); // 每条消息间隔500毫秒}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}@Testpublic void consumeMessages() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setUsername("admin");factory.setPassword("admin");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 确认队列存在channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println(" [*] 等待消息中...");// 用计数器记录消费的消息数AtomicInteger messageCount = new AtomicInteger(0);Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, StandardCharsets.UTF_8);System.out.printf(" [x] 收到消息: %s (投递标签: %d)%n", message, envelope.getDeliveryTag());// 模拟处理消息的业务逻辑try {processMessage(message);} finally {// 手动确认消息已处理channel.basicAck(envelope.getDeliveryTag(), false);if (messageCount.incrementAndGet() >= 10) {synchronized (messageCount) {messageCount.notify(); // 通知主线程消费完成}}}}};// 禁用自动确认,改用手动确认channel.basicConsume(QUEUE_NAME, false, consumer);// 等收到10条消息才结束synchronized (messageCount) {while (messageCount.get() < 10) {try {messageCount.wait();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}channel.close();connection.close();}private void processMessage(String message) {// 模拟处理消息的逻辑System.out.println(" [√] 正在处理: " + message);}
}
代码说明
- 发送消息:
- sendMessages 方法连到本地 RabbitMQ(localhost,用户 admin)。
- 声明一个持久化队列 hello(durable=true)。
- 循环发 10 条消息,每条带时间戳,消息也设成持久化(deliveryMode=2)。
- 每次发完等 500 毫秒,模拟真实场景。
- 消费消息:
- consumeMessages 方法连到同一个队列。
- 用 DefaultConsumer 监听消息,收到后打印并处理(processMessage 模拟业务)。
- 手动确认消息(basicAck),确保不丢消息。
- 用 AtomicInteger 计数,收到 10 条后通知主线程结束。 an