目录
poll也是一种多路转接的方案,它专门用来解决select的两个问题:
man poll:
①struct pollfd* fds:用来设置需要等待的fd以及事件
如上图所示,struct pollfd结构体中存在三个成员变量,第一个是fd,表示需要操作系统等待的文件描述符。第二个是short events,表示需要操作系统等待该fd的事件类型。第三个是short revents,操作系统告诉用户层该fd的哪个事件就绪了。
此时的文件描述符fd直接设置到struct pollfd结构中即可,需要设置哪个就设置哪个,不用再去寻找对应的位图。
告诉操作系统需要等待的事件时,只需要直接设置short events即可,不用将不同的事件类型放在不同的位图中。
当指定文件描述符fd的就绪时,操作系统会设置对应short revents,用户层直接读取fds中的这个字段便可知道是哪个事件就绪了。
struct pollfd结构体将用户和操作系统设置的字段分开了,所以就不存在相互干扰的问题。
events和revents的取值:
如上图所示便是用户层以及操作系统可以设置的事件类型,这些同样是一些宏定义,常用的就是POLLIN数据可读,以及POLLOUT数据可写。
假设fds结构体中,events的值是POLLIN,此时操作系统就关注指定文件描述符的读事件是否就绪,如果就绪,就将revents的值也设置成POLLIN,用户层读取到该值后就知道文件可读了。
②nfds_t nfds:需要poll等待的文件描述符fd的个数。
在内核中,nfds_t类型本质上是一个unsigned long int类型,也是一个整形。
第二个参数nfds就是用来设定需要poll等待文件描述符的个数的。用户层和操作系统同时维护一个元素为struct pollfd类型的数组,这个数组中有多少个元素,用户层需要让操作系统等待的文件描述符就有多少个,变量nfds就表示数组的大小。
这个数组就类似用户层和操作系统之间的“临界资源”,双方都能看到,而且都可以访问,由于访问的位置不同,所以不会出现干扰。
由于nfds的值是由用户层设定的,所以poll可同时等待的文件描述符数量并没有上限,unsigned long int的最大值非常大,远大于一个系统能打开的文件个数,所以可以理解为没有上限。
③int timeout:阻塞等待的时间
和select中的struct timeval变量的作用类似,但是这里的timeout是一个int类型的变量,它的单位是1ms。并且它不是一个输入输出型参数,只需要定义一次即可。
timeout>0表示在timout时间以内阻塞等待,超出这个时间就超时返回,如该值是1000就表示阻塞等待1s。
timeout ==0表示非阻塞等待。timeout < 0表示阻塞等待。
返回值:就绪事件的个数。和select的返回值意义一样。
这里在上一篇select代码下直接改:直接放代码:
Makefile
pollServer:main.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f pollServer
#ifndef __POLL_SVR_H__
#define __POLL_SVR_H__
#include <iostream>
#include <sys/select.h>
#include <poll.h>
#include "Log.hpp"
#include "Sock.hpp"
#define FD_NONE -1
using namespace std;
// select 我们只完成读取,写入和异常不做处理 -- epoll(写完整)
class PollServer
{
public:
static const int nfds = 100;
public:
PollServer(const uint16_t &port = 8080)
: _port(port), _nfds(nfds)
{
_listensock = Sock::Socket();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
logMessage(DEBUG, "%s", "create base socket success");
_fds = new struct pollfd[_nfds];
for (int i = 0; i < _nfds; i++)
{
_fds[i].fd = FD_NONE;
_fds[i].events = _fds[i].revents = 0;
}
_fds[0].fd = _listensock;
_fds[0].events = POLLIN;
_timeout = 1000;
}
void Start()
{
while (true)
{
int n = poll(_fds, _nfds, _timeout);
switch (n)
{
case 0:
logMessage(DEBUG, "%s", "time out...");
break;
case -1:
logMessage(WARNING, "select error: %d : %s", errno, strerror(errno));
break;
default:
// 成功的
HandlerEvent();
break;
}
}
}
~PollServer()
{
if (_listensock >= 0)
close(_listensock);
if (_fds)
delete[] _fds;
}
private:
void HandlerEvent() // fd_set 是一个集合,里面可能会存在多个sock
{
for (int i = 0; i < _nfds; i++)
{
if (_fds[i].fd == FD_NONE) // 1. 去掉不合法的fd
continue;
// 2. 合法的fd不一定就绪了
if (_fds[i].revents & POLLIN) // 如果fd读事件就绪
{
if(_fds[i].fd == _listensock) // 读事件就绪:连接事件到来,accept
{
Accepter();
}
else // 读事件就绪:INPUT事件到来,read / recv
{
Recver(i);
}
}
}
}
void Accepter()
{
string clientip;
uint16_t clientport = 0;
// listensock上面的读事件就绪了,表示可以读取了,获取新连接了
int sock = Sock::Accept(_listensock, &clientip, &clientport); // 这里进行accept不会阻塞
if (sock < 0)
{
logMessage(WARNING, "accept error");
return;
}
logMessage(DEBUG, "get a new line success : [%s:%d] : %d", clientip.c_str(), clientport, sock);
int pos = 1; // 规定了 _fd_array[0] = _listensock; 不用管
for (; pos < _nfds; pos++) // 将fd放入到数组中 -> 找一个合法的位置
{
if (_fds[pos].fd == FD_NONE)
break;
}
if (pos == _nfds) // 可以对struct pollfd进行自动扩容,或者直接改数组大小,这里不处理
{
logMessage(WARNING, "%s:%d", "poll server already full, close: %d", sock);
close(sock);
}
else // 找到了合法的位置
{
_fds[pos].fd = sock;
_fds[pos].events = POLLIN;
}
}
void Recver(int pos)
{
// 读事件就绪:INPUT事件到来、recv,read
logMessage(DEBUG, "message in, get IO event: %d", _fds[pos]);
// 此时poll已经帮我们进行了事件检测,fd上的数据一定是就绪的,即 本次 不会被阻塞
// 怎么保证以读到了一个完整的报文呢? -> 模拟实现epoll的时候再考虑
char buffer[1024];
int n = recv(_fds[pos].fd, buffer, sizeof(buffer) - 1, 0);
if (n > 0) // 正常读取
{
buffer[n] = 0;
logMessage(DEBUG, "client[%d]# %s", _fds[pos].fd, buffer);
}
else if (n == 0) // 对端把链接关了
{
logMessage(DEBUG, "client[%d] quit, me too...", _fds[pos]);
close(_fds[pos].fd); // 关闭不需要的fd
_fds[pos].fd = FD_NONE; // 不要让poll帮我关心当前的fd了
_fds[pos].events = 0;
}
else // 读取错误
{
logMessage(WARNING, "%d sock recv error, %d : %s", _fds[pos].fd, errno, strerror(errno));
close(_fds[pos].fd); // 关闭不需要的fd
_fds[pos].fd = FD_NONE; // 不要让poll帮我关心当前的fd了
_fds[pos].events = 0;
}
}
void DebugPrint() // 打印一下数组里合法的fd
{
cout << "_fd_array[]: ";
for (int i = 0; i < _nfds; i++)
{
if (_fds[i].fd == FD_NONE)
continue;
cout << _fds[i].fd << " ";
}
cout << endl;
}
private:
uint16_t _port;
int _listensock;
struct pollfd *_fds;
int _nfds;
int _timeout;
};
#endif
如上图所示,使用telnet连接服务端后,现象和selsect一样,也是一个服务端进程可以同时和多个客户端进行通信。
优点:
和select一样效率高,还有应用场景是:有大量的链接,只有少量是活跃的,省资源。这是所有多路转接都具备的优点。
struct pollfd结构包含了要监视的event和发生的revent,不再使用select“参数-值”传递的方式,接口使用比select更方便。
poll并没有最大等待文件描述符数量限制 (但是数量过大后性能也是会下降)。
缺点:
和select一样,poll返回后,需要轮询struct pollfd数组来获取就绪的描述符。
每次调用poll都需要把大量的struct pollfd结构从用户层拷贝到内核中。
同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长, 其效率也会线性下降。
代码的编写也比较复杂(比select简单)
epoll是基于poll的基础上改进的,e是expand增强/拓展的意思,增强版的poll,但实际比poll厉害得多,它不仅克服了select的缺点,而且解决了poll遍历成本,是效率最高的多路转接模式,但是它也是最复杂的一种模式。?
man epoll_create
如上图所示的epoll_create系统调用是用来创建epoll句柄的。
epoll是一个模型,这个模型包含多个数据结构,句柄,可以理解为是这个模型标志,通过句柄可以找到这个模型,并且使用它。
epoll句柄在内核中也是一个结构体,类似于struct file,而Linux下一切皆文件,所以返回的也是一个文件描述符,拿着这个文件描述符可以访问到这个epoll句柄,用完之后, 必须调用close()关闭。
man epoll_ctl
如上图所示的epoll_ctl系统调用是用来修改创建的epoll句柄属性的。四个参数:
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
如上图所示便是内核中struct epoll_event结构体的定义,它有两个成员变量。
第二个成员变量是一个联合体epoll_data_t data,可以看到有四个成员共用这个联合体,后面会讲解它每个变量的作用。
第一个成员变量是uint32_t events,用来设置需要等待的事件,其值也是有几个宏组成的集合:
值 | 意义 |
---|---|
EPOLLIN | 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭) |
EPOLLOUT | 表示对应的文件描述符可以写 |
EPOLLPRI | 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来) |
EPOLLERR | 表示对应的文件描述符发生错误 |
EPOLLHUP | 表示对应的文件描述符被挂断 |
EPOLLET | 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的 |
EPOLLONESHOT | 只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里 |
返回值:调用成功返回0,调用失败返回-1,并且设置相应的错误码。
man epoll_wait
如上图所示的epoll_wait系统调用是用来从操作系统中获取被等待文件描述符的状态的。
以上三个系统调用是epoll模型的核心调用,epoll_ctl是用户层用来告诉内核自己的需求的,epoll_wait是内核用来告诉用户层哪些文件描述符的什么事件就绪的。
现在知道了接口的使用,但是仍然并不清除为什么epoll模型能够解决poll和select存在的问题,所以下面了解下epoll模型的底层原理。
?
网络通信过程中,接收端将数据从网卡(硬件层)开始逐层向上交付,最后给到应用层,那么接收端是如何知道网卡上有数据到来的?也就是操作系统是怎么感知到数据来了呢?
将计算机体系结构,冯诺依曼体系结构,以及中断向量表放在了一起来看:
当网卡接收到数据后,输入外设(网卡)会自己产生一个控制信号直接给CPU中的控制器,表示此时网卡中有数据到来,可以读了。
冯诺依曼体系中,外设的数据信号不能直接和CPU传递,如上图中红色线,必须经过存储器。
外设的控制信号可以直接传递给CPU的控制器,如上图黑色线。
外设给CPU发送一个信号表示数据到来,这叫做中断事件发生。
CPU根据中断信号的编号,去操作系统维护的中断向量表中找到对应的中断服务函数并且执行。
中断服务函数中会调用网卡接收数据的驱动程序,将数据读取并且向上层交付,如上图绿色线。
在这里要重点关注中断服务函数,从网卡中接收数据是从它开始的。
epoll模型理论图:(包含计算机体系结构中的驱动层,操作系统,系统调用三层)
在调用epoll_create创建模型后,会返回一个文件描述符fd,这个fd同样放在服务器进程PCB所维护的进程描述符表中,通过fd这个句柄就可以找到对应的epoll模型。
epoll模型同样是一个大的结构体,只是这个结构体更加复杂,Linux下一切皆文件,在Linux眼中,都是struct file,所以创建模型后返回的也是一个文件描述符。
上图中操作系统中黑色框内的部分就是epoll模型,包含一个红黑树和一个就绪队列。
每一个epoll模型都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件。
这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是logN),其中N为树的高度。
而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法。
这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。
在epoll中,对于每一个事件,都会建立一个epitem结构体:
struct epitem
{
struct rb_node rbn; // 红黑树节点
struct list_head rdllink; // 双向链表节点
struct epoll_filefd ffd; // 事件句柄信息
struct eventpoll *ep; // 指向其所属的eventpoll对象
struct epoll_event event; // 期待发生的事件类型
}
epoll的使用过程就是三部曲:
- 调用epoll_create,创建一个epoll句柄
- 调用epoll_ctl,将要监控的文件描述符进行注册
- 调用epoll_wait,等待文件描述符就绪
以增加需要操作系统等待的文件描述符为例,调用epoll_ctl,将fd以及需要等待的事件构建成struct epoll_event变量插入到红黑树中,操作系统会遍历红黑树中所有节点。
红黑树节点中包含很多成员变量,如上图左下角所示,这其中必然有文件描述符fd,需要等待的事件event,左右字节的指针,还包括next和prev指针。
如果是删除或者修改等操作,同样是在修改这颗红黑树,而红黑树查找效率非常高,所以对应的操作也会很高效。
当操作系统发现红黑树中有节点的事件就绪后,就会将该节点放入到就绪队列中,就绪队列是一个双向循环链表。
将节点从红黑树放入到就绪队列中并没有发生拷贝,关键就在next和prev指针上。当网卡中有数据到来时,通过中断函数最终调用了网卡驱动程序,在驱动程序中有一个回调函数void* private_data,这是由操作系统提供的。private_data回调函数会将红黑树节点中的next和prev指针的指向关系做对应的修改,让该节点链入到就绪队列中去。
红黑树的一个节点,它不只属于红黑树,还可能属于就绪队列。红黑树中的节点和就绪队列中的节点地址可能是一样的。画的是逻辑图,所以将就绪队列和红黑树分开了。
就绪队列中必然也包括就绪文件的文件描述符,以及就绪的事件,如上图所示的struct epoll_event结构。所以,凡是处于就绪队列中的节点必然已经就绪。
用户层在调用epoll_wait后,获取的就是内核中就绪队列中的内容,所以获取到的全部都是就绪的事件,所以用户层的struct epoll_event类型数组中,全部都是就绪的事件。
epoll_wait将所有就绪的事件,按照顺序放入到用户层传入的数组中。
此时从内核到用户层虽然也需要遍历,但是此时是遍历拷贝,而不需要遍历检测,所以时间复杂度相当于从之前的O(N)变成了O(1),效率大提高。
先写个大概:(Log.hpp和Sock.hpp和前一篇一样就不放了)
封装一下epoll的三个接口:
#pragma once
#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
class Epoll
{
public:
static const int gsize = 256;
public:
static int CreateEpoll()
{
int epfd = epoll_create(gsize);
if (epfd > 0)
return epfd;
exit(5);
}
static bool CtlEpoll(int epfd, int op, int fd, uint32_t events)
{
struct epoll_event ev;
ev.events = events;
ev.data.fd = fd; // 先用第三个参数联合体的fd
int n = epoll_ctl(epfd, op, fd, &ev);
return n == 0;
}
static int WaitEpoll(int epfd, struct epoll_event revs[], int revs_num, int timeout)
{
return epoll_wait(epfd, revs, revs_num, timeout);
}
};
main.cc
#include "EpollServer.hpp"
#include <memory>
using namespace std;
using namespace ns_epoll;
void change(std::string request)
{
// 完成业务逻辑
std::cout << "change : " << request << std::endl;
}
int main()
{
unique_ptr<EpollServer> epoll_server(new EpollServer(change));
epoll_server->Start();
return 0;
}
(现在可以验证一下timeout,后面逐步把三个函数加上去就是完整的EpollServer.hpp了,_HandlerRequest和function还有main.cc的change函数在写Recver函数才用到,就是客户端告诉服务端接收到IO事件要怎么处理这个事件)
#ifndef __EPOLL_SERVER_HPP__
#define __EPOLL_SERVER_HPP__
#include <iostream>
#include <string>
#include <functional>
#include <cassert>
#include "Log.hpp"
#include "Sock.hpp"
#include "Epoll.hpp"
namespace ns_epoll
{
const static int default_port = 8080;
const static int gnum = 64;
class EpollServer // 这里先只处理读取
{
using func_t = std::function<void(std::string)>;
public:
EpollServer(func_t HandlerRequest, const int &port = default_port)
: _port(port), _revs_num(gnum)
{
_revs = new struct epoll_event[_revs_num]; // 1. 申请对应的空间
_listensock = Sock::Socket(); // 2. 创建listensock
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
_epfd = Epoll::CreateEpoll(); // 3. 创建epoll模型
logMessage(DEBUG, "init success, listensock: %d, epfd: %d", _listensock, _epfd);
// 4. 将listensock,先添加到epoll中,让epoll管理起来
if (!Epoll::CtlEpoll(_epfd, EPOLL_CTL_ADD, _listensock, EPOLLIN))
exit(6);
logMessage(DEBUG, "add listensock to epoll success.");
}
void Accepter(int listensock)
{}
void Recver(int sock)
{]
void HandlerEvents(int n)
{}
void LoopOnce(int timeout)
{
int n = Epoll::WaitEpoll(_epfd, _revs, _revs_num, timeout); // 封装意义不大还是和上面一样封装了
// 细节1:如果底层就绪的sock非常多,revs承装不下的话 -> 一次拿不完,就下一次再拿
// 担心拿不完的话 -> if(n == _revs_num) // 可以扩容,这里不处理
// 细节2:关于epoll_wait的返回值问题:有几个fd上的事件就绪,就返回几,
// epoll返回的时候,会将所有就绪的event按照顺序放入到revs数组中, 一共有返回值个->用来遍历就绪队列
switch (n)
{
case 0:
logMessage(DEBUG, "timeout..."); // 3, 4
break;
case -1:
logMessage(WARNING, "epoll wait error: %s", strerror(errno));
break;
default:
// 等待成功
logMessage(DEBUG, "get a event");
sleep(1);
HandlerEvents(n);
break;
}
}
void Start()
{
int timeout = -1; // -1是阻塞,0是非阻塞,1000是每隔1秒...
while (true)
{
LoopOnce(timeout);
}
}
~EpollServer()
{
if (_listensock >= 0)
close(_listensock);
if (_epfd >= 0)
close(_epfd);
if (_revs)
delete[] _revs;
}
private:
int _listensock;
int _epfd;
uint16_t _port;
struct epoll_event *_revs; // 就绪的事件
int _revs_num;
func_t _HandlerRequest;
};
}
#endif
编译运行:
和预期一样得到3和4文件描述符,监听套接字的文件描述符是3,句柄的值是4,成功获取事件,没处理就一直打印了,下面写一下HandlerEvents:
void Accepter(int listensock)
{
std::string clientip;
uint16_t clientport;
int sock = Sock::Accept(listensock, &clientip, &clientport);
if (sock < 0)
{
logMessage(WARNING, "accept error");
return;
}
// 不能直接读取,因为并不清楚底层是否有数据
if (!Epoll::CtlEpoll(_epfd, EPOLL_CTL_ADD, sock, EPOLLIN)) // 将新的sock添加给epoll
return;
logMessage(DEBUG, "add new sock : %d to epoll success", sock);
}
void Recver(int sock)
{
}
void HandlerEvents(int n)
{
assert(n > 0);
for (int i = 0; i < n; i++)
{
uint32_t revents = _revs[i].events;
int sock = _revs[i].data.fd;
if (revents & EPOLLIN) // 读事件就绪
{
if (sock == _listensock)
{
Accepter(_listensock); // 1. listensock 就绪
}
else
{
Recver(sock); // 2. 一般sock 就绪 - read
}
}
if (revents & EPOLLOUT)
{
// 这里不处理
}
}
}
测验一下:
和预期一样得到5和6文件描述符,下面写Recver:
void Recver(int sock)
{
char buffer[10240]; // 1. 读取数据
ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0);
if (n > 0) // 假设这里就是读到了一个完整的报文
{
buffer[n] = 0;
_HandlerRequest(buffer); // 2. 用服务端传进来的方法处理数据
}
else if (n == 0)
{
bool res = Epoll::CtlEpoll(_epfd, EPOLL_CTL_DEL, sock, 0); // 1. 先在epoll中去掉对sock的关心
assert(res);
(void)res;
close(sock); // 2. 再close文件
logMessage(NORMAL, "client %d quit, me too...", sock);
}
else
{
bool res = Epoll::CtlEpoll(_epfd, EPOLL_CTL_DEL, sock, 0); // 1. 先在epoll中去掉对sock的关心
assert(res);
(void)res;
close(sock); // 2. 再close文件
logMessage(NORMAL, "client recv %d error, close error sock", sock);
}
}
成功调用客户端的函数,后面还会改进这个epoll。
epoll的优点和select的缺点对应:
接口使用方便:虽然拆分成了三个函数,但是反而使用起来更方便高效,不需要每次循环都设置关注的文件描述符,也做到了输入输出参数分离开。
数据拷贝轻量:只在合适的时候调用epoll_ctl将文件描述符结构拷贝到内核中,这个操作并不频繁(而select/poll是每次循环都要进行拷贝)。
事件回调机制:避免使用遍历检测,而是使用回调函数的方式,将就绪的文件描述符结构加入到就绪队列中。epoll_wait返回直接访问就绪队列就知道哪些文件描述符就绪,这个操作时间复杂度是O(1),即使文件描述符数目很多, 效率也不会受到影响。
没有数量限制:文件描述符数目无上限。
虽然epoll的机制更复杂,但是它用起来更方便也更高效。
epoll主要解决的是多路转接中,进行IO的时候等的这一环节,当操作系统所监管的事件就绪了,就会通知用户层来处理事件,这个通知有两种方式:
来举一个生活中的例子,假设你正在打英雄联盟,正要打团的时候,你妈喊你吃饭,此时就存在两种方式:
放在多路转接中就是,事件就绪时,操作系统通知用户层后,用户层没有读取数据或者没有读取完毕,如果操作系统继续通知就是LT模式,如果没有继续通知就是ET模式。
epoll默认状态下就是LT工作模式。
LT模式下,事件未被用户层处理完毕,每调用一次epoll_wait就会返回一个大于0的值。
ET模式下,事件未被用户处理完毕,只有第一次调用epoll_wait才会返回大于0的值,之后不再返回,并且将事件设置为未就绪状态,除非该套接字中数据增加,才会再返回一次大于0的值。
在调用epoll_ctl的时候,将struct epoll_event中的uint32_t events字段设置成EPOLLET,此时该文件描述符就变成了ET模式,并没有设置LT模式的方法,因为默认就是LT模式。
使用ET模式能够减少epoll触发的次数,但是代价就是强逼着程序员一次响应就绪过程中就把所有的数据都处理完,如果不处理完,剩下的数据就有可能被覆盖,后果由程序员自己承担。
相当于一个文件描述符就绪之后,不会反复被提示就绪,所以就比 LT 更高效一些。
假如有这样一个例子:
我们已经把一个tcp socket添加到epoll描述符,
这个时候socket的另一端被写入了2KB的数据,
调用epoll_wait,并且它会返回,说明它已经准备好读取操作,
然后调用read,只读取了1KB的数据,
继续调用epoll_wait......
水平触发Level Triggered 工作模式:
当epoll检测到socket上事件就绪的时候,可以不立刻进行处理,或者只处理一部分。
如上面的例子,由于只读了1K数据,缓冲区中还剩1K数据,在第二次调用 epoll_wait 时,epoll_wait仍然会立刻返回并通知socket读事件就绪,
直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回,
LT模式支持阻塞读写和非阻塞读写。
边缘触发Edge Triggered工作模式
如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志,epoll进入ET工作模式。
当epoll检测到socket上事件就绪时,必须立刻处理。
如上面的例子,虽然只读了1K的数据,缓冲区还剩1K的数据,在第二次调用 epoll_wait 的时候,epoll_wait 不会再返回了。
也就是说,ET模式下,文件描述符上的事件就绪后,只有一次处理机会。
ET模式的性能比LT模式性能更高(epoll_wait 返回的次数少了很多)Nginx默认采用ET模式使用epoll。
ET模式只支持非阻塞的读写。
ET模式的高效是建立在程序员的痛苦之上的,由于它只通知用户层一次,如果不一次处理完数据就没机会再处理了,但是,用户层是怎么知道数据有没有读取完毕呢?
答案是:循环读取,直到读不到数据了,就证明读完了。
如上图所示,此时就存在一个问题,客户端发送了10K的数据给服务端,服务端收到了epoll的通知后,用户层调用recv进行读取,但是一次没有读取完毕,只读取了1K的数据。
由于此时epoll是ET模式,所以操作系统认为事件已经被处理了,就又将读事件设置成了未就绪的状态,再次读取时recv就会阻塞不动,整个进程就阻塞了,如下面伪代码:
while(1)
{
int ret = recv(sock,buffer,sizeof(buffer)-1,0);
// 第二次读取就会阻塞
}
由于epoll_wait不会再次返回,剩下的9K数据会一直在缓冲区中,直到下一次客户端再给服务器写数据,操作系统再次将读事件设置成就绪状态,才能再次recv。
服务端无法读取剩余的数据,也就不会发出响应,客户端无法收到响应,也就不会再次发送请求,服务端无法收到再次的请求,就无法再次读取缓冲区中剩余的数据。
时间一长,就会触发TCP的超时重传机制,导致数据被覆盖甚至丢失等问题。
为了解决ET模式这个问题,文件描述符对应的缓冲区必须设置成非阻塞 IO方式,使用fcntl设置。 只有非阻塞方式,才能用轮询的方式不断读取缓冲区中的数据,直到读取完毕。
如果是LT模式就不用设置成非阻塞模式,因为数据没有读取完毕,epoll_wait会持续返回,而事件也被保持就绪状态,recv就可以持续读取数据,直到将数据读取完毕。
select和poll是采用LT模式的,和epoll的默认方式一样,那么如果将文件描述符设置成非阻塞方式,仍然使用LT模式不是更方便吗?既能循环读取,又能让epoll持续返回,也能提高效率啊,为什么仍然要多此一举设计一个ET模式呢?
ET模式的高效不仅仅体现在通知机制上,减少通知次数,降低系统调用的开销。ET模式的高效还体现在增加底层网络的吞吐量上。
ET模式表面上看是在强迫程序员将本轮就绪的数据全部读走,深入网络底层TCP协议去看,服务端由于一次将数据全部读走了,从而能给客户端应答一个更大的窗口值。
客户端就能更新出一个更大的滑动窗口,增加一次发送的数据量,从而提高底层数据发送的效率,更好的利用诸如TCP延迟应答等策略,提高整个网络通信的吞吐量。
所以说,ET模式在压榨程序员的基础上,提高了整个网络通信的效率。
epoll不仅解决了poll方式的问题,而且还带来了其他优势,比如使用简单,遍历成本低等优势,以及ET模式对于通信效率的提升,虽然epoll的机制更复杂,但是它带来了更好的效果,利远大于弊。
epoll的高性能是有一定的特定场景的,如果场景选择的不适宜,epoll的性能可能适得其反。
比如对于多连接,且多连接中只有一部分连接比较活跃时,比较适合使用epoll。
还有一个需要处理上万个客户端的服务器,例如各种互联网APP的入口服务器,这样的服务器就很适合epoll。
如果只是系统内部,服务器和服务器之间进行通信,只有少数的几个连接,这种情况下用epoll就并不合适,具体要根据需求和场景特点来决定使用哪种模型。
下一篇:网络和Linux网络_15(IO多路转接)reactor编程_服务器+多路转接相关笔试题。