欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 产业 > 网络编程(16)——asio多线程模型IOServicePool

网络编程(16)——asio多线程模型IOServicePool

2024/10/25 18:27:57 来源:https://blog.csdn.net/m0_63086198/article/details/142798269  浏览:    关键词:网络编程(16)——asio多线程模型IOServicePool

目录

十六、day16

1. 什么是多线程?

2. IOServicePool实现

3. 服务器修改

4. 客户端修改

5. 总结

1. boost::asio::io_context::work的作用?


十六、day16

在之前的设计中,我们对 ASIO 的使用都是采用单线程模式。为了提升网络 I/O 并发处理的效率,这次我们设计了在多线程模式下使用 ASIO 的方法。总体而言,ASIO 有两种多线程模型

  • 启动多个线程,每个线程管理一个独立的 io_context。
  • 启动一个 io_context,由多个线程共享。

在后续文章中,我们会对比这两种模式的区别。这里我们先介绍第一种模式,即多个线程,每个线程管理一个独立的 io_context 服务。

1. 什么是多线程?

之前在完善消息节点的章节学习过asio服务器底层通信的流程,它是基于单线程运行的,可参考

知乎用户​www.zhihu.com/people/zhi-chi-tian-ya-10-23/posts

单线程模式

而今天将设计IOServicePool类型的多线程模型,如下图所示

多线程模式

IOServicePool 服务池中,IOServicePool 类会根据系统的 CPU 核数创建相应数量的 io_context 实例,并将每个 io_context 运行在一个独立的线程中。例如,如果系统有两个 CPU 核,就会有两个独立的线程分别运行各自的 io_context。io_context 是一个调度器,用于管理异步事件。例如,对于 Session1 会话,如果想在线程 1 上注册一个读事件,可以通过 async_read 将读事件注册到 io_context1 中,这样它的回调函数就会在线程 1 中执行。同样,线程 2 也是独立运行的,并处理它对应的 io_context 的事件。

IOServicePool多线程模式特点:

  • 每个 io_context 都在独立的线程中运行,因此同一个 socket 会被注册在同一个 io_context 上,它的回调函数也会在同一个线程中执行。这样,对于同一个 socket 来说,每次回调函数触发都会在同一个线程中执行,从而避免了线程安全问题,确保网络 I/O 层面的并发是线程安全的。
  • 但是,对于不同的 socket,回调函数的触发可能会在同一个线程中(如果两个 socket 被分配到同一个 io_context),也可能在不同的线程中(如果两个 socket 分配到不同的 io_context)。多个socket由同一个ioc调度的话,不会发生逻辑安全或线程问题,但如果不同的socket由不同的ioc调度,那么可能会发生安全问题。比如,两个 socket 对应的上层逻辑有交互或共享数据,就可能存在线程安全问题。如果 socket1 代表玩家1,socket2 代表玩家2,而这两个玩家在逻辑层面上有交互(如同属一个工会并且共同完成任务),则涉及的工会积分是共享的数据区域,需要保证线程安全。可以通过加锁或使用逻辑队列来解决这个问题,目前我们采用的是逻辑队列的方法。
  • 与单线程相比,多线程显著提高了并发能力。在单线程模式下,只有一个 io_context 来监听读写事件,事件就绪后回调函数在同一个线程中串行执行,如果一个回调函数执行时间较长,会影响后续的回调函数。而在多线程模式下,可以在一定程度上减少一个逻辑调用对下一个调用的影响。例如,如果两个 socket 被分配到不同的 io_context 上,它们的回调就不会互相影响。但如果两个 socket 分配到同一个 io_context,仍然可能有调用时间的影响。不过,我们已经通过逻辑队列的方式将网络线程和逻辑线程解耦,从而避免了前一个调用时间影响下一个回调触发的问题。

2. IOServicePool实现

IOServicePool本质上是一个线程池,基本功能就是根据构造函数传入的数量创建n个线程和iocontext,然后每个线程跑一个iocontext,这样就可以并发处理不同iocontext读写事件了

