目录
第一节:代码实现
1-1.MsgQueue类
1-2.MsgQueueMapper类
1-3.MsgQueueManager类
第二节:单元测试
下期预告:
队列管理模块在mqserver目录下实现。
第一节:代码实现
交换机管理模块和队列管理模块十分相似,因为它们的管理逻辑是一样的,都是使用sqlite数据库文件进行持久化管理,名字:信息 的映射进行内存管理。
很多代码都可以直接拷贝,稍加修改即可。
创建一个名为mq_msgqueue.hpp的文件,打开并防止重复包含、添加所需头文件、声明命名空间,这些内容可以在mq_exchange.hpp中拷贝:
#ifndef __M_MSGQUEUE_H__ #define __M_MSGQUEUE_H__ #include "../mqcommon/mq_logger.hpp" #include "../mqcommon/mq_helper.hpp" #include "../mqcommon/mq_msg.pb.h"#include <google/protobuf/map.h> #include <memory> #include <iostream> #include <unordered_map>namespace zd {};#endif
1-1.MsgQueue类
同样先定义一个交换机类,它保存交换机的各种信息:
(1)队列名称
(2)持久化标志
(3)独占标志:如果设置为true,那么队列最多只能有一个订阅者
(4)自动删除标志:当队列没有绑定,没有订阅者时删除队列
(5)其他参数:保存一些客户端可自定义的信息
因为交换机也有 args 成员,所以它也需要 args 的两个处理接口,直接从 class Exchange 拷贝即可。
// 1.定义消息队列类class MsgQueue{ public:using ptr = std::shared_ptr<MsgQueue>;std::string name; // 队列名称bool durable; // 持久化标志bool exclusive; // 独占标志bool autodelete; // 自动删除标志google::protobuf::Map<std::string,std::string> args; // 其他参数MsgQueue(){}MsgQueue(const std::string& qname,bool qdurable,bool qexclusive,bool qautodelete,const google::protobuf::Map<std::string,std::string>& qargs):name(qname),durable(qdurable),exclusive(qexclusive),autodelete(qautodelete),args(qargs){}// args在文件中会以 k1=v1&k2=v2&k3=v3 的形式存储// 读取上述内容后setArgs解析这个字符串,再存储到this->args中void setArgs(const std::string& str_args){std::vector<std::string> sub_args;size_t ret = StrHelper::split(str_args,"&",sub_args);for(auto& str:sub_args){size_t pos = str.find("=");std::string key = str.substr(0,pos);std::string val = str.substr(pos+1);args[key] = val;}}// 将this->args序列化成 k1=v1&k2=v2&k3=v3 的形式std::string getArgs(){std::string result;for(auto& it:args){result+=it.first+"="+it.second+"&";}if(!result.empty())result.pop_back();return result;}};
1-2.MsgQueueMapper类
class MsgQueueMapper用于队列的持久化管理,在实现它之前,也需要先重定义一个unordered_map:
using MsgQueueMap = std::unordered_map<std::string,MsgQueue::ptr>;
它提供的接口与class ExchageMapper基本一致,只是操作的表由 exchange_table 变成了 msgqueue_table,持久化数据也变化了一些:
// 定义消息队列数据持久化管理类class MsgQueueMapper{public:MsgQueueMapper(const std::string& dbfile):_sql_helper(dbfile){std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);assert(_sql_helper.open());}// 创建表void createTable(){#define CREATE_TABLE_ "create table if not exists msgqueue_table(\name varchar(32) primary key,\durable int,\exclusive int,\autodelete int,\args varchar(128));"bool ret = _sql_helper.exec(CREATE_TABLE_,nullptr,nullptr);if(ret == false){LOG("创建消息队列数据库表失败!");abort(); // 异常退出}} // 删除表void removeTable(){#define DROP_TABLE_ "drop table if exists msgqueue_table;"bool ret = _sql_helper.exec(DROP_TABLE_,nullptr,nullptr);if(ret == false){LOG("删除消息队列数据库表失败!");abort(); // 异常退出}}// 插入一个消息队列bool insert(MsgQueue::ptr& exp){#define INSERT_SQL_ "insert into msgqueue_table values('%s','%d','%d','%d','%s');"char sql_str[4096] = {0};sprintf(sql_str,INSERT_SQL_,exp->name.c_str(),exp->durable,exp->exclusive,exp->autodelete,exp->getArgs().c_str());bool ret = _sql_helper.exec(sql_str,nullptr,nullptr);if(ret == false){LOG("数据库新增消息队列信息失败!");return false;}return true;}// 删除一个消息队列void remove(const std::string& qcname){std::string DELE_SQL = "delete from msgqueue_table where name='"+qcname+"';";bool ret = _sql_helper.exec(DELE_SQL,nullptr,nullptr);if(ret == false){LOG("数据库删除消息队列信息失败!");abort(); // 异常退出}}// 从数据库中恢复消息队列数据std::unordered_map<std::string,MsgQueue::ptr> recovery(){createTable();MsgQueueMap result;std::string sql = "select name,durable,exclusive,autodelete,args from msgqueue_table;";// 它会根据数据库的数据量调整回调函数的调用次数,每次取出一套数据_sql_helper.exec(sql.c_str(),MsgQueueMapper::selectCallback,&result);return result;}private:static int selectCallback(void* arg,int numcol,char** row,char** fields){MsgQueueMap* result = (MsgQueueMap*)arg;auto qup = std::make_shared<MsgQueue>();qup->name = row[0];qup->durable = (bool)std::stoi(row[1]);qup->exclusive = (bool)std::stoi(row[2]);qup->autodelete = (bool)std::stoi(row[3]);if(row[4])qup->setArgs(row[4]);result->insert(std::make_pair(qup->name,qup));return 0;}private:zd::SqliteHelper _sql_helper;};
1-3.MsgQueueManager类
它的实现与 class ExchangeManager 基本一致:
class MsgQueueManager{public:using ptr = std::shared_ptr<MsgQueueManager>;MsgQueueManager(const std::string& dbfile):_mapper(dbfile){_msgqueues = _mapper.recovery();}// 声明消息队列bool declareMsgQueue(const std::string& name,bool durable,bool exclusive,bool autodelete,const google::protobuf::Map<std::string,std::string>& args){std::unique_lock<std::mutex> lock(_mtx);if(_msgqueues.find(name) != _msgqueues.end()) {// 消息队列已经存在就不需要添加了return true;}auto qup = std::make_shared<MsgQueue>(name,durable,exclusive,autodelete,args);if(durable == true){bool ret = _mapper.insert(qup);if(ret == false){LOG("消息队列 %s 持久化失败",name.c_str());return false;}}_msgqueues.insert(std::make_pair(name,qup));return true;}// 删除消息队列void deleteMsgQueue(const std::string& name){std::unique_lock<std::mutex> lock(_mtx);if(_msgqueues.find(name) == _msgqueues.end()) {// 交换机不存在就不需要删除了return;}_mapper.remove(name); // 数据库中删除消息队列信息_msgqueues.erase(name); // 内存中删除消息队列}// 获得消息队列MsgQueue::ptr selectMsgQueue(const std::string& name){std::unique_lock<std::mutex> lock(_mtx);auto it = _msgqueues.find(name);if(it == _msgqueues.end())return nullptr;return it->second;}// 获得所有消息队列const MsgQueueMap& selectAll(){return _msgqueues;}// 判断消息队列是否存在bool exists(const std::string& name){std::unique_lock<std::mutex> lock(_mtx);if(_msgqueues.find(name) == _msgqueues.end())return false;return true;}// 清理所有消息队列的数据void clear(){std::unique_lock<std::mutex> lock(_mtx);_mapper.removeTable();_msgqueues.clear();}size_t size(){return _msgqueues.size();}private:std::mutex _mtx;MsgQueueMap _msgqueues;MsgQueueMapper _mapper; // 持久化管理句柄};
实际上 绑定管理模块 的实现也和交换机管理模块、队列管理模块十分相似,它们的管理思路是一样的,只是对外提供的接口不同。
第二节:单元测试
打开mqtest目录。
创建mq_msgqueue_test.cc文件,并做好前置工作:
#include "../mqserver/mq_msgqueue.hpp" #include <gtest/gtest.h>
然后把mq_exchange_test.cc的测试套件拷贝过来,然后进行修改:
zd::MsgQueueManager::ptr qmp; // 全局测试套件------------------------------------------------ // 自己初始化自己的环境,使不同单元测试之间解耦 class MsgQueueTest :public testing::Environment { public:// 全部单元测试之前调用一次virtual void SetUp() override{// std::cout << "单元测试执行前的环境初始化" << std::endl;qmp = std::make_shared<zd::MsgQueueManager>("./data/meta.bd");} // 全部单元测试之后调用一次virtual void TearDown() override{// std::cout << "单元测试执行后的环境清理" << std::endl;// qmp->clear();} };// 单元测试 // 测试名称与类名称相同,则会先调用SetUp TEST(MsgQueueTest,MsgQueueTest_test1_Test) {std::cout << "单元测试-1" << std::endl;// 声明队列google::protobuf::Map<std::string,std::string> args;args.insert({"k1","v1"});args.insert({"k2","v2"});args.insert({"k3","v3"});//1. 3个持久化qmp->declareMsgQueue("q-1",true,false,false,args);qmp->declareMsgQueue("q-2",true,false,false,args);qmp->declareMsgQueue("q-3",true,false,false,args);//2. 1个非持久化qmp->declareMsgQueue("q-4",false,false,false,args);// 此时队列应该有4个ASSERT_EQ(qmp->size(),4);// 移除队列qmp->deleteMsgQueue("q-5"); // 移除不存在的队列ASSERT_EQ(qmp->size(),4);qmp->deleteMsgQueue("q-1");// 此时q-1不存在,而q-2\q-3\q-4存在ASSERT_EQ(qmp->exists("q-1"),false);ASSERT_EQ(qmp->exists("q-2"),true);ASSERT_EQ(qmp->exists("q-3"),true);ASSERT_EQ(qmp->exists("q-4"),true);// 获取q-2交换机zd::MsgQueue::ptr q2 = qmp->selectMsgQueue("q-2");ASSERT_EQ(q2->name,"q-2");ASSERT_EQ(q2->autodelete,false);ASSERT_EQ(q2->durable,true);ASSERT_EQ(q2->exclusive,false); }TEST(MsgQueueTest,MsgQueueTest_test2_Test) {std::cout << "单元测试-2" << std::endl;// 获取所有队列数据,进行打印其名字zd::MsgQueueMap m = qmp->selectAll();for(auto& qit:m){LOG("%s",qit.second->name.c_str());} }TEST(MsgQueueTest,MsgQueueTest_test3_Test) {std::cout << "单元测试-3" << std::endl;} // 单元测试全部结束后调用TearDown// ---------------------------------------------------------- int main(int argc,char** argv) {testing::InitGoogleTest(&argc,argv);testing::AddGlobalTestEnvironment(new MsgQueueTest); // 注册Test的所有单元测试if(RUN_ALL_TESTS() != 0) // 运行所有单元测试{printf("单元测试失败!\n");}return 0; }
编译后的执行结果:
结果符合预期。
然后查看meta.bd文件中的 msgqueue_table 的数据:
持久化的q-1被移除了,所以只剩q-2和q-3了,这也是符合预期的。
最后注释掉单元测试-1和单元测试-2,使用单元测试-3进行拉取历史队列数据的测试:
TEST(MsgQueueTest,MsgQueueTest_test3_Test) {std::cout << "单元测试-3" << std::endl;ASSERT_EQ(qmp->size(),2);ASSERT_EQ(qmp->exists("q-2"),true);ASSERT_EQ(qmp->exists("q-3"),true);for(auto& kv:qmp->selectMsgQueue("q-2")->args){LOG("%s:%s",kv.first.c_str(),kv.second.c_str());} }
执行结果:
说明历史数据的拉取和 args 的处理都是正确的。
至此,队列管理模块也完成了。
下期预告:
之后是 绑定管理模块 的实现,它与 交换机管理模块 也是十分相似的,只是提供的接口不同。