欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 建筑 > 第五章:队列管理模块

第五章:队列管理模块

2025/2/26 10:51:13 来源:https://blog.csdn.net/2303_78095330/article/details/145850100  浏览:    关键词:第五章:队列管理模块

目录

第一节:代码实现

        1-1.MsgQueue类

        1-2.MsgQueueMapper类

        1-3.MsgQueueManager类

第二节:单元测试

下期预告:


        队列管理模块在mqserver目录下实现。

第一节:代码实现

        交换机管理模块和队列管理模块十分相似,因为它们的管理逻辑是一样的,都是使用sqlite数据库文件进行持久化管理,名字:信息 的映射进行内存管理。

        很多代码都可以直接拷贝,稍加修改即可。

        创建一个名为mq_msgqueue.hpp的文件,打开并防止重复包含、添加所需头文件、声明命名空间,这些内容可以在mq_exchange.hpp中拷贝:

#ifndef __M_MSGQUEUE_H__
#define __M_MSGQUEUE_H__
#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_msg.pb.h"#include <google/protobuf/map.h>
#include <memory>
#include <iostream>
#include <unordered_map>namespace zd
{};#endif

        1-1.MsgQueue类

        同样先定义一个交换机类,它保存交换机的各种信息:

        (1)队列名称

        (2)持久化标志

        (3)独占标志:如果设置为true,那么队列最多只能有一个订阅者

        (4)自动删除标志:当队列没有绑定,没有订阅者时删除队列

        (5)其他参数:保存一些客户端可自定义的信息

        因为交换机也有 args 成员,所以它也需要 args 的两个处理接口,直接从 class Exchange 拷贝即可。

    // 1.定义消息队列类class MsgQueue{ public:using ptr = std::shared_ptr<MsgQueue>;std::string name; // 队列名称bool durable;     // 持久化标志bool exclusive;   // 独占标志bool autodelete;  // 自动删除标志google::protobuf::Map<std::string,std::string> args; // 其他参数MsgQueue(){}MsgQueue(const std::string& qname,bool qdurable,bool qexclusive,bool qautodelete,const google::protobuf::Map<std::string,std::string>& qargs):name(qname),durable(qdurable),exclusive(qexclusive),autodelete(qautodelete),args(qargs){}// args在文件中会以 k1=v1&k2=v2&k3=v3 的形式存储// 读取上述内容后setArgs解析这个字符串,再存储到this->args中void setArgs(const std::string& str_args){std::vector<std::string> sub_args;size_t ret = StrHelper::split(str_args,"&",sub_args);for(auto& str:sub_args){size_t pos = str.find("=");std::string key = str.substr(0,pos);std::string val = str.substr(pos+1);args[key] = val;}}// 将this->args序列化成 k1=v1&k2=v2&k3=v3 的形式std::string getArgs(){std::string result;for(auto& it:args){result+=it.first+"="+it.second+"&";}if(!result.empty())result.pop_back();return result;}};

        1-2.MsgQueueMapper类

        class MsgQueueMapper用于队列的持久化管理,在实现它之前,也需要先重定义一个unordered_map:

