欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 艺术 > egg.js使用消息队列rabbitMQ

egg.js使用消息队列rabbitMQ

2024/10/23 23:33:44 来源:https://blog.csdn.net/deng_zhihao692817/article/details/140355171  浏览:    关键词:egg.js使用消息队列rabbitMQ

1. egg-amqplib: 基于 rabbitmq 消息队列封装的库

安装:

npm i egg-amqplib --save

引入

// {app_root}/config/plugin.js
exports.amqplib = {enable: true,package: 'egg-amqplib',
};

设置

// {app_root}/config/config.default.js
exports.amqplib = {client: {// url: 'amqp://localhost',connectOptions: {protocol: 'amqp',hostname: 'localhost',port: 5672,username: 'guest',password: 'guest',locale: 'en_US',frameMax: 0,heartbeat: 0,vhost: '/',},// socketOptions: {//   cert: certificateAsBuffer, // client cert//   key: privateKeyAsBuffer, // client key//   passphrase: 'MySecretPassword', // passphrase for key//   ca: [caCertAsBuffer], // array of trusted CA certs// },},
};

查看github: https://github.com/zubinzhang/egg-amqplib

控制层:

'use strict';const Controller = require('egg').Controller;
const queueName = 'test';class HomeController extends Controller {async publish() {const { msg } = this.ctx.query;const ch = await this.app.amqplib.createChannel();await ch.assertQueue(queueName, { durable: false });const ok = await ch.sendToQueue(queueName, Buffer.from(msg));await ch.close();this.ctx.body = ok;this.ctx.status = 200;}async consume() {const ch = await this.app.amqplib.createChannel();await ch.assertQueue(queueName, { durable: false });const msg = await new Promise(resolve => ch.consume(queueName, msg => resolve(msg)));if (msg !== null) {ch.ack(msg);await ch.close();this.ctx.status = 200;this.ctx.body = { msg: msg.content.toString() };} else {this.ctx.status = 500;}}
}module.exports = HomeController;

 路由:

'use strict';module.exports = app => {const { router, controller } = app;router.get('/publish', controller.home.publish);router.get('/consume', controller.home.consume);
};

参考:egg-amqplib/test/fixtures/apps/amqplib-test/app/controller/home.js at master · zubinzhang/egg-amqplib · GitHub

2. 安装rabbitmq, 可使用docker安装rabbitmq

docker run --name rabbitmq -p 5672:567. -p 15672:15672 rabbitmq:3-management

访问地址: http://localhost:15672

默认的账号密码是: guest  :  guest

创建管理员:

3. 队列: 一对一

 

P 是我们的生产者  >  中间的框是一个队列,代表消费者保留的消息缓冲区  >  C 是我们的消费者 

'use strict';
const Controller = require('egg').Controller;
/*** 一对一队列演示*/// 频道名称
const queueName = 'hasone'class UserController extends Controller {// 生成者async send() {// 1. 获取要发送的消息const { msg } = this.ctx.query// 2. 创建频道const ch = await this.app.amqplib.createChannel();// 3. 创建队列 durable 关闭持久化存储await ch.assertQueue(queueName, { durable: false }  );// 4. 发送消息const ok = await ch.sendToQueue(queueName, Buffer.from(msg));// 5. 关闭连接await ch.close();this.ctx.body = ok;this.ctx.status = 200;}// 消费者async work() {// 1. 创建频道const ch = await this.app.amqplib.createChannel();// 2. 选择队列await ch.assertQueue(queueName, { durable: false });//3. 接收队列的消息const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => resolve(msg), { noAck: true }));// 4. 显示消息内容if (resultMsg !== null) {ch.ack(resultMsg);await ch.close();const { content } = resultMsg;this.status = 200;this.ctx.body = { msg: content.toString() }} else {this.ctx.body = '队列消费失败'this.ctx.status = 500;}}
}module.exports = UserController;

 4. 队列: 一对多

'use strict';
const Controller = require('egg').Controller;
/*** 队列一对多演示* 生产者 ---->  队列 ----> 消费者
*                    ----> 消费者----> 消费者*/// 频道名称
const queueName = 'hasMany'class UserController extends Controller {// 生成者async send() {const { msg } = this.ctx.query;//1. 创建频道const ch = await this.app.amqplib.createChannel();// 2. 创建队列 开启持久化存储await ch.assertQueue(queueName, { durable: true });// 3. 发送消息let ok = null;for(let i=0; i<50; i++) {// 此时我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将消息标记为持久性 - 通过使用持久性选项Channel.sendToQueue。ok = await ch.sendToQueue(queueName, Buffer.from(msg+i), { persistent: true });}//4. 关闭连接await ch.close();this.ctx.body = ok;this.ctx.status = 200;}// 消费者async work1() {// 1. 创建频道const ch = await this.app.amqplib.createChannel();//2. 选择队列await ch.assertQueue(queueName, { durable: true });// 3. 接收消息 noAck 关闭消息自动确认模式,需要手动 ackconst resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {setTimeout(() => {resolve(msg)}, 500)}, { noAck: false }) );if (resultMsg !== null) {const { content } = resultMsg;//消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它ch.ack(resultMsg);await ch.close();this.ctx.body = { work1: content.toString() };this.ctx.status = 200;} else {this.ctx.body = '消费者1号失败'this.ctx.status = 500}}async work2() {// 1. 创建频道const ch = await this.app.amqplib.createChannel();//2. 选择队列 RabbitMQ永远不会丢失我们的队列。为此,我们需要声明它是持久的await ch.assertQueue(queueName, { durable: true });// 3. 接收消息 noAck 开启自动确认模式const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {setTimeout(() => {resolve(msg)}, 1000)}, { noAck: false }) );if (resultMsg !== null) {const { content } = resultMsg;ch.ack(resultMsg);await ch.close();this.ctx.body = { work2: content.toString() };this.ctx.status = 200;} else {this.ctx.body = '消费者2号失败'this.ctx.status = 500}}async work3() {// 1. 创建频道const ch = await this.app.amqplib.createChannel();//2. 选择队列await ch.assertQueue(queueName, { durable: true });// 3. 接收消息 noAck 开启自动确认模式const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {setTimeout(() => {resolve(msg)}, 1500)}, { noAck: false }) );if (resultMsg !== null) {const { content } = resultMsg;//消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它ch.ack(resultMsg);await ch.close();this.ctx.body = { work3: content.toString() };this.ctx.status = 200;} else {this.ctx.body = '消费者3号失败'this.ctx.status = 500}}
}module.exports = UserController;

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com