目录
I/O多路转接之select
关于select
select接口参数解释
timeval结构体和fd_set类型
select服务器
select优缺点
I/O多路转接之poll
关于poll
poll接口参数解释
pollfd结构体
poll服务器
poll优缺点
I/O多路转接之epoll
关于epoll
epoll的系列系统调用
epoll模型
红黑树原理
队列原理
回调函数原理
epoll服务器
epoll优点
epoll工作方式
I/O多路转接之select
前面我们说过,IO = 等 + 拷贝,我们现在要处理的就是等,所以select的作用就是负责“等”,它用的是我们前面五种IO模型中的第四个模型,也就是select可以一次同时等待多个fd
关于select
select:只负责等,一次可以等待多个fd
select接口参数解释
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
参数说明:
- nfds:需要监视的文件描述符中,最大的文件描述符值+1。
- readfds:输入输出型参数,调用时用户告知内核需要监视哪些文件描述符的读事件是否就绪,返回时内核告知用户哪些文件描述符的读事件已经就绪。
- writefds:输入输出型参数,调用时用户告知内核需要监视哪些文件描述符的写事件是否就绪,返回时内核告知用户哪些文件描述符的写事件已经就绪。
- exceptfds:输入输出型参数,调用时用户告知内核需要监视哪些文件描述符的异常事件是否就绪,返回时内核告知用户哪些文件描述符的异常事件已经就绪。
- timeout:输入输出型参数,调用时由用户设置select的等待时间,返回时表示timeout的剩余时间。
参数timeout的取值:
- NULL/nullptr:select调用后进行阻塞等待,直到被监视的某个文件描述符上的某个事件就绪。
- 0:selec调用后t进行非阻塞等待,无论被监视的文件描述符上的事件是否就绪,select检测后都会立即返回。
- 特定的时间值:select调用后在指定的时间内进行阻塞等待,如果被监视的文件描述符上一直没有事件就绪,则在该时间后select进行超时返回。
返回值说明:
- 如果函数调用成功,则返回有事件就绪的文件描述符个数。
- 如果timeout时间耗尽,则返回0。
- 如果函数调用失败,则返回-1,同时错误码会被设置。
select调用失败时,错误码可能被设置为:
- EBADF:文件描述符为无效的或该文件已关闭。
- EINTR:此调用被信号所中断。
- EINVAL:参数nfds为负值。
- ENOMEM:核心内存不足。
timeval结构体和fd_set类型
我们先来看下timeval结构体类型:
在select接口介绍中,timeval作为第四个参数的结构体指针被传参,这个timeval也是一个结构体,我们可以在man 2 gettimeofday获取时间的接口介绍看到:
- 这个就是系统提供的时间结构体,time_t就是无符号整数,以秒为单位,代表时间戳;第二个代表微秒
- select最后一个参数表示给select设置等待方式,假设struct timeval timeout = {5, 0}; 表示让select每隔5秒去timeout(超时返回)一次,如果在5秒内没有任何一个文件描述符就绪,就返回,重新开始;如果有文件描述符就绪了,就立即返回。如果设为 {0 ,0},那么就是立马返回,非阻塞的一种;也可以设为NULL,也就是阻塞等待。比如说我们一次等待了10个文件描述符,我等了100秒,在这100秒以内没有任何一个文件描述符就绪,我的selset就会卡在这,直到任何一个文件描述符就绪
- 如果这个函数设置了,就是一个输入输出型参数,比如我设置5秒timeout一次,如果过了2秒,就有文件描述符就绪了,就会返回。timeout就会变成[3, 0]返回,表示还剩 3 秒就超时了
下面我们再来看下fd_set类型:
fd_set结构与sigset_t结构类似,fd_set本质也是一个位图,用位图中对应的位来表示要监视的文件描述符
我们目前关心的fd事件:读,写,异常
- 如果我想关心读事件是否就绪,就把文件描述符设置进readfds
- 如果我想关心写事件,就设置进writefds,异常事件同理
- 如果想即关心读又关心写,就设置两个;如果我想先关心读再关心写,我们可以把描述符先设置进readfds,等读完了,再添加进writefds
- 在后面我们只学习readfds读事件,只要懂了这一个,就能举一反三,其它的就都会了
为什么是位图呢?
已readfds为例
- 在输入时,表示用户告诉内核,我给你的一个或者多个fd,你要帮我关心fd上面的读事件,如果读事件就绪了,你要告诉我!
- 在输出时(返回时),内核告诉用户,你让我关心的多个fd中有哪些已经就绪了,用户赶紧来读
- 假设这个位图是8位的0000 0000,我想添加0 1 2 3这四个fd,就需要把位图改为0000 1111,所以比特位的位置表示文件描述符编号,比特位的内容,0和1就是表示内核是否需要关心
- 假设当2号fd就绪了,所以操作系统在返回的时候,把没有就绪的比特位清零,返回的就是0000 0010,这样用户就知道2号fd就绪了,可以去读了
- 所以返回时,比特位的位置还是文件描述符编号,但是比特位的内容,0和1就是表示哪些用户关心的fd上面的读事件已经就绪了
所以fd_set是一张位图,本质是为了让用户与内核相互传递fd是否就绪的信息的,所以这也注定了,使用select的时候,一定会有大量的位图操作,所以系统也给我们提供了位图操作的接口:
set本身并不真的关心是读还是写,只用来在底层当前是否有数据/读写入的条件是否满足,满足就修改对应我文件描述符位图就行
select服务器
void Start(){int listensock = _listensock.Fd();for(;;){//能不能直接accept呢?不能直接accept//accept的本质就是在检测并获取listensock上面有没有链接事件(新连接到来相当于读事件就绪)//你自己accept阻塞上去了,说好的一次要等待多个文件描述符呢?怎么用select//说好的IO分两步 一步是等,一步是拷贝。accept大部分时间都在等,一次accept只能等一个文件描述符,你为什么不交给selcet呢?fd_set rfds;FD_ZERO(&rfds);//1.清空文件描述符FD_SET(listensock,&rfds); //2.把指定的套接字添加到集合里 这样添加有没有把listen套接字添加到内核当中呢? -并没有,rfds是一个栈上的变量,把rfds传递给set之后才是进入了内核中struct timeval timeout = {0,0};//可以实现非阻塞轮回,但是这个方法不被接受 太消耗CPU了//struct timeval timeout = {1,0};//输入输出,可能要进行周期的重复设置int n = select(listensock+1,&rfds,nullptr,nullptr,/*&timeout*/nullptr); //如果把timeout设置nullptr代表select会一直阻塞,直到有事件就绪//那么问题又来了,select要等多个文件描述符,现在服务器刚开始只有一个文件描述符呀,怎么等多个?//随着新连接的到来,新连接的文件描述符在不断的增多,文件描述符会有个增多的过程switch(n){case 0:cout << "time out,timeout:" << timeout.tv_sec << "." <<timeout.tv_usec <<endl;break;case -1:cerr << "select error" << endl;break;default:break;}}}
如果把timeout设置nullptr代表select会一直阻塞,直到有事件就绪
新事件就绪
default:
//有事件就绪了cout << "get a new link!!!" << endl;break;
- 如果事件就绪,上层不处理,select会一直通知你,你的listen套接字来连接了,快去处理。
- 如果select告诉你就绪了,接下来的一次读取,我们读取fd的时候,不会被阻塞
处理事件
HandlerEvent(rfds);void HandlerEvent(fd_set &rfds){if(FD_ISSET(_listensock.Fd().&rfd)) //判断套接字是否在rfds中,如果在就说明连接就绪了{//我们的连接事件就绪了std::string clientip;uint16_t clientport = 0;int sock = _listensock.Accept(&clientip,&clientport); //会不会阻塞在这里? 不会 因为上层已经告诉我事件已经就绪了if(sock < 0) return;lg(INfo,"accept success, %s %d",clientip.c_str(),clientport);//后面//ssize_t n = read(sock,buffer,1024); //可以直接读取数据吗?不可以 以前为什么可以呢?以前我们用的是多进程多线程 线程池 你把对应的文件描述符托管给执行流的//比如说我连接上了就不输入内容呢?,那read就会阻塞。IO事件有没有就绪我们要完完全全交给select处理}}
当我们获取到了新连接之后,能用read直接读取吗?
不可以 以前为什么可以呢?以前我们用的是多进程多线程 线程池 你把对应的文件描述符托管给执行流的。比如说我连接上了就不输入内容呢?,那read就会阻塞。IO事件有没有就绪我们要完完全全交给select处理
我们要把获取的新连接交给select 也就意味着select的读文件描述符集里文件描述符会越来越多,select的一个系统调用就能同时等待多个文件描述符。好比五种IO里的赵六,一次性抱的鱼竿越多,有鱼就绪的概率就高
select一次能等待多少个文件描述符呢?
这里我们进行测试下
std::cout << "fd_set bits num:" << sizeof(fd_set) * 8 << std::endl;
位图的比特位的个数最多也是1024个
获得的新的文件描述符sock不能直接读取,要交给select。但是sock和select在不同的函数里,怎么把sock传递给select呢?
设置辅助数组
int fd_array[fd_num_max]
-
输入参数: 在
select
调用时,fd_set
(如rfds
)作为输入参数传递给select
。你将想要监视的文件描述符放入集合中。例如,使用FD_SET(fd, &rfds)
将文件描述符fd
添加到rfds
集合中,表示你希望select
检查这个文件描述符是否可以读(即是否有数据可读)。 -
输出参数:
select
调用后,rfds
中会包含那些就绪的文件描述符。也就是说,如果某个文件描述符在select
返回时准备好进行某些操作(如可读),则select
会在rfds
中设置对应的位。 -
问题: 由于
rfds
是输入输出型参数,select
会修改它的内容。因此,原来传入的rfds
集合会被修改,变成一个新的集合,表示那些已就绪的文件描述符。如果你想保留原来的rfds
内容(即监视的文件描述符),但又想检查哪些文件描述符就绪,你需要保存rfds
的副本。
和最大的文件描述符
所以就需要设置辅助数组进行保存
int listensock = _listensock.Fd();fd_array[0] = listensock;for (;;){fd_set rfds;FD_ZERO(&rfds);fd_array[0] = listensock;int maxfd = fd_array[0];for (int i = 0; i < fd_num_max; i++) //第一次循环{if (fd_array[i] == defaultfd)continue;FD_SET(fd_array[i], &rfds); if(maxfd<fd_array[i]){maxfd = fd_array[i];lg(Info, "max fd update,max fd is: %d", maxfd);}}void HandlerEvent(fd_set &rfds){if (FD_ISSET(_listensock.Fd(), &rfds)) // 判断套接字是否在rfds中,如果在就说明连接就绪了{// 我们的连接事件就绪了std::string clientip;uint16_t clientport = 0;int sock = _listensock.Accept(&clientip, &clientport); // 会不会阻塞在这里? 不会 因为上层已经告诉我事件已经就绪了if (sock < 0)return;lg(Info, "accept success, %s %d", clientip.c_str(), clientport);int pos = 1;for (; pos < fd_num_max; pos++){if (fd_array[pos] != defaultfd)continue;elsebreak;}if (pos == fd_num_max){lg(Warning, "server is full, close %d now!", sock);close(sock);}else{fd_array[pos] = sock;PrintFd();// TODO}}
之后连接越来越多,有的连接就绪了,有的读就绪了,怎么区分是那种事件呢?
我们不知道,只能再次for循环
所有的合法文件描述符最开始在数组里,select返回之后,所有已经就绪的文件描述符就放在了rfds。
if(FD_ISSET(fd,&rfds))//判断在集合里
{if(fd == _listensock.Fd()) //判断是不是listen套接字...else //说明不是listen套接字,其他文件描述符就绪{char buffer[1024];ssize_t n = read(fd,buffer,sizeof(buffer)-1);}if (n > 0){buffer[n] = 0;cout << "get a messge: " << buffer << endl;}else if (n == 0) //在读的时候,对方把文件描述符关了{lg(Info, "client quit, me too, close fd is : %d", fd);close(fd);fd_array[pos] = defaultfd; // 这里本质是从select中移除}else{lg(Warning, "recv error: fd is : %d", fd);close(fd);fd_array[pos] = defaultfd; // 这里本质是从select中移除}
}
完整代码
Socket类
#pragma once#include <iostream>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <cstring>
#include <cstdlib>class Socket{
public://创建套接字static int SocketCreate(){int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock < 0){std::cerr << "socket error" << std::endl;exit(2);}//设置端口复用int opt = 1;setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));return sock;}//绑定static void SocketBind(int sock, int port){struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;socklen_t len = sizeof(local);if (bind(sock, (struct sockaddr*)&local, len) < 0){std::cerr << "bind error" << std::endl;exit(3);}}//监听static void SocketListen(int sock, int backlog){if (listen(sock, backlog) < 0){std::cerr << "listen error" << std::endl;exit(4);}}
};
Main.cc
#include "SelectServer.hpp"
#include<memory>void Usage(std::string proc)
{std::cout << "\n\rUsage: " << proc << "port[1024+]\n"<< std::endl;
}int main(int argc, char *argv[])
{if (argc != 2){Usage(argv[0]);exit(1);}//单进程演示同时接收多个连接的消息uint16_t port = std::stoi(argv[1]);std::unique_ptr<SelectServer> svr(new SelectServer(port));svr->Init();svr->Start(); return 0;
}
SelectServer.hpp
#pragma once
#include<iostream>
#include<sys/select.h>
#include<sys/time.h>
#include "Socket.hpp"
#include "Log.hpp"static const uint16_t defaultport = 8081;
static const int fd_max_num = (sizeof(fd_set)); //表示一个位图能存多少个比特位,也就是一个fd_set能存多少个文件描述符
int defaultfd = -1;class SelectServer
{
public:SelectServer(uint16_t port = defaultport): _port(port){for(int i = 0; i < fd_max_num; i++) //初始化辅助数组{fd_array[i] = defaultfd;}}bool Init(){_listensock.Socket();_listensock.Bind(_port);_listensock.Listen();}void Accepter(){//走到这里,就是监听套接字就绪了,底层有新连接上来了std::string clientip;uint16_t clientport = 0;int sock = _listensock.Accept(&clientip, &clientport); // 不会阻塞在这里,因为走到着一步就是因为底层已经有连接了,已经可以直接拿上来if (sock < 0) return;log(Info, "accept success, %s: %d, sock fd: %d", clientip.c_str(), clientport, sock);// 在获取到新连接后也不能直接读,因为只是把连接读上来了,但是对方可能没有发数据,所以当对方没立即发数据的时候,文件里就是空的,这时候读取会出问题哦int pos = 1;for(; pos < fd_max_num; pos++){if (fd_array[pos] != defaultfd)continue; // 说明这个位置是已经被占用的位置elsebreak;}// 走到这里有两种结果if (pos == fd_max_num) // 1,说明辅助数组已经被合法文件描述符占满了{log(Warning, "server is full, close %d now ", sock);close(sock); // 满了直接关了}else // 2,说明当前pos的位置可以被使用{fd_array[pos] = sock; // 把新获取的连接搞到数组里去PrintFd();}}void Recver(int fd, int pos){//走到这里代表套接字的读事件就绪了,客户端给我发数据过来了char buffer[1024];ssize_t n = read(fd, buffer, sizeof(buffer) - 1); // 有bug,因为目前不能保证,对方通过网络发过来的报文可能不是完整的if (n > 0) //读成功{buffer[n] = 0;std::cout << "get a messge: " << buffer << std::endl;}else if (n == 0) //对方把连接关了,那么我服务器也要把套解析关了{log(Info, "client quit, me too, close fd is : %d", fd);close(fd);fd_array[pos] = defaultfd; // 这里本质是从select中移除}else //读出错{log(Warning, "recv error: fd is : %d", fd);close(fd);fd_array[pos] = defaultfd; // 这里本质是从select中移除,因为start第一个循环会根据这个数组重新设置fd_set}}void Dispatcher(fd_set &rfds){for (int i = 0; i < fd_max_num; i++){int fd = fd_array[i];if (fd == defaultfd) continue; //判断文件描述符是否就绪//所有合法的文件描述符放在辅助数组里,select一返回,所有就绪的文件描述符就会在rfds里//判断我们数组里的合法的文件描述符是否也在rfds里,如果在,代表该文件描述符已经就绪:1,读事件就绪 2,连接事件就绪if (FD_ISSET(fd, &rfds)){//读事件就绪后也有两种情况if (fd == _listensock.Fd()) //1,如果等于listen套接字,就是连接事件就绪,就获取新连接,就把这个新的文件描述符继续添加进数组里{Accepter(); // 连接管理器}else //2,如果不是linsten,那么就是别的文件描述符就绪了,也就是读事件就绪了{Recver(fd, i);}}}}bool Start(){int fd = _listensock.Fd();//struct timeval timeout = {1, 0};fd_array[0] = fd;while(true){fd_set rfds; //读文件描述符位图FD_ZERO(&rfds); //是一个宏,负责把位图清空//每次调用select前都对rfds进行重新设定int maxfd = fd_array[0];for(int i = 0; i < fd_max_num; i++) {if(fd_array[i] == defaultfd) continue;FD_SET(fd_array[i], &rfds); //把指定的文件描述符添加到rfds中,并且此时没有设置进内核里if(maxfd < fd_array[i]);{maxfd = fd_array[i];log(Info, "max fd update, max fd is: %d", maxfd);}}//开始监听后,不能"直接"accept,因为accept就是在检测并获取listensock上面的事件,是阻塞,那么就不能一次等待多个文件描述符了,那么select就没作用了//是什么事件呢?就是新连接到来的事件,就是操作系统底层把三次握手完成,把新连接放到了全连接队列里,然后通过select从底层把文件拿上来//新连接到来,就相当于读事件就绪struct timeval timeout = {0, 0}; //输入输出,可能要进行周期性的重复设置//设为0的话,就是非阻塞了,如果不设置timeout,就是永久性阻塞,直到有事件就绪int n = select(maxfd + 1, &rfds, nullptr, nullptr, /*&timeout*/ nullptr);//如果select告诉你就绪了,那么接下来的一次读取fd的时候,不会被阻塞,就是不会再“等”了,会直接读switch(n){case 0:std::cout << "timeout: " << timeout.tv_sec << "." <<timeout.tv_usec << std::endl;break;case -1:std::cerr << "select error" << std::endl;break;default://有事件就绪了,但是如果上层不处理,select就会一直通知你std::cout << "get a new link " << std::endl;Dispatcher(rfds); //处理事件break;}}}void PrintFd(){std::cout << "online fd list: ";for (int i = 0; i < fd_max_num; i++){if (fd_array[i] == defaultfd)continue;std::cout << fd_array[i] << " ";}std::cout << std::endl;}~SelectServer(){_listensock.Close();}private:Sock _listensock;uint16_t _port;int fd_array[fd_max_num]{defaultfd}; //辅助数组,用来保存监听套接字和已经和客户端建立好连接的套接字
};
select优缺点
select的优点
- 可以同时等待多个文件描述符,并且只负责等待,实际的拷贝动作由read,write等接口完成,并且最大的好处就是select之后再调用read这些接口不会再被阻塞
- select同时等待多个文件描述符,可以将“等”的时间重叠,提高IO效率
select缺点
- 等待的fd是有上限的,这个和操作系统有关系,不同的操作系统对位图的实现不同
- select的输入输出型参数比较多,所有可能会有比较频繁的用户层与内核层的数据拷贝
- 输入输出型参数较多,每次都要对关心的fd进行重置,就会有很多次的循环遍历,容易出bug或者其它问题
- 用户层,使用第三方数组管理用户的fd,用户层需要很多次遍历,内核中检测fd事件就绪也要遍历,效率比较低
所以select有好有坏,简单来说就是“还不够完美”,所以针对select的缺点,推出了poll
I/O多路转接之poll
关于poll
poll也是操作系统提供的一个多路转接接口,poll可以让我们的程序同时监视多个文件描述符上的事件是否就绪,和select的定位是一样的,poll是为了解决select的几个缺点
poll接口参数解释
参数说明:
- fds:表示一个poll函数监视的结构列表,是一个结构体,有三个元素,分别为文件描述符,监视的事件集合,就绪的事件集合
- nfds:表示fds数组的长度,也就是第一个参数的长度
- timeout:这个和select的那个是一样的,这里不再赘述
- 返回值和错误码也是和select一样的
pollfd结构体
下面是结构体的三个字段
- fd:就是一个特定的文件描述符,如果设为负值则忽略events字段并revents返回0
- events:表示需要监视该文件描述符上的哪些事件
- revents:内核通过revents告诉用户该文件描述符哪些事件已经就绪
事件宏 | 描述 | 是否可作为输入 | 是否可作为输出 |
---|---|---|---|
POLLIN | 数据可读(包括普通和优先数据) | 是 | 是 |
POLLRDNORM | 普通数据可读 | 是 | 是 |
POLLRDBAND | 优先级数据可读(Linux不支持) | 是 | 是 |
POLLPRI | 高优先级数据可读,比如TCP紧急指针 | 是 | 是 |
POLLOUT | 数据可写(包括普通和优先数据) | 是 | 是 |
POLLWRNORM | 普通数据可写 | 是 | 是 |
POLLWRBAND | 优先级数据可写 | 是 | 是 |
POLLRDHUP | 对方三次挥手,或者对方变比了写操作,由GNU引入 | 是 | 是 |
POLLERR | 错误 | 否 | 是 |
POLLHUP | 挂起,比如管道写端关闭后,读端描述符会收到POLLHUP事件 | 否 | 是 |
POLLNVAL | 文件描述符没有打开 | 否 | 否 |
- 这个结构体的最大特点是,将输入与输出进行了分离,用户通过这个结构体告诉内核“你要帮我关系fd文件描述符上的event事件”,然后内核就设置一下revent,告诉用户“fd文件描述符上的event已经就绪了”
- 如果我要同时关心多个文件描述符,就只要构建结构体数组,把数组指针传过去就OK了,然后只需要遍历这个数组即可
- select有三个类似参数,把事件类型做了区分,那poll呢?结构体是short类型,Linux很喜欢用比特位来进行传参,所以就把事件设置成位图
poll服务器
#pragma once
#include <iostream>
#include <poll.h>
#include <sys/time.h>
#include "Socket.hpp"
#include "Log.hpp"// 默认监听端口
static const uint16_t defaultport = 8081;
// 能够同时监控的最大文件描述符数
static const int fd_max_num = 64;
// 用来标识无效的文件描述符
int defaultfd = -1;
// 用来初始化 events 和 revents,使其默认为无事件
int long non_event = 0;class PollServer
{
public:// 构造函数,指定监听端口(默认为8081)PollServer(uint16_t port = defaultport): _port(port){// 初始化 pollfd 数组,将所有 fd 设置为 defaultfd(-1),表示该位置暂时无效for(int i = 0; i < fd_max_num; i++){event_fds[i].fd = defaultfd;event_fds[i].events = non_event;event_fds[i].revents = non_event;}}// 初始化服务器:创建套接字、绑定端口、开始监听bool Init(){_listensock.Socket();_listensock.Bind(_port);_listensock.Listen();return true; // 此处也可以根据绑定、监听结果返回成功/失败}// 处理新连接:从监听套接字上接受客户端连接void Accepter(){// 能走到这里说明监听套接字上有新连接(POLLIN 就绪)std::string clientip;uint16_t clientport = 0;// 不会阻塞,因为内核已就绪int sock = _listensock.Accept(&clientip, &clientport);if (sock < 0) return;log(Info, "accept success, %s: %d, sock fd: %d", clientip.c_str(), clientport, sock);// 找到一个空闲的数组下标,将新连接的 sock 放进去int pos = 1; // 从1开始,0号位留给监听套接字for(; pos < fd_max_num; pos++){if (event_fds[pos].fd != defaultfd)continue; // 该位置已被占用,继续寻找elsebreak; // 找到空位}// 判断是否数组已经被占满if (pos == fd_max_num) {// 已经满了,则无法接受更多连接,直接关闭新连接log(Warning, "server is full, close %d now ", sock);close(sock);}else {// 将新连接加入到 pollfd 数组中,并关注其读事件event_fds[pos].fd = sock;event_fds[pos].events = POLLIN; event_fds[pos].revents = non_event;PrintFd(); // 打印当前在线文件描述符列表}}// 读取客户端发来的数据void Recver(int fd, int pos){char buffer[1024];// 读取数据,这里如果报文分片,需要结合协议进行处理ssize_t n = read(fd, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;std::cout << "get a message: " << buffer << std::endl;}else if (n == 0) {// 对方关闭连接,服务器也关闭对应 fdlog(Info, "client quit, me too, close fd: %d", fd);close(fd);event_fds[pos].fd = defaultfd; }else {// 出现错误,也要关闭对应 fdlog(Warning, "recv error: fd: %d", fd);close(fd);event_fds[pos].fd = defaultfd;}}// 根据 poll 返回的就绪事件,分发处理void Dispatcher(){for (int i = 0; i < fd_max_num; i++){int fd = event_fds[i].fd;if (fd == defaultfd) continue; // 无效 fd,跳过// 检查该 fd 的 revents 中是否包含 POLLIN(读事件就绪)if (event_fds[i].revents & POLLIN){// 如果是监听套接字,就处理新连接,否则处理读事件if (fd == _listensock.Fd()){Accepter();}else{Recver(fd, i);}}}}// 主循环,使用 poll 等待事件并进行分发bool Start(){// 将监听套接字加入到数组第0个位置,并关注其读事件event_fds[0].fd = _listensock.Fd();event_fds[0].events = POLLIN;int timeout = 3000; // 超时时间 (毫秒)while(true){// 通过 poll 监控所有 fdint n = poll(event_fds, fd_max_num, timeout);switch(n){case 0:std::cout << "time out..." << std::endl;break;case -1:std::cerr << "poll error" << std::endl;break;default:// 有事件就绪时,调用 Dispatcher 进行分发std::cout << "get a new event" << std::endl;Dispatcher();break;}}return true;}// 打印当前 pollfd 数组中在线的 fdvoid PrintFd(){std::cout << "online fd list: ";for (int i = 0; i < fd_max_num; i++){if (event_fds[i].fd == defaultfd)continue;std::cout << event_fds[i].fd << " ";}std::cout << std::endl;}// 析构函数,关闭监听套接字~PollServer(){_listensock.Close();}private:Sock _listensock; // 封装的监听套接字uint16_t _port; // 监听端口struct pollfd event_fds[fd_max_num]; // pollfd 数组
};
poll优缺点
poll优点
struct pollfd结构体可以将select的输入输出型参数分离,减少编程成本,使接口使用更简单
poll可监控的文件描述符没有限制,因为数组大小是用户定的,相当于没有限制
poll的优点,就是解决了select的缺点
poll缺点
和select一样,poll也需要遍历fds数组来获取就绪的文件描述符
每次调用poll,都伴随着大量的struct pollfd在用户态和内核态之间的转换,并且当poll监视的文件描述符很多时,代价会很大
总结一下,就是poll需要“遍历”,当监视的套接字很多时,效率会下降,所以poll的缺点不再是功能上的了,而是效率上的了
I/O多路转接之epoll
关于epoll
epoll也是系统提供的一个多路转接接口:
- epoll比poll多了一个e,这个e是extend也就是加强的意思,是为了解决处理大量句柄而做了改进的poll
- epoll在2.5.44内核中被加入,几乎具备了select和poll的所有优点,所以epoll是被公认为的Linux 2.6 下性能最好的I/O就绪通知方法
- 但是高性能也是有代价的,epoll和poll虽然只差了一个字母,但是二者的实现方式大不相同,epoll的内部实现方式比较复杂
epoll的系列系统调用
epoll_create
- 该接口作用是创建epoll模型,其中参数size已被废弃,随便设置成大于0的数即可
- 创建模型成功后,打开epoll_file,返回它的文件描述符,所以可以初见端倪:epoll模型有文件描述符
- epfd:就是epoll_create返回值,表示epoll模型的文件描述符
- events: 是一个epoll_event结构体数组,内核会将已经就绪的事件拷贝到events数组中
- maxevents:表示events数组中的元素个数,该值不能大于创建epoll模型时传入的size大小
- timeout:和select和poll一样,单位是毫秒(ms)
epoll_events结构体
struct_epoll结构体有两个成员变量,events表示需要关心的事件,第二个成员data是一个联合体结构,一般使用结构中的fd,用来保存需要监视的文件描述符
events的常用取值如下:
事件宏 | 描述 |
---|---|
EPOLLIN | 文件描述符可读 |
EPOLLOUT | 文件描述符可写 |
EPOLLPRI | 文件描述符有紧急数据可读 |
EPOLLERR | 文件描述符发生错误 |
EPOLLHUP | 对方把文件描述符关闭(close )了 |
EPOLLET | 将 epoll 的工作模式改为 ET(边缘触发,Edge Triggered) |
EPOLLONESHOT | 只触发一次事件。监听完后如果还要继续监听,需要重新将该文件描述符加入 epoll 模型中。 |
epoll_ctl
该接口作用是对epoll模型进行相关的管理工作,参数:
- epfd:指定的epoll模型
- op:表示具体的动作,有三个选项
- fd:表示需要监视的文件描述符
- event:表示需要监视该文件描述符上的哪些事件
op的三个选项如下:
命令 | 描述 |
---|---|
EPOLL_CTL_ADD | 表示添加新的文件描述符到 epoll 模型中 |
EPOLL_CTL_MOD | 修改已经注册的文件描述符的监听事件 |
EPOLL_CTL_DEL | 表示从 epoll 模型中删除指定的文件描述符 |
epoll模型
- select和poll,都是用数组来管理文件描述符和事件的
- 网卡上面是网卡驱动,再上面就是OS,而OS不相信任何用户,所以就有了系统接口层system call,在上面就是用户了
- select和poll只有一个接口,在操作系统上就是只有一个进程,遍历去文件描述符表,去遍历多个数组,当一个文件描述符都没有就绪的时候,进程就会被挂起等待,放到等待队列当中,在这之后,遍历啥的操作就是操作系统做的了,那么操作系统就被绑在这里了,不管有没有就绪,操作系统都得自己去查
网卡是外设,那么操作系统在硬件层面,怎么知道网卡上有数据了呢?
- 所有的外设最大的特点,就是会以硬件中断的方式告诉我,就是外设一旦有事件就绪了,就给CPU发一个异步信号,称为硬件中断
- 每个设备都有自己的中断请求(IRQ),CPU根据IRQ将请求通过操作系统分发给相应的硬件驱动程序。硬件驱动通常是内核中的子程序,而不是独立的进程;例如,当网卡接收到数据包时,它会发出中断信号,CPU暂停当前任务,保存当前状态,并执行中断服务程序来处理这个事件。
- 硬件中断可以直接中断CPU的当前活动,并触发内核中相关代码的执行。对于需要时间处理的任务,中断代码本身也可能被其他硬件中断所中断。例如,时钟中断会导致内核调度代码挂起当前正在运行的代码,以便其他任务可以执行。
操作系统为了支持epoll,提供了三种机制。
红黑树机制
操作系统在自身内部会维护一颗红黑树,这颗红黑树有对应的节点
struct rb_node {int fd; //表示内核要关心的fduint32_t event; //表示以位图形式呈现的要等待就绪的事件//还有其它的,比如链接事件
}
在 epoll 中,红黑树主要用于存储和管理被监视的文件描述符。我们来看看它如何在实际操作中工作。
红黑树原理
1. 文件描述符作为树的节点
当通过 epoll_ctl
注册一个文件描述符时,epoll
会将该文件描述符作为节点插入到红黑树中。每个节点包含以下信息:
- 文件描述符:表示某个打开的文件或套接字。
- 事件类型:如
EPOLLIN
(可读)、EPOLLOUT
(可写)等,表示需要监视的事件。 - 其他元数据:例如事件回调函数和其他标志信息。
2. 事件类型作为排序标准
红黑树根据文件描述符的值来排序,并且文件描述符所监视的事件(如 EPOLLIN
、EPOLLOUT
等)会作为节点的额外信息。这确保了树结构能够高效地管理文件描述符和它们的事件类型。
3. 事件触发与红黑树的操作
当一个文件描述符的事件发生时(例如某个套接字的可读事件 EPOLLIN
),epoll
会遍历红黑树来检测哪些文件描述符的事件已经就绪。通过红黑树的查找操作,epoll
可以快速找到这些已就绪的文件描述符,并将它们加入到一个就绪队列中,等待应用程序通过 epoll_wait
来处理。
4. 高效的操作:O(log N) 时间复杂度
红黑树在 epoll
中的优势在于它能够保证在插入、删除和查找文件描述符时始终维持 O(log N) 的时间复杂度。这是因为红黑树的平衡特性,使得每次查找、插入或删除操作都不超过树的高度,而树的高度是对数级别的(log N)。
就绪队列机制
操作系统会为我们维护一个就绪队列ready_queue,一般是双链表实现的,一旦红黑树有一个节点里面的事件就绪了,操作系统就生成一个队列节点:
struct list_node{int fd; //表示已经就绪的fduint32_t event; //已经就绪的事件
}
在epoll中,对于每一个事件都有一个对应的epitem结构体,红黑树和就绪队列当中的节点分别是基于epitem结构体中的rbn成员和rbllink成员,ffd记录的就是指定的文件描述符,event成员记录的就是对应的事件:
struct epitem{ struct rb_node rbn;//红黑树节点 struct list_head rdllink;//双向链表节点 struct epoll_filefd ffd; //事件句柄信息 struct eventpoll *ep; //指向其所属的eventpoll对象 struct epoll_event event; //期待发生的事件类型
}
队列原理
在epoll 中,队列的作用是存储 已就绪的文件描述符。当我们通过 epoll_ctl
注册一个文件描述符并监视某些事件(如 EPOLLIN
、EPOLLOUT
等)时,这些文件描述符会被添加到 红黑树 中进行管理。但是,事件就绪的文件描述符并不会立即处理,而是先放入一个 就绪队列,等待应用程序通过调用 epoll_wait 来获取和处理。
队列的工作流程
1. 事件触发与文件描述符就绪
首先,epoll
会监视一组文件描述符,并监听它们的特定事件(如可读 EPOLLIN
,可写 EPOLLOUT
等)。
- 假设你通过
epoll_ctl
注册了多个文件描述符,它们可能会发生不同的事件。 - 事件触发:当某个文件描述符(例如套接字)上的事件发生时,内核会检测到这些事件。比如,当有数据可读时,内核会触发一个
EPOLLIN
事件。
2. 将就绪的文件描述符添加到队列
- 当内核检测到某个文件描述符的事件已经就绪(例如
EPOLLIN
)时,epoll
并不会立即处理这个事件,而是将触发的文件描述符添加到一个 就绪队列 中。 - 这个就绪队列是
epoll
内部的数据结构,用来存储所有已就绪的文件描述符。
示例:
- 假设有 3 个文件描述符 A、B、C,它们都在监视
EPOLLIN
事件。当文件描述符 A 和 C 的数据到达时,这两个文件描述符的事件就会被标记为就绪,并被添加到就绪队列中。 - 文件描述符 B 仍然没有数据到达,它不会被添加到队列中。
3. 应用程序调用 epoll_wait
获取就绪事件
- 当应用程序调用
epoll_wait
时,epoll
会从 就绪队列 中取出已就绪的文件描述符,并将它们返回给应用程序。 - 应用程序收到就绪文件描述符之后,依次处理这些文件描述符的事件(比如读取数据或写入数据)。
示例:
- 当你调用
epoll_wait
时,epoll
会检查就绪队列,返回文件描述符 A 和 C(它们的EPOLLIN
事件已经就绪)。应用程序可以分别对 A 和 C 执行读取操作。 - 文件描述符 B 不在队列中,因此应用程序无法处理它。
4. 处理就绪事件
应用程序获得了就绪的文件描述符后,可以针对每个文件描述符进行相应的操作。例如:
- 读取操作:如果文件描述符是一个套接字,并且其
EPOLLIN
事件已就绪,应用程序会执行read()
操作从该套接字中读取数据。 - 写入操作:如果文件描述符是一个套接字,并且其
EPOLLOUT
事件已就绪,应用程序会执行write()
操作。
5. 队列清空与重用
- 在应用程序完成了事件的处理后,已就绪的文件描述符将从队列中移除,
epoll
将继续等待新的事件触发。 - 如果应用程序没有处理所有就绪事件,队列中的这些事件将在下一次调用
epoll_wait
时继续返回,直到应用程序处理完所有事件。
epoll模型当中的就绪队列本质就是告诉内核,哪些文件描述符上的哪些事件已经就绪了,调用epoll_wait函数实际就是在从就绪队列当中获取已经就绪的事件。
回调函数机制
网卡允许操作系统注册一些回调机制,操作系统在底层就会提供一个回调函数callback,比如网卡通过驱动把数据搬到了数据链路层,而一旦数据链路层有数据了,就会自动调用callback:
回调函数原理
在 epoll
中,回调机制是一个非常关键的概念,它能有效地提高 I/O 事件的处理效率。回调机制的核心思想是,当事件发生时,系统会自动调用预设的回调函数,避免了不必要的轮询和等待。具体来说,回调机制的工作原理如下:
1. 回调函数的注册
首先,应用程序通过 epoll_ctl
函数注册需要监视的文件描述符及其事件类型(如 EPOLLIN
、EPOLLOUT
等)。在此过程中,应用程序可以为每个文件描述符指定一个与事件相关联的 回调函数。
- 文件描述符注册:文件描述符可能是一个网络套接字、管道或其他 I/O 资源。
epoll_ctl
负责将文件描述符和事件注册到内核的epoll
实例中。 - 回调函数注册:回调函数定义了当特定事件(如数据可读、数据可写等)发生时该如何处理。每个文件描述符对应一个回调函数,用来处理事件。
2. 事件触发
当注册的文件描述符上发生事件时(例如套接字的数据可以读取,或者套接字可以写入数据),内核会自动检测到这些事件,并标记相关的文件描述符为“就绪”。这些就绪的文件描述符会被添加到内核的 就绪队列 中。
- 事件检测:例如,当一个套接字有数据可读时,内核会检测到这个事件,并将该文件描述符的
EPOLLIN
事件设置为已触发。 - 事件通知:内核会把这些已就绪的文件描述符放入就绪队列中,并通知应用程序。
3. 回调函数的调用
当事件发生并且文件描述符就绪时,epoll
会根据注册时关联的回调函数,自动调用相应的回调函数来处理这些就绪的文件描述符。
- 自动触发回调:一旦
epoll
检测到文件描述符的事件已经就绪(比如EPOLLIN
),它会自动调用应用程序为该文件描述符注册的回调函数。 - 回调函数执行:回调函数中定义了如何处理具体事件的逻辑,例如读取数据、处理业务逻辑或更新状态等。
4. 回调函数的具体执行
回调函数在被触发时会根据事件类型执行特定的操作:
EPOLLIN
事件(可读事件):如果文件描述符是一个套接字,并且EPOLLIN
事件已就绪,回调函数会执行读取操作,如使用read()
或recv()
来读取套接字中的数据。EPOLLOUT
事件(可写事件):如果文件描述符是一个套接字,并且EPOLLOUT
事件已就绪,回调函数会执行写入操作,如使用write()
或send()
向套接字写入数据。EPOLLERR
或EPOLLHUP
事件(错误或挂起):如果发生错误或连接断开,回调函数可以处理这些特殊的错误事件。
5. 应用程序处理完事件后,回到等待状态
当回调函数执行完毕并处理完相关事件后,应用程序会继续处于等待状态。此时,如果有新的 I/O 事件发生,epoll
会再次触发相应的回调函数。
- 继续等待事件:应用程序在处理完当前事件后,可以继续调用
epoll_wait
等待新的事件发生,并将文件描述符继续加入到监视列表中。
6. 回调机制的优势
- 减少轮询:回调机制消除了轮询的需求,程序不需要每次都去主动检查文件描述符的状态。只有当事件实际发生时,回调函数才会被触发,减少了 CPU 的消耗。
- 提高响应速度:由于回调函数是事件驱动的,事件发生后立即触发相应的处理函数,从而提高了程序的响应速度和并发能力。
- 节省资源:通过回调机制,
epoll
能有效减少不必要的 I/O 检查,从而减少了 CPU 和内存的使用,优化了系统资源的分配。
说明一下
- 红黑树是一种二叉搜索树,因此必须有键值key,而这里的文件描述符就天然的可以作为红黑树的key值。
- 调用epoll_ctl向红黑树当中新增节点时,如果设置了EPOLLONESHOT选项,当监听完这次事件后,如果还需要继续监听该文件描述符则需要重新将其添加到epoll模型中,本质就是当设置了EPOLLONESHOT选项的事件就绪时,操作系统会自动将其从红黑树当中删除。
- 而如果调用epoll_ctl向红黑树当中新增节点时没有设置EPOLLONESHOT,那么该节点插入红黑树后就会一直存在,除非用户调用epoll_ctl将该节点从红黑树当中删除。
总结一下,epoll的使用过程就是三部曲:
- 调用epoll_create创建一个epoll模型。建立空红黑树,空队列,回调机制建立好
- 调用epoll_ctl,将要监控的文件描述符进行注册。修改红黑树
- 调用epoll_wait,等待文件描述符就绪。获取就绪队列
epoll服务器
Epoller.hpp
#pragma once // 确保这个头文件只会被包含一次#include <cerrno> // 错误码处理
#include <sys/epoll.h> // 提供epoll相关的API
#include <cstring> // 字符串处理
#include "Log.hpp" // 日志库,用于记录日志信息// nocopy 类用于禁止拷贝构造和赋值操作
class nocopy
{
public:nocopy() {} // 默认构造函数nocopy(const nocopy &) = delete; // 删除拷贝构造函数const nocopy& operator=(const nocopy &) = delete; // 删除赋值运算符
};// 设置epoll相关的常量
static const int size = 128; // 定义epoll模型的最大事件数// Epoller类,封装了epoll的创建、更新、等待等操作
class Epoller : public nocopy
{
public:// 构造函数Epoller(){// 创建epoll实例,传入最大事件数size_epfd = epoll_create(size); // 创建epoll模型if(_epfd == -1) // 如果创建失败{// 打印错误日志,输出错误信息log(Error, "epoll_create error: %s", strerror(errno));}else{// 创建成功,打印日志log(Info, "epollcreate success: %d", _epfd);}}// 等待事件发生int EpollerWait(struct epoll_event revents[], int num){// 阻塞等待epoll事件发生,最多返回num个就绪事件int n = epoll_wait(_epfd, revents, num, -1); // 这里的-1表示无限期等待return n; // 返回就绪事件的数量}// 更新epoll的事件int EpollUpdate(int oper, int sock, uint32_t event){int n = 0; // 返回值if(oper == EPOLL_CTL_DEL) // 如果是删除操作{// 执行删除操作n = epoll_ctl(_epfd, oper, sock, nullptr);if (n != 0) // 如果删除失败{log(Error, "epoll_ctl delete error!"); // 打印错误日志}}else // 如果是添加或修改操作{struct epoll_event ev; // 创建一个epoll_event结构体ev.events = event; // 设置感兴趣的事件类型(如EPOLLIN、EPOLLOUT等)ev.data.fd = sock; // 设置文件描述符,供后续处理事件时识别哪个文件描述符的事件就绪// 注册事件到epoll模型中n = epoll_ctl(_epfd, oper, sock, &ev);if (n != 0) // 如果添加失败{log(Error, "epoll_ctl add error!"); // 打印错误日志}}return n; // 返回epoll_ctl操作的结果}// 析构函数~Epoller(){if (_epfd >= 0) close(_epfd); // 关闭epoll文件描述符}private:int _epfd; // epoll实例的文件描述符,用于标识epoll模型int _timeout{3000}; // 默认超时时间3000毫秒(3秒)
};
EpollServer.hpp
#pragma once
#include <iostream>
#include <sys/epoll.h>
#include <memory>
#include "Log.hpp"
#include "Socket.hpp"
#include "Epoller.hpp"static const int num = 64;class EpollServer : public nocopy
{
public:EpollServer(uint16_t port): _port(port), _listensock(new Sock()), _epoller(new Epoller()){}void Init(){_listensock->Socket();_listensock->Bind(_port);_listensock->Listen();log(Info, "sock create success: %d\n", _listensock->Fd());}void Accepter(){std::string clientip;uint16_t clientport;int sock = _listensock->Accept(&clientip, &clientport); //不会再阻塞了if(sock > 0){//不能直接读取,我们再添加到红黑树里,统一处理_epoller->EpollUpdate(EPOLL_CTL_ADD, sock, EPOLLIN);log(Info, "get a new link, client info@ %s:%d", clientip.c_str(), clientport);}}void Recver(int fd){// 简单模拟事件处理char buffer[1024];// 从套接字里面读信息ssize_t n = read(fd, buffer, sizeof(buffer) - 1); // bug?if (n > 0){buffer[n] = 0;std::cout << "get a messge: " << buffer << std::endl;//再把数据返回给客户端std::string echo_str = "server echo & ";echo_str += buffer;write(fd, echo_str.c_str(), echo_str.size());}else if (n == 0){log(Info, "client quit, close fd is : %d", fd);//链接一旦断开,直接把红黑树里对应的节点去掉,节省资源//移除时,要想保证该文件描述符是合法的,所以要想移除再close关闭_epoller->EpollUpdate(EPOLL_CTL_DEL, fd, 0);close(fd);}else{log(Warning, "recv error: fd is : %d", fd);_epoller->EpollUpdate(EPOLL_CTL_DEL, fd, 0);close(fd);}}void Dispatcher(struct epoll_event revs[], int num){for(int i = 0;i < num; i++) //根据num,就可以一次性遍历获取并处理所有已经就绪的事件{uint32_t events = revs[i].events; //表示已经就绪的事件int fd = revs[i].data.fd; //获取已经就绪的文件描述符//现在就知道了,哪个文件描述符的哪个事件就绪了if (events & EPOLLIN) //读事件就绪了{if(fd == _listensock->Fd()) //说明来了一个新链接,需要accept获取上来{Accepter(); //事件派发} else //说明其它fd上面的读事件就绪{Recver(fd); //事件处理} }else if(events & EPOLLOUT) //写事件就绪了{//可以在这里扩充写的处理}else //其它事件就绪了{}}}void Start(){//将listensock添加到epoll中,这句话本质是:将listensock和它关心的事件,都给添加到内核epoll模型中的红黑树中_epoller->EpollUpdate(EPOLL_CTL_ADD, _listensock->Fd(), EPOLLIN); //我想把“读事件”,“添加”进去struct epoll_event revs[num];while(true){int n = _epoller->EpollerWait(revs, num);if(n > 0) //有事件就绪了{log(Debug, "event happened, fd is : %d", revs[0].data.fd);Dispatcher(revs, n); //处理事件逻辑 }else if(n = 0) //超时了{log(Info, "time out ...");}else //错误了{log(Info, "epoll wait error");}}}~EpollServer(){}
private:std::shared_ptr<Sock> _listensock;std::shared_ptr<Epoller> _epoller;uint16_t _port; //端口号
};
Main.cc
#include "EpollServer.hpp"void Usage(std::string proc)
{std::cout << "\n\rUsage: " << proc << "port[1024+]\n"<< std::endl;
}int main(int argc, char* argv[])
{if(argc != 2){Usage(argv[0]);return 1;}auto port = std::stoi(argv[1]);std::unique_ptr<EpollServer> svr(new EpollServer(port));svr->Init();svr->Start();return 0;
}
可以看到,我们是单进程,但是却可以同时接收多个客户端发来的请求,这就是多路转接的好处,相比多进程和多线程,多路转接所占用的资源更少
epoll优点
- 接口使用方便:虽然拆分成了三个函数,但是反而使用起来更方便高效不需要每次循环都设置关注的文件描述符,也做到了输入输出参数分离开
- 检测就绪时间复杂度为O(1),因为检测步骤只有一个,就是看就绪队列是否为空,一个if就可以解决;获取就绪就是O(n),就是把队列里的节点一个一个拷贝到用户层,一个while解决
- fd和event没有上限,因为所有文件描述符和要关心的事件都是红黑树管理的,这颗红黑树有多大和epoll没关系
- 数据拷贝轻量:只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中,这个操作并不频繁(而select/poll都是每次循环都要进行拷贝)
- 事件回调机制:避免使用遍历,而是使用回调函数的方式,将就绪的文件描述符结构加入到就绪队列中,epoll_wait返回直接访问就绪队列就知道哪些文件描述符就绪。这个操作时间复杂度O(1),即使文件描述 符数目很多,效率也不会受到影响
- 没有数量限制:文件描述符数目无上限
- 有select,poll的所有优势,也解决了它们的缺点,epoll_wait获取就绪队列数组后,返回值n,表示有几个fd就绪了,而且会把就绪队列的节点一个一个弹出,这也意味着,返回给用户的就绪事件是连续的,意味着上层用户处理所有已经就绪的事件,不用再一个个检测fd是否非法的,或者是否就绪,只要拿着n就可以按指定次数处理了,省去了用户层很多的多余动作
epoll工作方式
epoll有两种工作方式,分别是水平触发工作模式和边缘触发工作模式。
水平触发(LT,Level Triggered)
只要底层有数据,epoll就会一直通知用户,就像数字电路的高电平触发一样,只要一直处于高电平,则会一直触发
epoll默认状态下就是LT模式
- 所以当epoll检测到事件就绪时,可以不立即进行处理,因为只要底层数据没处理完,下一次epoll还会通知用户
- select和poll就是LT模式的
- 支持阻塞读写和非阻塞读写
边缘触发(ET,Edge Triggered)
只有底层就绪事件数量发生变化时,epoll才会通知用户,就像数字电路中的上升沿触发一样,只有当电平由低到高那一瞬间才会触发
如果要将epoll改为ET工作模式,则需要在添加事件时设置EPOLLET选项。
- 由于在ET工作模式下,只有底层就绪事件无到有或由有到多发生变化的时候才会通知用户,因此当epoll检测到底层读事件就绪时,必须立即进行处理,而且必须全部处理完毕,因为有可能此后底层再也没有事件就绪,那么epoll就再也不会通知用户进行事件处理,此时没有处理完的数据就相当于丢失了。
- ET工作模式下epoll通知用户的次数一般比LT少,因此ET的性能一般比LT性能更高,Nginx就是默认采用ET模式使用epoll的。
- 只支持非阻塞的读写。
面试题:为什么ET的效率更高?
- 因为相同时间内,ET通知的数量比LT多
- ET也会让我们IO的效率也变高,因为我每次通知上层有数据来了之后,倒逼程序员,每次通知都必须把本轮数据全部取走(当实际读上来的数据比期望要的字节数小,就是全部取走了) --> 循环读取,直到读取出错 --> 因为fd默认是阻塞的(当read读取,没数据的时候,就会阻塞) --> ET下的所有fd必须是非阻塞的,因为如果是阻塞的,那么循环读取到最后没有数据时依然会阻塞,导致服务器挂起了 (也是为什么ED模式下fd是非阻塞的)
- 当把数据全取走之后,Tcp会向对方通告一个更大的窗口,从而从概率上让对方一次发送更多数据,能提升网络传输效率
ET的效率一定比LT高吗?
上面的情况是普遍的,也有特殊情况,LT不必将所有的fd设置成非阻塞然后循环读取,比如只要LT第一次通知的时候就把数据全取走,就和ET一样了,所以ET和LT谁效率高?不一定,要看具体的代码怎么写
什么叫做底层有数据了告知上层,每次通知是什么意思呢?
说到底其实就是,底层数据就绪了,是一直会回调一直会把节点放入到队列里,还是只放入一次?
如果ET只会放一次,如果是LT,只要底层有数据,每一次epoll,都把特定的文件描述符及其事件放到队列里
所谓通知,本质上就是向就绪队列里添加一次还是每次都添加。这就是LT和ET的区别