欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > 在nodejs中使用RabbitMQ(六)sharding消息分片

在nodejs中使用RabbitMQ(六)sharding消息分片

2025/2/22 2:14:39 来源:https://blog.csdn.net/qq_35496811/article/details/145635359  浏览:    关键词:在nodejs中使用RabbitMQ(六)sharding消息分片

RabbitMQ 的分片插件(rabbitmq_sharding)允许将消息分布到多个队列中,这在消息量很大或处理速度要求高的情况下非常有用。分片功能通过将消息拆分到多个队列中来平衡负载,从而提升消息处理的吞吐量和可靠性。它能够在多个队列之间分配负载,避免单个队列过载。(注:不能单独消费分片消息。消息分片不利于消息顺序区分)

启用消息分片插件。 

rabbitmq-plugins enable rabbitmq_sharding 

示例

通过rabbitmq management添加策略,用于分片消息匹配转发。

或者通过命令添加策略 

CTL set_policy images-shard "queue10" '{"shards-per-node": 3, "routing-key": "sharding"}'

producer.ts

import RabbitMQ from 'amqplib';async function start() {try {const conn = await RabbitMQ.connect("amqp://admin:admin1234@localhost:5672//mirror?heartbeat=60");conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");return setTimeout(start, 1000);});console.log("[AMQP] connected");let channel = null;try {channel = await conn.createChannel();} catch (err) {console.error("[AMQP]", err);return setTimeout(start, 1000);}const exchangeName = 'exchange_queue10';await channel.assertExchange(exchangeName,'x-modulus-hash',{durable: true,arguments: {'x-modulus': 3 // 分片数量(需与队列分片数匹配)}},);let routeKey = '';for (let i = 0; i < 1000; ++i) {// console.log('message send!', channel.sendToQueue(//   queueName,//   Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),//   { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在//   // (err: any, ok: Replies.Empty)=>{}// ));let num = Math.ceil(Math.random() * 100000);console.log('消息发送是否成功', num, routeKey, channel.publish(exchangeName,`${routeKey}${i}`,Buffer.from(`"发送消息, index:${i}, number:${num}, routeKey:${JSON.stringify(routeKey)}"`),{persistent: true,},));}setTimeout(() => {conn.close();process.exit(0);}, 1000);} catch (err) {console.error("[AMQP]", err);return setTimeout(start, 1000);}
}start();

consumer.ts

import RabbitMQ, { type Replies } from 'amqplib/callback_api';RabbitMQ.connect('amqp://admin:admin1234@localhost:5672//mirror', (err0, conn) => {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {console.log('[*] waiting...');const exchangeName = 'exchange_queue10';channel.prefetch(32);// for(let i=0;i<3;++i){//   channel.assertQueue(queueName, { durable: true }, () => {//     channel.bindQueue(queueName, exchangeName, `shard_${shardId}`);//   });// }channel.consume(exchangeName, function (msg) {if(msg){console.log(`队列'${exchangeName}'接收到的消息`, msg?.content.toString());// 第二个参数,false拒绝当前消息// 第二个参数,true拒绝小于等于当前消息// 第三个参数,3false从队列中清除// 第三个参数,4true从新在队列中排队channel.nack(msg, false, false);}}, {// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);noAck: false,arguments: {}}, (err: any, ok: Replies.Empty) => {console.log(err, ok);});});conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");});
});

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词