APUE学习之I/O多路复用

发布时间:2024年01月23日

目录

一、基础概念

1、同步(Synchronous)和异步(Asynchronous)

2、阻塞(Blocking)和非阻塞(Non-blocking)

二、五种IO模型

1、同步阻塞IO(Blocking IO)

2、同步非阻塞IO(Non-blocking IO)

3、IO多路复用(IO Multiplexing)

4、信号驱动IO(signal driven IO)

5、异步IO(Asynchronous IO)

?三、select多路复用

1、select()函数

2、select工作流程

3、select编写服务器

4、select的优缺点

四、poll多路复用

1、poll()函数

2、poll编写服务器

3、poll的优缺点

五、epoll多路复用

1、认识epoll

2、创建epoll实例:epoll_create()

3、修改epoll的兴趣列表:epoll_ctl()

4、事件等待:epoll_wait()

5、epoll编写服务器

6、epoll的优点

7、epoll工作方式


一、基础概念

1、同步(Synchronous)和异步(Asynchronous)

同步:用户线程发起IO请求后需要等待或者轮询内核IO操作完成后才能继续执行;

异步:用户线程发起IO请求后仍继续执行,当内核IO操作完成后会通知用户线程,或者调用用户线程注册的回调函数。

同步和异步的概念描述的是用户线程与内核的交互方式

2、阻塞(Blocking)和非阻塞(Non-blocking)

阻塞:指IO操作在没有接收完数据或者没有得到结果之前不会返回,需要彻底完成后才返回用户空间;

非阻塞:指 I/O 操作被调用后立即返回给用户一个状态值,无需等到 I/O 操作彻底完成。

阻塞和非阻塞的概念描述的是用户线程调用内核 I/O 操作的方式。

二、五种IO模型

????????在Linux下进行网络编程时,服务器端编程经常需要构造高性能的IO模型,常见的IO模型有:同步阻塞IO、同步非阻塞IO、IO多路复用、信号驱动IO、异步IO,详解请看如下:

1、同步阻塞IO(Blocking IO)

????????最传统的IO模型,在Linux中默认情况下所有的socket都是阻塞模式。以read()为例,当用户调用read()这个函数,内核就开始了IO的第一个阶段:准备数据。对于网络IO来说,很多时候数据在一开始还没有到达,这个时候内核就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当内核一直等到数据准备好了,它会将数据从内核中拷贝到用户内存,然后内核返回结果,用户进程才解除阻塞的状态,重新运行起来。

2、同步非阻塞IO(Non-blocking IO)

????????默认创建的socket都是阻塞的,同步非阻塞IO是在同步阻塞IO的基础上,将socket设置为NONBLOCK,这个可以使用ioctl()系统调用设置。这样做用户线程可以在发起IO请求后可以立即返回,如果该次读操作未读取到任何数据,用户线程需要不断地发起IO请求,直到数据到达后,才能真正读取到数据,继续执行。整个IO请求的过程中,虽然用户线程每次发起IO请求后可以立即返回,但是为了等到数据,仍需要不断地轮询、重复请求,消耗了大量的CPU的资源。一般很少直接使用这种模型,而是在其他IO模型中使用非阻塞这一特性。

3、IO多路复用(IO Multiplexing)

????????IO多路复用模型是建立在内核提供的多路分离函数select基础之上的,使用select函数可以避免同步非阻塞IO模型中轮询等待的问题,此外poll、额poll都是这种模型。在该种模式下,用户首先将需要进行IO操作的socket添加到select中,然后阻塞等待select系统调用返回。当数据到达时,socket被激活,select函数返回。用户线程正式发起read请求,读取数据并继续执行。

????????从流程上看,使用select函数进行IO请求和同步阻塞模型没有太大的区别,甚至还多了添加监视socket,以及调用select函数的额外操作,效率更差。但是,使用select以后最大的优势是用户可以在一个线程内同时处理多个socket的IO请求。用户可以注册多个socket,然后不断地调用select读取被激活的socket,即可达到在同一个线程内同时处理多个IO请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。

????????那么多路复用 IO 为何比非阻塞 IO 模型的效率高呢?是因为在非阻塞 IO 中,不断地询问 socket 状态是通过用户线程去进行的,而在多路复用 IO 中,轮询每个 socket 状态是内核在进行的,这个效率要比用户线程要高的多。

4、信号驱动IO(signal driven IO)

????????在信号驱动IO模型中,当用户线程发起一个IO请求操作,会注册一个信号处理的回调函数,然后用户线程会继续执行,当内核数据就绪时,会发送一个SIGIO信号给用户线程,并回调我们注册的信号回调函数,用户线程接收到信号之后,便在信号函数中调用IO读写操作来进行实际的IO请求操作。?

5、异步IO(Asynchronous IO)