a. IOServicePool.h

#pragma once
#include "Singleton.h"
#include <boost/asio.hpp>
#include <vector>class AsioIOServicePool : public Singleton<AsioIOServicePool>
{friend Singleton<AsioIOServicePool>;
public:using IOService = boost::asio::io_context;using Work = boost::asio::io_context::work; // work的作用?using WorkPtr = std::unique_ptr<Work>; // 希望该work不会被拷贝,只能移动或者从头用到尾不被改变~AsioIOServicePool();AsioIOServicePool(const AsioIOServicePool&) = delete;AsioIOServicePool& operator = (const AsioIOServicePool&) = delete;// 使用round-robin 的方式返回一个io_contextboost::asio::io_context& GetIOService();void Stop();
private:AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency()); // hardware_concurrency获取CPu核数std::vector<IOService> _ioServices;std::vector<WorkPtr> _works;std::vector<std::thread> _threads;// 通过轮询返回ioc时,需要记录当前ioc的下标,累加,当超过vector的size时就归零,然后继续按轮询的方式返回// 记录ioc在vector的下标std::size_t _nextIOService;
};
  • IOServicePool也是单例模式,有且仅有唯一实例
  • IOService :io_context
  • Work :用于绑定ioc,避免ioc.run()提前返回, work的详细作用请看文章末的总结部分
  • WorkPtr :使用unique_ptr管理work,希望该work不会被拷贝,只能移动或者从头用到尾不被改变
  • _ioServices:存储指定数量的ioc
  • _works:存储与ioc数量对应的work
  • _threads:存储指定数量的线程
  • _nextIOService:记录ioc在vector的下标,通过轮询返回ioc时,需要记录当前ioc的下标,累加,当超过vector的size时就归零,然后继续按轮询的方式返回

b. IOServicePool构造函数

AsioIOServicePool::AsioIOServicePool(std::size_t size) : _ioServices(size), _works(size), _nextIOService(0) {for (std::size_t i = 0; i < size; i++) {_works[i] = std::unique_ptr<Work>(new Work(_ioServices[i]));}// 遍历多个ioservice,创建多个线程,每个线程内部启动ioservicefor (std::size_t i = 0; i < _ioServices.size(); i++) {_threads.emplace_back([this, i]() {_ioServices[i].run();});}
}

size的默认值是std::thread::hardware_concurrency(),该函数用于获取CPU核数。如果不主动更改size,那么IOServicePool会构造数量等于CPU核数的上下文服务、work和线程。

因为work通过std::unique_ptr进行管理,所以下面这段代码是错的,因为std::unique_ptr 不允许将一个普通指针直接赋值给另一个 std::unique_ptr, std::unique_ptr是独占有权的。

auto unptr = std::unique_ptr<Work>(new Work(_ioServices[i]));
_works[i] = unptr;

但是,可以通过移动语义将自动将创建的 unique_ptr 的所有权转移到 _works[i] ,实际上是在 _works[i] 中创建或替换一个 unique_ptr。

std::unique_ptr 不允许复制(即同一对象不能被多个 unique_ptr 同时拥有),但支持移动操作。使用 std::unique_ptr 时,_works[i] 直接接收新创建的 unique_ptr,所有权被有效地转移。

可以将unique_ptr作为右值赋值给另一个unique_ptr

_works[i] = std::unique_ptr<Work>(new Work(_ioServices[i]));

注意,Work(ioc_context)是asio库的函数,用于将work与ioc进行绑定,避免ioc.run()返回。原型为

boost::asio::io_context::work::work(boost::asio::io_context& io_context)

最后,遍历多个ioservice,创建多个线程,每个线程内部启动ioservice。

c. GetIOService()

boost::asio::io_context& AsioIOServicePool::GetIOService() {auto& service = _ioServices[_nextIOService++];if (_nextIOService == _ioServices.size())_nextIOService = 0;return service;
}

