欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 养生 > 十、消息头完善和使用json序列化

十、消息头完善和使用json序列化

2024/10/25 0:28:30 来源:https://blog.csdn.net/m0_58311374/article/details/141184497  浏览:    关键词:十、消息头完善和使用json序列化

        系列文章目录:C++ asio网络编程-CSDN博客 

        本教程使用的是jsoncpp,安装方式网上有很多,可以自己下载源码编译,也可以使用vcpkg一键安装,我比较推荐使用vcpkg,感觉这个就像 visual studio 的 maven,使用这种方式安装后就不需要自己配置项目的包含目录和库目录了,直接使用。

1、消息头完善

        前面我们的消息头其实是一种简化的方式去构造的,从本小节开始将使用完整的 tlv 格式

2、json数据格式

        json中将id和data分开存放:

{"data" : "hello","id" : 1001
}

3、MsgNode修改       

        现在,我们修改一下MsgNode

#pragma once
#include <string>
#include <iostream>
#include <boost/asio.hpp>
#include "const.h"class MsgNode
{
public:MsgNode(short max_len);~MsgNode();void clearData();  // 清空数据public:short _cur_len;  // 当前收发数据的长度short _total_len; // 数据总长度char* _data;  // 数据起始地址
};class RecvNode : public MsgNode
{
public:RecvNode(short max_len, short msg_id);private:short _msg_id;
};class SendNode : public MsgNode
{
public:SendNode(const char* msg, short max_len, short msg_id);private:short _msg_id;
};
#include "MsgNode.h"MsgNode::MsgNode(short max_len) : _total_len(max_len), _cur_len(0)
{_data = new char[_total_len + 1]();_data[_total_len] = '\0';
}MsgNode::~MsgNode() {std::cout << "destruct MsgNode" << std::endl;delete[] _data;
}void MsgNode::clearData()
{memset(_data, 0, _total_len);_cur_len = 0;
}RecvNode::RecvNode(short max_len, short msg_id): MsgNode(max_len), _msg_id(msg_id)
{
}SendNode::SendNode(const char* msg, short max_len, short msg_id): MsgNode(max_len + HEAD_TOTAL_LEN), _msg_id(msg_id)
{// 先拼接idshort msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(msg_id);memcpy(_data, &msg_id_host, HEAD_ID_LEN);// 再拼接长度short msg_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);memcpy(_data + HEAD_ID_LEN, &msg_len_host, HEAD_DATA_LEN);// 最后拼接消息memcpy(_data + HEAD_TOTAL_LEN, msg, max_len);
}

        const.h的内容如下

#pragma once#define MAX_LENGTH  1024*2
#define MAX_SENDQUE 1000
#define MAX_RECVQUE 10000
#define HEAD_TOTAL_LEN 4
#define HEAD_ID_LEN 2
#define HEAD_DATA_LEN 2

4、Session类修改      