????????在异步IO模型中,当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态,通知和回调通知调用者输入输出操作。IO操作都不会阻塞用户线程,都是由内核自动完成,然后发送一个信号告知用户线程操作已完成,用户线程中不需要再次调用IO函数进行具体的读写。这点是和信号驱动模型有所不同的,在信号驱动模型中,当用户线程接收到信号表示数据已经就绪,然后需要用户线程调用IO函数进行实际的读写操作;而在异步IO模型中,收到信号表示IO操作已经完成,不需要再在用户线程中调用IO函数进行实际的读写操作。

?三、select多路复用

1、select()函数

????????select()函数允许进程指示内核等待多个事件(文件描述符),并只有在一个或多个事件发生或经历一段时间后才唤醒它,然后接下来判断究竟是哪个文件描述符发生了事件并进行相应的处理。

函数原型如下:

#include? <sys/select.h>

#include? <sys/time.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);?

参数说明:

(1) 第一个参数nfds:指待测试的fd总个数,它的值是待测试的最大文件描述符加1。Linux内核从0开始到 nfds-1 扫描文件描述符,如果有数据出现事件(读、写、异常)将会返回;假设需要监测的文件描述符是6,7,8,那么Linux内核实际也要监测0-5,此时真正待测试的文件描述符是0-8总共9个,即max(6,7,8)+1,所以第一个参数是所有监听的文件描述符中最大的+1.

(2)中间三个参数readfds、writefds、exceptfds:指让内核测试读、写和异常条件的fd集合,如果不需要测试的可以设置为NULL。

(3)最后一个参数timeval:是设置select的超时时间,如果设置为NULL则永不过时

????????简单说明一下,select监视并等待多个文件描述符的属性发生变化,它监视的属性分3类,分别是readfds(文件描述符有数据到来可读)、writefds(文件描述符可写)和exceptfds(文件描述符异常)。调用后select函数会阻塞,直到有描述符就绪(有数据可读、可写、或者有错误异常),或者超时(timeout指定等待时间)发生函数才返回。select函数的返回值就是就绪描述符的数目,超时返回0,出错返回-1 。当select()函数返回后,可以通过遍历fdset,来找到究竟是哪些文件描述符就绪。

关于select用法,还需要知道俩条知识点,补充如下:

timeval结构体:

struct? ? ? ? timeval

{

? ? ? ? long? ? ? ? tv_sec;? ? ? ? //seconds

? ? ? ? long? ? ? ? tv_usec;? ? ? ? //microseconds

};

FD_ZERO(fd_set*? ? fds)? ? ? ? ? ? ? ? //清空集合

FD_SET(int? ? fd,fd_set*? ? fds)? ? ? ?//将给定的描述符加入到集合

FD_ISSET?(int? ? fd,fd_set*? ? fds)? ?//判断指定的描述符是否在集合中

FD_CLR(int? ? fd,fd_set*? ? fds)? ? ? ?//将给定的描述符从文件中删除

2、select工作流程

(1)先初始化服务器,完成套接字的创建、绑定和监听。
(2)定义一个fd_array数组用于保存监听套接字和已经与客户端建立连接的套接字,刚开始时就将监听套接字添加到fd_array数组当中。
(3)然后服务器开始循环调用select函数,检测读事件是否就绪,如果就绪则执行对应的操作。
(4)每次调用select函数之前,都需要定义一个读文件描述符集rdset,并将fd_array当中的文件描述符依次设置进rdset当中,表示让select帮我们监视这些文件描述符的读事件是否就绪。
(5)当select检测到数据就绪时会将读事件就绪的文件描述符设置进rdset当中,此时我们就能够得知哪些文件描述符的读事件就绪了,并对这些文件描述符进行对应的操作。
(6)如果读事件就绪的是监听套接字,则调用accept函数从底层全连接队列获取已经建立好的连接,并将该连接对应的套接字添加到fd_array数组当中。
(7)如果读事件就绪的是与客户端建立连接的套接字,则调用read函数读取客户端发来的数据并进行打印输出。

可以看一下流程图,更加清晰明了:

3、select编写服务器

#include <stdio.h>
#include <sys/select.h>
#include <libgen.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>

#define PORT                    8888
#define ARRAY_SIZE(x)           (sizeof(x)/sizeof(x[0]))

int socket_server_init(char *listen_ip,int listen_port);        /*定义一个函数,完成socket、bind、listen操作*/

