欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 幼教 > 2工作队列

2工作队列

2024/10/25 15:25:07 来源:https://blog.csdn.net/Wen_J_Q/article/details/141931480  浏览:    关键词:2工作队列

工作队列

逻辑图

image-20210810220747032

<!-- SpringBoot 消息队列的起步依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

轮询分发 Round-robin

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;public class WorkQueueProducer {/*** 生产者 → 消息队列* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* 创建队列* 发送消息**/public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();if(true){factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}Connection connection = factory.newConnection();Channel channel = connection.createChannel();//创建队列/*** String queue                     队列名称* boolean durable                  是否持久化,* boolean exclusive                含义一:是否独占,是否只能有一个消费者监听*                                  含义二:connection 关闭是否删除队列* boolean autoDelete               是否自动删除,当没有消费者的时候是否自动删除* Map<String, Object> arguments    参数*/channel.queueDeclare("WorkQueues",true,false,false,null);//发送消息/*** String exchange          : 交换机名称,简单模式不使用交换机* String routingKey        : 路由规则,当不使用交换机时,路由键需要和队列名称相同* BasicProperties props    : 配置参数* byte[] body              : 消息体,真实的数据*/for (int i = 0; i < 20; i++) {String str = "WorkQueues is so easy!\t" + i + "\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());;System.out.println("发送消息:\t" + str);channel.basicPublish("","WorkQueues",null,str.getBytes());}//释放资源channel.close();connection.close();System.out.println("消息发送成功");}
}
  • 与简单队列几乎没有什么不同

消费者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class WorkQueueConsumerA {/*** 消息队列 ← 消费者* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* 订阅队列* 接收消息*/public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂,并设置参数ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}//创建连接 ConnectionConnection connection = factory.newConnection();//创建通道 ChannelChannel channel = connection.createChannel();/*** consumerTag  消费信息标签* delivery     回执*/DeliverCallback deliverCallback = (consumerTag, delivery) -> {byte[] body = delivery.getBody();System.out.println("消费消息:\t" + new String(body));};/*** basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)* String queue                         :   队列名称* boolean autoAck                      :   是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列* DeliverCallback deliverCallback      :   回调函数* CancelCallback cancelCallback        :   消费者取消订阅时的回调函数*/channel.basicConsume("WorkQueues", true, deliverCallback, consumerTag -> {});}
}
  • 再创建一个类 WorkQueueConsumerB,代码与 WorkQueueConsumerA 一样,只是类型不同

测试

  • 先启动生产者,查看 RabbitMQ 网页控制台

  • 先将2个消费者启动

    • 第一个消费者启动的时候,会将所有的都消费掉
    • 将两个都启动之后,再启动生产者
  • 再启动生产者

image-20210810224949350

image-20210810225050947image-20210810225104104

公平分发 Fair

如果机器 A 性能很好,一下子就处理完了,其他时间一直空闲,而机器 B 性能很差,很久都不能处理完一条,但是队列还是一人一条的轮询分发,这就造成 A 性能浪费,B 处理慢

我们采用公平分发

采用 basicQos(prefetchCount=1) ,来限制 MQ 只发不超过1条的消息给同一个消费者,当消费者处理完消息,给 MQ 反馈了,MQ 才会进行第二次发送

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;public class WorkQueueProducer {/*** 生产者 → 消息队列* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* 创建队列* 发送消息**/public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();if(true){factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}Connection connection = factory.newConnection();Channel channel = connection.createChannel();//创建队列/*** String queue                     队列名称* boolean durable                  是否持久化,* boolean exclusive                含义一:是否独占,是否只能有一个消费者监听*                                  含义二:connection 关闭是否删除队列* boolean autoDelete               是否自动删除,当没有消费者的时候是否自动删除* Map<String, Object> arguments    参数*/channel.queueDeclare("WorkQueues",true,false,false,null);//发送消息/*** String exchange          : 交换机名称,简单模式不使用交换机* String routingKey        : 路由规则,当不使用交换机时,路由键需要和队列名称相同* BasicProperties props    : 配置参数* byte[] body              : 消息体,真实的数据*/for (int i = 0; i < 20; i++) {String str = "WorkQueues is so easy!\t" + i + "\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());;System.out.println("发送消息:\t" + str);channel.basicPublish("","WorkQueues",null,str.getBytes());}//释放资源channel.close();connection.close();System.out.println("消息发送成功");}
}

消费者

消费者A
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;public class WorkQueueConsumerA {/*** 消息队列 ← 消费者* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* 订阅队列* 接收消息*/public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂,并设置参数ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}//创建连接 ConnectionConnection connection = factory.newConnection();//创建通道 ChannelChannel channel = connection.createChannel();/*** prefetchCount 设为 1* MQ 发送小于等于 1 的数据给消费者* 当消费者消费完这几条数据,就会给 MQ 一个反馈,MQ 再次发送*/channel.basicQos(1);/*** consumerTag  消费信息标签* delivery     回执*/DeliverCallback deliverCallback = (consumerTag, delivery) -> {byte[] body = delivery.getBody();System.out.println("消费消息:\t" + new String(body));try {/*** 睡眠 1 秒,模拟等待*/TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}/*** 手动回执* long deliveryTag* boolean multiple*/channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};/*** basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)* String queue                         :   队列名称* boolean autoAck                      :   是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列* DeliverCallback deliverCallback      :   回调函数* CancelCallback cancelCallback        :   消费者取消订阅时的回调函数*/channel.basicConsume("WorkQueues", false, deliverCallback, consumerTag -> {});}
}
消费者B
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;public class WorkQueueConsumerB {/*** 消息队列 ← 消费者* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* 订阅队列* 接收消息*/public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂,并设置参数ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}//创建连接 ConnectionConnection connection = factory.newConnection();//创建通道 ChannelChannel channel = connection.createChannel();/*** prefetchCount 设为 1* MQ 发送小于等于 1 的数据给消费者* 当消费者消费完这几条数据,就会给 MQ 一个反馈,MQ 再次发送*/channel.basicQos(1);/*** consumerTag  消费信息标签* delivery     回执*/DeliverCallback deliverCallback = (consumerTag, delivery) -> {byte[] body = delivery.getBody();System.out.println("消费消息:\t" + new String(body));try {/*** 睡眠 1 秒,模拟等待*/TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}/*** 手动回执* long deliveryTag* boolean multiple*/channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};/*** basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)* String queue                         :   队列名称* boolean autoAck                      :   是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列* DeliverCallback deliverCallback      :   回调函数* CancelCallback cancelCallback        :   消费者取消订阅时的回调函数*/channel.basicConsume("WorkQueues", false, deliverCallback, consumerTag -> {});}
}
  • 与轮询的区别

    • channel.basicQos(1);
      
    • try {/*** 睡眠 1 秒,模拟等待*/TimeUnit.SECONDS.sleep(2);
      } catch (InterruptedException e) {e.printStackTrace();
      }
      
    • channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
      
    • 自动确认的 autoAck 改为 false
      channel.basicConsume("WorkQueues", false, deliverCallback, consumerTag -> {});   
      

测试

image-20210810232114774

image-20210810232129482image-20210810232142274

SpringBoot整合

小结

2 个消费者监听同一个队列,消息被平均分配到 2 个消费者,提高了处理效率,3个4个消费者效率更高

轮询分发:假设有100条消息,A消费者消费50条,B消费者消费50条,但是 A 机器是8核32G的,B机器是1核1G的,显然 B机器消费慢,A机器一直空闲

公平分发:性能好的机器多消费一点,性能差的少消费一点,负载均衡

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com