前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Redis源码阅读(三)初始化与事件循环

Redis源码阅读(三)初始化与事件循环

原创
作者头像
星沉
发布2022-01-28 16:48:25
8320
发布2022-01-28 16:48:25
举报
文章被收录于专栏:Redis存储

本篇从源码的角度介绍下Redis的代码初始化流程和事件循环的结构。

【server.c】main函数入口:

main主函数
main主函数

Redis实现了一个简单的事件驱动程序库,即 ae.c 的代码,它屏蔽了系统底层在事件处理上的差异,并实现了事件循环机制。

Redis将事件处理分为两大类:文件事件与时间事件。文件事件即客户端和服务器在交互过程中socket的可读可写事件,时间事件即需要周期性执行的一些定时任务(如定时清除超时客户端连接,定时删除过期键等)。Redis采用比较成熟的I/O多路复用模型(select/epoll等)处理文件事件,并对这些I/O多路复用模型进行简单封装。

系统底层的I/O多路复用机制:能够同时等待I/O和timer这两种事件的发生。在不同的系统上,存在多种不同的I/O多路复用机制。

事件处理框架非常简单,从初始化、服务到结束,分别对应的函数:aeCreateEventLoop、aeMain、aeDeleteEventLoop。 其中,aeMain是事件循环的主体函数,它又会调用 aeProcessEvents函数,三个主体函数会调用aeApiCreate、aeApiPool、aeApiFree三个接口函数进行处理。

事件机制处理流程:

一、阶段一:初始化阶段

(1) 配置加载和初始化

Redis 服务器基本数据结构和各种参数的初始化。

initServerConfig 函数初始化 redisServer ==> 保证Redis的内部数据结构及参数都有缺省值

代码语言:javascript
复制
struct redisServer {
    //...
    char *configfile;    // 配置文件绝对路径
    redisDb *db;         // 数据库数组
    dict *commands;      // 命令字典,Redis支持的所有命令都存储在这个字典中
    aeEventLoop *el;     // 事件循环结构
​
    int port;            // 服务器监听的端口号
    char *bindaddr[CONFIG_BINDADDR_MAX];      // 绑定的ip地址
    int ipfd[CONFIG_BINDADDR_MAX];            // 针对ip地址创建的文件描述符
    
    list *clients;       // 当前连接到Redis服务器的所有客户端
    int maxidletime;     // 最大空闲时间
    
  // 持久化/主从/集群等相关的参数
}

然后,从 redis.conf 中加载并解析配置文件 ==> 自定义配置,对某些参数进行覆盖

代码语言:javascript
复制
void loadServerConfig(char *filename, char *options)

(2) 创建事件循环

Redis服务器是典型的事件驱动程序,它将事件处理分为两大类:文件事件与时间事件,它们都封装在结构体aeEventLoop中:

代码语言:javascript
复制
typedef struct aeEventLoop {
    int stop;        // 标识事件循环是否结束
    
    aeFileEvent *events;           // 文件事件数组,存储已经注册的文件事件
    aeFiredEvent *fired;           // 储被触发的文件事件
    aeTimeEvent *timeEventHead;    // 多个时间事件形成链表,为时间事件链表头节点
    
    void *apidata;                 // 对4种I/O多路复用模型的进一步封装
    aeBeforeSleepProc *beforesleep;    // 阻塞等待文件事件发生之前会调用beforesleep函数
    aeBeforeSleepProc *aftersleep;     // 进程被唤醒之后会调用aftersleep函数
} aeEventLoop;

因此,我们需要创建 aeEventLoop,分配结构体所需内存,并初始化结构体各字段 ==> 依赖系统底层的I/O多路复用机制。

代码语言:javascript
复制
aeEventLoop *aeCreateEventLoop(int setsize) {
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
​
    if (aeApiCreate(eventLoop) == -1) goto err;
}

(3) 开始socket监听

三种监听:TCP连接、Unix domain socket连接、TLS连接