int main(int argc,char *argv[])
{

        int                     listenfd,connfd;
        int                     rv;
        fd_set                  rdset;          /*定义一个让内核测试读文件的集合*/
        int                     found;
        int                     maxfd = 0;
        int                     i;
        char                    buf[1024];
        int                     fds_array[1024];        /*用于保存监听套接字和已经与客户端建立连接的套接字*/

        if( (listenfd = socket_server_init(NULL,PORT)) < 0)
        {
                
                printf("ERROR:listen on port 8888\n");
                return -1;
        
        }
        printf("Start to listen on port\n");

        for(i=0 ; i<ARRAY_SIZE(fds_array) ; i++)        /*将fds_array初始化,所有值设为-1*/
        {

                fds_array[i] = -1;

        }
        fds_array[0] = listenfd;        /*保存监听套接字*/

        while(1)
        {

                FD_ZERO(&rdset);        /*将rdset进行初始化*/
                for(i=0 ; i<ARRAY_SIZE(fds_array) ; i++)        /*通过轮询,确定需要监听的最大文件数*/
                {

                        if( fds_array[i] < 0 )
                                continue;

                        maxfd = fds_array[i]>maxfd ? fds_array[i] : maxfd;
                        FD_SET(fds_array[i],&rdset);

                }

                rv = select(maxfd+1,&rdset,NULL,NULL,NULL);     /*调用select多路复用,此处会阻塞*/

if(rv < 0)
                {

                        printf("select failure :%s\n",strerror(errno));
                        break;

                }
                else if(rv == 0)
                {

                        printf("select get timeout\n");
                        continue;

                }

                /*new client start connect now*/
                if( FD_ISSET(listenfd,&rdset) )
                {

                        if( (connfd = accept(listenfd,(struct sockaddr *)NULL,NULL)) < 0)
                        {

                                printf("accept new client failure :%s\n",strerror(errno));
                                continue;

                        }

                        found = 0;              /*found用于判断fds_array是否已满*/
 for(i=0 ; i<ARRAY_SIZE(fds_array) ; i++)
                        {

                                if( fds_array[i] < 0 )
                                {

                                        printf("accept new client[%d] and add it into array\n",connfd);
                                        fds_array[i] = connfd;
                                        found = 1;
                                        break;
                                }
                        }

                        if( !found )
                        {

                                printf("accept new client [%d] but full,so refuse it\n",connfd);
                                close(connfd);
                        }
                }

                /*已连上的客户端有数据到来*/
                else
                {

                        for(i=0 ; i<ARRAY_SIZE(fds_array) ; i++)
                        {

                                if( fds_array[i]<0 || !FD_ISSET(fds_array[i],&rdset) )
                                        continue;

                                memset(buf,0,sizeof(buf));
                                if( (rv = read(fds_array[i],buf,sizeof(buf))) <= 0 )
                                {

                                        printf("socket[%d] read failure or get disconnect\n",fds_array[i]);
                                        close(fds_array[i]);
                                        fds_array[i] = -1;
                                        return -1;
                                }

 else
                                {

                                        printf("socket[%d] read get %d bytes data:%s\n",fds_array[i],rv,buf);

                                        if( write(fds_array[i],buf,rv) < 0)
                                        {

                                                printf("socket[%d] write failure:%s\n",fds_array[i],strerror(errno));
                                                close(fds_array[i]);
                                                fds_array[i] = -1;

                                        }
                                }
                        }
                }
        }

        close(listenfd);
        return 0;

}

int     socket_server_init(char *listen_ip,int listen_port)
{

        struct sockaddr_in              servaddr;
        int                             rv = 0;
        int                             listenfd;

        if( (listenfd = socket(AF_INET,SOCK_STREAM,0)) < 0)
        {

                printf("Use socket() to create a TCP socket failure:%s\n",strerror(errno));
                return -1;

        }

        memset(&servaddr,0,sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        servaddr.sin_port   = htons(listen_port);

        if( !listen_ip )
        {

                servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

        }
        else
        {

                if(inet_pton(AF_INET,listen_ip,&servaddr.sin_addr) <= 0)
                {

                        printf("inet_pton() set listen ip address failure.\n");
                        rv = -2;
                        goto cleanup;
                }
        }

        if(bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr)) < 0)
        {

                printf("Use bind() to bind the TCP socket failure:%s\n",strerror(errno));
                rv = -3;
                goto cleanup;
        }

        if(listen(listenfd,13) < 0)
        {

                printf("Use bind() to bind the TcP socket failure: %s\n",strerror(errno));
                rv = -4;
                goto cleanup;

        }
cleanup:
        if(rv < 0)
                close(listenfd);
        else
                rv = listenfd;

        return rv;

}

4、select的优缺点

优点:

(1)基于select的I/O复用模型的是单进程执行可以为多个客户端服务,这样可以减少创建线程或进程所需要的CPU时间片或内存资源的开销;

(2)几乎所有的平台都支持select(),有良好的跨平台支持。

缺点:

(1)每次调用select()都需要把fd集合从用户拷贝到内核态,之后内核需要遍历所有传递进来的fd,这时如果客户端fd很多时会导致系统开销很大;

(2)单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过setrlimit()、修改宏定义甚至重新编译内核等方式来提升这一限制,但是这样也会造成效率的降低。

(3)select的触发方式是水平触发,应用程序如果没有完成对一个已经就绪的文件描述符进行IO操作,那么之后每次select调用还是会将这些文件描述符通知进程。

四、poll多路复用

1、poll()函数

????????poll()的机制与select()类似,管理多个描述符也是进行轮询,根据描述符的状态进行处理,但是poll()没有最大文件描述符数量的限制(但是数量过大后性能也是会下降)。poll()与select()一样,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

poll()函数原型如下:

