欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 养生 > RabbitMQ深度探索:创建消息队列

RabbitMQ深度探索:创建消息队列

2025/2/6 7:36:05 来源:https://blog.csdn.net/SOS_suyan/article/details/145438298  浏览:    关键词:RabbitMQ深度探索:创建消息队列

快速入门:实现 RabbitMQ 简单队列:

  1. 在 RabbitMQ 平台创建 Virtual Hosts 和一个队列
  2. /boyaVirtualHosts
    1. 订单队列
    2. 支付队列
  3. 导入依赖:
    <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5 </version>
    </dependency>
  4. 编写连接类:
    public class RabbitMQConnection {/*** 获取连接*/public static Connection getConnection() throws IOException, TimeoutException {// 1.创建连接ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置连接地址connectionFactory.setHost("127.0.0.1");// 3.设置端口号connectionFactory.setPort(5672);// 4.设置账号和密码connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 5.设置 VirtualHostconnectionFactory.setVirtualHost("/boyaVirtualHostsR");return connectionFactory.newConnection();}
    }
  5. 编写生产者代码:
    public class Producer {private static final String QUEUE_NAME = "BoyatopMamber";/*** 获取连接*/public static void main(String[] args) throws IOException, TimeoutException {while (true){// 1.创建连接Connection connection = RabbitMQConnection.getConnection();// 2.设置通道Channel channel = connection.createChannel();// 3.设置消息String msg = "Hello World";System.out.println("msg:" + msg);channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());channel.close();connection.close();}}
    }
  6. 编写消费者代码:
    public class Comsumer {private static final String QUEUE_NAME = "BoyatopMamber";public static void main(String[] args) throws IOException, TimeoutException {// 1.创建链接Connection connection = RabbitMQConnection.getConnection();// 2.设置通道Channel channel = connection.createChannel();DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body,"UTF-8");System.out.println("消费者获取消息:" + msg);}};// 3.监听队列channel.basicConsume(QUEUE_NAME,true,defaultConsumer);}
    }
  7. RabbitMQ 如何保证消息不丢失:
    1. 生产者角色:
      1. 确保生产者角色投递到 MQ 服务器端成功
      2. Ack 消息确认机制
      3. 同步或异步的形式:
        1. Confirms
        2. 事务消息
    2. 消费者角色:
      1. 在 RabbitMQ 情况下:
        1. 必须要将消息消费成功之后,才会将消息从 MQ 服务器端移除
      2. 在 kafka 中的情况下:
        1. 不管是消费成功还是消费失败,该消息都不会立即从 MQ 服务器移除
    3. MQ 服务器端:
      1. 在默认的情况下,都会对队列中的消息持久化,持久化硬盘
  8. 使用消息确认机制 + 持久化技术实现:
    1. A 消费者确认收到消息机制
      channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
      1. 第二个参数值为 false,代表关闭 RabbitMQ 的自动应答机制,改为手动应答
      2. 在处理完消息时,返回应答状态,true 表示为自动应答模式
        channel.basicAck(envelope.getDeliveryTag(),false);
    2. B 生产者确认投递消息成功,使用 Confirm 机制或者事务消息
    3. Confirm 机制,同步或异步的形式
  9. RabbitMQ 默认创建是持久化的形式:
    1. 将代码中的 durable 设为 true
    2. 参数详解:
      1. Durability:是否持久化
        1. durable:持久化
        2. Transient:不持久化
      2. Auto delete:是否自动删除
        1. 当最后一个消费者断开连接之后队列是否自动被删除
        2. 可以通过 RabbitMQ Management 查看某个队列的消费者数量
        3. 当 consumers = 0 时,队列就会自动删除
  10. 使用 RabbitMQ 事务:
    //设置事务
    channel.txSelect();
    channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
    channel.txCommit();
    1. 生产者:
      
      public class producer {private static final String QUEUE_NAME = "BoyatopMamber";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// 1.创建新的连接Connection connection = RabbitMQConnection.getConnection();// 2.设置 channelChannel channel = connection.createChannel();// 3.发送消息String msg = "Hello my Bro";channel.confirmSelect();channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());boolean result = channel.waitForConfirms();if(result){System.out.println("消息投递成功");}else {System.out.println("消息投递失败");}// 4.关闭资源channel.close();connection.close();}
      }
    2. 消费者:
      public class Consumer {private static final String QUEUE_ANME = "BoyatopMamber";public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接Connection connection = RabbitMQConnection.getConnection();//2.设置通道Channel channel = connection.createChannel();DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body,"UTF-8");System.out.println("消费者获取消息:" + msg);//消费者完成 消费该消息channel.basicAck(envelope.getDeliveryTag(),false);}};// 3.监听队列channel.basicConsume(QUEUE_ANME,false,defaultConsumer);}
      }

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com