目录
第一节:工作目录创建
第二节:属性定义
第三节:日志宏
第四节: SqliteHelper
第五节: FileHelper
下期预告:
第一节:工作目录创建
在家目录创建一个名为mq的目录,mq里又创建5个目录:mqserver、mqclient、mqtest、mqthird、mqcommon
mqserver:存放服务器相关代码
mqclient:存放客户端相关代码
mqtest:进行单元测试的目录
mqthird:存放项目要用到的库
mqcommon:存放服务器和客户端都要使用的代码,各种辅助代码就存放在此
第二节:属性定义
打开mqcommon目录,创建一个名为mq_msg.proto的文件并打开,先添加以下内容:
syntax = "proto3"; // 声明版本package zd; // 声明命名空间
声明版本就是使用最新的proto3语法,它的语法是做过优化的;
声明命名空间的意思就是下面的内容都在名字为"zd"的命名空间中定义。
添加的第一个属性是交换机的发布模式:
// 交换机的发布模式 enum ExchangeType {UNKONW_1 = 0;// 因为只能从0开始,使用它占据0DIRECT = 1; // 直接模式FANOUT = 2; // 广播模式TOPIC = 3 ; // 主题模式 };
交换机的发布模式与交换机的消息审核有关:
(1)直接模式:每个消息都有一个"钥匙"数据,每个绑定也有一个"钥匙"数据,钥匙内容完全相同才成功
(2)广播模式:只要与该交换机绑定的队列都成功,与钥匙无关
(3)主题模式:钥匙满足一定匹配规则才成功,例如消息的钥匙:"news.music.pop"与队列绑定的钥匙:"news.music"就可以成功
添加的第二个属性是消息的投递模式:
// 消息的投递模式 enum DeliveryMode {UNKONW_2 = 0;// 因为只能从0开始,使用它占据0UNDURABLE = 1;// 非持久化投递DURABLE = 2; // 持久化投递 };
(1)非持久化投递:消息不保存在文件中,只在内存中保存
(2)持久化投递:消息既保存在文件中,也保存在内存中
持久化的作用:如果服务器因为某些原因崩溃了,可以将持久化的消息从文件中重新读取出来。
消息想要持久化保存还有一个前置条件——消息所属队列也是持久化的,否则即使消息持久化了,也没有队列获取它。
添加的第三个属性是消息的属性:
// 消息属性 message BasicProperties {string id = 1; // 唯一识别码DeliveryMode delivery_mode = 2; // 投递模式string routing_key = 3; // 钥匙 };
(1)唯一识别码:消息的"身份证"
(2)投递模式:同上
(3)钥匙:与绑定钥匙进行匹配,成功将被发布给队列
最后将添加一个消息类,它保存消息的各种信息:
// 消息 message Message {// 消息载荷// 在文件中保存的数据message Payload{// 消息属性BasicProperties properties = 1;// 消息内容string body = 2;// 消息的有效性string valid = 3; };Payload payload = 1;// 消息载荷// 在内存中保存的数据,辅助消息的写入和读取uint32 offset = 2; // 消息存储位置uint32 length = 3; // 消息长度 };
(1)消息属性:同上
(2)消息内容:真正要发布的内容
(3)消息的有效性:标识文件中持久化的消息是否有效,如果不用标识,而是将无效消息删除的话,还需要把有效消息向前填充,效率就太低了。
(4)消息的存储位置:保存这条消息在文件中相对于开头的偏移量,用于消息的无效化和读取
(5)消息的长度:存储消息时,验证它和存储的内容长度是否一致
保存文件并退出后,就可以使用以下指令生成C++代码了:
protoc --cpp_out=./ mq_msg.proto
然后就会生成两个文件:
上述定义的各种内容就被声明在mq_msg.pb.h中了。
那么为什么要使用proto生成而不自己定义呢?因为proto生成的内容符合protobuf协议,使用该协议的send函数会自动将其序列化并添加应用层报头,解决了"粘包"的问题。
第三节:日志宏
在mqcommon创建名为mq_logger.hpp的文件并打开,先添加一个防止文件重复包含的预编译指令:
#ifndef __M_LOG_H__ #define __M_LOG_H__#endif
之后的每个头文件我们都需要添加类似的预处理指令。
然后包含需要的头文件:
#include <iostream> #include <time.h>
我们需要的日志要能够打印时间、文件名、行号和打印内容,所以使用struct tm将时间转化成各种格式,再使用__FILE__, __LINE__, ##__VA_ARGS__获得文件名、行号和其他参数:
// 宏不允许换行\n,所以每行结束加\将\n转义 #define LOG(format,...){\/* 获得当前时间戳 */\time_t t = time(nullptr);\/* 时间戳转化成类,类保存了它的各种格式 */\struct tm* ptm = localtime(&t);\/* 将时间的时分秒提取出来,保存为字符串 */\char time_str[32] = {0};\strftime(time_str,31,"%H:%M:%S",ptm);\printf("[%s][%s:%d]\t" format "\n", time_str, __FILE__, __LINE__, ##__VA_ARGS__);\/* __VA_ARGS__是不定参数...,##__VA_ARGS__表示...默认为空 */\/* 这样做LOG宏的...形参就可以不传参数了 */\ }
完成上述内容后就可以在mqtest中创建文件,进行测试了。
第四节: SqliteHelper
在mqcommon创建一个名为mq_helper.hpp的文件并打开,首先设置防止文件重复包含的预编译指令:
#ifndef __M_HELPER_H__ #define __M_HELPER_H__#endif
然后包含所需头文件,日志宏也包含在内:
#include "mq_logger.hpp"#include <iostream> #include <string> #include <vector> #include <sqlite3.h>
声明一个命名空间,名字自定,之后所有的代码基本上都包含在这个命名空间中:
namespace zd {};
在命名空间中定义一个名为SqliteHelper的类,它的构造函数需要传入一个不带路径的文件名,这个文件名就是sqlite存放数据的地方:
class SqliteHelper { public:SqliteHelper(const std::string& dbfile):_dbfile(dbfile){} private:std::string _dbfile;sqlite3* _handler; // 数据库管理句柄,用它才能对sqlite文件进行操作 }
然后我们设计它向外提供的接口。
首先是打开/创建文件的接口:
bool open(int save_level = SQLITE_OPEN_FULLMUTEX){int ret = sqlite3_open_v2(_dbfile.c_str(),&_handler,SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | save_level,nullptr);if(ret != SQLITE_OK){LOG("sqlite数据库打开/创建失败:%s",sqlite3_errmsg(_handler));return false;}return true;}
SQLITE_OPEN_FULLMUTEX是sqlite文件打开的多线程模式,这种模式可以保证线程安全,所以将其设置为缺省值
其次是执行命令的接口:
bool exec(const std::string& sql,SqliteCallback cb,void* arg){int ret = sqlite3_exec(_handler,sql.c_str(),cb,arg,nullptr);if(ret != SQLITE_OK){LOG("%s\n语句执行失败:%s",sql.c_str(),sqlite3_errmsg(_handler));return false;}return true;}
命名一共有五种:创建表、删除表、向指定表增加数据、向指定表删除数据、向指定表修改数据。
不同命令有自己的格式内容,但是它都能执行。
SqliteCallback cb是指令执行完毕后调用的回调函数,它的类型在SqliteHelper中重定义:
typedef int(*SqliteCallback)(void*,int,char**,char**);
而 void* arg 就是这个回调函数的各种参数。
最后是关闭数据库文件的接口:
void close(){if(_handler)sqlite3_close_v2(_handler);}
就在该文件中,定义FileHelper。
第五节: FileHelper
FileHelper的功能是提供一些文件操作接口,便于对文件的使用:
首先包含它所需的头文件:
#include <stdio.h> #include <string> #include <fstream> #include <unistd.h> #include <sys/stat.h>
构造函数的参数是带路径的文件名:
class FileHelper { public:FileHelper(const std::string filename):_filename(filename){} private:std::string _filename; }
所需的第一个接口是判断文件是否存在的接口:
// 判断文件是否存在bool exists(){struct stat st;return stat(_filename.c_str(),&st) == 0;}
其次是获取文件大小的接口:
// 获得文件大小size_t size(){struct stat st;int ret = stat(_filename.c_str(),&st);if(ret < 0) // 文件不存在返回0return 0;return st.st_size;}
读取文件数据的接口有两个,一个是从文件的起始读取,一个是根据某个偏移量从文件的中间开始读取:
// 读取数据bool read(std::string& body){// 根据文件大小调整body的大小size_t fsize = this->size();body.resize(fsize);return read(&body[0],0,fsize);}bool read(char* body,size_t offset,size_t len){// 1.二进制读方式打开文件std::ifstream ifs(_filename,std::ios::binary | std::ios::in);if(ifs.is_open() == false){LOG("%s 文件打开失败!",_filename.c_str());return false;}// 2.读文件指针 跳转到指定位置ifs.seekg(offset,std::ios::beg);// 3.读取文件内容ifs.read(body,len);if(ifs.good() == false) // 操作成功会返回true{LOG("%s 文件读取失败!",_filename.c_str());return false;}// 4.关闭文件ifs.close();return true;}
向文件写数据的接口也有两个,一个是从开头读取文件数据,一个是根据偏移量从中间写入数据:
// 写入数据bool write(const std::string& body){return write(body.c_str(),0,body.size());}bool write(const char* body,size_t offset,size_t len){// 1.二进制写方式打开文件std::fstream ofs(_filename,std::ios::binary | std::ios::out | std::ios::in);if(ofs.is_open() == false){LOG("%s 文件打开失败!",_filename.c_str());return false;}// 2.写文件指针 跳转到指定位置// 该操作需要文件的读权限ofs.seekp(offset,std::ios::beg);// 3.写入内容ofs.write(body,len);if(ofs.good() == false){LOG("%s 文件写入失败!",_filename.c_str());return false;}// 4.关闭文件ofs.close();return true;}
创建文件的接口也提供两个,一个是根据this的_filename创建文件,一个设置成静态函数,供外部传入文件名创建文件:
// 创建文件bool createFile(){// 文件不存在就会创建std::fstream ofs(_filename.c_str(),std::ios::binary | std::ios::out);if(ofs.is_open() == false){LOG("%s 文件创建失败!",_filename.c_str());return false;}ofs.close();return true;}static bool createFile(const std::string& filename){// 文件不存在就会创建std::fstream ofs(filename.c_str(),std::ios::binary | std::ios::out);if(ofs.is_open() == false){LOG("%s 文件创建失败!",filename.c_str());return false;}ofs.close();return true;}
需要注意的是创建文件时文件会被截断,即内容被清空,所以文件已经存在就不要调用它了。
删除文件的接口也对应创建文件接口有两个:
bool removeFile(){return (::remove(_filename.c_str()) == 0);}static bool removeFile(const std::string& filename){return (::remove(filename.c_str()) == 0);}
还需要对文件进行重命名的接口:
// 重命名bool rename(const std::string new_filename){return ::rename(_filename.c_str(),new_filename.c_str()) == 0;}static bool rename(const std::string filename,const std::string new_filename){return ::rename(filename.c_str(),new_filename.c_str()) == 0;}
获取文件父级路径的对外接口:
// 获取一个文件的父级路径static std::string parentDirectory(const std::string& filename){size_t pos = filename.find_last_of("/");if(pos == std::string::npos) // 找不到就是当前目录,例如"test"{return "./";}return filename.substr(0,pos);}
创建/删除父级路径的对外接口:
// 创建/删除目录static bool createDirectory(const std::string& path){std::string PATH = path;if(PATH.size() > 1){if(PATH.substr(0,2) == "./")PATH = PATH.substr(2);}size_t pos = 0;size_t idx = 0;while(idx < PATH.size()){pos = PATH.find("/",idx);if(pos == std::string::npos) // 已经是最后一层了{return mkdir(PATH.c_str(),0775) == 0; // 0775:其他人只允许查看}std::string subpath = PATH.substr(0,pos);int ret = mkdir(subpath.c_str(),0775);if(ret != 0 && errno != EEXIST){LOG("%s 目录创建失败!",subpath.c_str());return false;}idx = pos+1;}return true;}static bool removeDirectory(const std::string path){std::string cmd = "rm -rf "+path;return system(cmd.c_str()) != -1;}
这样文件管理类也完成了。
下期预告:
完成了日志宏、SqliteHelper、FileHelper之后,之后将完成:随机id生成类、字符串切割类并对上述5个小功能进行单元测试,没有问题后再完成工作线程池的代码。