欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 培训 > 高性能服务器模型之Reactor(单线程版本)

高性能服务器模型之Reactor(单线程版本)

2024/11/30 4:10:56 来源:https://blog.csdn.net/weixin_73809064/article/details/143972768  浏览:    关键词:高性能服务器模型之Reactor(单线程版本)

一、概念

Reactor模型是一种事件驱动的设计模式,用于处理并发I/O操作。其核心思想是将I/O事件的监听和实际的I/O操作分离开来,由事件循环(Event Loop)负责监听I/O事件,当事件发生时,将事件分发给相应的事件处理器(Event Handler)进行处理。

单线程版本如下图:

其中各个组件的任务如下:

  1. Reactor:这是事件循环的核心,负责监听和分发事件。它等待I/O事件(如连接请求、数据到达等)的发生,并将这些事件分发给相应的处理器。

  2. Acceptor:Acceptor是Reactor模式中的一个特殊处理器,专门用于处理新的连接请求。当Reactor监听到新的连接请求时,它会将这个事件交给Acceptor处理。Acceptor负责接受新的连接,并将新的连接分配给适当的资源或处理器。

  3. Clients:在图中,客户端(client)代表与服务器建立连接的实体。在Reactor模式中,客户端可以是任何发起连接请求的实体。

  4. Dispatch:Dispatch是Reactor中的一个组件,它负责将事件分发给相应的处理器。在图中,Dispatch连接了Acceptor和一系列的处理步骤。

  5. Read:读取数据的步骤。一旦连接建立,服务器需要从客户端读取数据。

  6. Decode:解码数据的步骤。读取的数据可能需要解码,以转换为服务器可以理解的格式。

  7. Compute:计算或处理数据的步骤。服务器对解码后的数据进行业务逻辑处理。

  8. Encode:编码数据的步骤。处理完的数据可能需要编码,以转换为客户端可以理解的格式。

  9. Send:发送数据的步骤。服务器将编码后的数据发送回客户端。

二、代码实现 

代码模块:

Socket:负责管理sockfd的创建和传参。

InetAddress:负责处理ip地址,端口号等的大小端转换处理等。

Acceptor:由Socket和InetAddress组合而成,ready函数负责完成服务器前期的基本操作,达到监听状态,accept负责获取连接的netfd。

TcpConnect:负责和客户端和读写操作,使用Acceptor的accept获得的文件描述符初始化,进行信息传递。

EevenLoop:负责事件循环中的操作,封装了epoll的创建,增加,监听和删除操作,TCP连接三个事件的处理,接受用户输入的回调函数,并处理但由于EvenLoop本身没有发送消息的能力,所以EvenLoop将回调函数传入TcpConnect中注册,再使用TcpConnect的回调函数。

Server:封装Acceptor和EvenLoop的一些操作,简化使用。

1、Socket

Socket.h

#pragma onceclass Socket
{
public:Socket();~Socket();int getSockfd();
private:int _fd;
};

Socket.cc

#include"Socket.h"
#include<kk.h>
Socket::Socket(){_fd=socket(AF_INET,SOCK_STREAM,0);
}Socket::~Socket(){close(_fd);
}int Socket::getSockfd(){return _fd;
}

 2、InetAddress

InetAddress.h

#pragma once
#include<kk.h>
#include<iostream>
using std::string;class InetAddress
{
public:InetAddress(const string &ip,const string &port);InetAddress(struct sockaddr_in clientAddr);~InetAddress();struct sockaddr_in *getInetAddr();string getIp();string getPort();
private:struct sockaddr_in _serverAddr;
};

 InetAddress.cc

#include"InetAddress.h"
InetAddress::InetAddress(const string & ip,const string & port){_serverAddr.sin_family=AF_INET;_serverAddr.sin_port=htons(atoi(port.c_str()));_serverAddr.sin_addr.s_addr=inet_addr(ip.c_str());
}
InetAddress::InetAddress(struct sockaddr_in clientAddr){}
InetAddress::~InetAddress(){}
struct sockaddr_in *InetAddress::getInetAddr(){return &_serverAddr;
}string getIp();string getPort();

3、Acceptor

Acceptor.h 

#pragma once
#include"InetAddress.h"
#include"Socket.h"class Acceptor
{
public:Acceptor(const string &ip,const string &port);~Acceptor();void bind();void listen();int accept();void setReuse();void ready();int getSockfd();
private:Socket _sock;InetAddress _addr;
};

 Acceptor.cc

#include"Acceptor.h"
Acceptor::Acceptor(const string &ip,const string &port):_sock(),_addr(ip,port){}Acceptor::~Acceptor(){}void Acceptor::bind(){int ret=::bind(_sock.getSockfd(),(struct sockaddr *)_addr.getInetAddr(),sizeof(struct sockaddr_in));if(ret==-1){perror("bind");return;}
}void Acceptor::listen(){int ret=::listen(_sock.getSockfd(),50);if(ret==-1){perror("listen");return;}
}int Acceptor::accept(){int netfd=::accept(_sock.getSockfd(),NULL,NULL);if(netfd==-1){perror("accept");return -1;}return netfd;
}void Acceptor::setReuse(){int reuse=1;setsockopt(_sock.getSockfd(),SOL_SOCKET,SO_REUSEADDR,&reuse,sizeof(reuse));int reuse2=1;setsockopt(_sock.getSockfd(),SOL_SOCKET,SO_REUSEPORT,&reuse2,sizeof(reuse2));
}void Acceptor::ready(){setReuse();bind();listen();
}
int Acceptor::getSockfd(){return _sock.getSockfd();
}

