欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 名人名企 > 【RabbitMQ 项目】服务端:数据管理模块之绑定管理

【RabbitMQ 项目】服务端:数据管理模块之绑定管理

2024/10/25 1:24:15 来源:https://blog.csdn.net/weixin_74113106/article/details/142286242  浏览:    关键词:【RabbitMQ 项目】服务端:数据管理模块之绑定管理

文章目录

  • 一.编写思路
  • 二.代码实践

一.编写思路

  1. 定义绑定信息类
    1. 交换机名称
    2. 队列名称
    3. 绑定关键字:交换机的路由交换算法中会用到
      没有是否持久化的标志,因为绑定是否持久化取决于交换机和队列是否持久化,只有它们都持久化时绑定才需要持久化。绑定就好像一根绳子,两端连接着交换机和队列,当一方不存在,它就没有存在的必要了
  2. 定义绑定持久化类
    1. 构造函数:如果数据库文件不存在则创建,打开数据库,创建 binding_table
    2. 插入交换机和队列的绑定
    3. 移除交换机和队列的绑定
    4. 移除与指定交换机相关的所有绑定
    5. 移除与指定队列相关的所有绑定
    6. 从数据库中恢复绑定到内存
    7. 删除绑定数据库表(仅调试)
  3. 定义绑定管理类
    1. 构造函数:从数据库中恢复绑定
    2. 绑定交换机和队列
    3. 解绑交换机和队列
    4. 解绑与指定交换机相关的绑定:移除交换机时要用到
    5. 解绑与指定队列相关的绑定:移除队列时要用到
    6. 获取交换机相关的所有绑定信息:交换路由时要用到
    7. 销毁所有绑定(仅调试)

二.代码实践

Binding.hpp