#include? ? <poll.h>

int poll(struct pollfd *fds, nfds_t nfds, int timeout);
?

struct? ? pollfd

{

? ? ? ? int? ? ? ? fd;? ? ? ? ? ? ? ? /*文件描述符*/

? ? ? ? short? ? ? ? events;? ? /*等待的事件*/

? ? ? ? short? ? ? ? revents;? ?/*实际发生了的事件*/

};

参数说明:

(1)第一个参数fds:用来指向一个struct pollfd类型的数组,每一个pollfd结构体指定了一个被监视的文件描述符,指示poll()监视多个文件描述符。每个结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域,revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域,events域中请求的任何事件都可能在revents域中返回。下表列出指定events标志以及测试revents标志的一些常值:

常量说明是否能作为events的输入是否能作为revents的返回结果
POLLIN普通或者优先级带数据可读
POLLRDNORM普通数据可读
POLLRDBAND优先级带数据可读
POLLPRI高优先级数据可读
POLLOUT普通数据可写
POLLWRNORM普通数据可写
POLLWRBAND优先级带数据可写
POLLERR发生错误
POLLHUP发生挂起
POLLNVAL描述字不是一个打开的文件

要同时监视一个文件描述符是否可读和可写,我们可以设置events为POLLIN|POLLOUT。在poll返回时,我们可以检查revents中的标志,对应于文件描述符请求的events结构体。如果POLLIN事件被设置,则文件描述符可以被读取而不阻塞。如果POLLOUT被设置,则文件描述符可以写入而不导致阻塞。这些标志并不是互斥的:它们可能被同时设置,表示这个文件描述符的读取和写入操作都会正常返回而不阻塞。

(2)第二个参数nfds:指定数组中监听的个数;

(3)第三个参数timeout:指定等待的毫秒数,无论I/O是否准备好,poll都会返回。timeout指定为负数值表示无限超时,使poll()一直挂起直到一个指定事件发生;timeout为0指示poll调用立即返回并列出准备好I/O的文件描述符,但并不等待其它的事件。

补充:

该函数成功调用时,poll()返回结构体中revents域中不为0的文件描述个数;如果在超时前没有任何事件发生,poll()返回0;失败时,poll()返回-1,并设置errno为下列值之一:

EBADF一个或多个结构体中指定的文件描述符无效
EFAULTfds指针指向的地址超出进程的地址空间
EINTR请求的事件之前产生一个信号,调用可以重新发起
EINVAlnfds参数超出PLIMIT_NOFILE值
ENOMEM可用内存不足,无法完成请求

2、poll编写服务器

poll与select的工作流程基本类似,这里就直接用poll开始编写服务器了:

#include <stdio.h>
#include <sys/select.h>
#include <libgen.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <poll.h>

#define ARRAY_SIZE(x)           (sizeof(x)/sizeof(x[0]))
#define PORT                    8888

int socket_server_init(char *listen_ip,int listen_port);        /*定义一个函数,完成socket、bind、listen操作*/