Unix domain socket:一种高效的进程间通信机制,省去了协议栈的开销,比使用TCP协议性能更好。 从 Redis 6 开始支持 SSL / TLS,这是一项可选功能,需要在编译时启用;但 TLS当前不支持I / O多线程。

监听socket主要是为了获取文件描述符,后面需要根据文件描述符去注册I/O事件回调。

代码语言:javascript
复制
int listenToPort(int port, int *fds, int *count) {
    for (j = 0; j < server.bindaddr_count || j == 0; j++) {
        //创建socket并启动监听,文件描述符存储在fds数组作为返回参数
        fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
            server.tcp_backlog);
        //设置socket非阻塞
        anetNonBlock(NULL,fds[*count]);
        (*count)++;
    }
}

注意:所有创建的socket都会设置为非阻塞模式,原因在于Redis使用了IO多路复用模式,其要求socket读写必须是非阻塞的,函数anetNonBlock通过系统调用fcntl设置socket非阻塞模式。

(4) 注册timer事件(时间事件) 回调

Redis服务器只维护了一个时间事件,该时间事件处理函数为serverCron,执行了所有需要周期性执行的一些定时任务,初次创建时1毫秒后就会被触发

代码语言:javascript
复制
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
    exit(1);
}

serverCron函数:周期性地执行过期key的回收,主从重连、Cluster节点间的重连、BGSAVE、AOF rewrite的触发执行等 ==> 通过事件循环调度一些异步执行的任务

代码语言:javascript
复制
int serverCron(struct aeEventLoop *eventLoop, long long id, void 
               *clientData) {
    run_with_period(100) {
        //100毫秒周期执行
    }
    run_with_period(5000) {
        //5000毫秒周期执行
    }
    //清除超时客户端连接
    clientsCron();
    //处理数据库
    databasesCron();
​
    server.cronloops++;
    return 1000/server.hz;
}

serverCron由事件来驱动,执行还是在Redis主线程上,相当于和主线程上执行的其他操作(主要是对命令请求的执行)按时间进行分片

疑问:服务器内部不是有很多定时任务吗,为什么只有一个时间事件呢?

原因:Redis创建时间事件节点的函数为aeCreateTimeEvent,会创建时间事件并添加到时间事件链表。

(5) 注册I/O事件(文件事件) 回调

文件事件:

Redis客户端通过TCP socket与服务端交互,文件事件指的就是socket的可读可写事件。

socket的读写事件被抽象为文件事件,因此,对于监听的socket还需要创建对应的文件事件

代码语言:javascript
复制
for (j = 0; j < server.ipfd_count; j++) {
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
        acceptTcpHandler,NULL) == AE_ERR){
        }
}

TCP 连接的I/O事件回调:acceptTcpHandler

TLS 连接的I/O事件回调:acceptTLSHandler

Unix domain socket连接的I/O事件回调:acceptUnixHandler

其他:通过pipe机制与module进行双向通信

(6) 初始化后台线程

通过bioInit函数,在后台执行的一些额外的线程,用于处理一些比较耗时且可以被延迟执行的任务,如可以延迟执行的文件关闭操作(unlink)、AOF的持久化写库操作(fsync)、大key的清除操作

(7) 启动事件循环

Redis的事件驱动也是通过while循环等待事件发生并处理:

代码语言:javascript
复制
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    // 开始事件循环
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        // 事件处理主函数
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

二、阶段二:事件循环阶段

(1) 为什么要循环?

Redis作为一个服务端程序,需要对客户端不停发送的请求做响应的处理,因此需要进入一个无线循环中。在每一次的循环中,如果有I/O事件发生,就会去处理这些事件。如果没有事件发生,则等待,把整个循环阻塞住一段时间,阻塞时间根据时间事件间隔所决定。

(2) 什么时候恢复执行呢?

等待的事件发生的时候,程序会被重新唤醒,循环继续。

等待和唤醒的操作需要依赖底层系统实现。

(3) 如何统一调度timer事件和I/O事件?

