浅析Redis②:命令处理之epoll实现(中)

发布时间:2024年01月24日

写在前面

Redis作为我们日常工作中最常使用的缓存数据库,其重要性不言而喻,作为普通开发者,我们在日常开发中使用Redis,主要聚焦于Redis的基层数据结构的命令使用,很少会有人对Redis的内部实现机制进行了解,对于我而言,也是如此,但一直以来,我对于Redis的内部实现都很好奇,它为什么会如此高效,本系列文章是旨在对Redis源代码分析拆解,通过阅读Redis源代码,了解Redis基础数据结构的实现机制。

关于Redis的源码分析,已经有非常多的大佬写过相关的内容,最为著名的是《Redis设计与实现》,对于Redis源码的分析已经非常出色,本系列文章对于源码拆解时,并不会那么详细,相信大部分读者应该不是从事Redis的二次开发工作,对于源码细节过于深入,会陷入细节的泥潭,这是我在阅读源码时尽量避免的,我尽量做到对大体的脉络进行梳理,讲清楚主干逻辑,细节部分,如果读者有兴趣,可以自行参阅源码或相关资料。

本系列源代码,基于Redis 3.2.6

前言

在上一篇中浅析Redis①:命令处理核心源码分析(上),我们大致了解了Redis客户端命令请求的处理流程,在整个流程中,我们还有两个问题没有解释:

1、非阻塞的核心epoll是如何实现的?

2、Redis是如何将数据写回Client端的?

本篇我们就围绕第一个问题,寻找答案,继续看Redis客户端命令请求的处理流程。

Redis的epoll实现

Redis的非阻塞I/O是指Redis在处理客户端请求时,不会一直等待I/O操作完成,而是会尽快返回,并在I/O操作完成后通知Redis进行后续处理。

epoll作为非阻塞I/O的实现,是Linux内核提供的一种多路I/O复用机制。epoll可以监视多个文件描述符,一旦某个文件描述符就绪,epoll就会通知Redis进行后续处理。

Redis的非阻塞I/O模型可以提高并发处理能力,在阻塞I/O模型中,Redis在处理一个客户端请求时,如果遇到I/O操作,会一直等待I/O操作完成,这意味着Redis无法处理其他客户端的请求。

而在非阻塞I/O模型中,Redis在遇到I/O操作时,会尽快返回,并在I/O操作完成后通知Redis进行后续处理。这样,Redis就可以同时处理多个客户端的请求,提高了并发处理能力。

同时非阻塞I/O模型还可以减少Redis的CPU占用率。在阻塞I/O模型中,Redis在遇到I/O操作时,会一直等待I/O操作完成,这意味着Redis的CPU会一直处于占用状态。

在非阻塞I/O模型中,Redis在遇到I/O操作时,会尽快返回,并在I/O操作完成后通知Redis进行后续处理。这样CPU就不会一直处于占用状态,可以减少CPU占用率,提升CPU使用效率。

核心实现

Redis非阻塞IO的实现是基于OS的内核函数支持,源码逻辑如下:

redis.c中main方法启动,执行initServer()初始化redis配置,同时创建非阻塞事件监听器:

server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);


for (j = 0; j < server.ipfd_count; j++) {
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
                          acceptTcpHandler,NULL) == AE_ERR)
    {
        redisPanic(
            "Unrecoverable error creating server.ipfd file event.");
    }
}

其中,ipfd_count默认参数为1024,该参数表示Redis可以同时处理的最大TCP连接数。

aeCreateEventLoop与aeCreateFileEvent的实现逻辑在ae.c文件中:

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

    err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
                      aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

其中aeApiCreate()是核心创建逻辑,aeApiCreate()方法采用了类似Java中多态的实现方式,由于C本身并不支持多态,因此需要使用C中的技巧实现:

/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

这段代码是Redis中的一个条件编译语句,用于根据不同的操作系统和编译器选择不同的事件驱动库。

事件驱动库是Redis的一个核心组件,用于处理各种事件,包括网络IO事件、定时器事件等。Redis支持多种事件驱动库,比如epoll、kqueue、select等。在编译Redis时,需要根据操作系统和编译器选择合适的事件驱动库进行编译。

