poll和select一样,也是一种linux中的多路转接的方案。而poll解决了select的两个问题:
1.select的文件描述符有上限的问题。
2.每次调用都要重新设置需要关心的文件描述符。
首先我们认识一下poll的接口:
?第一个参数是一个结构体,我们可以将这个参数想象为一个new/malloc出来的动态数组,这个数组中每个元素的类型是一个结构体,结构体中有文件描述符,监听的事件集合,返回的事件集合三部分。
第二个参数是刚刚那个fds数组的长度。
第三个参数和我们学select中的timeval结构体差不多,时间单位是ms,当等于0时表示非阻塞等待,小于0表示阻塞式等待,大于0表示前面阻塞式等待,时间到了非阻塞返回一次。
这个函数的返回值和select一模一样,大于0表示有几个文件描述符的事件就绪了,等于0表示超时返回,小于0表示poll函数出现错误。
所以poll函数中,输入的时候看fd和events,输出的时候看fd和revents,通过将事件分为输出事件和返回事件来解决select中每次需要对文件描述符集重新设定的操作。
下面我们看看events和revents的取值都有哪些:
?虽然很多实际上我们能用到的很少,这些宏值实际上对应的在events和revents中的位图,只要我们设置在底层位图中就会将某个事件标记为1,我们讲几个常用的:POLLIN事件就是看哪些的文件描述符可以读了或者用户告诉操作系统帮我们关心某个文件描述符的读事件。POLLOUT事件就是看哪些的文件描述符可以写了或者用户告诉操作系统帮我们关心某个文件描述符的写事件。
POLLPRI与TCP的紧急指针相对应。
下面我们实现poll服务器:
首先我们创建poll所需要的struct?pollfd类型数组,然后对数组初始化,这里我们可以写成扩容版数组,但是为了演示我们就用2018这个定值。
void initServer()
{
_listensock = Sock::createSock();
if (_listensock == -1)
{
logMessage(NORMAL,"createSock error");
return;
}
Sock::Bind(_listensock,_port);
Sock::Listen(_listensock);
_rfds = new struct pollfd[max_num];
for (int i = 0;i<max_num;i++)
{
_rfds[i].fd = defaultfd;
_rfds[i].events = 0;
_rfds[i].revents = 0;
}
_rfds[0].fd = _listensock;
_rfds[0].events = POLLIN;
}
初始化服务器还是需要遍历数组将数组内的文件描述符设置为非法状态,并且事件初始化。然后我们将listensock放到数组第一个位置并且让系统帮我们监视listen文件描述符的读事件。
void start()
{
int timeout = -1;
for (;;)
{
int n = poll(_rfds,max_num,timeout);
switch (n)
{
case 0:
logMessage(NORMAL,"time out.....");
break;
case -1:
logMessage(WARNING,"select error,code: %d,err string: %s",errno,strerror(errno));
break;
default:
//说明有事件就绪了
//logMessage(NORMAL,"get a new link");
HanderEvent();
break;
}
}
}
启动的时候我们不需要再像select那样每次先重新设置文件描述符集并且把数组中合法fd读到文件描述符集中并且select还需要知道最大文件描述符,这些在poll中统统不要,只需要将数组传进去系统会帮我们管理,我们设置的时间为-1表示阻塞式监视。
void HanderEvent()
{
for (int i = 0;i<max_num;i++)
{
//过滤掉非法的文件描述符
if (_rfds[i].fd == defaultfd)
continue;
//过滤掉没有设置读事件的文件描述符
if (!(_rfds[i].events & POLLIN))
continue;
//如果是listensock事件就绪,就去监听新连接获取文件描述符,如果不是listensock事件,那么就是普通的IO事件就绪了
if (_rfds[i].fd == _listensock && (_rfds[i].revents & POLLIN))
{
Accepter(_listensock);
}
else if ((_rfds[i].revents & POLLIN))
{
Recver(i);
}
else
{
}
}
}
在hander函数中,因为我们目前演示的服务器只涉及读数据,所以我们先过滤数组中非法的文件描述符,然后再过滤不关心读事件的文件描述符,然后判断listensock文件描述符的读事件是否就绪,如果就绪了就去执行监听新连接的函数,如果是其他文件描述符的读事件就绪,那么就进行数据读取和发送响应的函数。
void Accepter(int listensock)
{
// listensock必然就绪
std::string clientip;
uint16_t clientport = 0;
int sock = Sock::Accept(listensock, &clientip, clientport);
if (sock < 0)
{
return;
}
logMessage(NORMAL, "accept success [%s:%d]", clientip.c_str(), clientport);
// 开始进行服务器的处理逻辑
// 将accept返回的文件描述符放到自己管理的数组中,本质就是放到了select管理的位图中
int i = 0;
for (i = 0; i < max_num; i++)
{
if (_rfds[i].fd != defaultfd)
{
continue;
}
else
{
break;
}
}
if (i == max_num)
{
logMessage(WARNING, "server is full ,please wait");
close(sock);
}
else
{
_rfds[i].fd = sock;
_rfds[i].events = POLLIN;
_rfds[i].revents = 0;
}
print();
}
void Recver(int pos)
{
//注意:这样的读取有问题,由于没有定协议所以我们不能确定是否能读取一个完整的报文,并且还有序列化反序列化操作...
//由于我们只做演示所以不再定协议,在TCP服务器定制的协议大家可以看看
char buffer[1024];
ssize_t s = recv(_rfds[pos].fd,buffer,sizeof(buffer)-1,0);
if (s>0)
{
buffer[s] = 0;
logMessage(NORMAL,"client# %s",buffer);
}
else if (s == 0)
{
//对方关闭文件描述符,我们也要关闭并且下次不让select关心这个文件描述符了
close(_rfds[pos].fd);
_rfds[pos].fd = defaultfd;
_rfds[pos].events = 0;
_rfds[pos].revents = 0;
logMessage(NORMAL,"client quit");
}
else
{
//读取失败,关闭文件描述符
close(_rfds[pos].fd);
_rfds[pos].fd = defaultfd;
_rfds[pos].events = 0;
_rfds[pos].revents = 0;
logMessage(ERROR,"client quit: %s",strerror(errno));
}
//2.处理 request
std::string response = func(buffer);
//3.返回response
write(_rfds[pos].fd,response.c_str(),response.size());
}
这两个函数只需要记得当读取失败的时候需要重置数组中文件描述符结构体。
下面我们运行起来:
可以看到也是没问题的。?
epoll可以理解为是增强版的poll,按照man手册的说法:epoll是为了处理大批量句柄而作了改进的poll.
epoll有三个系统调用:
int epoll_create(int size);
如果创建成功了会给我们返回一个文件描述符,如果失败返回-1。实际上epoll_create就是创建一个epoll模型。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
第二个参数的取值:
EPOLL_CTL_ADD :注册新的fd到epfd中;
EPOLL_CTL_MOD :修改已经注册的fd的监听事件;
?第三个参数是需要监听的fd.
?
?如上图:uint32_t events就是宏的集合,如果文件描述符是读事件那么就会设置为EPOLLIN,data由用户定义,可以是指针可以是文件描述符等。
如果函数成功则返回0,否则返回-1.
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
首先作为一款服务器,端口号肯定是必须要有的,并且我们还需要监听套接字和epoll对象模型,这个epoll对象模型我们讲过,实际上就是一个int变量。
void initServer()
{
//1.创建socket
_listensock = Sock::createSock();
Sock::Bind(_listensock,_port);
Sock::Listen(_listensock);
//2.创建epoll模型
_epfd = epoll_create(size);
if (_epfd < 0)
{
logMessage(FATAL,"epoll create error: %s",strerror(errno));
exit(EPOLL_CREATE_ERR);
}
//3.添加listensock到epoll中
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = _listensock; //当事件就绪被重新捞取上来的时候我们要知道是哪一个fd就绪了
epoll_ctl(_epfd,EPOLL_CTL_ADD,_listensock,&ev);
//4.申请就绪事件的空间
_revs = new struct epoll_event[_num];
logMessage(NORMAL,"init server success");
}
我们在初始化服务器的时候首先创建套接字然后创建epoll模型(创建epoll模型的参数只要大于0即可,我们添加一个变量size用来使用),然后我们需要将监听套接字添加到epoll中,epoll_ctl的参数我们已经讲过了,第一个就是epoll创建成功返回的句柄,第二个参数是选项可以是添加也可以是删除或者修改,第三个参数就是要添加的文件描述符,第四个参数是对此文件描述符事件的设置,要设置监听套接字的读事件,首先创建一个struct epoll_event对象,然后把事件设置为读事件,把监听套接字放入data中的fd,这一步操作非常重要,因为我们后续事件就绪捞出事件时需要知道这个事件是哪个文件描述符。设置成功后我们还需要创建一个struct epoll_event的数组(这个数组用来存放所有的struct epoll_event),有了这个数组后我们就可以开辟空间了,这里的空间大家可以改为扩容版,我们为了演示就用了固定大小:
void start()
{
int timeout = -1;
for (;;)
{
int n = epoll_wait(_epfd,_revs,_num,timeout);
switch (n)
{
case 0:
logMessage(NORMAL,"timeout......");
break;
case -1:
logMessage(WARNING,"epoll_wait error: %s",strerror(errno));
break;
default:
logMessage(NORMAL,"have event ready");
HanderEvent(n);
break;
}
}
}
?我们启动服务器的时候就进行epoll_wait,这个函数与select和epoll的返回值一模一样,第一个参数是epoll创建的句柄,第二个参数是struct epoll_events数组,这个数组epoll会帮我们管理,数组大小就是我们前面定义的num,时间设置为阻塞式。当返回值大于0说明有几个事件就绪了,我们就去调用事件处理函数:
void HanderEvent(int readynum)
{
for (int i = 0;i<readynum;i++)
{
int sock = _revs[i].data.fd;
uint32_t events = _revs[i].events;
if (sock == _listensock && (events & EPOLLIN))
{
//listen读事件就绪,获取新连接
std::string clientip;
uint16_t clientport;
int fd = Sock::Accept(_listensock,&clientip,clientport);
if (fd<0)
{
logMessage(WARNING,"accept error");
continue;
}
//获取fd成功不可以直接读取,要放入epoll
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
epoll_ctl(_epfd,EPOLL_CTL_ADD,fd,&ev);
}
else if ((events & EPOLLIN))
{
char buffer[1024];
//普通读事件就绪
int n = recv(sock,buffer,sizeof(buffer)-1,0);
if (n>0)
{
buffer[n] = 0;
logMessage(NORMAL,"client# %s",buffer);
std::string response = _func(buffer);
send(sock,response.c_str(),response.size(),0);
}
else if (n==0)
{
epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
close(sock);
logMessage(NORMAL,"client quit");
}
else
{
epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
close(sock);
logMessage(ERROR,"recv error: %s",strerror(errno));
}
}
else
{
}
}
}
首先遍历事件数组,然后记录每个结构体的fd和事件,当是监听套接字并且设置了读事件,那么我们就监听新连接,监听成功后要将用于通信的套接字设置进epoll中。如果不是监听套接字并且设置了读事件那么就是普通事件了,我们需要读数据,注意:我们在select,poll,epoll中演示的读取和发送都是有问题的,不仅仅要保证数据是一个完整的报文,而且还得考虑一次读完数据或者一次读不完数据的情况,并且还有序列化和反序列化等。我们只是演示多路转接对于事件的推送机制。
数据读成功后我们就发送,如果客户端关闭了文件描述符或者recv函数失败,则先将此文件描述符在epoll中删除,然后再关闭文件描述符。
下面我们运行起来:
?可以看到是没问题的,这就是我们epoll服务器的测试代码。
首先epoll的底层有红黑树和双向链表,当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关:
struct eventpoll{
....
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
struct rb_root rbr;
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
struct list_head rdlist;
....
};
?红黑树用来存储所有添加到epoll中的需要监控的事件,而双向链表的作用是当用户调用epoll_wait时给用户返回事件就绪的节点。
struct epitem{
struct rb_node rbn;//红黑树节点
struct list_head rdllink;//双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的eventpoll对象
struct epoll_event event; //期待发生的事件类型
}
注意:有种说法是:epoll中使用了内存映射机制
例子:
1.LT(水平触发)
2.ET(边缘触发)