需要能够同时等待timer和I/O两种事件的发生。要做到这一点,我们依赖系统底层的I/O多路复用机制(I/O multiplexing)。这种机制一般是这样设计的:它允许我们针对多个文件描述符来等待对应的I/O事件发生,并同时可以指定一个最长的阻塞超时时间。如果在这段阻塞时间内,有I/O事件发生,那么程序会被唤醒继续执行;如果一直没有I/O事件发生,而是指定的时间先超时了,那么程序也会被唤醒。对于timer事件的等待,就是依靠这里的超时机制。

(4) Redis中的IO多路复用是怎样的?

IO多路复用:多个网络 I/O 复用一个或少量的线程来处理 Socket

socket读写操作有阻塞与非阻塞之分。采用阻塞模式时,一个进程只能处理一条网络连接的读写事件,为了同时处理多条网络连接,通常会采用多线程或者多进程,效率低下;非阻塞模式下,可以使用目前比较成熟的I/O多路复用模型,如select/epoll/kqueue/event ports,视不同操作系统而定。

Redis同时支持4种I/O多路复用模型,并将这些模型的API进一步统一封装,由文件ae_evport.c、ae_epoll.c、ae_kqueue.c和ae_select.c实现。

而Redis在编译阶段,会检查操作系统支持的I/O多路复用模型,并按照一定规则决定使用哪种模型。

【例如,在macOS上编译Redis,那么它底层会选用kqueue;而如果在Linux上编译则会选择epoll

以epoll为例,在Redis中对应的源文件为:ae_epoll.c

epoll是linux中IO多路复用的一种机制,通过一个进程监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),通过 callbak 回调通知机制,能够通知程序进行相应的读写操作。Redis并没有直接使用epoll提供的API,而是将其API进一步统一封装

主要有三个函数:

Redis封装函数

linux函数

备注

aeApiCreate

int epoll_create(int size)

创建事件

aeApiAddEvent

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

添加事件

aeApiDelEvent

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

删除事件

aeApiPoll

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

等待事件

a.int epoll_create(int size);

创建一个epoll的句柄,当创建好epoll句柄后,它就是会占用一个fd值;size用来告诉内核这个监听的数目一共有多大。

b.int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。

  • 第一个参数是epoll_create()的返回值;
  • 第二个参数表示动作,用三个宏来表示: ​ EPOLL_CTL_ADD:注册新的fd到epfd中; ​ EPOLL_CTL_MOD:修改已经注册的fd的监听事件; ​ EPOLL_CTL_DEL:从epfd中删除一个fd;
  • 第三个参数是需要监听的fd;
  • 第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:
代码语言:javascript
复制
struct epoll_event {
  __uint32_t events;  /* Epoll events */
  epoll_data_t data;  /* User data variable */
};

c.int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

等待事件的产生,类似于select()调用。返回需要处理的事件数目,如返回0表示已超时

epoll相比select/poll的优势

  • select/poll每次调用都要传递所要监控的所有fd给select/poll系统调用(这意味着每次调用都要将fd列表从用户态拷贝到内核态,当fd数目很多时,这会造成低效)。而每次调用epoll_wait时(作用相当于调用select/poll),不需要再传递fd列表给内核,因为已经在epoll_ctl中将需要监控的fd告诉了内核(epoll_ctl不需要每次都拷贝所有的fd,只需要进行增量式操作)。所以,在调用epoll_create之后,内核已经在内核态开始准备数据结构存放要监控的fd了。每次epoll_ctl只是对这个数据结构进行简单的维护。
  • select/poll一个致命弱点就是当你拥有一个很大的socket集合,不过由于网络延时,任一时间只有部分的socket是"活跃"的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。但是epoll不存在这个问题,它只会对"活跃"的socket进行操作---这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。
  • 当我们调用epoll_ctl往里塞入百万个fd时,epoll_wait仍然可以飞快的返回,并有效的将发生事件的fd给我们用户。这是由于我们在调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的fd外,还会再建立一个list链表,用于存储准备就绪的事件,当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。而且,通常情况下即使我们要监控百万计的fd,大多一次也只返回很少量的准备就绪fd而已,所以,epoll_wait仅需要从内核态copy少量的fd到用户态而已。那么,这个准备就绪list链表是怎么维护的呢?当我们执行epoll_ctl时,除了把fd放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个fd的中断到了,就把它放到准备就绪list链表里。所以,当一个fd(例如socket)上有数据到了,内核在把设备(例如网卡)上的数据copy到内核中后就来把fd(socket)插入到准备就绪list链表里了。

