前言:reactor作为一种高性能的范式,值得我们学习
本次目标 实现一个基于的reactor 具备echo功能的服务器
核心组件
Reactor本身是靠一个事件驱动的框架,无疑引出一个类似于moduo的"EventLoop "以及boost.asio中的context而言,不断获取就绪事件,其本质无疑是io多用复路实现的。
下面就是核心功能:
void EventLoop::loop(){while(!quit){std::vector<Channel*> chs;chs = ep->poll();for(auto it = chs.begin(); it != chs.end(); ++it){(*it)->handleEvent();}}
}
当事件检测到了,无疑我们要进行分发,这个所以我们也可以借助一个核心组件
Channel来分发
class Channel
{
private:EventLoop *loop;int fd;uint32_t events;uint32_t revents;bool inEpoll;std::function<void()> callback;
public:Channel(EventLoop *_loop, int _fd);~Channel();void enableReading();//设置检测什么事件int getFd();uint32_t getEvents();uint32_t getRevents();bool getInEpoll();void setInEpoll();// void setEvents(uint32_t);void setRevents(uint32_t);void setCallback(std::function<void()>);
};
不难发现,其实这个核心函数为std::function<void()> callback和void setCallback(std::function<void()>);,设置不同事件的回调函数
void Channel::setCallback(std::function<void()> _cb){callback = _cb;
}
有过reactor的基础的人,一般都知道我们将业务分成连接和业务处理
没基础也没关系可以阅读这个Reactor解析 你也可以直接看图
这就引出了Acceptor类 专门处理连接的
- Acceptor类存在于事件驱动EventLoop类中,也就是Reactor模式的main-Reactor
- 类中的socket fd就是服务器监听的socket fd,每一个Acceptor对应一个socket fd
- 这个类也通过一个独有的Channel负责分发到epoll,该Channel的事件处理函数handleEvent()会调用Acceptor中的接受连接函数来新建一个TCP连接
class Acceptor
{
private:EventLoop *loop;Socket *sock;//每一个Acceptor对应一个socket fdChannel *acceptChannel;//这个类也通过一个独有的Channel负责分发到epoll
public:Acceptor(EventLoop *_loop);~Acceptor();void acceptConnection();std::function<void(Socket*)> newConnectionCallback;void setNewConnectionCallback(std::function<void(Socket*)>);
};
这就引出为什么 的核心组件其连接组件Acceptor(EventLoop *_loop);和 void acceptConnection();
Acceptor::Acceptor(EventLoop *_loop) :loop(_loop){sock=new Socket();InetAddress* addr=new InetAddress("127.0.0.1",8080);sock->bind(addr);sock->listen();sock->setnonblocking();acceptChannel=new Channel(_loop,sock->getFd());std::function<void()> cb = std::bind(&Acceptor::acceptConnection, this);//注册回调函数 进行连接acceptChannel->setCallback(cb);acceptChannel->enableReading();//注册事件delete addr;}void Acceptor::acceptConnection(){InetAddress*clnt_addr=new InetAddress();Socket* clnt_sock=new Socket(sock->accept(clnt_addr));printf("new client fd %d! IP: %s Port: %d\n", clnt_sock->getFd(), inet_ntoa(clnt_addr->getAddr().sin_addr), ntohs(clnt_addr->getAddr().sin_port));clnt_sock->setnonblocking();newConnectionCallback(sock);delete clnt_addr;
}
讲一下这个 newConnectionCallback(sock); 这个就是业务逻辑处理。这份代码的不完整之处就在于 他的目的核心是实现一个功能 echo回响 所以直接连接之后给了一个echo的业务逻辑。引出一个connect 即每一个连接都有一个业务处理方法
Connection类,这个类也有以下几个特点:
- 类存在于事件驱动EventLoop类中,也就是Reactor模式的main-Reactor
- 类中的socket fd就是客户端的socket fd,每一个Connection对应一个socket fd
- 每一个类的实例通过一个独有的Channel负责分发到epoll,该Channel的事件处理函数handleEvent()会调用Connection中的事件处理函数来响应客户端请求
class Connection {
private:EventLoop *loop;Socket *sock;Channel *channel;std::function<void(Socket*)> deleteConnectionCallback;//
public:Connection(EventLoop *_loop, Socket *_sock);~Connection();void echo(int sockfd);void setDeleteConnectionCallback(std::function<void(Socket*)>);};
通过这个函数也可以看出一个echo就是一个connection的回调函数。然后我们再来看一下另一个重要的资源释放.设置一个主动释放的connection的回调函数。
void Connection::setDeleteConnectionCallback(std::function<void(Socket*)> _cb){deleteConnectionCallback = _cb;
}
也是注册函数
Reactor-server
class Server
{
private:EventLoop *loop;//ADD 连接逻辑Acceptor *acceptor;std::map<int, Connection*> connections;//一一对应public:Server(EventLoop*);~Server();void handleReadEvent(int);void newConnection(Socket *serv_sock);void deleteConnection(Socket *sock);
};
这个reactor无疑包含了三个组件 事件驱动,连接逻辑和业务逻辑
Server::Server(EventLoop *_loop) : loop(_loop){acceptor = new Acceptor(loop);//完成所有的连接步骤 就没设置回调函数std::function<void(Socket*)> cb = std::bind(&Server::newConnection, this, std::placeholders::_1);//这个是每次动态acceptor->setNewConnectionCallback(cb);//}
void Server::newConnection(Socket *sock){Connection *conn = new Connection(loop, sock);std::function<void(Socket*)> cb = std::bind(&Server::deleteConnection, this, std::placeholders::_1);conn->setDeleteConnectionCallback(cb);connections[sock->getFd()] = conn;}
void Server::deleteConnection(Socket * sock){Connection *conn = connections[sock->getFd()];connections.erase(sock->getFd());delete conn;
}
- 解析
这个reactor-main是不是有三个疑问 - 为什么连接步骤完成了
- 他的业务函数也就是echo在哪
- 为什么用map来装饰Connection
首先我们来回顾一下整体流程,我们要明确一个点 一个socket的绑定IP和端口,不算连接逻辑,也就是说当你初始化的时候就应该弄好了。
真正的连接逻辑应该是设置连接回调函数的时候,例如
Acceptor::Acceptor(EventLoop *_loop) :loop(_loop){sock=new Socket();InetAddress* addr=new InetAddress("127.0.0.1",8080);sock->bind(addr);sock->listen();sock->setnonblocking();acceptChannel=new Channel(_loop,sock->getFd());std::function<void()> cb = std::bind(&Acceptor::acceptConnection, this);//注册回调函数 acceptChannel->setCallback(cb);/acceptChannel->enableReading();//注册事件delete addr;}
acceptChannel->setCallback(cb)这个就连接逻辑,当有客户端来的时候 他就是执行这个逻辑。第一个疑问解决
然后 acceptor->setNewConnectionCallback(cb);这个是不是设置了业务逻辑,因为前面讲过 这个没有线程池而且业务逻辑单一,所有将echo默认每一个连接的业务逻辑 所有直接再连接回调的时候设置了一个业务逻辑(你去看connect的构造函数 就会发现他直接绑定了echo做业务逻辑)第二疑问解决
Connection::Connection(EventLoop *_loop, Socket *_sock):loop(_loop),sock(_sock),channel(nullptr){//属性channel=new Channel(loop,sock->getFd());//std::function<void()>cb=std::bind(&Connection::echo,this,sock->getFd());//业务处理函数;channel->setCallback(cb);channel->enableReading();
}
Connection::~Connection(){delete channel;delete sock;
}
也就是说 你可以从这下手 换一个业务逻辑 实现其他的业务
第三个疑问:
对于断开TCP连接操作,也就是销毁一个Connection类的实例。由于Connection的生命周期由Server进行管理,所以也应该由Server来删除连接。如果在Connection业务中需要断开连接操作,也应该和之前一样使用回调函数来实现.使用map就是好删除管理