int main(int argc,char *argv[])
{

        int                     listenfd,connfd;
        int                     rv;
        fd_set                  rdset;          /*定义一个让内核测试读文件的集合*/
        int                     found;
        int                     max = 0;
        int                     i;
        char                    buf[1024];
        struct pollfd           fds_array[1024];        /*用于保存监听套接字和已经与客户端建立连接的套接字*/

        if( (listenfd = socket_server_init(NULL,PORT)) < 0)
        {

                printf("ERROR:listen on port \n");
                return -1;

        }
        printf("Start to listen on port\n");

        for(i=0 ; i<ARRAY_SIZE(fds_array) ; i++)        /*将fds_array初始化,所有值设为-1*/
        {

                fds_array[i].fd = -1;

        }
        fds_array[0].fd     = listenfd;         /*保存监听套接字*/
        fds_array[0].events = POLLIN;           /*监听该文件描述符可读*/

        while(1)
        {

                rv = poll(fds_array,max+1,-1);          /*调用poll多路复用,轮询fds_array,有新客户到来则修改max*/
                if(rv < 0)
                {

                        printf("select failure:%s\n",strerror(errno));
                        break;

                }
                else if(rv == 0)
                {

                        printf("select get timrout\n");
                        continue;

                }

                /*listen socket get event means new client start connect now*/
                if(fds_array[0].revents & POLLIN)
                {

                        if((connfd = accept(listenfd,(struct sockaddr*)NULL,NULL)) < 0)
                        {

                                printf("accept new client failure:%s\n",strerror(errno));
                                continue;

                        }

                        found = 0;              /*found用于判断fds_array是否已满*/
                        for(i=1 ; i<ARRAY_SIZE(fds_array) ; i++)
                        {

                                if(fds_array[i].fd < 0)
                                {

                                        printf("accept new client[%d] and add it into array\n",connfd);
                                        fds_array[i].fd     = connfd;
                                        fds_array[i].events = POLLIN;
                                        found = -1;
                                        break;
                                }
                        }

                        if( !found )
                        {

                                printf("accept new client[%d] but full,so refuse it\n",connfd);
                                close(connfd);
                                continue;
                        }

                        max = i>max ? i : max;

                }

                /*data arrive from already connected client*/
                else
                {

                        for(i=1 ; i<ARRAY_SIZE(fds_array) ; i++)
                        {

                                if(fds_array[i].fd < 0)
                                        continue;

                                memset(buf,0,sizeof(buf));
                                rv = read(fds_array[i].fd,buf,sizeof(buf));
                                if(rv <= 0)
                                {

                                        printf("socket[%d] read failure or get disconnect\n",fds_array[i].fd);
                                        close(fds_array[i].fd);
                                        fds_array[i].fd = -1;

                                }
 else
                                {

                                        printf("socket[%d] read get %d bytes data:%s\n",fds_array[i].fd,rv,buf);

                                        if(write(fds_array[i].fd,buf,rv) < 0)
                                        {

                                                printf("socket[%d] write failure:%s\n",fds_array[i].fd,strerror(errno));
                                                close(fds_array[i].fd);
                                                fds_array[i].fd = -1;
                                        }
                                }
                        }
                }
        }

        close(listenfd);
        return 0;
}
int     socket_server_init(char *listen_ip,int listen_port)
{

        struct sockaddr_in              servaddr;
        int                             rv = 0;
        int                             listenfd;

        if( (listenfd = socket(AF_INET,SOCK_STREAM,0)) < 0)
        {

                printf("Use socket() to create a TCP socket failure:%s\n",strerror(errno));
                return -1;

        }

        memset(&servaddr,0,sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        servaddr.sin_port   = htons(listen_port);

        if( !listen_ip )
        {

                servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

        }
        else
        {

                if(inet_pton(AF_INET,listen_ip,&servaddr.sin_addr) <= 0)
                {

                        printf("inet_pton() set listen ip address failure.\n");
                        rv = -2;
                        goto cleanup;

                }

        }

        if(bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr)) < 0)
        {

                printf("Use bind() to bind the TCP socket failure:%s\n",strerror(errno));
                rv = -3;
                goto cleanup;

        }

        if(listen(listenfd,13) < 0)
        {

                printf("Use bind() to bind the TcP socket failure: %s\n",strerror(errno));
                rv = -4;
                goto cleanup;

        }

cleanup:
        if(rv < 0)
                close(listenfd);
        else
                rv = listenfd;

        return rv;

}

3、poll的优缺点

优点:

(1)struct pollfd结构当中包含了events和revents,相当于将select的输入输出型参数进行分离,因此在每次调用poll之前,不需要像select一样重新对参数进行设置。

(2)poll可监控的文件描述符数量没有限制。

(3)poll可以同时等待多个文件描述符,能够提高IO的效率

缺点:

(1)和select函数一样,当poll返回后,需要遍历fds数组来获取就绪的文件描述符。

(2)每次调用poll,都需要把大量的struct pollfd结构从用户态拷贝到内核态,这个开销也会随着poll监视的文件描述符数目的增多而增大。

(3)每次调用poll都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大。

五、epoll多路复用

1、认识epoll

????????再讲epoll函数前,先和大家聊一聊epoll是什么?在Linux还没有实现epoll之前,我们一般都是使用select和poll等多路复用的方法来实现并发服务程序。自Linux内核正式引入epoll以来,epoll已经成为了目前实现高性能网络服务器的必备技术。

? ? ? ? epoll是Linux内核为处理大批量文件描述符而做了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。另一点原因就是获取事件的时候,它无需遍历整个被侦听的描述符集,只要遍历那些内核IO事件异步唤醒而加入到就绪队列的描述符集合就行了。epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高了应用程序的效率。

? ? ? ? 设想一下如下场景:有100万个客户端同时与一个服务器进程保持着TCP连接。而每一时刻,通常只有几百上千个TCP连接是活跃的(事实上大部分场景都是这种情况)。如何实现这样的高并发?在select/poll时代,服务器进程每次都把这100万个连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生,轮询完后,再将句柄数据复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,因此,select/poll一般只能处理几千的并发连接。

????????epoll的设计和实现与select完全不同。epoll通过在Linux内核中申请一个简易的文件系统,把原先的select/poll调用分成了3个部分:

(1)调用epoll_create()建立一个epoll对象(在epoll文件系统中为这个句柄对象分配资源)

(2)调用epoll_ctl()向epoll对象中添加了这100万个连接的套接字

(3)调用epoll_wait()收集发生的事件的连接

如此一来,要实现上面所说的场景,只需要在进程启动时建立一个epoll对象,然后在需要的时候向这个epoll对象中添加或者删除连接。同时,epoll_wait的效率也非常高,因为调用epoll_wait时,并没有一股脑的向操作系统复制这100万个连接的句柄数据,内核也不需要去遍历全部的连接。

接下来,为大家详细讲解几个函数的详细用法:

2、创建epoll实例:epoll_create()