整个 I/O 多路复用模块在事件循环看来就是一个输入事件、输出 aeFiredEvent 数组的一个黑箱

在这个黑箱中,使用 aeCreateFileEventaeDeleteFileEvent 来添加删除需要监听的文件描述符以及事件。

在对应事件发生时,当前单元格会“变色”表示发生了可读(黄色)或可写(绿色)事件,调用 aeApiPoll 时会把对应的文件描述符和事件放入 aeFiredEvent 数组,并在 processEvents 方法中执行事件对应的回调。

(5) 如何进行事件循环?

无论是文件事件还是时间事件都封装在结构体aeEventLoop中:

代码语言:javascript
复制
typedef struct aeEventLoop {
    int stop;         // 标识事件循环是否结束
​
    aeFileEvent *events;    // 文件事件数组,存储已经注册的文件事件
    aeFiredEvent *fired;    // 存储被触发的文件事件
    aeTimeEvent *timeEventHead; // 多个时间事件形成链表,timeEventHead 为时间事件链表头节点
​
    void *apidata;    // 对4种I/O多路复用模型(kqueue、epoll等)的进一步封装
    aeBeforeSleepProc *beforesleep;   // 阻塞等待文件事件的生之前会调用beforesleep函数
    aeBeforeSleepProc *aftersleep;    // 阻塞进程被唤醒之后调用aftersleep函数
} aeEventLoop;

事件驱动程序通常存在while/for循环,循环等待事件发生并处理,Redis也不例外,其事件循环如下:

代码语言:javascript
复制
while (!eventLoop->stop) {
  if (eventLoop->beforesleep != NULL)
    eventLoop->beforesleep(eventLoop);
  aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}

函数aeProcessEvents为事件处理主函数,其第2个参数是一个标志位,AE_ALL_EVENTS表示函数需要处理文件事件与时间事件,AE_CALL_AFTER_SLEEP表示阻塞等待文件事件之后需要执行aftersleep函数。

事件循环执行函数aeProcessEvents的主要逻辑:①查找最早会发生的时间事件,计算超时时间;②阻塞等待文件事件的产生;③处理文件事件;④处理时间事件。

补充说明:

Redis对于timer事件回调的处理设计了一个小机制:timer事件的回调函数可以返回一个需要下次执行的毫秒数。如果返回值是正常的正值,那么Redis就不会把这个timer事件从事件循环的队列中删除,这样它后面还有机会再次执行。例如,按照默认的设置,serverCron返回值是100,因此它每隔100毫秒会执行一次(当然这个执行频率可以在redis.conf中通过hz变量来调整)。

(6) 底层是如何支持了Redis的事件循环?(事件循环的底层实现)

a. 注册回调函数

首先,向事件循环中注册I/O事件回调的时候,需要指定哪个回调函数注册到哪个事件上(事件用文件描述符来表示)。事件和回调函数的对应关系,由Redis上层封装的事件驱动程序库来维护。

代码语言:javascript
复制
// 客户端连接的事件处理器
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL);
// 命令请求的事件处理器
aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c);
// 命令回复的事件处理器
aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c);

b. 阻塞等待事件发生

类似地,向事件循环中注册timer事件回调的时候,需要指定多长时间之后执行哪个回调函数。这里需要记录哪个回调函数预期在哪个时刻被调用,这也是由Redis上层封装的事件驱动程序库来维护的。

Redis创建时间事件节点的函数为aeCreateTimeEvent,内部实现只是创建时间事件并添加到时间事件链表。

代码语言:javascript
复制
long long aeCreateTimeEvent(aeEventLoop *eventLoop,      // 输入参数指向事件循环结构体
                  long long milliseconds,                // 此时间事件触发时间,单位毫秒
                  aeTimeProc *proc,                      // 时间事件的处理函数
                  void *clientData,                      // 指向对应的结构体对象
                  aeEventFinalizerProc *finalizerProc);  // 函数指针