这段代码中,首先判断是否定义了HAVE_EVPORT宏。如果定义了该宏,则使用ae_evport.c文件中的事件驱动库,否则继续判断是否定义了HAVE_EPOLL宏。如果定义了该宏,则使用ae_epoll.c文件中的事件驱动库,否则继续判断是否定义了HAVE_KQUEUE宏。如果定义了该宏,则使用ae_kqueue.c文件中的事件驱动库,否则使用ae_select.c文件中的事件驱动库。

这种条件编译技术可以使Redis在不同操作系统和编译器下具有更好的兼容性和可移植性,使得Redis可以在不同的平台上运行,并且可以充分发挥不同平台的优势。

简言之,就是根据不同的操作系统,决定选择不同的内核IO模型,优先级: evport > epoll > kqueue > select

关于系统内核实现,参考:

#ifdef HAVE_EVPORT: 如果定义了宏 HAVE_EVPORT,则包含文件 ae_evport.c。ae_evport.c 可能包含了 Solaris 10 系统使用的事件驱动库。

#else: 如果没有定义宏 HAVE_EVPORT,则继续处理后续代码。

#ifdef HAVE_EPOLL: 如果定义了宏 HAVE_EPOLL,则包含文件 ae_epoll.c。ae_epoll.c 可能包含了 Linux 系统使用的事件驱动库 epoll。

#else: 如果没有定义宏 HAVE_EPOLL,则继续处理后续代码。

#ifdef HAVE_KQUEUE: 如果定义了宏 HAVE_KQUEUE,则包含文件 ae_kqueue.c。ae_kqueue.c 可能包含了 FreeBSD 或 macOS 系统使用的事件驱动库 kqueue。

#else: 如果没有定义宏 HAVE_KQUEUE,则包含文件 ae_select.c。ae_select.c 可能包含了所有系统都支持的 select 事件驱动库,但效率较低。

#endif: 结束条件编译语句块。

我们常用的CentOS使用的是epoll的实现,在Linux系统中,epoll机制是一种高效的事件触发机制,可以监听大量的文件描述符,并在文件描述符上发生事件时,立即通知应用程序。使用epoll机制时,需要使用epoll_create函数创建一个epoll对象,然后使用epoll_ctl函数向epoll对象添加或删除文件描述符,最后使用epoll_wait函数等待事件的发生。

epoll的实现在ae_epoll.c文件中:

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;
    return 0;
}

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee;
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
        EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.u64 = 0; /* avoid valgrind warning */
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

epoll_create是Linux系统中的一个系统调用,用于创建一个epoll对象,以便对文件描述符进行事件监听。

在Linux系统中,如果需要对多个文件描述符进行事件监听,常用的方式是使用select或poll函数。但是随着文件描述符数量的增加,select和poll函数的效率会逐渐降低,因为它们需要遍历所有的文件描述符,而无法实现快速的事件通知。为了解决这个问题,Linux引入了epoll机制,通过epoll_create系统调用创建一个epoll对象,然后使用epoll_ctl函数向epoll对象添加或删除文件描述符,最后使用epoll_wait函数等待事件的发生。

epoll_create函数的原型如下:

#include <sys/epoll.h>
int epoll_create(int size);

其中,size参数表示epoll对象中能够监听的最大文件描述符数量,这个参数在Linux 2.6.8之后已经无效,可以忽略。epoll_create函数返回一个整数类型的文件描述符,表示创建的epoll对象的标识符。如果创建失败,返回-1。

需要注意的是,使用epoll_create函数创建的epoll对象是在内核中创建的,而不是在用户空间中创建的。因此,在使用epoll机制时,需要将文件描述符设置为非阻塞模式,并且需要使用epoll_ctl函数向内核注册文件描述符,从而实现文件描述符的事件监听。

epoll_ctl是Linux系统中的一个系统调用,用于向epoll对象中添加或删除文件描述符,并设置对应的事件类型。

在Linux系统中,epoll机制是一种高效的事件触发机制,可以监听大量的文件描述符,并在文件描述符上发生事件时,立即通知应用程序。使用epoll机制时,需要使用epoll_create函数创建一个epoll对象,然后使用epoll_ctl函数向epoll对象添加或删除文件描述符,最后使用epoll_wait函数等待事件的发生。

epoll_ctl函数的原型如下:

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

其中,epfd参数表示epoll对象的文件描述符,op参数表示操作类型,可以是EPOLL_CTL_ADD、EPOLL_CTL_MOD或EPOLL_CTL_DEL,分别表示添加、修改或删除文件描述符。fd参数表示要添加、修改或删除的文件描述符,event参数表示要监听的事件类型,包括读事件、写事件等。