#pragma once
#include <iostream>
#include <boost/asio.hpp>
#include <map>
#include "Server.h"
#include <queue>
#include <mutex>
#include "MsgNode.h"class Server;class Session : public std::enable_shared_from_this<Session>
{
public:Session(boost::asio::io_context& ioc, Server* server);~Session();void start();void close();void send(char* msg, int max_length, short msg_id);void send(std::string msg, short msg_id);boost::asio::ip::tcp::socket& getSocket();std::string& getUuid();std::shared_ptr<Session> getSelfShared();private:void handle_read(const boost::system::error_code& ec,std::size_t bytes_transferred,std::shared_ptr<Session> self_share);void handle_write(const boost::system::error_code& ec, std::shared_ptr<Session> self_share);boost::asio::ip::tcp::socket _socket;std::string _uuid;Server* _server;bool _b_close;char _data[MAX_LENGTH];// 收到的消息头std::shared_ptr<MsgNode> _recv_head_node;// 消息头是否处理完成bool _b_head_parse;// 收到的消息std::shared_ptr<RecvNode> _recv_msg_node;// 发送的消息std::queue<std::shared_ptr<SendNode>> _send_que;std::mutex _send_lock;
};
#include "Session.h"
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <json/json.h>
#include <json/value.h>
#include <json/reader.h>Session::Session(boost::asio::io_context& ioc, Server* server): _socket(ioc), _server(server), _b_close(false), _b_head_parse(false)
{// 生成唯一id,可以了解一下雪花算法// 这里直接使用boost自带的方法boost::uuids::uuid a_uuid = boost::uuids::random_generator()();_uuid = boost::uuids::to_string(a_uuid);_recv_head_node = std::make_shared<MsgNode>(HEAD_TOTAL_LEN);}Session::~Session()
{std::cout << "~Session destruct" << std::endl;
}boost::asio::ip::tcp::socket& Session::getSocket()
{return _socket;
}void Session::start()
{memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2,getSelfShared()));
}void Session::send(char* msg, int max_length, short msg_id)
{std::lock_guard<std::mutex> lock(_send_lock);if (_send_que.size() > MAX_SENDQUE) {std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << std::endl;return;}_send_que.push(std::make_shared<SendNode>(msg, max_length, msg_id));if (_send_que.size() > 1) {  // 因为push了一次,判断条件为大于1return;}auto& msgnode = _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),std::bind(&Session::handle_write, this, std::placeholders::_1, getSelfShared()));
}void Session::send(std::string msg, short msg_id)
{std::lock_guard<std::mutex> lock(_send_lock);if (_send_que.size() > MAX_SENDQUE) {std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << std::endl;return;}_send_que.push(std::make_shared<SendNode>(msg.c_str(), msg.length(), msg_id));if (_send_que.size() > 1) {  // 因为push了一次,判断条件为大于1return;}auto& msgnode = _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),std::bind(&Session::handle_write, this, std::placeholders::_1, getSelfShared()));
}void Session::close()
{_socket.close();_b_close = true;
}std::string& Session::getUuid()
{return _uuid;
}std::shared_ptr<Session> Session::getSelfShared()
{return shared_from_this();
}void Session::handle_read(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<Session> self_share)
{try {if (!ec) {int copy_len = 0;  // 已经处理的字符数while (bytes_transferred > 0) {// 头部未处理完成if (!_b_head_parse) {// 接收到的数据长度比头部长度小if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) {// 将接收到的存起来memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);_recv_head_node->_cur_len += bytes_transferred;// 继续接收数据memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, self_share));return;}// 接收到的长度大于等于头部长度// 只存头部剩余部分int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len;memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain);// 更新已处理的data长度和接收到但未处理的长度copy_len += head_remain;bytes_transferred -= head_remain;// 获取头部 msg_id 数据short msg_id = 0;memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);// 网络字节序转本地字节序msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);std::cout << "msg id: " << msg_id << std::endl;// id 校验if (msg_id > MAX_LENGTH) {std::cout << "invalid msd_id: " << msg_id << std::endl;_server->clearSession(_uuid);return;}// 获取数据长度short msg_len = 0;memcpy(&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN);// 网络字节序转化为本地字节序msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);std::cout << "msg len: " << msg_len << std::endl;// 头部长度非法if (msg_len > MAX_LENGTH) {std::cout << "invalid data length" << std::endl;_server->clearSession(_uuid);return;}// 头部处理完成_b_head_parse = true;// 头部处理完成后,还有剩余接收到的数据,就是正式数据// 开始处理数据_recv_msg_node = std::make_shared<RecvNode>(msg_len, msg_id);// 有数据但不完整if (bytes_transferred < msg_len) {// 先存下已接收到的memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);_recv_msg_node->_cur_len += bytes_transferred;// 继续接收剩下的数据memcpy(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, self_share));return;}// 数据齐全memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, msg_len);_recv_msg_node->_cur_len += msg_len;copy_len += msg_len;bytes_transferred -= msg_len;_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';std::cout << "receive data: " << _recv_msg_node->_data << std::endl;// 发过来的是json,解析数据测试是否正确Json::Reader reader;Json::Value root;reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);std::cout << "receive msg id is " << root["id"].asInt() << " msg data is "<< root["data"].asString() << std::endl;// 发送测试root["data"] = "server has received msg, msg data is " + root["data"].asString();std::string return_str = root.toStyledString();  // 序列化后再发送send(return_str, root["id"].asInt());// 继续轮询剩余未处理的数据_b_head_parse = false;_recv_head_node->clearData();if (bytes_transferred <= 0) {memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, self_share));return;}continue;}// 已经处理完头部,继续处理未处理完的数据int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;// 接收的数据仍不完整if (bytes_transferred < remain_msg) {memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);_recv_msg_node->_cur_len += bytes_transferred;// 继续接收数据memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, self_share));return;}// 接受的数据完整memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);_recv_msg_node->_cur_len += remain_msg;bytes_transferred -= remain_msg;copy_len += remain_msg;_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';//std::cout << "receive data: " << _recv_msg_node->_data << std::endl;// 发过来的是json,解析数据测试是否正确Json::Reader reader;Json::Value root;reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);std::cout << "receive msg id is " << root["id"].asInt() << " msg data is "<< root["data"].asString() << std::endl;// 发送测试root["data"] = "server has received msg, msg data is " + root["data"].asString();std::string return_str = root.toStyledString();  // 序列化后再发送send(return_str, root["id"].asInt());// 继续轮询剩余未处理的数据_b_head_parse = false;_recv_head_node->clearData();if (bytes_transferred <= 0) {memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, self_share));return;}continue;}}else {std::cout << "read error" << std::endl;close();_server->clearSession(_uuid);}}catch (std::exception& e) {std::cout << "Exception: " << e.what() << std::endl;}
}void Session::handle_write(const boost::system::error_code& ec, std::shared_ptr<Session> self_share)
{try {if (!ec) {std::lock_guard<std::mutex> lock(_send_lock);_send_que.pop();if (!_send_que.empty()) {auto& msgNode = _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgNode->_data, msgNode->_total_len),std::bind(&Session::handle_write, this, std::placeholders::_1, self_share));}}else {std::cout << "write error: " << ec.what() << std::endl;close();_server->clearSession(_uuid);}}catch (std::exception& e) {std::cerr << "Exception: " << e.what() << std::endl;}
}

5、客户端修改

#include <boost/asio.hpp>
#include <iostream>
#include <thread>
#include <json/json.h>
#include <json/value.h>
#include <json/reader.h>#define MAX_LENGTH  1024*2
#define HEAD_LENGTH 2
#define HEAD_TOTAL 4int main() {try {// 创建上下文服务boost::asio::io_context ioc;// 构造endpointboost::asio::ip::tcp::endpoint remote_ep(boost::asio::ip::address::from_string("127.0.0.1"), 10086);boost::asio::ip::tcp::socket sock(ioc);boost::system::error_code error = boost::asio::error::host_not_found;sock.connect(remote_ep, error);if (error) {std::cout << "连接失败,错误码:" << error.value()<< "错误信息:" << error.message() << std::endl;return 0;}Json::Value root;root["id"] = 1001;root["data"] = "hello";std::string request = root.toStyledString();std::size_t request_length = request.length();char send_data[MAX_LENGTH] = { 0 };int msgid = 1001;int msgid_host = boost::asio::detail::socket_ops::host_to_network_short(msgid);memcpy(send_data, &msgid_host, 2);int request_host_length = boost::asio::detail::socket_ops::host_to_network_short(request_length);memcpy(send_data + 2, &request_host_length, 2);memcpy(send_data + 4, request.c_str(), request_length);boost::asio::write(sock, boost::asio::buffer(send_data, request_length + 4));std::cout << "begin to receive ..." << std::endl;char reply_head[HEAD_TOTAL];std::size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply_head, HEAD_TOTAL));msgid = 0;memcpy(&msgid, reply_head, 2);short msglen = 0;memcpy(&msglen, reply_head + 2, 2);// 转换为本地字节序msgid = boost::asio::detail::socket_ops::network_to_host_short(msgid);msglen = boost::asio::detail::socket_ops::network_to_host_short(msglen);char msg[MAX_LENGTH] = { 0 };std::size_t msg_length = boost::asio::read(sock, boost::asio::buffer(msg, msglen));Json::Reader reader;reader.parse(std::string(msg, msg_length), root);std::cout << "msg id: " << root["id"] << ", msg: " << root["data"] << std::endl;getchar();}catch (std::exception& e) {std::cout << "异常:" << e.what() << std::endl;}return 0;
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com