c. 执行回调

底层的各种事件机制都会提供一个等待事件的操作,比如epoll提供的epoll_wait API。这个等待操作一般可以指定预期等待的事件列表(事件用文件描述符来表示),并同时可以指定一个超时时间(即最大等待多长时间)。在事件循环中需要等待事件发生的时候,就调用这个等待操作,传入之前注册过的所有I/O事件,并把最近的timer事件所对应的时刻转换成这里需要的超时时间。主要在aeProcessEvents函数进行处理。

aeProcessEvents函数在调用aeApiPoll之前会遍历Redis的时间事件链表,查找最早会发生的时间事件,以此作为aeApiPoll需要传入的超时时间:

代码语言:javascript
复制
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
  // 最早发生的时间事件
  shortest = aeSearchNearestTimer(eventLoop);
  long long ms =
    shortest->when_sec - now_sec)*1000 +
    shortest->when_ms - now_ms;
  …………
  // 阻塞等待文件事件发生
  numevents = aeApiPoll(eventLoop, tvp);
  
  for (j = 0; j < numevents; j++) {
    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    // 处理文件事件,即根据类型执行rfileProc或wfileProc
  }
  // 处理时间事件
  processed += processTimeEvents(eventLoop);
}

aeProcessEvents 都会先计算最近的时间事件发生所需要等待的时间,然后调用 aeApiPoll 方法在这段时间中等待事件的发生,在这段时间中如果发生了文件事件,就会优先处理文件事件,否则就会一直等待,直到最近的时间事件需要触发

d. 从上一步的等待操作中唤醒,有两种情况:如果是I/O事件发生了,那么就根据触发的事件查到I/O回调函数,进行调用;如果是超时了,那么检查所有注册过的timer事件,对于预期调用时刻超过当前时间的回调函数都进行调用。

三、总结

  • Redis主要的处理流程包括接收请求、执行命令,以及周期性地执行后台任务(serverCron),这些都是由这个事件循环驱动的。
  • 当请求到来时,I/O事件被触发,事件循环被唤醒,根据请求执行命令并返回响应结果;
  • 同时,后台异步任务(如回收过期的key)被拆分成若干小段,由timer事件所触发,夹杂在I/O事件处理的间隙来周期性地运行。
  • 这种执行方式允许仅仅使用一个线程来处理大量的请求,并能提供快速的响应时间。当然,这种实现方式之所以能够高效运转,除了事件循环的结构之外,还得益于系统提供的异步的I/O多路复用机制(I/O multiplexing)。
  • 事件循环利用I/O多路复用机制,对 CPU 进行时分复用 (多个事件流将 CPU 切割成多个时间片,不同事件流的时间片交替进行),使得多个事件流就可以并发进行。
  • 而且,使用单线程事件机制可以避免代码的并发执行,在访问各种数据结构的时候都无需考虑线程安全问题,从而大大降低了实现的复杂度。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、阶段一:初始化阶段
    • (1) 配置加载和初始化
      • (2) 创建事件循环
        • (3) 开始socket监听
          • (4) 注册timer事件(时间事件) 回调
            • (5) 注册I/O事件(文件事件) 回调
              • (6) 初始化后台线程
                • (7) 启动事件循环
                • 二、阶段二:事件循环阶段
                  • (1) 为什么要循环?
                    • (2) 什么时候恢复执行呢?
                      • (3) 如何统一调度timer事件和I/O事件?
                        • (4) Redis中的IO多路复用是怎样的?
                          • (5) 如何进行事件循环?
                            • (6) 底层是如何支持了Redis的事件循环?(事件循环的底层实现)
                              • a. 注册回调函数
                              • b. 阻塞等待事件发生
                              • c. 执行回调
                          • 三、总结
                          相关产品与服务
                          云数据库 Redis®
                          腾讯云数据库 Redis®(TencentDB for Redis®)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档