using MsgQueueMap = std::unordered_map<std::string,MsgQueue::ptr>;

        它提供的接口与class ExchageMapper基本一致,只是操作的表由 exchange_table 变成了 msgqueue_table,持久化数据也变化了一些:

    // 定义消息队列数据持久化管理类class MsgQueueMapper{public:MsgQueueMapper(const std::string& dbfile):_sql_helper(dbfile){std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);assert(_sql_helper.open());}// 创建表void createTable(){#define CREATE_TABLE_ "create table if not exists msgqueue_table(\name varchar(32) primary key,\durable int,\exclusive int,\autodelete int,\args varchar(128));"bool ret = _sql_helper.exec(CREATE_TABLE_,nullptr,nullptr);if(ret == false){LOG("创建消息队列数据库表失败!");abort(); // 异常退出}} // 删除表void removeTable(){#define DROP_TABLE_ "drop table if exists msgqueue_table;"bool ret = _sql_helper.exec(DROP_TABLE_,nullptr,nullptr);if(ret == false){LOG("删除消息队列数据库表失败!");abort(); // 异常退出}}// 插入一个消息队列bool insert(MsgQueue::ptr& exp){#define INSERT_SQL_ "insert into msgqueue_table values('%s','%d','%d','%d','%s');"char sql_str[4096] = {0};sprintf(sql_str,INSERT_SQL_,exp->name.c_str(),exp->durable,exp->exclusive,exp->autodelete,exp->getArgs().c_str());bool ret = _sql_helper.exec(sql_str,nullptr,nullptr);if(ret == false){LOG("数据库新增消息队列信息失败!");return false;}return true;}// 删除一个消息队列void remove(const std::string& qcname){std::string DELE_SQL = "delete from msgqueue_table where name='"+qcname+"';";bool ret = _sql_helper.exec(DELE_SQL,nullptr,nullptr);if(ret == false){LOG("数据库删除消息队列信息失败!");abort(); // 异常退出}}// 从数据库中恢复消息队列数据std::unordered_map<std::string,MsgQueue::ptr> recovery(){createTable();MsgQueueMap result;std::string sql = "select name,durable,exclusive,autodelete,args from msgqueue_table;";// 它会根据数据库的数据量调整回调函数的调用次数,每次取出一套数据_sql_helper.exec(sql.c_str(),MsgQueueMapper::selectCallback,&result);return result;}private:static int selectCallback(void* arg,int numcol,char** row,char** fields){MsgQueueMap* result = (MsgQueueMap*)arg;auto qup = std::make_shared<MsgQueue>();qup->name = row[0];qup->durable = (bool)std::stoi(row[1]);qup->exclusive = (bool)std::stoi(row[2]);qup->autodelete = (bool)std::stoi(row[3]);if(row[4])qup->setArgs(row[4]);result->insert(std::make_pair(qup->name,qup));return 0;}private:zd::SqliteHelper _sql_helper;};

        1-3.MsgQueueManager类

        它的实现与 class ExchangeManager 基本一致:

    class MsgQueueManager{public:using ptr = std::shared_ptr<MsgQueueManager>;MsgQueueManager(const std::string& dbfile):_mapper(dbfile){_msgqueues = _mapper.recovery();}// 声明消息队列bool declareMsgQueue(const std::string& name,bool durable,bool exclusive,bool autodelete,const google::protobuf::Map<std::string,std::string>& args){std::unique_lock<std::mutex> lock(_mtx);if(_msgqueues.find(name) != _msgqueues.end()) {// 消息队列已经存在就不需要添加了return true;}auto qup = std::make_shared<MsgQueue>(name,durable,exclusive,autodelete,args);if(durable == true){bool ret = _mapper.insert(qup);if(ret == false){LOG("消息队列 %s 持久化失败",name.c_str());return false;}}_msgqueues.insert(std::make_pair(name,qup));return true;}// 删除消息队列void deleteMsgQueue(const std::string& name){std::unique_lock<std::mutex> lock(_mtx);if(_msgqueues.find(name) == _msgqueues.end()) {// 交换机不存在就不需要删除了return;}_mapper.remove(name);   // 数据库中删除消息队列信息_msgqueues.erase(name); // 内存中删除消息队列}// 获得消息队列MsgQueue::ptr selectMsgQueue(const std::string& name){std::unique_lock<std::mutex> lock(_mtx);auto it = _msgqueues.find(name);if(it == _msgqueues.end())return nullptr;return it->second;}// 获得所有消息队列const MsgQueueMap& selectAll(){return  _msgqueues;}// 判断消息队列是否存在bool exists(const std::string& name){std::unique_lock<std::mutex> lock(_mtx);if(_msgqueues.find(name) == _msgqueues.end())return false;return true;}// 清理所有消息队列的数据void clear(){std::unique_lock<std::mutex> lock(_mtx);_mapper.removeTable();_msgqueues.clear();}size_t size(){return _msgqueues.size();}private:std::mutex _mtx;MsgQueueMap  _msgqueues;MsgQueueMapper _mapper; // 持久化管理句柄};

        实际上 绑定管理模块 的实现也和交换机管理模块、队列管理模块十分相似,它们的管理思路是一样的,只是对外提供的接口不同。

        

第二节:单元测试

        打开mqtest目录。

        创建mq_msgqueue_test.cc文件,并做好前置工作:

#include "../mqserver/mq_msgqueue.hpp"
#include <gtest/gtest.h>

        然后把mq_exchange_test.cc的测试套件拷贝过来,然后进行修改:

zd::MsgQueueManager::ptr qmp;
// 全局测试套件------------------------------------------------
// 自己初始化自己的环境,使不同单元测试之间解耦
class MsgQueueTest :public testing::Environment
{
public:// 全部单元测试之前调用一次virtual void SetUp() override{// std::cout << "单元测试执行前的环境初始化" << std::endl;qmp = std::make_shared<zd::MsgQueueManager>("./data/meta.bd");}   // 全部单元测试之后调用一次virtual void TearDown() override{// std::cout << "单元测试执行后的环境清理" << std::endl;// qmp->clear();}
};// 单元测试
// 测试名称与类名称相同,则会先调用SetUp
TEST(MsgQueueTest,MsgQueueTest_test1_Test)
{std::cout << "单元测试-1" << std::endl;// 声明队列google::protobuf::Map<std::string,std::string> args;args.insert({"k1","v1"});args.insert({"k2","v2"});args.insert({"k3","v3"});//1.  3个持久化qmp->declareMsgQueue("q-1",true,false,false,args);qmp->declareMsgQueue("q-2",true,false,false,args);qmp->declareMsgQueue("q-3",true,false,false,args);//2.  1个非持久化qmp->declareMsgQueue("q-4",false,false,false,args);// 此时队列应该有4个ASSERT_EQ(qmp->size(),4);// 移除队列qmp->deleteMsgQueue("q-5"); // 移除不存在的队列ASSERT_EQ(qmp->size(),4);qmp->deleteMsgQueue("q-1");// 此时q-1不存在,而q-2\q-3\q-4存在ASSERT_EQ(qmp->exists("q-1"),false);ASSERT_EQ(qmp->exists("q-2"),true);ASSERT_EQ(qmp->exists("q-3"),true);ASSERT_EQ(qmp->exists("q-4"),true);// 获取q-2交换机zd::MsgQueue::ptr q2 = qmp->selectMsgQueue("q-2");ASSERT_EQ(q2->name,"q-2");ASSERT_EQ(q2->autodelete,false);ASSERT_EQ(q2->durable,true);ASSERT_EQ(q2->exclusive,false);
}TEST(MsgQueueTest,MsgQueueTest_test2_Test)
{std::cout << "单元测试-2" << std::endl;// 获取所有队列数据,进行打印其名字zd::MsgQueueMap m = qmp->selectAll();for(auto& qit:m){LOG("%s",qit.second->name.c_str());}
}TEST(MsgQueueTest,MsgQueueTest_test3_Test)
{std::cout << "单元测试-3" << std::endl;}
// 单元测试全部结束后调用TearDown// ----------------------------------------------------------
int main(int argc,char** argv)
{testing::InitGoogleTest(&argc,argv);testing::AddGlobalTestEnvironment(new MsgQueueTest); // 注册Test的所有单元测试if(RUN_ALL_TESTS() != 0) // 运行所有单元测试{printf("单元测试失败!\n");}return 0;
}

        编译后的执行结果: 

                ​​​​​​​        ​​​​​​​        

        结果符合预期。

        然后查看meta.bd文件中的 msgqueue_table 的数据:

        ​​​​​​​        ​​​​​​​        

        持久化的q-1被移除了,所以只剩q-2和q-3了,这也是符合预期的。

        最后注释掉单元测试-1和单元测试-2,使用单元测试-3进行拉取历史队列数据的测试:

TEST(MsgQueueTest,MsgQueueTest_test3_Test)
{std::cout << "单元测试-3" << std::endl;ASSERT_EQ(qmp->size(),2);ASSERT_EQ(qmp->exists("q-2"),true);ASSERT_EQ(qmp->exists("q-3"),true);for(auto& kv:qmp->selectMsgQueue("q-2")->args){LOG("%s:%s",kv.first.c_str(),kv.second.c_str());}
}

        执行结果:

        ​​​​​​​        ​​​​​​​        ​​​​​​​        

        说明历史数据的拉取和 args 的处理都是正确的。

        至此,队列管理模块也完成了。

下期预告:

        之后是 绑定管理模块 的实现,它与 交换机管理模块 也是十分相似的,只是提供的接口不同。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词