在第13节,我们形成了主从Reactor模式,这样可以很好地分担了IO的压力。但是在主从Reactor模式却没有能很好地解决需要比较耗时(消耗CPU时间)的任务。
那这时我们可以再添加个线程池去解决耗时任务,这个线程池和IO线程池是不一样的,这个可以说是计算线程池,这样就形成了主从Reactor+thread pool模式。
即是用多个Reactor来处理IO,又使用线程池来处理计算。这种方案适合既有突发IO(利用IO多线程处理多个连接上的IO),又有突发计算的应用(利用线程池把一个连接上的计算任务分配给其任一线程去做)。
1.主从Reactor+thread pool模式
这一节的改动很简单的啦,来看看Server类中的变化。
class Server { // 省略其他成员和方法 ... private: // 用于指向计算线程池的智能指针 // std::unique_ptr确保计算线程池的生命周期受Server对象管理 // 当Server对象被销毁时,compute_threadpool_也会自动释放内存 std::unique_ptr<ThreadPool> compute_threadpool_; // 其他私有成员变量和方法 ...
};
添加计算线程池变量compute_threadpool_,并在构造函数中进行初始化,但确定计算线程的数量是在start()函数中的。
void Server::start(int IOThreadNum, int compute_thread_num) { // 检查 TcpServer 是否已经启动。如果 started_ 为 0,表示对象没有被启动过。 // 因为使用了自增操作,所以在第一次启动时,started_ 将由 0 增加到 1。 if (started_++ == 0) { // 防止一个 TcpServer 对象被启动多次 // 设置 I/O 线程池的线程数量 loop_threadpool_->setThreadNum(IOThreadNum); // 启动 I/O 线程池,开始处理 I/O 事件 loop_threadpool_->start(); // 启动计算线程池,指定线程数量,准备处理计算密集型任务 compute_threadpool_->start(compute_thread_num); // 添加这句,给定线程数量,开启线程池 // 开始接收客户端连接 acceptor_->listen(); }
}
之后,就可以使用第11节添加线程池中的main()函数测试代码来进行测试了。
那么这个主从Reactor+thread pool模式就完成了。
2.Connection对象的引用计数
之前是在Connection类使用了共享的智能指针std::shared_ptr。那么其引用计数就可能会困惑到大家,就是有些地方是不是需要使用引用& ,一些地方是否是使用值传递的。
1.回调中lambda表达式使用引用&或是值传递
先说说使用std::bind实现回调函数的情况。使用std::bind()就一定会把实参进行拷贝一份。举个例子:
void Server::newConnection(int sockfd, const InetAddr &peerAddr) { // 省略其他代码逻辑... // 创建一个新的连接对象,使用 std::shared_ptr 管理其生命周期 auto conn = std::make_shared<Connection>(...); // 将新的连接对象存储在 connections_ 容器中,以 sockfd 为键 connections_[sockfd] = conn; // 使用 std::bind() 注册连接建立的回调 // ioLoop->runInLoop(std::bind(&Connection::connectEstablished, conn)); // 使用 lambda 表达式的第一种情况: // 此方式捕获 conn 以值传递的方式,引用计数 +1 // 这样确保在回调执行时 conn 依然有效 ioLoop->runInLoop([conn]() { conn->connectEstablished(); }); // 使用 lambda 表达式的第二种情况: // 假如使用下面的方式 (按引用捕获),这将导致潜在的悬空引用问题 // 因为如果 newConnection 函数结束,conn 可能会被销毁, // 这将会在回调中调用一个已经被析构的对象引发未定义行为。 // ioLoop->runInLoop([&conn]() { conn->connectEstablished(); }); // 使用 lambda 表达式的第三种情况: // 捕获 this 指针和 sockfd,以便通过 Server 类的成员方法访问 connections_ // 这样可以避免原始 conn 的生命周期管理问题 // 确保当 connections_[sockfd] 的引用计数还保持有效 ioLoop->runInLoop([this, sockfd]() { connections_[sockfd]->connectEstablished(); });
}
使用std::bind()的会把conn实参进行拷贝一份,这里conn是std::shared_ptr,那其引用计数就会+1。
而使用lambda表达式的第一种情况,这种是值传递[conn],会进行拷贝,那其引用计数也会+1。
而情况2是引用[&conn],不进行拷贝,其引用计数不变。
而在这个例子中,我们是需要进行拷贝的。因为实参conn是在栈上的,其生命周期只在Server::newConnection()内部,要是Server::newConnection()结束了,那么该conn也会进行回收,那其引用计数-1。
接着来看conn->connectEstablished();这句代码。
调用connectEstablished()的是conn,该函数是在EventLoop::doPendingFunctors()中进行。是Server::newConnection()函数结束了,紧接着connectEstablished()才开始。
若是传引用的,conn跟随着newConnection()函数结束就被系统回收了,那调用connectEstablished()的对象conn就没有了,这就会引发问题。
所以需要值传递的,进行拷贝一份,那调用conn->connectEstablished()就不会有问题。
那么这里要是想不进行拷贝,那就可以按照情况3的方式使用,connections_是Server类的成员变量,只要Server类对象还在,connections_就还会存在,这样就不会出现调connectEstablished()函数的对象不存在的问题,这也解决了拷贝的问题。
还有一些只能值传递的,如下面的两个函数。
void Server::removeConnection(const ConnectionPtr& conn) { // 在事件循环中运行回调,确保移除连接的操作在 I/O 线程中执行 // 捕获 this 指针以访问 Server 对象的成员变量和方法,捕获 conn 以值传递 loop_->runInLoop([this, conn]() { removeConnectionInLoop(conn); });
} void Server::removeConnectionInLoop(const ConnectionPtr& conn) { // 从 connections_ 容器中移除与连接关联的条目 // 使用 conn->fd() 获取连接的文件描述符作为索引 connections_.erase(conn->fd()); // 调用连接对象的 getLoop() 方法获取其关联的事件循环 // 然后将 connectDestroyed() 方法的调用排入此事件循环的队列 // 这会处理连接销毁的后续工作,确保在正确的线程上下文中执行 conn->getLoop()->queueInLoop([conn]() { conn->connectDestroyed(); });
}
这需要画个图来解说下为什么需要值传递的,其实要是需要值传递的话,就说明需要延长conn其生命周期,不然那肯定是要传引用的啦。
2.std::shared_ptr的拷贝问题
std::shared_ptr的拷贝开销比拷贝原始指针是要高的,因为需要修改其引用计数(而且拷贝的时候通常要加锁)。
很多函数的参数使用了其ConnetionPtr,即是std::shared_ptr<Connetion>,那会不会有很多参数进行拷贝的问题呢。其实不会的,大多数情况下是const reference方式进行传递的,例如void Server::removeConnection(const ConnectionPtr& conn)。
3.其引用计数变化的情况
前面所说的两个只能值传递的函数都是有关Connection连接关闭的,所以需要弄清楚连接关闭的时候其引用计数的变化。
先简单说明下如何看这个图。同一水平线表示在同一时间,每个框的高度就是该函数的生命期,框4(handleRead函数)就是在框2(hanleEvent函数)生命周期内。
按照顺序从每个框的函数开始看,注意这里的只是连接关闭的情况,也简化了要在Server线程的操作,直接画在同一个线程了。
引用计数的变化:
到框2(handleEvent函数),其定义了个栈变量guard,其提升weak_ptr成为shared_ptr,那么该连接的引用计数就+1。而之前连接建立成功的时候,Server类中的成员变量connections_也保存了这该连接,所以这时引用计数为2。
到框5内部有个在栈上的std::shared_ptr<Connection>临时变量, 那么该连接的引用计数+1,这时引用计数为3。
到了框6(调用closeCallback_,即是removeConnection);接着到了框7,其内部调用runInLoop([this, conn]() { removeConnectionInLoop(conn); })。removeConnectionInLoop(conn)函数会在EventLoop::doPendingFunctors()中执行。
若框7结束,那框5的生命期也结束了,那在框5内部的栈上变量guardThis也就被回收了,那引用计数-1。
而框7内部的runInLoop()的lambda表达式是值传递的,那其引用计数就+1,那可以说框7结束,即是框5函数结束时候,其引用计数不增不减,还是为3。
那么到了框8,那框2(handleEvent函数)也就结束了,那在框2栈上的变量guard就被回收了,引用计数-1,所以在框8开始的时候,其引用计数为2。
在框8中,在connections_.erase(conn->fd())后;其引用计数就-1,其引用计数就为1。
这时剩下的这个唯一的一个引用计数是从哪处得来的呢。
这个引用计数是从框7中runInLoop()的lambda表达式,其是值传递,会拷贝一份,所以在那时增加了引用计数的。
也就是说框8函数结束,那这唯一的引用计数也会-1,就会变成0,那就不会再有框10中的conn去调用connectDestroyed函数。
所以需要在queueInLoop函数的lambda表达式中进行值传递,拷贝一份conn,就可以增加了引用计数,在框9进行了值传递拷贝后,其引用计数+1,这时引用计数为2。
在框9结束,即是框8结束时,框8内的conn也会被回收,其引用计数-1,这时其引用计数为1,还剩下唯一的一份conn去执行connectDestroyed()。
到了框10结束,那唯一的一份也就被回收,其引用计数就真的为0了,该Connection对象就会被析构了。
3.总结
这Connection的引用计数变化和前面提到的两个函数中的lambda表达式为什么使用值传递也就很好理解了。
这一节的代码改动是很小的,很小的变化就可以组合成主从Reactor+thread pool模式,也很好理解。所以可以说这节最重要的是理解其Connection的引用计数的变化,通过画出的这个图和讲解,应该可以深入地理解其引用计数的变化了。