欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 资讯 > boost asio 高性能服务器

boost asio 高性能服务器

2025/3/17 20:23:18 来源:https://blog.csdn.net/weixin_50873490/article/details/144648146  浏览:    关键词:boost asio 高性能服务器

       

       介绍

Boost.Asio 是一个跨平台的 C++ 网络编程库,它采用的是 Reactor 模型来实现异步 I/O 操作。

Boost.Asio 的核心是 io_context 对象,它充当了 Reactor 模式中的 Reactor 组件。io_context 负责监听和分发各种 I/O 事件,同时也提供了定时器事件的支持。应用程序需要将关注的 I/O 操作注册到 io_context 上,比如 TCP 连接、UDP 发送/接收等。当这些 I/O 操作就绪时,io_context 会通知应用程序进行相应的事件处理。

Boost.Asio 提供了很多其他的便利功能,如:

  1. 跨平台抽象:Boost.Asio 屏蔽了不同操作系统的 I/O 模型差异,为应用程序提供了统一的编程接口。

  2. 缓冲区管理:Boost.Asio 提供了灵活的缓冲区管理机制,简化了应用程序的内存管理。

  3. 线程安全:Boost.Asio 的设计是线程安全的,可以方便地在多线程环境下使用。

  4. 高性能:Boost.Asio 利用 Reactor 模型的事件驱动机制,可以高效地处理大量的并发 I/O 操作。

 具体来说:

  • io_context:

    • 这是 Boost.Asio 中最核心的组件,相当于 Reactor 模型中的 Reactor 组件。
    • io_context 负责监听和分发 I/O 事件,为应用程序提供异步 I/O 操作的支持。
  • Executor:

    • Executor 是 Boost.Asio 中用于执行异步任务的组件。
    • 它提供了一种抽象的任务执行模型,可以集成不同类型的任务执行器,如线程池、协程等。
  • Socket:

    • Socket 是 Boost.Asio 提供的网络连接抽象,包括 TCP 套接字、UDP 套接字等。
    • 应用程序可以使用 Socket 进行网络通信。
  • Acceptor:

    • Acceptor 用于监听网络连接请求,并创建新的 Socket 实例。
    • 通常用于实现 TCP 服务器。
  • Buffer:

    • Buffer 是 Boost.Asio 用于管理内存缓冲区的组件。
    • 它提供了灵活的缓冲区管理机制,简化了应用程序的内存管理。
  • Timer:

    • Timer 组件提供了定时器功能,可以用于实现定时任务。
  • Signal:

    • Signal 组件用于处理操作系统信号,如 SIGINT、SIGTERM 等。
  • Error Handling:

    • Boost.Asio 提供了一套完整的错误处理机制,包括错误码、异常等。
    • 应用程序可以使用这些工具来处理各种 I/O 操作中可能出现的错误。

 这些核心组件共同构成了 Boost.Asio 的功能框架。应用程序可以基于这些组件编写各种网络应用程序,如 HTTP 服务器、WebSocket 服务器、RPC 框架等。

boost网络框架使用方法

boost绑定

        首先介绍io_context,可以理解为这是操作系统和应用层数据交互的桥梁。有了它不必关注内核态的缓冲区,只需要关注自己定义在用户态的缓冲区,因为它会通过桥梁运输到用户态的缓冲区。

boost::asio::io_context io_context;

endpoint 就是我们的服务器ip+port,boost已经封装好了。

tcp::endpoint(tcp::v4(), port)
tcp::endpoint(tcp::v6(), port)

resolver 就是dns解析,这个步骤一般是由客户端进行,解析www.baidu.com -> ip 8.xxx.1.25