函数原型:

#include? ? <sys/epoll.h>

int epoll_create(int size);

参数说明:

参数size:自从Linux2.6.8之后,size参数是被忽略的,但size的值必须设置为大于0的值。

注意:作为函数返回值,epoll_create()返回代表了新创建的epoll实例的文件描述符。这个文件描述符在其他几个epoll系统调用中用来表示epoll实例。当这个文件描述符不再需要时,应该通过close()来关闭。当所有与epoll实例相关的文件描述符都被关闭时,实例被销毁,相关的资源都返还给系统。

3、修改epoll的兴趣列表:epoll_ctl()

函数原型如下:

#include? ? <sys/epoll.h>

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

参数说明:

(1)第一个参数epfd:epoll_create()的返回值。

(2)第二个参数op:指定需要执行的操作,它可以是如下几种值:

EPOLL_CTL_ADD将描述符fd添加到epoll实例中的兴趣列表中去。对于fd上我们感兴趣的事件,都指定在ev所指向的结构体中。如果我们试图向兴趣列表中添加一个已经存在的文件描述符,epoll_ctl()将出现EEXIST错误;
EPOLL_CTL_MOD修改描述符上设定的事件,需要用到由ev所指向的结构体中的信息。如果我们试图修改不在兴趣列表中的文件描述符,epoll_ctl()将出现ENOENT错误;
EPOLL_CTL_DEL将文件描述符fd从epfd的兴趣列表中移除,该操作参数忽略参数ev。如果我们试图移除一个不在epfd的兴趣列表中的文件描述符,epoll_ctl()将出现ENOENT错误。关闭一个文件描述符将会自动将其从所有的epoll实例的兴趣列表移除;

(3)第三个参数fd:指明了要修改兴趣列表中的哪一个文件描述符的设定。该参数可以是代表管道、FIFO、套接字、POSIX消息队列、inotify实例、终端、设备,甚至是另一个epoll实例的文件描述符。但是,这里fd不能作为普通文件或目录的文件描述符。

(4)第四个参数ev:是一个指向结构体epoll_event的指针,指需要监视该文件描述符上的哪些事件。结构体的定义如下:

struct? ? ? ? epoll_event

{

? ? ? ? uint32_t? ? ? ? events;? ? ? ? ? ? ? ? /*epoll events*/

? ? ? ? epoll_data_t? ? ? ? data;? ? ? ? ? ? ?/*User? data*/

};

typedef? ? union? ? ? ? epoll_data

{

? ? ? ? void? ? ? ? *ptr;

? ? ? ? int? ? ? ? fd;

? ? ? ? uint32_t? ? ? ? u32;

? ? ? ? uint64_t? ? ? ? u64;

}epoll_data_t;

?struct epoll_event结构中有两个成员,第一个成员events表示的是需要监视的事件,第二个成员data是一个联合体结构,一般选择使用该结构当中的fd,表示需要监听的文件描述符。

4、事件等待:epoll_wait()

函数原型如下:

#include? ? <sys/epoll.h>

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

参数说明:

(1) 第一个参数epfd:epoll_create()的返回值;

(2)第二个参数events:内核会将已经就绪的事件拷贝到events数组当中(events不能是空指针,内核只负责将就绪事件拷贝到该数组中,不会帮我们在用户态中分配内存,空间由调用者负责申请);

(3)第三个参数maxevents:指定events数组里包含的元素个数;

(4)第四个参数timeout:用来确定epoll_wait()的阻塞行为,有如下几种:

timeout = -1调用将一直阻塞,直到兴趣列表中的文件描述符上有事件产生或者直到捕获到一个信号为止
timeout = 0执行一次非阻塞式地检查,看兴趣列表中的描述符上产生了哪个事件
timeout > 0调用将阻塞至多timeout毫秒,直到文件描述符上有事件发生,或者直到捕获到一个信号为止

说明:数组events中,每个元素返回的都是单个就绪态文件描述符的信息。events字段返回了在该描述符上已经发生的事件掩码。data字段返回的是我们在描述符上使用epoll_ctl()注册感兴趣的事件时在ev.data上所指定的值。注意,data字段是唯一可获知同这个事件相关的文件描述符的途径。因此,当我们调用epoll_ctl()将文件描述符添加到感兴趣列表中时,应该要么将ev.data.fd设为文件描述符号,要么将ev.data.ptr设为指向包含文件描述符号的结构体。

当我们调用epoll_ctl()时可以在ev.events中指定的位掩码以及由epoll_wait()返回的evlist[].events中的值如下所示:

常量说明作为epoll_ctl()的输入作为epoll_wait()的返回
EPOLLIN可读取非高优先级数据
EPOLLPRI可读取高优先级数据
EPOLLRDHUPsocket对端关闭
EPOLLOUT普通数据可写
EPOLLET采用边沿触发事件通知
EPOLLONESHOT在完成事件通知之后禁用检查
EPOLLERR有错误发生
POLLHUP出现挂断