需要注意的是,使用epoll_ctl函数添加、修改或删除文件描述符时,需要将文件描述符设置为非阻塞模式。在调用epoll_wait函数等待事件时,如果有事件发生,epoll_wait函数会返回一组事件列表,然后可以处理这些事件。处理完毕后,可以使用epoll_ctl函数修改或删除已经处理过的文件描述符,然后再次调用epoll_wait函数等待事件的发生。

epoll_wait是一个Linux内核提供的系统调用,用于等待文件描述符上的事件。epoll是Linux内核提供的一种多路I/O复用机制,可以监视多个文件描述符,一旦某个文件描述符就绪,epoll就会通知用户进程进行后续处理。

epoll_wait的函数原型如下:

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

参数说明如下:

  • epfd:epoll实例的文件描述符。
  • events:用于存放就绪文件描述符的数组。
  • maxevents:events数组的大小。
  • timeout:等待事件的超时时间,单位为毫秒。

epoll_wait的返回值如下:

  • 成功时,返回就绪文件描述符的数目。
  • 出错时,返回-1。

epoll_wait的使用步骤如下:

  1. 创建一个epoll实例,并获取其文件描述符。
  2. 将需要监视的文件描述符注册到epoll实例中。
  3. 调用epoll_wait函数,等待事件。
  4. 处理就绪文件描述符上的事件。

以下是epoll_wait的使用示例:

#include <sys/epoll.h>
int main() {
    // 创建一个epoll实例
    int epfd = epoll_create(1024);
    if (epfd == -1) {
        perror("epoll_create");
        return -1;
    }

    // 将需要监视的文件描述符注册到epoll实例中
    struct epoll_event event;
    event.events = EPOLLIN;
    event.data.fd = 0; // 标准输入
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, 0, &event) == -1) {
        perror("epoll_ctl");
        return -1;
    }

    // 等待事件
    struct epoll_event events[10];
    int nfds = epoll_wait(epfd, events, 10, -1);
    if (nfds == -1) {
        perror("epoll_wait");
        return -1;
    }

    // 处理就绪文件描述符上的事件
    for (int i = 0; i < nfds; ++i) {
        if (events[i].events & EPOLLIN) {
            // 读取标准输入
            char buf[1024];
            int n = read(events[i].data.fd, buf, sizeof(buf));
            if (n == -1) {
                perror("read");
                return -1;
            }
            // ...
        }
    }
    return 0;
}

在上述示例中,我们创建了一个epoll实例,并将标准输入注册到epoll实例中。然后,我们调用epoll_wait函数,等待标准输入上的数据到达。如果标准输入上有数据到达,epoll_wait函数就会返回,并将就绪文件描述符的相关信息保存在events数组中。最后,我们遍历events数组,处理每个就绪文件描述符上的事件。

epoll_wait是Linux内核提供的一种高效的多路I/O复用机制。它可以提高程序的并发处理能力,减少CPU占用率。

ae_epoll.c文件中封装了一系列的epoll操作,包括epoll的创建、新增、删除、扩容、等待。

那这个非阻塞IO是怎么工作的?

核心关注aeApiPoll()

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
                        tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

epoll_wait等待FD的就绪通知,如果FD准备完毕,则进行数据流处理,否则就阻塞等待,在Redis启动时,会在main函数中创建一个死循环,轮询监听epoll事件,当有事件就绪时,执行事件的回调函数,即我们上一篇中所讲到的,具体的命令执行函数。

ae.c aeMain()

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    //省略部分非核心代码
    .....
        
		// 等待epoll就绪事件
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

	    /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            
            // 核心:执行命令对应的回调函数
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 核心:执行命令执行结果数据,写回客户端,回调函数
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

就此,命令执行流程的epoll部分,就此完成,我们还是用一张图描述整个执行过程:
redis命令执行

结语

本篇,我们对Redis源码中非阻塞的核心epoll是如何实现进行了浅析,简单了解了Redis中epoll的工作流程,至此,我们已经大体了解了Redis如何处理执行来自客户端的命令请求,但是还有一个问题我们没有清楚,Redis是如何将命令读取到的数据返回客户端的,下一篇中,我们将围绕这个问题,进行拆解,敬请期待。

文章来源:https://blog.csdn.net/wtopps/article/details/135767929
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。