tcp::resolver resolver(io_context);
tcp::resolver::results_type endpoints_ = resolver.resolve(host, port);boost::asio::async_connect(socket_,      endpoints_,        // 连接地址connect_success_cb);

小总结

这小部分主要就是socket绑定的内容。后续我们关注重点放在发送和接收数据。

读写api

同步

首先搞明白为什么同步?为什么阻塞?

因为我们是在应用层调用send,会拷贝到内核态。最后内核态再通过网卡接口发出数据!所有当内核态空间占满,我们就需要阻塞等待,直到缓冲区有空间。这就是阻塞原因。对于同步,通常指的是多线程或多进程之间的协调,以确保在访问共享资源时不会产生冲突。描述的是编程方式,它更符合人类逻辑。

write_some 可以每次向指定的空间写入固定的字节数,如果写缓冲区满了,就只写一部分,返回写入的字节数。
write  可以一次性将所有数据发送给对端,如果发送缓冲区满了则阻塞,直到发送缓冲区可用,将数据发送完成。
send  一次性将buffer中的内容发送给对端,如果有部分字节因为发送缓冲区满无法发送,则阻塞等待,直到发送缓冲区可用,则继续发送完成。泛型编程api,就是不是通过对象调用,是传参调用。

total_bytes_written += sock.write_some(asio::buffer(buf.c_str()+total_bytes_written, buf.length()- total_bytes_written)
);sock.send(asio::buffer(buf.c_str(), buf.length()));asio::write(sock, asio::buffer(buf.c_str(), buf.length()));

 读


read_some  同步读和同步写类似
read_until    一直读取,直到读取指定字符结束
read            一次性同步读取对方发送的数据
receive      一次性同步接收对方发送的数据

total_bytes_read += sock.read_some(asio::buffer(buf + total_bytes_read,MESSAGE_SIZE - total_bytes_read)
);int receive_length =  sock.receive(asio::buffer(buffer_receive, BUFF_SIZE));
int receive_length = asio::read(sock, asio::buffer(buffer_receive, BUFF_SIZE));
asio::read_until(sock, buf, '\n');

总结

为什么要提供这些api?

其实这里涉及到粘包问题:三个解决方案

1.读固定字节   2.读特定符号停止 比如“\r\n”   3.header + body形式 头部放body长度。

write_some 可以每次向指定的空间写入固定的字节数。

read_until    一直读取,直到读取指定字符结束。

read和send recvice/writer   一次阻塞读完,能保证数据不丢。

异步非阻塞

什么是异步非阻塞?一句话就是调用api,直接返回做后面的事情(比如举例是输出std::cout xxx事情)当数据准备好了,内核态调用我们给定cb函数,处理数据。

async_readasync_write:

  • 异步读取数据到指定的缓冲区中。当读取完成时,会调用传入的回调函数
  • 回调函数参数:
    • boost::system::error_code: 表示读取操作的错误状态
    • std::size_t: 表示实际读取的字节数
template <typename AsyncReadStream, typename MutableBufferSequence, typename ReadHandler>
void async_read(
AsyncReadStream& s                      //socket
, const MutableBufferSequence& buffers  //读取数据的缓冲区,可以是单个缓冲区或多个缓冲区序列
, ReadHandler&& handler                  //读取完毕cb
);//同理
void async_write(
AsyncWriteStream& s
, const ConstBufferSequence& buffers    //写入数据的缓冲区,可以是单个缓冲区或多个缓冲区序列
, WriteHandler&& handler                //写入完毕cb处理
);

async_write_some和async_read_some:与 async_write 不同的是, async_write_some 并不保证能够写入全部的数据,而是写入尽可能多的数据。当写入的字节数小于缓冲区的总长度时, async_write_some 会在下次被调用时继续写入剩余的数据。该函数适用于无法一次性写入全部数据的情况,比如当发送的数据量很大时。async_read_some同理

void do_write(boost::asio::ip::tcp::socket& socket, const std::vector<char>& data)
{socket.async_write_some(boost::asio::buffer(data),[](boost::system::error_code ec, std::size_t bytes_transferred) {// 写操作完成后的回调处理});
}void do_read(boost::asio::ip::tcp::socket& socket, std::vector<char>& buffer)
{socket.async_read_some(boost::asio::buffer(buffer),[](boost::system::error_code ec, std::size_t bytes_transferred) {// 读操作完成后的回调处理});
}

关于boost::asio::buffer()

buffer()函数:接受原生指针和长度,创建缓冲区对象,支持的容器类型:可以直接使用 std::vectorstd::array 等作为缓冲区

  • asio::mutable_buffer 和 asio::const_buffer:分别表示可读写和只读的缓冲区,实现read write分离,防止只读被改
  • buffers_iterator类:用于遍历缓冲区内容

缓冲区使用场景:

  • 多个缓冲区组合:可以使用 buffers() 函数将多个缓冲区组合成一个
  • 动态缓冲区:可以使用 dynamic_buffer() 函数创建可增长的动态缓冲区

缓冲区生命周期管理:

  • 缓冲区对象的生命周期应与 I/O 操作保持一致,比如我们举例是session中的成员
  • 可以使用 lambda 捕获局部变量作为缓冲区,也可以使用智能指针管理动态分配的缓冲区

总结

本质来说api基本一致,只是调用回调cb频率不同,原版本读完整段调用cb,some版本每次读一点就会调用。

Client端

#include <boost/asio.hpp>
#include <chrono>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <atomic>using boost::asio::ip::tcp;
std::atomic<uint64_t> total_requests(0);
std::atomic<uint64_t> total_bytes(0);class Client {
public:Client(boost::asio::io_context& io_context, const std::string& host, const std::string& port,size_t msg_size,size_t num_requests): socket_(io_context),message_(msg_size, 'A'),num_requests_(num_requests) {// 解析服务器地址tcp::resolver resolver(io_context);endpoints_ = resolver.resolve(host, port);// 连接服务器do_connect();}private:void do_connect() {// 异步连接服务器boost::asio::async_connect(socket_,                          // 异步连接endpoints_,                      // 连接地址[this](boost::system::error_code ec, const tcp::endpoint&) {if (!ec) {do_write();}});}void do_write() {// 如果请求数达到最大,则停止写入if (requests_sent_ >= num_requests_) return;    boost::asio::async_write(socket_,                        // 异步写入boost::asio::buffer(message_),                      // 写入缓冲区[this](boost::system::error_code ec, std::size_t length) {// 写入数据成功后,增加请求计数和字节计数if (!ec) {total_bytes += length;do_read(); // 读取服务器响应}});}void do_read() {boost::asio::async_read(socket_,                        // 异步读取boost::asio::buffer(read_buffer_),                  // 读取缓冲区boost::asio::transfer_exactly(message_.size()),     // 读取固定大小的数据[this](boost::system::error_code ec, std::size_t /*length*/) {if (!ec) {++requests_sent_;++total_requests;do_write(); // 继续写入数据}});}tcp::socket socket_;std::string message_;std::vector<char> read_buffer_{std::vector<char>(1024)};tcp::resolver::results_type endpoints_;size_t requests_sent_ = 0;size_t num_requests_;
};
// ./client localhost 8080 10 1024 1000 
// 参数说明:
// localhost:服务器地址
// 8080:服务器端口
// 10:并发客户端数量
// 1024:消息大小(字节)
// 1000:每个客户端发送的请求数
int main(int argc, char* argv[]) {try {if (argc != 6) {std::cerr << "Usage: client <host> <port> <num_clients> <msg_size> <requests_per_client>\n";return 1;}std::string host = argv[1];std::string port = argv[2];int num_clients = std::atoi(argv[3]);size_t msg_size = std::atoi(argv[4]);size_t requests_per_client = std::atoi(argv[5]);boost::asio::io_context io_context;std::vector<std::shared_ptr<Client>> clients;// 创建多个客户端for (int i = 0; i < num_clients; ++i) {clients.push_back(std::make_shared<Client>(io_context, host, port, msg_size, requests_per_client));}// 记录开始时间auto start_time = std::chrono::high_resolution_clock::now();// 运行IO上下文std::vector<std::thread> threads;//根据cpu核心数创建线程int thread_pool_size = std::thread::hardware_concurrency();for (int i = 0; i < thread_pool_size; ++i) {threads.emplace_back([&io_context]() {io_context.run();});}// 每秒输出统计信息uint64_t last_requests = 0;while (total_requests < (num_clients * requests_per_client)) {std::this_thread::sleep_for(std::chrono::seconds(1));uint64_t current_requests = total_requests;uint64_t requests_per_second = current_requests - last_requests;double mb_per_second = (total_bytes - (last_requests * msg_size)) / (1024.0 * 1024.0);std::cout << "Requests/sec: " << requests_per_second << ", Throughput: " << mb_per_second << " MB/s\n";last_requests = current_requests;}// 计算总时间和平均性能auto end_time = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);double total_time = duration.count() / 1000.0;double avg_requests_per_second = total_requests / total_time;double total_mb = total_bytes / (1024.0 * 1024.0);double avg_mb_per_second = total_mb / total_time;std::cout << "\nTest completed:\n"<< "Total time: " << total_time << " seconds\n"<< "Average requests/sec: " << avg_requests_per_second << "\n"<< "Average throughput: " << avg_mb_per_second << " MB/s\n";// 等待所有线程完成for (auto& thread : threads) {thread.join();}} catch (std::exception& e) {std::cerr << "Exception: " << e.what() << "\n";}return 0;
}

server端

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iostream>
#include <memory>
#include <thread>
#include <vector>#include <sys/socket.h>using boost::asio::ip::tcp;// 会话类,处理单个客户端连接
class Session : public std::enable_shared_from_this<Session>
{
public:Session(tcp::socket socket) : socket_(std::move(socket)) {}void start(){do_read();// 直接返回 可以做其他事情std::cout << "start" << std::endl;}private:void do_read(){// 给回调函数续生命周期auto self(shared_from_this());socket_.async_read_some(boost::asio::buffer(data_, max_length),[this, self](boost::system::error_code ec, std::size_t length){if (!ec){// 立即回复相同的数据(echo服务器)do_write(length);}});// 直接返回 可以做其他事情std::cout << "do_read" << std::endl;}void do_write(std::size_t length){auto self(shared_from_this());boost::asio::async_write(socket_,boost::asio::buffer(data_, length),[this, self](boost::system::error_code ec, std::size_t /*length*/){if (!ec){do_read();}});// 直接返回 可以做其他事情std::cout << "do_write" << std::endl;}tcp::socket socket_;enum{max_length = 1024};char data_[max_length];
};// 服务器类
class Server
{ServerOptions options;public:Server(boost::asio::io_context &io_context, short port): acceptor_(io_context, tcp::endpoint(tcp::v4(), port)){// 开始接受连接do_accept();}private:void do_accept(){// 接受一个连接acceptor_.async_accept([this](boost::system::error_code ec, tcp::socket socket){if (!ec){std::make_shared<Session>(std::move(socket))->start();}// 继续接受连接do_accept();});// 直接返回 可以做其他事情std::cout << "do_accept" << std::endl;}tcp::acceptor acceptor_;
};
/// @brief  ./server 8080
/// @param argc
/// @param argv
/// @return
int main(int argc, char *argv[])
{try{if (argc != 2){std::cerr << "Usage: server <port>\n";return 1;}boost::asio::io_context io_context;// 创建服务器实例Server s(io_context, std::atoi(argv[1]));// 创建线程池std::vector<std::thread> threads;// 根据cpu核心数创建线程池int thread_pool_size = std::thread::hardware_concurrency();std::cout << "Server starting with " << thread_pool_size << " threads\n";// 创建多个工作线程for (int i = 0; i < thread_pool_size; ++i){threads.emplace_back([&io_context](){ io_context.run(); });}// 等待所有线程完成for (auto &thread : threads){thread.join();}}catch (std::exception &e){std::cerr << "Exception: " << e.what() << "\n";}return 0;
}

 boost提供tcp参数设置

有些参数仅限于linux平台

class ServerOptions
{
public:// 端口重用(仅在 Linux 下支持)/*问题:当服务器程序退出后,短时间内无法重新绑定到相同的端口。解决:通过设置端口重用选项,可以在服务器程序退出后立即重新绑定到相同端口,提高服务器的可用性。*/bool reuse_address = true; // 地址重用bool reuse_port = true;    // 端口重用// 设置 TCP_NODELAY(禁用 Nagle 算法)/*问题:Nagle 算法会缓存小数据包,导致延迟增大,不适用于实时性要求高的应用。解决:禁用 Nagle 算法可以立即发送数据,减少延迟,适用于实时通信场景。*/bool no_delay = true; // 禁用 Nagle 算法/*问题:立即关闭连接时,可能会有未发送完的数据,导致连接中断。解决:启用 linger 选项可以延迟关闭连接,确保数据完整发送。*/bool linger = true;      // 延迟关闭int linger_timeout = 30; // 延迟关闭超时时间// 设置阻塞/非阻塞模式/*问题:阻塞模式下,accept() 会阻塞等待连接,影响并发处理能力。解决:启用非阻塞模式可以提高并发处理能力,适用于高并发场景。*/bool non_blocking = true; // 非阻塞模式int backlog_size = 100;   // 监听队列大小// 设置监听队列大小(backlog) 需要设置ip重用/*问题:监听队列大小不足,导致连接请求被丢弃。解决:增加监听队列大小可以提高连接处理能力,避免连接请求被丢弃。 有效抵御ddos攻击在linux下需要更改系统配置才可使用# 通过 sysctl 命令临时修改sudo sysctl -w net.ipv4.tcp_max_syn_backlog=2048# 或者修改配置文件 /etc/sysctl.conf 永久生效net.ipv4.tcp_max_syn_backlog=2048*/int tcp_acceptor_queue_size = 100;// 设置紧急数据处理(Out-Of-Band Data)/*问题:某些应用需要处理紧急数据。解决:通过设置 SO_OOBINLINE 选项来内联处理紧急数据。*/int oobinline = 0;// 设置发送超时/*问题:发送操作可能会永久阻塞。解决:设置发送超时时间,避免永久阻塞。*/long send_timeout_sec = 10; // 10slong send_timeout_mil = 0;  // 0ms// 设置 keep-alive/*问题:客户端异常断开时,服务器无法及时检测到,从而占用不必要的资源。解决:启用 keep-alive 选项可以定期检测客户端连接状态,及时释放无效连接。*/bool keep_alive = true;       // 保持连接int keep_alive_idle = 60;     // 空闲时间(秒)int keep_alive_interval = 10; // 探测间隔(秒)int keep_alive_count = 3;     // 探测次数/*问题:默认的缓冲区大小可能无法满足应用程序的需求。解决:根据实际情况调整缓冲区大小,可以提高 I/O 性能。*/int send_buffer_size = 8192;    // 发送缓冲区大小int receive_buffer_size = 8192; // 接收缓冲区大小void tcp_opts(tcp::acceptor &acceptor_){// 设置地址重用
acceptor_.set_option(boost::asio::socket_base::reuse_address(reuse_address));
// 设置端口重用(仅 Linux 平台支持)
#ifdef __linux__int option = reuse_port ? 1 : 0;setsockopt(acceptor_.native_handle(), SOL_SOCKET, SO_REUSEPORT, &option, sizeof(option));
#endif// 设置紧急数据处理(Out-Of-Band Data)setsockopt(acceptor_.native_handle(), SOL_SOCKET, SO_OOBINLINE,&oobinline, sizeof(oobinline));struct timeval send_timeout;send_timeout.tv_sec = send_timeout_sec;send_timeout.tv_usec = send_timeout_mil;// 设置发送超时setsockopt(acceptor_.native_handle(), SOL_SOCKET, SO_SNDTIMEO,&send_timeout, sizeof(send_timeout));// 设置 TCP_NODELAY(禁用 Nagle 算法)acceptor_.set_option(boost::asio::ip::tcp::no_delay(no_delay));// 设置 keep-aliveacceptor_.set_option(boost::asio::socket_base::keep_alive(keep_alive));
#ifdef __linux__/*TCP Keepalive 详细配置:TCP_KEEPIDLE:空闲多久后开始探测TCP_KEEPINTVL:探测的间隔时间TCP_KEEPCNT:探测的最大次数*/setsockopt(acceptor_.native_handle(), IPPROTO_TCP, TCP_KEEPIDLE,&keep_alive_idle, sizeof(keep_alive_idle));setsockopt(acceptor_.native_handle(), IPPROTO_TCP, TCP_KEEPINTVL,&keep_alive_interval, sizeof(keep_alive_interval));setsockopt(acceptor_.native_handle(), IPPROTO_TCP, TCP_KEEPCNT,&keep_alive_count, sizeof(keep_alive_count));
#endif// 设置非阻塞模式acceptor_.non_blocking(non_blocking);// 设置延迟关闭acceptor_.set_option(boost::asio::socket_base::linger(linger, linger_timeout));// 设置发送缓冲区大小(字节)acceptor_.set_option(boost::asio::socket_base::send_buffer_size(send_buffer_size));// 设置接收缓冲区大小(字节)acceptor_.set_option(boost::asio::socket_base::receive_buffer_size(receive_buffer_size));// 这里的 100 是半连接队列的最大长度,可以根据需要调整acceptor_.listen(tcp_acceptor_queue_size);}
};

使用后

cmake

# 指定所需的最低 CMake 版本
cmake_minimum_required(VERSION 2.8)# 设置项目名称
project(BoostCommunication)
# 设置C++标准
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
#debug模式
# 添加调试信息
set(CMAKE_BUILD_TYPE Debug)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g")# 查找Boost库
find_package(Boost REQUIRED COMPONENTS system)# 设置源文件目录
set(SERVER_DIR ${CMAKE_SOURCE_DIR}/server)# server源文件
set(SERVER_SOURCES${SERVER_DIR}/main.cpp${SERVER_DIR}/Session.cpp${SERVER_DIR}/TcpServer.cpp
)
#server头文件
set(SERVER_HEADERS${SERVER_DIR}/Session.h${SERVER_DIR}/TcpServer.h
)# server include目录的设置
include_directories(${SERVER_DIR})# 创建服务器可执行文件
add_executable(server ${SERVER_SOURCES} ${SERVER_HEADERS})
# 将服务器目标与 Boost::system 和 pthread 库链接
target_link_libraries(server Boost::system pthread)set(CLIENT_DIR ${CMAKE_SOURCE_DIR}/client)
set(CLIENT_SOURCES ${CLIENT_DIR}/client.cpp)# 添加客户端可执行文件
add_executable(client ${CLIENT_SOURCES})
# 将客户端目标与 Boost::system 和 pthread 库链接
target_link_libraries(client Boost::system pthread)# 添加自定义的 clean-all 目标
add_custom_target(clean-allCOMMAND ${CMAKE_COMMAND} -E remove_directory ${CMAKE_BINARY_DIR}/CMakeFilesCOMMAND ${CMAKE_COMMAND} -E remove ${CMAKE_BINARY_DIR}/CMakeCache.txtCOMMAND ${CMAKE_COMMAND} -E remove ${CMAKE_BINARY_DIR}/MakefileCOMMAND ${CMAKE_COMMAND} -E remove ${CMAKE_BINARY_DIR}/cmake_install.cmakeCOMMAND ${CMAKE_COMMAND} -E remove_directory ${CMAKE_BINARY_DIR}/clientCOMMAND ${CMAKE_COMMAND} -E remove_directory ${CMAKE_BINARY_DIR}/server# COMMENT "Cleaning build directory..."
)
# 启用core dump 
if(UNIX)# 设置core dump大小为无限制add_custom_target(enable-core-dump# 允许生成core文件COMMAND ulimit -c unlimited# 设置core文件的存储位置和命名格式COMMAND echo "core-%e-%p-%t" | sudo tee /proc/sys/kernel/core_pattern# 设置core文件的存储位置和命名格式COMMAND sysctl -w kernel.core_pattern=/tmp/core-%e-%p-%t# 确保目录有写入权限COMMAND sudo chmod a+w /tmpCOMMENT "Enabling core dump... "COMMENT "gdb ./server /tmp/core-server-*")
endif()#输出
message(STATUS "client: ./client localhost 8080 10 1024 1000  ")
message(STATUS "server: ./server 8080")

 总结

        学习boost asio基本使用,重点关注异步读写数据和socket参数设置。

代码分享

jbj62/cpp新特新和boost

学习资料分享

0voice · GitHub

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词