默认情况下,一旦通过epoll_ctl()的EPOLL_CTL_ADD操作将文件描述符添加到epoll实例的兴趣列表中后,它会保持激活状态(即,之后对epoll_wait()的调用会在描述符处于就绪态时通知我们)直到我们显示地通过epoll_ctl()的EPOLL_CTL_DEL操作将其从列表中删除。如果我们希望在某个特定的文件描述符上只得到一次通知,那么可以在传给epoll_ctl()的ev.events中指定EPOLLONESHOT标志。如果指定了这个标志,那么在下一个epoll_wait()调用通知我们对应的文件描述符处于就绪态之后,这个描述符就会在兴趣列表中被标记为非激活态,之后的epoll_wait()调用都不会再通知我们有关这个描述符的状态了。如果需要,我们可以稍后用过调用epoll_ctl()的EPOLL_CTL_MOD操作重新激活对这个文件描述符的检查。

5、epoll编写服务器

epoll的工作原理与红黑树有关,关于红黑树的知识,我会在接下来的文章进行讲解,接下来的代码主要还是为了会用epoll函数和epoll的工作流程,代码如下:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <sys/resource.h>
#include <libgen.h>

#define ARRAY_SIZE(x)           (sizeof(x)/sizeof(x[0]))
#define MAX_EVENTS              512
#define PORT                    8888

int socket_server_init(char *listen_ip,int listen_port);
void set_socket_rlimit(void);

int main(int argc,char *argv[])
{

        int                     listenfd,connfd;
        int                     rv;
        int                     i;
        int                     found;
        char                    buf[1024];

        int                     epollfd;
        struct epoll_event      event;
        struct epoll_event      event_array[MAX_EVENTS];
        int                     events;

        set_socket_rlimit();    /*进程间资源限制*/

        if((listenfd = socket_server_init(NULL,PORT)) < 0)
        {

                printf("ERROR:listen on port\n");
                return -2;

        }
        printf("server start to listen on port\n");

        if((epollfd=epoll_create(MAX_EVENTS)) < 0)      /*创建epoll实例*/
        {

                printf("epoll_create() failure:%s\n",strerror(errno));
                return -3;

        }

        event.events = EPOLLIN;
        event.data.fd = listenfd;

        if(epoll_ctl(epollfd,EPOLL_CTL_ADD,listenfd,&event) < 0)        /*将监听套接字加入到epoll实例中*/
        {

                printf("epoll add listen socket failure:%s\n",strerror(errno));
                return -4;

        }

        while(1)
        {

                events = epoll_wait(epollfd,event_array,MAX_EVENTS,-1);/*调用epoll_wait()将就绪态文件描述符加入到eve
nt-array数组中*/
                if(events < 0)
                {

                        printf("epoll failure:%s\n",strerror(errno));
                        break;

                }

                else if(events == 0)
                {

                        printf("epoll get timeout\n");
                        continue;

                }

                /*rv>0 is the active events count*/
                for(i=0 ; i<events ; i++)
                {

                        if((event_array[i].events&EPOLLERR) || (event_array[i].events&EPOLLHUP))
                        {

                                printf("epoll_wait get error on fd[%d]:%s\n",event_array[i].data.fd,strerror(errno))
;
                                epoll_ctl(epollfd,EPOLL_CTL_DEL,event_array[i].data.fd,NULL);
                                close(event_array[i].data.fd);

                        }

                        /*listen socket get event means new client start connect now*/
                        if(event_array[i].data.fd == listenfd)
                        {

                                if((connfd = accept(listenfd,(struct sockaddr *)NULL,NULL)) < 0)
                                {

                                        printf("accept new client failure:%s\n",strerror(errno));
                                        continue;

                                }

                                event.data.fd = connfd;
                                event.events  = EPOLLIN;
                               if(epoll_ctl(epollfd,EPOLL_CTL_ADD,connfd,&event) < 0)
                                {

                                        printf("epoll and client socket failure:%s\n",strerror(errno));
                                        close(event_array[i].data.fd);
                                        continue;

                                }
                                printf("epoll and new client socket[%d] ok\n",connfd);
                        }


                        /*already connected client socket get data incoming*/
                        else
                        {

                                memset(buf,0,sizeof(buf));
                                if((rv = read(event_array[i].data.fd,buf,sizeof(buf))) <= 0)
                                {

                                        printf("socket[%d] read failure or get disconnected and will be removed\n",e
vent_array[i].data.fd);
                                        epoll_ctl(epollfd,EPOLL_CTL_DEL,event_array[i].data.fd,NULL);
                                        close(event_array[i].data.fd);
                                        continue;

                                }
 else
                                {

                                        printf("socket[%d] read get %d bytes data:%s\n",event_array[i].data.fd,rv,bu
f);

                                        if(write(event_array[i].data.fd,buf,rv) < 0)
                                        {

                                                printf("socket[%d] write failure :%s\n",event_array[i].data.fd,strer
ror(errno));
                                                epoll_ctl(epollfd,EPOLL_CTL_DEL,event_array[i].data.fd,NULL);

                                                close(event_array[i].data.fd);

                                        }
                                }
                        }
                }
        }
        close(listenfd);
        return 0;
}
int socket_server_init(char *listen_ip,int listen_port)
{

        struct sockaddr_in              servaddr;
        int                             rv = 0;
        int                             listenfd;

        if( (listenfd = socket(AF_INET,SOCK_STREAM,0)) < 0 )
        {

                printf("socket fd create failure:%s\n",strerror(errno));
                return -1;

        }

        memset(&servaddr,0,sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        servaddr.sin_port = htons(listen_port);

        if( !listen_ip )
        {

                servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

        }
        else
        {

                if(inet_pton(AF_INET,listen_ip,&servaddr.sin_addr) <= 0)
                {

                        printf("inet_pton() set listen ip address failure\n");
                        rv = -1;
                        goto cleanup;

                }

        }

        if(bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr)) < 0)
        {

                printf("bind the socket failure:%s\n",strerror(errno));
                rv = -3;
                goto cleanup;

        }

        if(listen(listenfd,64) <0)
        {

                printf("listen to the socket failure:%s\n",strerror(errno));
                rv =-4;
                goto cleanup;

        }