4、TcpConnect

TcpConnect.h

#pragma once
#include<iostream>
#include<functional>
#include<memory>
#include<kk.h>
using std::string;
class TcpConnect;
using ConnectPtr=std::shared_ptr<TcpConnect>;
using TcpConnectCallback=std::function<void(const ConnectPtr &)>;
class TcpConnect
:public std::enable_shared_from_this<TcpConnect>
{
public:TcpConnect(int netfd);~TcpConnect();string recv();int send(const string &msg);void setNewConnectCb(const TcpConnectCallback &cb);void setCloseCb(const TcpConnectCallback &cb);void setMessageCb(const TcpConnectCallback &cb);void handleNewConnectCb();void handleMessageCb();void handCloseCb();bool isClose();
private:int readn(char *buf,int len);int writen(char *buf,int len);int readline(char *buf,int len);TcpConnectCallback _onNewConnectCb;TcpConnectCallback _onCloseCb;TcpConnectCallback _onMessageCb;int _netfd;
};

TcpConnect.cc

#include"TcpConnect.h"TcpConnect::TcpConnect(int netfd):_netfd(netfd)
{}TcpConnect::~TcpConnect(){close(_netfd);
}string TcpConnect::recv(){char buf[65535]={0}; readline(buf,sizeof(buf)); return string(buf); 
}int TcpConnect::send(const string &msg){::send(_netfd,msg.c_str(),msg.length(),MSG_NOSIGNAL);
}int TcpConnect::readn(char *buf,int len){int left=len;char *pstr=buf;int ret=0;while(left>0){ret=read(_netfd,pstr,left);if(ret==-1&&errno==EINTR){continue;}else if(ret==-1){perror("read error -1");return -1;}else if(ret==0){break;}else{pstr+=ret;left-=ret;}}return len-left;
}int TcpConnect::readline(char *buf,int len){int left=len-1;char *pstr=buf;int ret=0,total=0;while(left>0){ret=::recv(_netfd,pstr,left,MSG_PEEK);if(-1==ret&&errno==EINTR){continue;}else if(-1==ret){perror("readLine error -1");return -1;}else if(0==ret){break;}else{for(int i=0;i<ret;i++){if(pstr[i]=='\n'){int sz=i+1;readn(pstr,sz);pstr+=sz;*pstr='\0';return total+sz;}}readn(pstr,ret);total+=ret;pstr+=ret;left-=ret;}}return total;
}
void TcpConnect::setNewConnectCb(const TcpConnectCallback &cb){_onNewConnectCb=std::move(cb);
}
void TcpConnect::setCloseCb(const TcpConnectCallback &cb){_onCloseCb=std::move(cb);
}
void TcpConnect::setMessageCb(const TcpConnectCallback &cb){_onMessageCb=std::move(cb);
}
void TcpConnect::handleNewConnectCb(){if(_onNewConnectCb){_onNewConnectCb(shared_from_this());}else{std::cout<<"_onNewConnectCb==nullptr"<<std::endl;}
}
void TcpConnect::handleMessageCb(){if(_onMessageCb){_onMessageCb(shared_from_this());}else{std::cout<<"_onMessageCb==nullptr"<<std::endl;}
}
void TcpConnect::handCloseCb(){if(_onMessageCb){_onCloseCb(shared_from_this());}else{std::cout<<"_onCloseCb"<<std::endl;}
}bool TcpConnect::isClose(){char buf[10]={0};int ret=::recv(_netfd,buf,sizeof(buf),MSG_PEEK);return (0==ret);
}

 5、EvenLoop

EvenLoop.h

#pragma once
#include<vector>
#include<memory>
#include<map>
#include<utility>
#include<kk.h>
#include<functional>
#include"Acceptor.h"
#include"TcpConnect.h"
using std::vector;
using std::shared_ptr;
using std::map;using ConnectPtr=shared_ptr<TcpConnect>;
using TcpConnectCallback=std::function<void(const ConnectPtr &)>;class EvenLoop
{
public:EvenLoop(Acceptor &acceptor);~EvenLoop();int createEpoll();void addEpoll(int fd);void delEpoll(int fd);void loop();void unloop();void waitEpoll();void handleNewConnect();void handleMessage(int netfd);void setNewConnectCb(const TcpConnectCallback &cb);void setCloseCb(const TcpConnectCallback &cb);void setMessageCb(const TcpConnectCallback &cb);private:int _epfd;vector<struct epoll_event> _readyArr;bool _isLooping;Acceptor &_acceptor;map<int,ConnectPtr> _conns;TcpConnectCallback _onNewConnectCb;TcpConnectCallback _onCloseCb;TcpConnectCallback _onMessageCb;
};

