写在前面
Redis作为我们日常工作中最常使用的缓存数据库,其重要性不言而喻,作为普调开发者,我们在日常开发中使用Redis,主要聚焦于Redis的基层数据结构的命令使用,很少会有人对Redis的内部实现机制进行了解,对于我而言,也是如此,但一直以来,我对于Redis的内部实现都很好奇,它为什么会如此高效,本系列文章是旨在对Redis源代码分析拆解,通过阅读Redis源代码,了解Redis基础数据结构的实现机制。
关于Redis的源码分析,已经有非常多的大佬写过相关的内容,最为著名的是《Redis设计与实现》,对于Redis源码的分析已经非常出色,本系列文章对于源码拆解时,并不会那么详细,相信大部分读者应该不是从事Redis的二次开发工作,对于源码细节过于深入,会陷入细节的泥潭,这是我在阅读源码时尽量避免的,我尽量做到对大体的脉络进行梳理,讲清楚主干逻辑,细节部分,如果读者有兴趣,可以自行参阅源码或相关资料。
本系列源代码,基于Redis 3.2.6
毫无疑问,Redis已经成为我们日常开发中最长使用的缓存数据库,Redis如此高效的原因,是因为采用了非阻塞I/O模型来处理命令请求,这是我们耳熟能详的事情了,那么Redis具体是如何实现非阻塞I/O的呢?Redis是如何接收命令请求,并执行命令,再返回给客户端的呢?我们来一起探究。
本篇是Redis源码分析系列的第一篇,我们来一起看一下Redis处理命令的核心实现机制。
我们可以思考一下,如果使用Java实现Redis,应该会怎么样?
我们需要编写main函数,在main中初始化Redis的配置,然后实现一些Servlet的接口,处理命令请求,然后使用一个NIO框架,处理请求命令,最后将结果返回给客户端。事实上,Redis的整体结构上,的确是这样实现的,首先第一步,Redis需要一个main函数,作为Redis的启动入口,在main中,需要做一系列的事情。
那由此为引,我们来看一下Redis的main函数实现。
阅读Redis的源码,一切的起点在server.c中,在该文件中,定义了main函数,作为整个工程的入口:
int main(int argc, char **argv) {
struct timeval tv;
int j;
// 省略,各种初始化操作检查
......
// 核心1:初始化Server配置
initServerConfig();
// 从配置文件中加载配置信息
loadServerConfig(configfile, options);
// 省略,各种初始化操作检查
......
// 核心2:初始化Server
// 重点如: 绑定监听端口号,设置 acceptTcpHandler 回调函数
initServer();
// 省略,各种初始化操作检查
......
// 从硬盘恢复数据,RBD/AOF
loadDataFromDisk();
// 核心3:设置核心函数beforeSleep,用于Redis进入事件驱动库的主循环之前被调用
// 后面再讲
aeSetBeforeSleepProc(server.el,beforeSleep);
// 核心4:核中核,主函数循环,处理命令请求的核心函数
aeMain(server.el);
// 核心5:关闭服务,收尾工作
aeDeleteEventLoop(server.el);
return 0;
}
上面就是server.c中的main函数实现,这里我删除了很多非核心的检查方法,可以更清晰的聚焦mian函数的核心步骤,简单归纳一下main函数中都做了哪些事情:
1、Redis 会设置一些回调函数,当前时间,随机数的种子。回调函数实际上什么?举个例子,比如要给 Redis 发送一个关闭的命令,让它去做一些优雅的关闭,做一些扫尾清楚的工作,这个工作如果不设计回调函数,它其实什么都不会干。其实 C 语言的程序跑在操作系统之上,Linux 操作系统本身就是提供给我们事件机制的回调注册功能,所以它会设计这个回调函数,让你注册上,关闭的时候优雅的关闭,然后它在后面可以做一些业务逻辑。
2、不管任何软件,肯定有一份配置文件需要配置。首先在服务器端会把它默认的一份配置做一个初始化。
3、解析启动的参数。其实不管什么软件,它在初始化的过程当中,配置都是由两部分组成的。
第一部分,静态的配置文件;第二部分,动态启动的时候,main,就是参数给它的时候进去配置。
4、把服务端的东西拿过来,装载 Config 配置文件,loadServerConfig。
5、初始化服务器,initServer。
6、从磁盘装载数据。
7、有一个主循环程序开始干活,用来处理客户端的请求,并且把这个请求转到后端的业务逻辑,帮你完成命令执行,然后吐数据。
就这么一个过程。
OK,继续主题,我们希望了解是如何处理命令请求的,如果是Java语言,我们需要定义Servlet接口,并监听特定的端口,比如8080端口,以此来接收来自客户端的请求,但是对于C,是没有Servlet的,如果希望接收网络请求调用,需要通过socket进行网络通信,下面我们看一下Redis如何注册socket:
server.c initServer()
void initServer(void) {
// 省略,各种初始化操作检查
......
// 核心1:创建epoll
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
// 省略,各种初始化操作检查
......
// 核心2:创建socket监听
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,
server.unixsocketperm, server.tcp_backlog);
if (server.sofd == ANET_ERR) {
serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
exit(1);
}
anetNonBlock(NULL,server.sofd);
}
// 省略,各种初始化操作检查
......
// 核心3:创建定时任务
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create the serverCron time event.");
exit(1);
}
// 核心4:重点,核中核,通过aeCreateFileEvent创建epoll监听socket,设置行为为READABLE,
// 并注册回调函数,当socket接收到套接字时,会触发执行回调函数
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
// 省略,各种初始化操作检查
......
}
上面就是initServer()函数的实现,老规矩,我删除了非核心部分的实现,简单归纳一下initServer()函数中都做了哪些事情:
1、创建非阻塞I/O,这里就是我们最常说的Redis的非阻塞I/O,这里Redis具体使用哪一种非阻塞I/O框架,取决于操作系统的具体实现,大部分场景下,我们的服务器使用Linux CentOS,其默认实现即为epoll
由于C中并没有Java语言中的多态,这里作者采用了一种精妙的方式实现了“多态”:
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
2、创建socket监听
3、创建系统定时任务,后台线程执行,例如过期Key扫描淘汰
4、通过aeCreateFileEvent创建epoll监听socket,设置行为为READABLE,并注册回调函数,当socket接收到套接字时,会触发执行回调函数
networking.c acceptTcpHandler()
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
// 省略部分代码
......
while(max--) {
// 获取socket数据
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
// 处理命令请求
acceptCommonHandler(cfd,0,cip);
}
}
acceptTcpHandler函数中,处理socket连接,并进行命令请求处理,进入命令处理函数acceptCommonHandler
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
// 创建redis连接
if ((c = createClient(fd)) == NULL) {
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}
// 省略部分代码
......
server.stat_numconnections++;
c->flags |= flags;
}
networking.c createClient()
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// 核心,注册命令处理事件,设置回调函数readQueryFromClient
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
// 省略部分代码
......
return c;
}
createClient函数中,注册命令处理事件到epoll,并设置回调函数readQueryFromClient,当socket数据读取完成时,将执行回调函数,真正的进行命令执行,我们继续看回调函数readQueryFromClient的实现
networking.c readQueryFromClient()
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
// 省略部分代码
......
// 从socket fd中读取数据
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
if (errno == EAGAIN) {
return;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
} else if (nread == 0) {
serverLog(LL_VERBOSE, "Client closed connection");
freeClient(c);
return;
}
// 省略部分代码
......
// 处理命令
processInputBuffer(c);
}
void processInputBuffer(client *c) {
server.current_client = c;
/* Keep processing while there is something in the input buffer */
while(sdslen(c->querybuf)) {
// 省略部分代码
......
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
// 执行命令
if (processCommand(c) == C_OK)
resetClient(c);
/* freeMemoryIfNeeded may flush slave output buffers. This may result
* into a slave, that may be the active client, to be freed. */
if (server.current_client == NULL) break;
}
}
server.current_client = NULL;
}
server.c processCommand()
int processCommand(client *c) {
// 省略部分代码
......
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
flagTransaction(c);
addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[0]->ptr);
return C_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return C_OK;
}
// 省略部分代码
......
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
// 2528行,真正执行命令
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return C_OK;
}
server.c call()
void call(client *c, int flags) {
// 省略部分代码
......
// 核心,执行命令对应的函数调用
c->cmd->proc(c);
duration = ustime()-start;
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;
// 省略部分代码
......
}
上述是命令处理的几个核心函数,这里我省略了部分非核心逻辑,聚焦命令处理的主流程,在回调函数readQueryFromClient中主要做了几个事情:
1、从socket fd中读取数据
2、解析socket数据,解析命令,查找命令是否存在
3、执行命令对应的函数调用
4、重置Client,已备下一次请求处理
server.c redisCommandTable
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
{"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
{"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
{"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
{"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
{"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
{"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
{"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
{"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
{"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},
{"bitfield",bitfieldCommand,-2,"wm",0,NULL,1,1,1,0,0},
{"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0},
{"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
{"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
{"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0},
{"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0},
{"mget",mgetCommand,-2,"r",0,NULL,1,-1,1,0,0},
{"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
{"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
{"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
{"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
{"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},
{"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},
{"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},
// 省略部分代码
..........
};
上述代码,就是命令请求对应的函数列表,也就是我们最熟悉的Redis命令,就此我们可以串起一条命令请求的执行过程,为了方便理解,我们用一张流程图说明一条命令的执行过程:
本篇,我们通过Redis源代码,了解了一部分Redis处理命令请求的核心流程,之所以说是一部分,是因为我是按照Redis命令处理的逻辑流程进行的拆解,而并非真正的执行过程,Redis的如此高性能的根本,是基于epoll的非阻塞机制实现,在下一篇中,我们将重点介绍Redis的epoll实现机制,敬请期待。