目录
1. 项目实现;
1. 项目实现:
1.1 通信抽象实现:
(1) BaseMessage: 主要实现对消息处理;
主要包含设置和获取ID, 设置类型和获取类型, 消息检查, 以及序列化和反序列化操作.
class BaseMessage{public://大家需要的功能先实现;using ptr = std::shared_ptr<BaseMessage>;//定义一个指针;virtual ~BaseMessage(){}virtual void setId(const std::string& id){_rid = id;}virtual std::string rid(){return _rid;}virtual void setMType(MType mtype){_mtype = mtype;}virtual MType mtype(){return _mtype;}//序列化;virtual std::string serialize() = 0;//反序列化;virtual bool unserialize(const std::string& msg) = 0;//对消息进行检验;virtual bool check() = 0;private:MType _mtype;//消息类型std::string _rid;//id号};
(2) BaseBuffer缓冲区抽象:
主要是查看缓冲区剩余大小, 读取数据, 删除数据. 读取全部字符串的操作.
//缓冲区的抽象, 缓冲区大小, 读取数据, 删除数据, 读取并删除数据;class BaseBuffer{public:using ptr = std::shared_ptr<BaseBuffer>;//定义一个指针;virtual size_t readableSize() = 0;//缓冲区可使用大小virtual int32_t peekInt32() = 0;//读取前4字节数据;virtual void retriveInt32() = 0;//删除前4字节数据;virtual int32_t readInt32() = 0;//读取并且删除前4字节数据.virtual std::string retrieveAsString(size_t len) = 0;};
(3) BaseProtocol: 主要实现判断缓冲区是否可以参数具体数据大小, 对接收数据处理, 以及序列化接收到的数据.
class BaseProtocal{public:using ptr = std::shared_ptr<BaseProtocal>;virtual bool canProcessed(const BaseBuffer::ptr& buf) = 0;//可以进行传输数据的大小s;virtual bool onMessage(const BaseBuffer::ptr& buf, BaseMessage::ptr& msg) = 0;//接收数据进行处理.virtual std::string serialize(const BaseMessage::ptr& msg) = 0;//发送数据前进行序列化};
(4) BaseConnection: 连接状态, 连接终止, 连接发送数据操作.
//连接, 发送数据, 关闭, 连接状态.class BaseConnection{public:using ptr = std::shared_ptr<BaseConnection>;virtual void send(const BaseMessage::ptr& msg) = 0;virtual void shutdown() = 0; virtual bool connected() = 0;};
(5) Server: 服务器端, 使用到包装器将Connenction和Close以及Message这三个回调进行封装. 服务器直接调用这些包装器即可.
using ConnectionCallback = std::function<void(const BaseConnection::ptr&)>;using CloseCallback = std::function<void(const BaseConnection::ptr&)>;using MessageCallback = std::function<void(const BaseConnection::ptr&, BaseMessage::ptr&)>;//服务器; 连接回调, 关闭回调, 信息回调.class BaseServer{public:using ptr = std::shared_ptr<BaseServer>;virtual void setConnectionCallback(const ConnectionCallback& cb){_cb_connection = cb;}virtual void setCloseCallback(const CloseCallback& cb){_cb_close = cb;}virtual void setMessageCallback(const MessageCallback& cb){_cb_message = cb;}virtual void start() = 0;protected:ConnectionCallback _cb_connection;CloseCallback _cb_close;MessageCallback _cb_message;};
(6) client: 客户端也是封装使用包装器将Connenction和Close以及Message这三个回调, 类就是连接处理, 连接关闭, 发送消息, 连接状态的实现.(这部分在继承类中纯虚函数里面实现.
//客户端, 连接回调, 关闭回调, 信息回调, 连接, 关闭, 发送, 连接状态.class BaseClient{public:using ptr = std::shared_ptr<BaseClient>;virtual void setConnectionCallback(const ConnectionCallback& cb){_cb_connection = cb;}virtual void setCloseCallback(const CloseCallback& cb){_cb_close = cb;}virtual void setMessageCallback(const MessageCallback& cb){_cb_message = cb;}virtual void connect() = 0;virtual void shutdown() = 0;virtual bool send(const BaseMessage::ptr& msg) = 0;virtual BaseConnection::ptr connection() = 0;virtual bool connected() = 0;protected:ConnectionCallback _cb_connection;CloseCallback _cb_close;MessageCallback _cb_message;};
1.2 消息抽象实现:
(1) JsonMessage: 进行序列化和反序列化操作. 继承上面BaseMessage进行多态对序列化和反序列化操作..
//序列化信息, 继承BaseMessage再进行重写;class JsonMessage : public BaseMessage{public:using ptr = std::shared_ptr<JsonMessage>;//序列化;virtual std::string serialize() override{std::string body;bool ret = JSON::serialize(_body, body);if(ret == false)return std::string();return body;}//反序列化;virtual bool unserialize(const std::string& msg) override{return JSON::unserialize(msg, _body);}protected:Json::Value _body;};
(2) JsonRequest: 对上面进行处理Json数据进行请求,
//Json请求, 继承JsonMessage;class JsonRequest : public JsonMessage{public:using ptr = std::shared_ptr<JsonRequest>;};
(3) JsonResponse: 处理的数据检查一下响应码以及响应码类型是否正确, 然后就是设置响应码或者返回响应码的操作.
//Json响应, 返回响应码, 设置响应码;class JsonResponse : public JsonMessage{public:using ptr = std::shared_ptr<JsonResponse>;//对消息进行检验;virtual bool check() override{//大部分响应只有响应状态码; 只需要判断响应状态码存在和正确即可.if(_body[KEY_RCODE].isNull() == true){ELOG("响应中没有响应状态码! ");return false;}if(_body[KEY_RCODE].isIntegral() == false){ELOG("响应状态码类型错误! ");return false;}return true;}virtual RCode rcode(){return (RCode)_body[KEY_RCODE].asInt();}virtual void setRCode(RCode rcode){_body[KEY_RCODE] = (int)rcode;}};
(4) RpcRequest: 继承JsonRequest, 主要是检查请求方法名称是否存在以及是否类型正确,
其次就是请求参数进行检查, 接着就是对请求方法以及请求参数的返回或者处理.
//rpc请求, 返回和设置请求方法, 返回和设置请求参数,class RpcRequest : public JsonRequest{public:using ptr = std::shared_ptr<RpcRequest>;virtual bool check() override{//rpc请求中, 包含请求方法名称, 请求参数 对象;if(_body[KEY_METHOD].isNull() == true ||_body[KEY_METHOD].isString() == false){ELOG("RPC请求中没有请求方法名称或请求类型错误 ");return false;}if(_body[KEY_PARAMS].isNull() == true ||_body[KEY_PARAMS].isObject() == false){ELOG("RPC中没有请求参数或请求参数类型错误! ");return false;}return true;}std::string method(){return _body[KEY_METHOD].asString();}void setMethod(const std::string& method_name){_body[KEY_METHOD] = method_name;}Json::Value params(){return _body[KEY_PARAMS];}void setparams(const Json::Value& params){_body[KEY_PARAMS] = params;}};
(5) TopicRequest: 主题的请求处理, 继承JsonRequest, 里面check主要对主题名称, 主题操作类型进行处理, 以及在主题操作类型是TOPIC_PUBLISH时候但是主题信息不存在或错误的处理, 其次就是对主题名称, 主题操作类型以及主题消息进行返回或者设置.
//主题请求, 返回和设置主题请求名称, 返回和设置主题操作类型;// 返回主题信息;class TopicRequest : public JsonRequest{public:using ptr = std::shared_ptr<TopicRequest>;virtual bool check() override{//rpc请求中, 包含请求方法名称, 请求参数 对象;if(_body[KEY_TOPIC_KEY].isNull() == true ||_body[KEY_TOPIC_KEY].isString() == false){ELOG("主题请求中没有主题名称或主题名称类型错误! ");return false;}if(_body[KEY_OPTYPE].isNull() == true ||_body[KEY_OPTYPE].isIntegral() == false){ELOG("主题请求中没有操作类型或操作类型错误! ");return false;}//判断主题为发布主题.if(_body[KEY_OPTYPE].asInt() == (int)TopicOpType::TOPIC_PUBLISH&& (_body[KEY_TOPIC_MSG].isNull() == true ||_body[KEY_TOPIC_MSG].isString() == false)){ELOG("主题消息发布请求中没有消息内容字段或者消息内容字段错误");return false;}return true;}std::string topicKey(){return _body[KEY_TOPIC_KEY].asString();}void setTopicKey(const std::string& key){_body[KEY_TOPIC_KEY] = key;}TopicOpType Optype(){return (TopicOpType)_body[KEY_OPTYPE].asInt();}void setOptype(TopicOpType optype){_body[KEY_OPTYPE] = (int)optype;}std::string topicMsg(){return _body[KEY_TOPIC_MSG].asString();}void setTopicMsg(const std::string& msg){_body[KEY_TOPIC_MSG] = msg;}};
(6) ServiceRequest: 服务器请求处理, 检查请求方法和请求类型是否正确, 确定不是请求发现的前提下进行判断服务的主机信息. 对请求方法以及请求类型以及主机信息进行返回以及设置.
//服务器请求, 请求方法, 请求类型, typedef std::pair<std::string, int> Address;class ServiceRequest : public JsonRequest{public:using ptr = std::shared_ptr<ServiceRequest>;virtual bool check() override{if(_body[KEY_METHOD].isNull() == true ||_body[KEY_METHOD].isString() == false){ELOG("服务器请求中没有方法名称或方法名称类型错误!! ");return false;}if(_body[KEY_OPTYPE].isNull() == true ||_body[KEY_OPTYPE].isIntegral() == false){ELOG("服务器请求中没有操作类型或操作类型错误! ");return false;}//请求类型不是服务发现, 服务主机信息.if(_body[KEY_OPTYPE].asInt() != (int)(ServiceOpType::SERVICE_DISCOVERY) &&(_body[KEY_HOST].isNull() == true ||_body[KEY_HOST].isObject() == false || (_body[KEY_HOST][KEY_HOST_IP].isNull() == true ||_body[KEY_HOST][KEY_HOST_IP].isString() == false) ||(_body[KEY_HOST][KEY_HOST_PORT].isNull() == true ||_body[KEY_HOST][KEY_HOST_PORT].isIntegral()) == false)){ELOG("服务请求中主机地址错误! ");return false;}return true;}std::string method(){return _body[KEY_METHOD].asString();}void setMethod(const std::string& name){_body[KEY_METHOD] = name;}ServiceOpType Optype(){return (ServiceOpType)_body[KEY_OPTYPE].asInt();}void setOptype(ServiceOpType optype){_body[KEY_OPTYPE] = (int)optype;}//返回主机信息;Address host(){Address addr;addr.first = _body[KEY_HOST][KEY_HOST_IP].asString();addr.second = _body[KEY_HOST][KEY_HOST_PORT].asInt();return addr;}//设置主机信息;void setHost(const Address& host){Json::Value val;val[KEY_HOST_IP] = host.first;val[KEY_HOST_PORT] = host.second;_body[KEY_HOST] = val;}};
(7) Rpc响应: 继承JosnResponse; 检查响应码和响应结果, 以及返回响应结果.
//RPC响应: class RpcResponse : public JsonResponse{public:using ptr = std::shared_ptr<RpcResponse>;virtual bool check() override{//大部分响应只有响应状态码; 只需要判断响应状态码存在和正确即可.if(_body[KEY_RCODE].isNull() == true ||_body[KEY_RCODE].isIntegral() == false){ELOG("响应中没有响应状态码或响应状态码类型错误! ");return false;}if(_body[KEY_RESULT].isNull() == true){ELOG("响应中没有响应结果或者结果类型错误! ");return false;}return true;}Json::Value result(){return _body[KEY_RESULT];}void setResult(const Json::Value& result){_body[KEY_RESULT] = result;}};
(8) TopicResponse主题响应:
//主题响应,class TopicResponse : public JsonResponse{public:using ptr = std::shared_ptr<TopicResponse>;};
(9) ServiceResponse服务器响应: 检查响应码以及操作类型, 如果操作类型是服务查询还要检查请求方法以及主机信息是否正确. 以及对方法还有操作类型以及主机信息进行返回以及设置.
//服务器响应, class ServiceResponse : public JsonResponse{public:using ptr = std::shared_ptr<ServiceResponse>;virtual bool check() override{//大部分响应只有响应状态码; 只需要判断响应状态码存在和正确即可.if(_body[KEY_RCODE].isNull() == true ||_body[KEY_RCODE].isIntegral() == false){ELOG("响应中没有响应状态码或响应状态码类型错误! ");return false;}if(_body[KEY_OPTYPE].isNull() == true ||_body[KEY_OPTYPE].isIntegral() == false){ELOG("响应中没有操作类型或者操作类型错误! ");return false;}if(_body[KEY_OPTYPE].asInt() == (int)(ServiceOpType::SERVICE_DISCOVERY) &&(_body[KEY_METHOD].isNull() == true ||_body[KEY_METHOD].isString() == false ||_body[KEY_HOST].isNull() == true ||_body[KEY_HOST].isArray() == false)){ELOG("服务响应中响应消息字段错误! ");return false;}return true;}std::string method(){return _body[KEY_METHOD].asString();}void setMethod(const std::string& method){_body[KEY_METHOD] = method;}ServiceOpType Optype(){return (ServiceOpType)_body[KEY_OPTYPE].asInt();}void setOptype(ServiceOpType optype){_body[KEY_OPTYPE] = (int)optype;}void setHost(std::vector<Address> addrs){for(auto& addr : addrs){Json::Value val;val[KEY_HOST_IP] = addr.first;val[KEY_HOST_PORT] = addr.second;_body[KEY_HOST].append(val);}}std::vector<Address> Hosts(){std::vector<Address> addrs;int sz = _body[KEY_HOST].size();for(int i = 0; i < sz; i++){Address addr;addr.first = _body[KEY_HOST][i][KEY_HOST_IP].asString();addr.second = _body[KEY_HOST][i][KEY_HOST_PORT].asInt();addrs.push_back(addr);}return addrs;}};
(10) MessageFactory: 对消息对象进行封装, 最上面的RpcRequest/RpcResponse以及TopicRequest/TopicResponse以及ServiceRequest以及ServiceResponse进行封装.
//实现消息对象的生产工厂;class MessageFactory {public:static BaseMessage::ptr create(MType mtype) {switch(mtype) {case MType::REQ_RPC : return std::make_shared<RpcRequest>();case MType::RSP_RPC : return std::make_shared<RpcResponse>();case MType::REQ_TOPIC : return std::make_shared<TopicRequest>();case MType::RSP_TOPIC : return std::make_shared<TopicResponse>();case MType::REQ_SERVICE : return std::make_shared<ServiceRequest>();case MType::RSP_SERVICE : return std::make_shared<ServiceResponse>();}return BaseMessage::ptr();}//不定参数模板, forward是进行万能引用, 保持对象的性质(左值或右值).template<typename T, typename ...Args>static std::shared_ptr<T> create(Args&& ...args) {return std::make_shared<T>(std::forward(args)...);}};
1.3 通信-Muduo封装实现:
(1) MuduoBuffer: 继承BaseBuffer, 纯虚函数实现父类的接口, 直接调用Muduo库里面的接口即可完成.
#pragma once
#include <functional>
#include "detail.hpp"
#include "fields.hpp"
#include "abstract.hpp"#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <iostream>
#include <string>
#include <unordered_map>#include <muduo/net/TcpServer.h>
#include <muduo/net/TcpClient.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <iostream>
#include <string>
#include <unordered_map>
#include <muduo/base/CountDownLatch.h>
#include <muduo/net/EventLoopThread.h>
#include <utility>
#include <mutex>
#include <unordered_map>
namespace RPC
{class MuduoBuffer : public BaseBuffer{public:using ptr = std::shared_ptr<MuduoBuffer>; // 定义一个指针;MuduoBuffer(muduo::net::Buffer *buf): _buf(buf){}virtual size_t readableSize() override{return _buf->readableBytes();}virtual int32_t peekInt32() override{// muduo库是一个网络库, 从缓冲区取出4字节整形,// 会进行网络字节序转换.return _buf->peekInt32();}virtual void retriveInt32() override{return _buf->retrieveInt32();}virtual int32_t readInt32() override{return _buf->readInt32();}virtual std::string retrieveAsString(size_t len) override{return _buf->retrieveAsString(len);}private:muduo::net::Buffer *_buf;};
}
(2) BufferFactory: 将MuduoBuffer进行工厂类的封装.
class BufferFactory{public:template <typename ...Args>static BaseBuffer::ptr create(Args&& ...args){return std::make_shared<MuduoBuffer>(std::forward<Args>(args)...);}};
(3) LVProtocol: 继承实现BaseProtocall里面的接口, lenFieldsLength是数据长度, mtypeFieldsLength是类型长度, idlenFieldsLength是ID字段长度.
对数据是否可以处理, 足不足够进行处理, 算出总长度, 类型, id长度, 正文长度, 找到类型以及id, 接着就是对消息进行处理, 调用对应的消息函数, 进行反序列化, 以及设置id和类型.
其中也提供序列化操作:
根据LV规则, 读取数据以及长度, 包装成LV格式.
// 判断缓存区数据是否可以进行一条数据的处理;class LVProtocol : public BaseProtocal{public://|--len--|--VALUE--|//|--len--|--mtype--|--idLen--|--id--|--body--|using ptr = std::shared_ptr<LVProtocol>;virtual bool canProcessed(const BaseBuffer::ptr &buf) override{if(buf->readableSize() < lenFieldsLength)return false;int32_t total_len = buf->peekInt32();DLOG("total_len:%d", total_len);if (buf->readableSize() < (total_len + lenFieldsLength)){return false;}return true;}virtual bool onMessage(const BaseBuffer::ptr &buf, BaseMessage::ptr &msg) override{// 调用onmessage时候, 默认可以进行处理一条数据;int32_t total_len = buf->readInt32();MType mtype = (MType)buf->readInt32();int32_t idlen = buf->readInt32();int32_t body_len = total_len - idlenFieldsLength - idlen - mtypeFieldsLength;std::string id = buf->retrieveAsString(idlen);std::string body = buf->retrieveAsString(body_len);msg = MessageFactory::create(mtype);if (msg.get() == nullptr){ELOG("消息类型错误, 构造消息对象失败!");return false;}bool ret = msg->unserialize(body);if (ret == false){ELOG("消息正文反序列失败!");return false;}msg->setId(id);msg->setMType(mtype);return true;}virtual std::string serialize(const BaseMessage::ptr& msg) override{//|--len--|--mtype--|--idLen--|--id--|--body--|std::string body = msg->serialize();std::string id = msg->rid();int32_t idlen = htonl(id.size());auto mtype = htonl((int32_t)msg->mtype());int32_t h_total_len = mtypeFieldsLength + mtypeFieldsLength + id.size() + body.size();int32_t n_total_len = htonl(h_total_len);DLOG("h_total_len:%d", h_total_len);std::string result;result.reserve(h_total_len);result.append((char *)&n_total_len, lenFieldsLength);result.append((char *)&mtype, mtypeFieldsLength);result.append((char *)&idlen, idlenFieldsLength);result.append(id);result.append(body);return result;}private:const size_t lenFieldsLength = 4;const size_t mtypeFieldsLength = 4;const size_t idlenFieldsLength = 4;};
(4) ProtocolFactory, 进行封装BaseProtocol工厂类模式.
class ProtocolFactory{public:template <typename ...Args>static BaseProtocal::ptr create(Args&& ...args){return std::make_shared<LVProtocol>(std::forward<Args>(args)...);}};
(5) MuduoConnection: 继承BaseConnection, 直接调用Muduo库里里面的接口即可实现构造函数, 发送数据, 关闭连接, 连接状态的实现.
class MuduoConnection : public BaseConnection{public:using ptr = std::shared_ptr<MuduoConnection>;MuduoConnection(const muduo::net::TcpConnectionPtr &conn,BaseProtocal::ptr &protocol): _protocol(protocol), _conn(conn){}virtual void send(const BaseMessage::ptr &msg) override{std::string body = _protocol->serialize(msg);_conn->send(body);}virtual void shutdown() override{_conn->shutdown();}virtual bool connected() override{_conn->connected();}private:BaseProtocal::ptr _protocol;muduo::net::TcpConnectionPtr _conn;};class ConnectionFactory{public:template <typename ...Args>static BaseConnection::ptr create(Args&& ...args){return std::make_shared<MuduoConnection>(std::forward<Args>(args)...);}};
(6) MuduoServer: 继承BaseServer, onConnection进行连接管理, 简历连接成功需要标记一下连接的消息, 使用unordered_map解决的, 连接断开是需要从unordered_map里面移除的.
onMessage: 判断数据是否足够处理, 可能数据过大也要判断, 判断完发送到缓冲区, 接着进行连接管理.
class MuduoServer : public BaseServer{public:using ptr = std::shared_ptr<MuduoServer>;MuduoServer(int port): _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port),"MuduoServer", muduo::net::TcpServer::kReusePort),_protocol(ProtocolFactory::create()){}virtual void start() override{_server.setConnectionCallback(std::bind(&MuduoServer::onConnection, this, std::placeholders::_1));_server.setMessageCallback(std::bind(&MuduoServer::onMessage, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.start(); // 打开服务器机械能监听;_baseloop.loop(); // 开始死循环事务监听;}void onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){std::cout << "链接建立成功!" << std::endl;auto muduo_conn = ConnectionFactory::create(conn, _protocol);{std::unique_lock<std::mutex> lock(_mutex);_conns.insert(std::make_pair(conn, muduo_conn));}if (_cb_connection)_cb_connection(muduo_conn);}else{std::cout << "链接断开" << std::endl;BaseConnection::ptr muduo_conn;{std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(conn);if (it == _conns.end()){return;}muduo_conn = it->second;_conns.erase(conn);}if (_cb_close)_cb_close(muduo_conn);}}void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp){DLOG("连接有数据量到来, 开始处理! ")auto base_buf = BufferFactory::create(buf);while (1){if (_protocol->canProcessed(base_buf) == false){// 数据不足处理;if (base_buf->readableSize() > maxDatasize){conn->shutdown();ELOG("缓冲区中数据过大! ");return;}DLOG("数据量不足! ");break;}DLOG("缓冲区中数据可处理! ")BaseMessage::ptr msg;bool ret = _protocol->onMessage(base_buf, msg);if (ret == false){conn->shutdown();ELOG("缓冲区数据错误! ");return;}DLOG("消息反序列化成功! ")BaseConnection::ptr base_conn;{std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(conn);if (it == _conns.end()){conn->shutdown();return;}base_conn = it->second;}DLOG("调用回调函数进行消息处理! ");if (_cb_message)_cb_message(base_conn, msg);}}private:const size_t maxDatasize = (1 << 16);BaseProtocal::ptr _protocol;muduo::net::EventLoop _baseloop;muduo::net::TcpServer _server;std::mutex _mutex;std::unordered_map<muduo::net::TcpConnectionPtr, BaseConnection::ptr> _conns;};class ServerFactory{public:template <typename ...Args>static BaseServer::ptr create(Args&& ...args){return std::make_shared<MuduoServer>(std::forward<Args>(args)...);}};
(7) MuduoClient: 继承BaseClient, onConnection连接管理调用上面实现的ConnectionFactory, onMessage和服务器一样.
class MuduoClient : public BaseClient{public:using ptr = std::shared_ptr<MuduoClient>;MuduoClient(const std::string &sip, int sport): _downlatch(1), // 初始化计数为1._protocol(ProtocolFactory::create()),_baseloop(_loopthread.startLoop()),_client(_baseloop, muduo::net::InetAddress(sip, sport), "MuduoClient"){}virtual void connect() override{DLOG("设置回调函数, 连接服务器! ");_client.setConnectionCallback(std::bind(&MuduoClient::onConnection, this, std::placeholders::_1));_client.setMessageCallback(std::bind(&MuduoClient::onMessage, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.connect();_downlatch.wait();DLOG("连接服务器成功! ");}virtual void shutdown() override{return _client.disconnect();}virtual bool send(const BaseMessage::ptr & msg) override{if(connected() == false){ELOG("连接已断开! ");return false;}_conn->send(msg);return true;}virtual BaseConnection::ptr connection() override{return _conn;}virtual bool connected() override{return (_conn && _conn->connected());}private:void onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){std::cout << "链接建立成功!" << std::endl;_downlatch.countDown(); // 计数为0被唤醒._conn = ConnectionFactory::create(conn, _protocol);}else{std::cout << "链接断开" << std::endl;_conn.reset();}}void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp){DLOG("连接有数据到来, 开始处理! ");auto base_buf = BufferFactory::create(buf);while (1){if (_protocol->canProcessed(base_buf) == false){// 数据不足处理;if (base_buf->readableSize() > maxDatasize){conn->shutdown();ELOG("缓冲区中数据过大! ");return;}DLOG("数据量不足! ");break;}DLOG("缓冲区中数据可处理! ")BaseMessage::ptr msg;bool ret = _protocol->onMessage(base_buf, msg);if (ret == false){conn->shutdown();ELOG("缓冲区数据错误! ");return;}DLOG("缓冲区中数据解析完毕, 调用回调函数进行处理! ")if (_cb_message)_cb_message(_conn, msg);}}protected:const size_t maxDatasize = (1 << 16);BaseProtocal::ptr _protocol;BaseConnection::ptr _conn;muduo::CountDownLatch _downlatch;muduo::net::EventLoopThread _loopthread;muduo::net::EventLoop *_baseloop;muduo::net::TcpClient _client;};class ClientFactory{public:template <typename ...Args>static BaseClient::ptr create(Args&& ...args){return std::make_shared<MuduoClient>(std::forward<Args>(args)...);}};