该段代码用于从ioc存储容器_ioServices中获取io_context&,其中_nextIOService为索引,轮询获取io_context&

d. Stop()

void AsioIOServicePool::Stop(){for (auto& work : _works) {work.reset();}for (auto& t : _threads) {t.join();}
}

同样我们要实现Stop函数,控制AsioIOServicePool停止所有ioc的工作,并等待所有线程结束。因为我们要保证每个线程安全退出后再让AsioIOServicePool停止。

3. 服务器修改

a. void CServer::start_accept()

void CServer::start_accept() {auto& ioc = AsioIOServicePool::GetInstance()->GetIOService();// make_shared分配并构造一个 std::shared_ptr,_ioc, this是传给Session的参数std::shared_ptr<CSession> new_session = std::make_shared<CSession>(ioc, this);// 开始一个异步接受操作,当new_session的socket与客户端连接成功时,调用回调函数handle_accept_acceptor.async_accept(new_session->Socket(), std::bind(&CServer::handle_accept, this, new_session,std::placeholders::_1));
}

该段函数在CServer实例化的时候被CServer构造函数调用,使服务器启动异步接收(相当于异步读,之前的代码不需要work的原因是因为ioc.run()是在CServer实例化后运行的,start_accept()函数会执行异步接收操作,相当于异步读注册给ioc,ioc.run不会返回),等待客户端连接。

之前的代码中,new_session使用的ioc是acceptor绑定的ioc,该ioc负责异步接收、异步读和写。但是在多线程模式中,该ioc只需要执行异步接收操作,而异步读写通过从AsioIOServicePool池中获取的ioc运行。

std::shared_ptr<CSession> new_session = std::make_shared<CSession>(_ioc, this); // 修改前
std::shared_ptr<CSession> new_session = std::make_shared<CSession>(ioc, this); // 修改后

b. AsyncServer_MsgNode.cpp

主函数也需要修改,因为现在的ioc不止用于执行异步接受,还有线程池中的ioc,所以需要将二者均stop

