欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 国际 > moduo之主从reactor

moduo之主从reactor

2024/12/22 14:10:20 来源:https://blog.csdn.net/wuli2496/article/details/141832323  浏览:    关键词:moduo之主从reactor

简介

TcpServer实现了主从Reactor,主Reactor处理listen事件,从Reactor处理建立连接后的读写

结构

1
n
1
n
TcpServer
EventLoopThreadPool
EventLoop
EventLoopThread

EventLoop列表中包含一个BaseLoop,n个从EventLoop

启动

TcpServer 启动时,会先启动线程池

void TcpServer::start()
{if (started_.getAndSet(1) == 0){threadPool_->start(threadInitCallback_);assert(!acceptor_->listening());loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_)));}
}

线程池启动时,会启动n个EventLoopThread,其会开启EventLoop

void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{assert(!started_);baseLoop_->assertInLoopThread();started_ = true;for (int i = 0; i < numThreads_; ++i){char buf[name_.size() + 32];snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);EventLoopThread* t = new EventLoopThread(cb, buf);threads_.push_back(std::unique_ptr<EventLoopThread>(t));loops_.push_back(t->startLoop());}if (numThreads_ == 0 && cb){cb(baseLoop_);}
}

EventLoopThread启动loop时会先启动线程,等线程启动完后,将EventLoop记录到线程池的EventLoop组中

void EventLoopThread::threadFunc()
{EventLoop loop;if (callback_){callback_(&loop);}{MutexLockGuard lock(mutex_);loop_ = &loop;cond_.notify();}loop.loop();//assert(exiting_);MutexLockGuard lock(mutex_);loop_ = NULL;
}

Acceptor的读回调为Acceptor::handleRead,当有连接请求到来时会触发

void Acceptor::handleRead()
{loop_->assertInLoopThread();InetAddress peerAddr;//FIXME loop until no moreint connfd = acceptSocket_.accept(&peerAddr);if (connfd >= 0){// string hostport = peerAddr.toIpPort();// LOG_TRACE << "Accepts of " << hostport;if (newConnectionCallback_){newConnectionCallback_(connfd, peerAddr);}else{sockets::close(connfd);}}else{LOG_SYSERR << "in Acceptor::handleRead";// Read the section named "The special problem of// accept()ing when you can't" in libev's doc.// By Marc Lehmann, author of libev.if (errno == EMFILE){::close(idleFd_);idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);::close(idleFd_);idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);}}
}

当有连接到来时,会触发连接的回调newConnectionCallback_
TcpServer初始化时有设置

acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, _1, _2));void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{loop_->assertInLoopThread();EventLoop* ioLoop = threadPool_->getNextLoop();char buf[64];snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);++nextConnId_;string connName = name_ + buf;LOG_INFO << "TcpServer::newConnection [" << name_<< "] - new connection [" << connName<< "] from " << peerAddr.toIpPort();InetAddress localAddr(sockets::getLocalAddr(sockfd));// FIXME poll with zero timeout to double confirm the new connection// FIXME use make_shared if necessaryTcpConnectionPtr conn(new TcpConnection(ioLoop,connName,sockfd,localAddr,peerAddr));connections_[connName] = conn;conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafeioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

其使用轮询方式将新连接套接字添加到其中的一个线程中

EventLoop* EventLoopThreadPool::getNextLoop()
{baseLoop_->assertInLoopThread();assert(started_);EventLoop* loop = baseLoop_;if (!loops_.empty()){// round-robinloop = loops_[next_];++next_;if (implicit_cast<size_t>(next_) >= loops_.size()){next_ = 0;}}return loop;
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com