本篇从源码的角度介绍下Redis的代码初始化流程和事件循环的结构。
【server.c】main函数入口:
Redis实现了一个简单的事件驱动程序库,即 ae.c 的代码,它屏蔽了系统底层在事件处理上的差异,并实现了事件循环机制。
Redis将事件处理分为两大类:文件事件与时间事件。文件事件即客户端和服务器在交互过程中socket的可读可写事件,时间事件即需要周期性执行的一些定时任务(如定时清除超时客户端连接,定时删除过期键等)。Redis采用比较成熟的I/O多路复用模型(select/epoll等)处理文件事件,并对这些I/O多路复用模型进行简单封装。
系统底层的I/O多路复用机制:能够同时等待I/O和timer这两种事件的发生。在不同的系统上,存在多种不同的I/O多路复用机制。
事件处理框架非常简单,从初始化、服务到结束,分别对应的函数:aeCreateEventLoop、aeMain、aeDeleteEventLoop。 其中,aeMain是事件循环的主体函数,它又会调用 aeProcessEvents函数,三个主体函数会调用aeApiCreate、aeApiPool、aeApiFree三个接口函数进行处理。
事件机制处理流程:
Redis 服务器基本数据结构和各种参数的初始化。
initServerConfig
函数初始化 redisServer ==> 保证Redis的内部数据结构及参数都有缺省值
struct redisServer {
//...
char *configfile; // 配置文件绝对路径
redisDb *db; // 数据库数组
dict *commands; // 命令字典,Redis支持的所有命令都存储在这个字典中
aeEventLoop *el; // 事件循环结构
int port; // 服务器监听的端口号
char *bindaddr[CONFIG_BINDADDR_MAX]; // 绑定的ip地址
int ipfd[CONFIG_BINDADDR_MAX]; // 针对ip地址创建的文件描述符
list *clients; // 当前连接到Redis服务器的所有客户端
int maxidletime; // 最大空闲时间
// 持久化/主从/集群等相关的参数
}
然后,从 redis.conf 中加载并解析配置文件 ==> 自定义配置,对某些参数进行覆盖
void loadServerConfig(char *filename, char *options)
Redis服务器是典型的事件驱动程序,它将事件处理分为两大类:文件事件与时间事件,它们都封装在结构体aeEventLoop中:
typedef struct aeEventLoop {
int stop; // 标识事件循环是否结束
aeFileEvent *events; // 文件事件数组,存储已经注册的文件事件
aeFiredEvent *fired; // 储被触发的文件事件
aeTimeEvent *timeEventHead; // 多个时间事件形成链表,为时间事件链表头节点
void *apidata; // 对4种I/O多路复用模型的进一步封装
aeBeforeSleepProc *beforesleep; // 阻塞等待文件事件发生之前会调用beforesleep函数
aeBeforeSleepProc *aftersleep; // 进程被唤醒之后会调用aftersleep函数
} aeEventLoop;
因此,我们需要创建 aeEventLoop,分配结构体所需内存,并初始化结构体各字段 ==> 依赖系统底层的I/O多路复用机制。
aeEventLoop *aeCreateEventLoop(int setsize) {
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (aeApiCreate(eventLoop) == -1) goto err;
}
三种监听:TCP连接、Unix domain socket连接、TLS连接
Unix domain socket:一种高效的进程间通信机制,省去了协议栈的开销,比使用TCP协议性能更好。 从 Redis 6 开始支持 SSL / TLS,这是一项可选功能,需要在编译时启用;但 TLS当前不支持I / O多线程。
监听socket主要是为了获取文件描述符,后面需要根据文件描述符去注册I/O事件回调。
int listenToPort(int port, int *fds, int *count) {
for (j = 0; j < server.bindaddr_count || j == 0; j++) {
//创建socket并启动监听,文件描述符存储在fds数组作为返回参数
fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
server.tcp_backlog);
//设置socket非阻塞
anetNonBlock(NULL,fds[*count]);
(*count)++;
}
}
注意:所有创建的socket都会设置为非阻塞模式,原因在于Redis使用了IO多路复用模式,其要求socket读写必须是非阻塞的,函数anetNonBlock通过系统调用fcntl设置socket非阻塞模式。
Redis服务器只维护了一个时间事件,该时间事件处理函数为serverCron,执行了所有需要周期性执行的一些定时任务,初次创建时1毫秒后就会被触发
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
exit(1);
}
serverCron函数:周期性地执行过期key的回收,主从重连、Cluster节点间的重连、BGSAVE、AOF rewrite的触发执行等 ==> 通过事件循环调度一些异步执行的任务
int serverCron(struct aeEventLoop *eventLoop, long long id, void
*clientData) {
run_with_period(100) {
//100毫秒周期执行
}
run_with_period(5000) {
//5000毫秒周期执行
}
//清除超时客户端连接
clientsCron();
//处理数据库
databasesCron();
server.cronloops++;
return 1000/server.hz;
}
serverCron由事件来驱动,执行还是在Redis主线程上,相当于和主线程上执行的其他操作(主要是对命令请求的执行)按时间进行分片
疑问:服务器内部不是有很多定时任务吗,为什么只有一个时间事件呢?
原因:Redis创建时间事件节点的函数为aeCreateTimeEvent,会创建时间事件并添加到时间事件链表。
文件事件:
Redis客户端通过TCP socket与服务端交互,文件事件指的就是socket的可读可写事件。
socket的读写事件被抽象为文件事件,因此,对于监听的socket还需要创建对应的文件事件
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR){
}
}
TCP 连接的I/O事件回调:acceptTcpHandler
TLS 连接的I/O事件回调:acceptTLSHandler
Unix domain socket连接的I/O事件回调:acceptUnixHandler
其他:通过pipe机制与module进行双向通信
通过bioInit
函数,在后台执行的一些额外的线程,用于处理一些比较耗时且可以被延迟执行的任务,如可以延迟执行的文件关闭操作(unlink)、AOF的持久化写库操作(fsync)、大key的清除操作
Redis的事件驱动也是通过while循环等待事件发生并处理:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
// 开始事件循环
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 事件处理主函数
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
Redis作为一个服务端程序,需要对客户端不停发送的请求做响应的处理,因此需要进入一个无线循环中。在每一次的循环中,如果有I/O事件发生,就会去处理这些事件。如果没有事件发生,则等待,把整个循环阻塞住一段时间,阻塞时间根据时间事件间隔所决定。
等待的事件发生的时候,程序会被重新唤醒,循环继续。
等待和唤醒的操作需要依赖底层系统实现。
需要能够同时等待timer和I/O两种事件的发生。要做到这一点,我们依赖系统底层的I/O多路复用机制(I/O multiplexing)。这种机制一般是这样设计的:它允许我们针对多个文件描述符来等待对应的I/O事件发生,并同时可以指定一个最长的阻塞超时时间。如果在这段阻塞时间内,有I/O事件发生,那么程序会被唤醒继续执行;如果一直没有I/O事件发生,而是指定的时间先超时了,那么程序也会被唤醒。对于timer事件的等待,就是依靠这里的超时机制。
IO多路复用:多个网络 I/O 复用一个或少量的线程来处理 Socket
socket读写操作有阻塞与非阻塞之分。采用阻塞模式时,一个进程只能处理一条网络连接的读写事件,为了同时处理多条网络连接,通常会采用多线程或者多进程,效率低下;非阻塞模式下,可以使用目前比较成熟的I/O多路复用模型,如select/epoll/kqueue/event ports,视不同操作系统而定。
Redis同时支持4种I/O多路复用模型,并将这些模型的API进一步统一封装,由文件ae_evport.c、ae_epoll.c、ae_kqueue.c和ae_select.c实现。
而Redis在编译阶段,会检查操作系统支持的I/O多路复用模型,并按照一定规则决定使用哪种模型。
【例如,在macOS上编译Redis,那么它底层会选用kqueue
;而如果在Linux上编译则会选择epoll】
以epoll为例,在Redis中对应的源文件为:ae_epoll.c
epoll是linux中IO多路复用的一种机制,通过一个进程监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),通过 callbak 回调通知机制,能够通知程序进行相应的读写操作。Redis并没有直接使用epoll提供的API,而是将其API进一步统一封装
主要有三个函数:
Redis封装函数 | linux函数 | 备注 |
---|---|---|
aeApiCreate | int epoll_create(int size) | 创建事件 |
aeApiAddEvent | int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) | 添加事件 |
aeApiDelEvent | int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) | 删除事件 |
aeApiPoll | int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout); | 等待事件 |
a.int epoll_create(int size);
创建一个epoll的句柄,当创建好epoll句柄后,它就是会占用一个fd值;size用来告诉内核这个监听的数目一共有多大。
b.int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
c.int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待事件的产生,类似于select()调用。返回需要处理的事件数目,如返回0表示已超时
epoll相比select/poll的优势:
整个 I/O 多路复用模块在事件循环看来就是一个输入事件、输出 aeFiredEvent
数组的一个黑箱
在这个黑箱中,使用 aeCreateFileEvent
、 aeDeleteFileEvent
来添加删除需要监听的文件描述符以及事件。
在对应事件发生时,当前单元格会“变色”表示发生了可读(黄色)或可写(绿色)事件,调用 aeApiPoll
时会把对应的文件描述符和事件放入 aeFiredEvent
数组,并在 processEvents
方法中执行事件对应的回调。
无论是文件事件还是时间事件都封装在结构体aeEventLoop中:
typedef struct aeEventLoop {
int stop; // 标识事件循环是否结束
aeFileEvent *events; // 文件事件数组,存储已经注册的文件事件
aeFiredEvent *fired; // 存储被触发的文件事件
aeTimeEvent *timeEventHead; // 多个时间事件形成链表,timeEventHead 为时间事件链表头节点
void *apidata; // 对4种I/O多路复用模型(kqueue、epoll等)的进一步封装
aeBeforeSleepProc *beforesleep; // 阻塞等待文件事件的生之前会调用beforesleep函数
aeBeforeSleepProc *aftersleep; // 阻塞进程被唤醒之后调用aftersleep函数
} aeEventLoop;
事件驱动程序通常存在while/for循环,循环等待事件发生并处理,Redis也不例外,其事件循环如下:
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
函数aeProcessEvents
为事件处理主函数,其第2个参数是一个标志位,AE_ALL_EVENTS表示函数需要处理文件事件与时间事件,AE_CALL_AFTER_SLEEP表示阻塞等待文件事件之后需要执行aftersleep函数。
事件循环执行函数aeProcessEvents的主要逻辑:①查找最早会发生的时间事件,计算超时时间;②阻塞等待文件事件的产生;③处理文件事件;④处理时间事件。
补充说明:
Redis对于timer事件回调的处理设计了一个小机制:timer事件的回调函数可以返回一个需要下次执行的毫秒数。如果返回值是正常的正值,那么Redis就不会把这个timer事件从事件循环的队列中删除,这样它后面还有机会再次执行。例如,按照默认的设置,serverCron
返回值是100,因此它每隔100毫秒会执行一次(当然这个执行频率可以在redis.conf中通过hz
变量来调整)。
首先,向事件循环中注册I/O事件回调的时候,需要指定哪个回调函数注册到哪个事件上(事件用文件描述符来表示)。事件和回调函数的对应关系,由Redis上层封装的事件驱动程序库来维护。
// 客户端连接的事件处理器
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL);
// 命令请求的事件处理器
aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c);
// 命令回复的事件处理器
aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c);
类似地,向事件循环中注册timer事件回调的时候,需要指定多长时间之后执行哪个回调函数。这里需要记录哪个回调函数预期在哪个时刻被调用,这也是由Redis上层封装的事件驱动程序库来维护的。
Redis创建时间事件节点的函数为aeCreateTimeEvent,内部实现只是创建时间事件并添加到时间事件链表。
long long aeCreateTimeEvent(aeEventLoop *eventLoop, // 输入参数指向事件循环结构体
long long milliseconds, // 此时间事件触发时间,单位毫秒
aeTimeProc *proc, // 时间事件的处理函数
void *clientData, // 指向对应的结构体对象
aeEventFinalizerProc *finalizerProc); // 函数指针
底层的各种事件机制都会提供一个等待事件的操作,比如epoll提供的epoll_wait API。这个等待操作一般可以指定预期等待的事件列表(事件用文件描述符来表示),并同时可以指定一个超时时间(即最大等待多长时间)。在事件循环中需要等待事件发生的时候,就调用这个等待操作,传入之前注册过的所有I/O事件,并把最近的timer事件所对应的时刻转换成这里需要的超时时间。主要在aeProcessEvents函数进行处理。
aeProcessEvents函数在调用aeApiPoll之前会遍历Redis的时间事件链表,查找最早会发生的时间事件,以此作为aeApiPoll需要传入的超时时间:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
// 最早发生的时间事件
shortest = aeSearchNearestTimer(eventLoop);
long long ms =
shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
…………
// 阻塞等待文件事件发生
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
// 处理文件事件,即根据类型执行rfileProc或wfileProc
}
// 处理时间事件
processed += processTimeEvents(eventLoop);
}
aeProcessEvents
都会先计算最近的时间事件发生所需要等待的时间,然后调用 aeApiPoll
方法在这段时间中等待事件的发生,在这段时间中如果发生了文件事件,就会优先处理文件事件,否则就会一直等待,直到最近的时间事件需要触发
d. 从上一步的等待操作中唤醒,有两种情况:如果是I/O事件发生了,那么就根据触发的事件查到I/O回调函数,进行调用;如果是超时了,那么检查所有注册过的timer事件,对于预期调用时刻超过当前时间的回调函数都进行调用。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。