#pragma once
#include "../common/Util.hpp"
#include <memory>
#include <unordered_map>
#include <mutex>
/**************** 定义绑定信息类* **************/
namespace ns_data
{class Binding;using BindingPtr = std::shared_ptr<Binding>;using BindingMap = std::unordered_map<std::string, std::unordered_map<std::string, BindingPtr>>;struct Binding{std::string _exchangeName;std::string _msgQueueName;std::string _bindingKey;Binding(const std::string &exchangeName, const std::string &msgQueueName, const std::string &bindingKey): _exchangeName(exchangeName),_msgQueueName(msgQueueName),_bindingKey(bindingKey){}};class BindingMapper{private:ns_util::Sqlite3Util _sqlite;public:BindingMapper(const std::string &dbName): _sqlite(dbName){if (!ns_util::FileUtil::createFile(dbName)){LOG(FATAL) << "create database " << dbName << " fail" << endl;exit(1);}if (!_sqlite.open()){LOG(FATAL) << "open database " << dbName << " fail" << endl;exit(1);}createTable();}/***************** 插入绑定* *************/bool insertBinding(BindingPtr bindingPtr){char insertSql[1024];sprintf(insertSql, "insert into binding_table values('%s', '%s', '%s');", bindingPtr->_exchangeName.c_str(),bindingPtr->_msgQueueName.c_str(), bindingPtr->_bindingKey.c_str());if (!_sqlite.exec(insertSql, nullptr, nullptr)){LOG(WARNING) << "insert Binding fail: " << bindingPtr->_exchangeName << "->" << bindingPtr->_msgQueueName;return false;}return true;}/************ 移除交换机和队列的绑定* ****************/void removeBinding(const std::string &exchangeName, const std::string &msgQueueName){char deleteSql[1024];sprintf(deleteSql, "delete from binding_table where exchange_name='%s' and msg_queue_name='%s';",exchangeName.c_str(), msgQueueName.c_str());if (!_sqlite.exec(deleteSql, nullptr, nullptr)){LOG(WARNING) << "remove Binding fail: " << exchangeName << "->" << msgQueueName << endl;}}/***************** 移除指定交换机相关的所有绑定* ************/void removeExchangeBindings(const std::string &exchangeName){char deleteSql[1024];sprintf(deleteSql, "delete from binding_table where exchange_name='%s';", exchangeName.c_str());if (!_sqlite.exec(deleteSql, nullptr, nullptr)){LOG(WARNING) << "remove exchange Bindings fail, exchange: " << exchangeName << endl;}}/***************** 移除指定队列相关的所有的绑定* ************/void removeMsgQueueBindings(const std::string &msgQueueName){char deleteSql[1024];sprintf(deleteSql, "delete from binding_table where msg_queue_name='%s';", msgQueueName.c_str());if (!_sqlite.exec(deleteSql, nullptr, nullptr)){LOG(WARNING) << "remove msg_queue Bindings fail, exchange: " << msgQueueName << endl;}}/******************* 恢复数据库中绑定到内存* BindingMap是一个二维数组,通过交换机名称和队列名称即可得到Binding的智能指针* **************/void recoverBindings(BindingMap* mapPtr){const std::string selectSql = "select * from binding_table;";if (!_sqlite.exec(selectSql.c_str(), selectCallback, mapPtr)){LOG(FATAL) << "recover Bindings from binding_table fail" << endl;exit(1);}}/********** 删除绑定数据库表(仅调试)* *************/void removeTable(){const std::string dropSql = "drop table if exists binding_table;";if (_sqlite.exec(dropSql.c_str(), nullptr, nullptr)){LOG(FATAL) << "drop binding_table fail" << endl;}}private:void createTable(){const std::string createSql = "create table if not exists binding_table(\exchange_name varchar(32),\msg_queue_name varchar(32),\binding_key varchar(32),\primary key(exchange_name, msg_queue_name)\);";if (!_sqlite.exec(createSql.c_str(), nullptr, nullptr)){LOG(FATAL) << "create table binding_table fail" << endl;exit(1);}}static int selectCallback(void* arg, int colNum, char** line, char** fields){auto mapPtr = static_cast<BindingMap*>(arg);const std::string exchangeName = line[0];const std::string msgQueueName = line[1];const std::string bindingKey = line[2];auto bindingPtr = std::make_shared<Binding>(exchangeName, msgQueueName, bindingKey);(*mapPtr)[exchangeName][msgQueueName] = bindingPtr;return 0;}};class BindingManager{private:BindingMapper _mapper;BindingMap _bindings;std::mutex _mtx;public:BindingManager(const std::string& dbName):_mapper(dbName){_mapper.recoverBindings(&_bindings);}/*********** 绑定交换机和队列* 说明:让上层判断交换机和队列的持久性来告诉我Binding是否需要持久化,如果自己判断耦合度太高了* ***********/bool bind(const std::string &ename, const std::string &qname, const std::string& bindingKey, bool durable){std::unique_lock<std::mutex> lck(_mtx);if (_bindings.count(ename) && _bindings[ename].count(qname)){return true;}auto bindingPtr = std::make_shared<Binding>(ename, qname, bindingKey);_bindings[ename][qname] = bindingPtr;if (durable){return _mapper.insertBinding(bindingPtr);}return true;}/*********** 解绑交换机和队列* 说明:让上层判断交换机和队列的持久性来告诉我Binding是否需要持久化,如果自己判断耦合度太高了* 如果用户不说明,默认它是持久化的,执行一遍delete语句也不会出错* ***********/void unbind(const std::string &ename, const std::string &qname, bool durable = true){std::unique_lock<std::mutex> lck(_mtx);if (_bindings.count(ename) == 0 || _bindings[ename].count(qname) == 0){return;}if (durable){_mapper.removeBinding(ename, qname);}_bindings[ename].erase(qname);}/*************** 解绑与指定交换机相关的绑定:删除交换机后要调用该接口* ******************/void removeExchangeBindings(const std::string &ename){std::unique_lock<std::mutex> lck(_mtx);if (_bindings.count(ename) == 0){return;}_mapper.removeExchangeBindings(ename);_bindings.erase(ename);}/*************** 解绑与指定交换机相关的绑定:删除队列后要调用该接口* ******************/void removeMsqQueueBindings(const std::string &qname){std::unique_lock<std::mutex> lck(_mtx);_mapper.removeMsgQueueBindings(qname);for (auto it = _bindings.begin(); it != _bindings.end(); it++){it->second.erase(qname);}}/**************** 获取指定的绑定* ***********/BindingPtr getBinding(const std::string& ename, const std::string& qname){std::unique_lock<std::mutex> lck(_mtx);if (_bindings.count(ename) == 0 || _bindings[ename].count(qname) == 0){return nullptr;}return _bindings[ename][qname];}/*************** 获取和指定交换机相关的绑定:路由交换时要用到该接口* ***************/bool getExchangeBindings(const std::string& ename, std::unordered_map<std::string, BindingPtr>* mapPtr){std::unique_lock<std::mutex> lck(_mtx);if (_bindings.count(ename) == 0){*mapPtr = std::unordered_map<std::string, BindingPtr>();return false;}*mapPtr = _bindings[ename];return true;}/*************** 清空绑定(仅调试)* ************/void clearBindings(){std::unique_lock<std::mutex> lck(_mtx);_bindings.clear();_mapper.removeTable();}};
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com