cleanup:
        if(rv < 0)
                close(listenfd);
        else
                rv = listenfd;

        return rv;

}

void set_socket_rlimit(void)
{

        struct rlimit limit = {0};

        getrlimit(RLIMIT_NOFILE,&limit);
        limit.rlim_cur = limit.rlim_max;
        setrlimit(RLIMIT_NOFILE,&limit);

        printf("set socket open fd max count to %d\n",limit.rlim_max);

}

对于代码中set_socket_rlimit(有关进程间资源限制)不理解的同学可以看《APUE学习之进程资源限制》这篇文章。

6、epoll的优点

(1)接口使用方便:虽然拆分成了三个函数,但是反而使用起来更方便高效。
(2)数据拷贝轻量:只在新增监视事件的时候调用epoll_ctl将数据从用户拷贝到内核,而select和poll每次都需要重新将需要监视的事件从用户拷贝到内核。此外,调用epoll_wait获取就绪事件时,只会拷贝就绪的事件,不会进行不必要的拷贝操作。
(3)事件回调机制:避免操作系统主动轮询检测事件就绪,而是采用回调函数的方式,将就绪的文件描述符结构加入到就绪队列中。调用epoll_wait时直接访问就绪队列就知道哪些文件描述符已经就绪。
(4)没有数量限制:监视的文件描述符数目无上限,只要内存允许,就可以一直向红黑树当中新增节点。

注意:

????????有人说epoll中使用了内存映射机制,内核可以直接将底层就绪队列通过mmap的方式映射到用户态,此时用户就可以直接读取到内核中就绪队列当中的数据,避免了内存拷贝的额外性能开销。
这种说法是错误的,实际操作系统并没有做任何映射机制,因为操作系统是不相信任何人的,操作系统不会让用户进程直接访问到内核的数据的,用户只能通过系统调用来获取内核的数据。
因此用户要获取内核当中的数据,势必还是需要将内核的数据拷贝到用户空间。

7、epoll工作方式

上文已经提到过,epoll不同于selec/poll,epoll有两种工作方式,分别是水平触发和边缘触发。接下来为大家分别讲解:

水平触发(LT,Level Triggered)

?

(1)epoll默认状态下就是LT工作模式。

(2)由于在LT工作模式下,只要底层有事件就绪就会一直通知用户,因此当epoll检测到底层读事件就绪时,可以不立即进行处理,或者只处理一部分,因为只要底层数据没有处理完,下一次epoll还会通知用户事件就绪。

(3)select和poll其实就是工作是LT模式下的。

(4)支持阻塞读写和非阻塞读写。

边缘触发(ET,Edge Triggered)

(1)如果要将epoll改为ET工作模式,则需要在添加事件时设置EPOLLET选项。

(2)由于在ET工作模式下,只有底层就绪事件无到有或由有到多发生变化的时候才会通知用户,因此当epoll检测到底层读事件就绪时,必须立即进行处理,而且必须全部处理完毕,因为有可能此后底层再也没有事件就绪,那么epoll就再也不会通知用户进行事件处理,此时没有处理完的数据就相当于丢失了。

(3)ET工作模式下epoll通知用户的次数一般比LT少,因此ET的性能一般比LT性能更高,Nginx就是默认采用ET模式使用epoll的。

(4)只支持非阻塞的读写。

对比LT和ET?

(1)在ET模式下,一个文件描述符就绪之后,用户不会反复收到通知,看起来比LT更高效,但如果在LT模式下能够做到每次都将就绪的文件描述符立即全部处理,不让操作系统反复通知用户的话,其实LT和ET的性能也是一样的。

(2)此外,ET的编程难度较高。

文章来源:https://blog.csdn.net/m0_65292176/article/details/135695830
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。