int main()
{try {auto pool = AsioIOServicePool::GetInstance();boost::asio::io_context ioc;boost::asio::signal_set signals(ioc, SIGINT, SIGTERM);// 必须异步等待,否则建立线程进行处理signals.async_wait([&ioc, pool](const boost::system::error_code& error, int signal_number) {if (!error) {std::cout << "Signal " << signal_number << " received." << std::endl;ioc.stop();  // 停止 io_contextpool->Stop();}});CServer s(ioc, 10086);ioc.run();}catch (std::exception& e) {std::cerr << "Exception: " << e.what() << '\n';}boost::asio::io_context io_context;
}

4. 客户端修改

#include <boost/asio.hpp>
#include <iostream>
#include <json/json.h>
#include <json/value.h>
#include <json/reader.h>
#include <chrono>
#include <thread>using namespace boost::asio::ip;
using std::cout;
using std::endl;
const int MAX_LENGTH = 1024 * 2; // 发送和接收的长度为1024 * 2字节
const int HEAD_LENGTH = 2;
const int HEAD_TOTAL = 4;std::vector<std::thread> vec_threads;int main()
{auto start = std::chrono::high_resolution_clock::now();for (int i = 0; i < 50; i++) { //建立100个线程vec_threads.emplace_back([]() {try {boost::asio::io_context ioc; // 创建上下文服务// 127.0.0.1是本机的回路地址,也就是服务器和客户端在一个机器上tcp::endpoint remote_ep(address::from_string("127.0.0.1"), 10086); // 构造endpointtcp::socket sock(ioc);boost::system::error_code error = boost::asio::error::host_not_found; // 错误:主机未找到sock.connect(remote_ep, error);if (error) {cout << "connect failed, code is " << error.value() << " error msg is " << error.message();return 0;}int i = 0;while(i++ < 200) {Json::Value root;root["id"] = 1001;root["data"] = "hello world";std::string request = root.toStyledString();size_t request_length = request.length();char send_data[MAX_LENGTH] = { 0 };int msgid = 1001;int msgid_host = boost::asio::detail::socket_ops::host_to_network_short(msgid);memcpy(send_data, &msgid_host, 2);//转为网络字节序int request_host_length = boost::asio::detail::socket_ops::host_to_network_short(request_length);memcpy(send_data + 2, &request_host_length, 2);memcpy(send_data + 4, request.c_str(), request_length);boost::asio::write(sock, boost::asio::buffer(send_data, request_length + 4));char reply_head[HEAD_TOTAL]; // 首先读取对端发送消息的总长度size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply_head, HEAD_TOTAL));msgid = 0;memcpy(&msgid, reply_head, HEAD_LENGTH);short msglen = 0; // 消息总长度memcpy(&msglen, reply_head + 2, HEAD_LENGTH); // 将消息总长度赋值给msglen//转为本地字节序msglen = boost::asio::detail::socket_ops::network_to_host_short(msglen);char msg[MAX_LENGTH] = { 0 }; // 构建消息体(不含消息总长度)size_t msg_length = boost::asio::read(sock, boost::asio::buffer(msg, msglen));Json::Reader reader;reader.parse(std::string(msg, msg_length), root);std::cout << "msg id is " << root["id"] << " msg is " << root["data"] << endl;}}catch (std::exception& e) {std::cerr << "Exception: " << e.what() << endl;}});std::this_thread::sleep_for(std::chrono::seconds(1));for (auto& t : vec_threads) {t.join();}auto end = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);cout << "Time spent: " << duration.count() << " microsencods" << endl;}return 0;
}

5. 总结

1. boost::asio::io_context::work的作用?

在实际使用中,我们通常会将一些异步操作提交给io_context进行处理,然后该操作会被异步执行,而不会立即返回结果。如果没有其他任务需要执行,那么io_context就会停止工作,导致所有正在进行的异步操作都被取消。这时,我们需要使用boost::asio::io_context::work对象来防止io_context停止工作。

boost::asio::io_context::work的作用是持有一个指向io_context的引用,并通过创建一个“工作”项来保证io_context不会停止工作,直到work对象被销毁或者调用reset()方法为止。当所有异步操作完成后,程序可以使用work.reset()方法来释放io_context,从而让其正常退出。

在之前的代码中,ioc不会被阻塞是因为我们已经提前给ioc注册了一个读事件(acceptor通过async_accept注册了一个读事件监听对端连接,而acceptor又绑定了此io_context),所以此时的ioc不会退出。

        boost::asio::io_context ioc;boost::asio::signal_set signals(ioc, SIGINT, SIGTERM);// 必须异步等待,否则建立线程进行处理signals.async_wait([&ioc](const boost::system::error_code& error, int signal_number) {if (!error) {std::cout << "Signal " << signal_number << " received." << std::endl;ioc.stop();  // 停止 io_context}});CServer s(ioc, 10086);ioc.run();CServer::CServer(boost::asio::io_context& ioc, short port) : _ioc(ioc),
_acceptor(ioc, tcp::endpoint(tcp::v4(), port)) {cout << "Server start success, on port: " << port << endl;// 开始异步地接受客户端连接请求。服务器启动后就进入等待客户端连接的状态start_accept();
}void CServer::start_accept() {// make_shared分配并构造一个 std::shared_ptr,_ioc, this是传给Session的参数std::shared_ptr<CSession> new_session = std::make_shared<CSession>(_ioc, this);// 开始一个异步接受操作,当new_session的socket与客户端连接成功时,调用回调函数handle_accept_acceptor.async_accept(new_session->Socket(), std::bind(&CServer::handle_accept, this, new_session,std::placeholders::_1));
}

而我们实现的IOServicePool中,在它的构造函数中初始化了n个io_context,且ioc运行在独立的线程中调用ioc.run(),如果不写work,相当于ioc没有绑定任何事件,那么ioc就会退出

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com