EvenLoop.cc

#include"EvenLoop.h"
#include<iostream>
using std::cerr;
using std::endl;
using std::cout;
EvenLoop::EvenLoop(Acceptor &acceptor):_epfd(createEpoll()),_readyArr(1024),_isLooping(false),_acceptor(acceptor),_conns()
{addEpoll(_acceptor.getSockfd());
}EvenLoop::~EvenLoop(){close(_epfd);
}int EvenLoop::createEpoll(){int ret=epoll_create(1);if(ret<0){perror("epoll_create");return ret;}return ret;
}void EvenLoop::addEpoll(int fd){struct epoll_event event;event.events=EPOLLIN;event.data.fd=fd;int ret=epoll_ctl(_epfd,EPOLL_CTL_ADD,fd,&event);if(ret<0){perror("epoll_ctr_add");return;}
}void EvenLoop::delEpoll(int fd){epoll_ctl(_epfd,EPOLL_CTL_DEL,fd,nullptr);
}void EvenLoop::loop(){_isLooping=true;while(_isLooping==true){waitEpoll();}
}void EvenLoop::unloop(){_isLooping=false;
}void EvenLoop::waitEpoll(){int readyNum=0;do{readyNum=epoll_wait(_epfd,&*_readyArr.begin(),_readyArr.size(),3000);}while(readyNum==-1&&errno==EINTR);if(readyNum==-1){cerr<<"readyNum==-1"<<endl;}else if(readyNum==0){cout<<"----timeout!!!----"<<endl;}else{if(readyNum==(int)_readyArr.size()){_readyArr.reserve(2*readyNum);}for(int i=0;i<readyNum;i++){int fd=_readyArr[i].data.fd;if(fd==_acceptor.getSockfd()){handleNewConnect();}else{handleMessage(fd);}}}
}
void EvenLoop::handleNewConnect(){int netfd=_acceptor.accept();if(netfd<0){perror("handleNewConnect");return ;}addEpoll(netfd);ConnectPtr conn(new TcpConnect(netfd));conn->setNewConnectCb(_onNewConnectCb);conn->setMessageCb(_onMessageCb);conn->setCloseCb(_onCloseCb);_conns.insert(std::make_pair(netfd,conn));conn->handleNewConnectCb();
}void EvenLoop::handleMessage(int netfd){auto it=_conns.find(netfd);//存在该此连接if(it!=_conns.end()){bool flag=it->second->isClose(); if(flag==true){//已断开为trueit->second->handCloseCb();_conns.erase(it);delEpoll(netfd);return ;}else{it->second->handleMessageCb();}}else{std::cout<<"连接不存在"<<std::endl;return;}
}void EvenLoop::setNewConnectCb(const TcpConnectCallback &cb){_onNewConnectCb=cb;
}
void EvenLoop::setCloseCb(const TcpConnectCallback &cb){_onCloseCb=cb;
}
void EvenLoop::setMessageCb(const TcpConnectCallback &cb){_onMessageCb=cb;
}

 6、TcpServer

 TcpServer.h

#pragma once
#include"EvenLoop.h"
class TcpServer
{
public:TcpServer(const string &ip,const string &port);~TcpServer();void start();void stop();void setAllFunction(const TcpConnectCallback &newCallBack,const TcpConnectCallback &msgCallBack,const TcpConnectCallback &closeCallBack);private:Acceptor _acceptor;EvenLoop _loop;
};

 TcpServer.cc

#include"TcpServer.h"TcpServer::TcpServer(const string &ip,const string &port):_acceptor(ip,port),_loop(_acceptor){}
TcpServer::~TcpServer(){}void TcpServer::start(){_acceptor.ready();_loop.loop();
}
void TcpServer::stop(){_loop.unloop();
}
void TcpServer::setAllFunction(const TcpConnectCallback &newCallBack,const TcpConnectCallback &msgCallBack,const TcpConnectCallback &closeCallBack){_loop.setNewConnectCb(newCallBack);_loop.setMessageCb(msgCallBack);_loop.setCloseCb(closeCallBack);
}

三、测试

test.c 

#include <iostream>
#include"Socket.h"
#include"Acceptor.h"
#include"InetAddress.h"
#include"TcpConnect.h"
#include"EvenLoop.h"
#include"TcpServer.h"
#include"ThreadPool.h"using std::cout;
using std::endl;void onNew(const ConnectPtr& con){cout<<"---a client has connected!!---"<<endl;
}
void onMes(const ConnectPtr& con){string ret=con->recv();cout<<"msg>>"<<ret<<endl;con->send(ret);
}
void onClose(const ConnectPtr&con){cout<<"---a client has close---"<<endl;}int main()
{TcpServer server("127.0.0.1","1234");   server.setAllFunction(onNew,onMes,onClose);server.start();return 0;
}

server

 

client

实现了一个执行回声业务的reactor单线程服务器模型。 

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com