前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Redis 源码简洁剖析 09 - Reactor 模型

Redis 源码简洁剖析 09 - Reactor 模型

作者头像
Yano_nankai
发布2022-03-24 08:56:16
6040
发布2022-03-24 08:56:16
举报
文章被收录于专栏:二进制文集

Reactor 模型

网络服务器端,用了处理高并发网络 IO请求的一种编程模型

处理 3 类事件:

  • 连接事件:客户端→服务器的连接请求,对应服务端的连接事件
  • 写事件:客户端→服务器的读请求,服务端处理后要写回客户端,对应服务端的写事件
  • 读事件:服务端要从客户端读取请求内容,对应服务端的读事件

image

3 个关键角色:

  • acceptor:处理连接事件,接收连接、创建 handler
  • handler:处理读写事件
  • reactor:专门监听和分配事件,连接请求 → acceptor、读写请求 → handler

image

事件驱动框架

事件驱动框架就是 Reactor 的具体实现。包括:

  • 事件初始化:创建要监听的事件类型,及该类事件对应的 handler
  • 事件捕获、分发和处理主循环
    • 捕获发生的事件
    • 判断事件类型
    • 根据事件类型,调用对应 handler 处理事件

image

Redis 如何实现 Reactor 模型

实现代码:

  • 头文件:ae.h
  • 实现:ae.c

事件的数据结构:aeFileEvent

Redis 的事件驱动框架定义了 2 类事件:

  • IO 事件
  • 时间事件

下面介绍 IO 事件 aeFileEvent 的数据结构:

代码语言:javascript
复制
/* File event structure */
typedef struct aeFileEvent {
    // 事件类型的掩码,AE_(READABLE|WRITABLE|BARRIER)
    int mask;
    // AE_READABLE 事件的处理函数
    aeFileProc *rfileProc;
    // AE_WRITABLE 事件的处理函数
    aeFileProc *wfileProc;
    // 指向客户端私有数据
    void *clientData;
} aeFileEvent;

主循环:aeMain 函数

是在 Redis 初始化时调用的,详见 Redis 源码简洁剖析 07 - main 函数启动

代码语言:javascript
复制
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    // 循环调用
    while (!eventLoop->stop) {
        // 核心函数,处理事件的逻辑
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

代码非常简单,就是循环调用 aeProcessEvents 函数。aeMain 是在 main 函数中被调用的:

代码语言:javascript
复制
// 事件驱动框架,循环处理各种触发的事件
aeMain(server.el);
// 循环结束,删除 eventLoop
aeDeleteEventLoop(server.el);

事件捕获与分发:aeProcessEvents 函数

主体有 3 个 if 分支:

代码语言:javascript
复制
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
 
    /* 若没有事件处理,则立刻返回*/
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    /*如果有 IO 事件发生,或者紧急的时间事件发生,则开始处理*/
    if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
       …
    }
    /* 检查是否有时间事件,若有,则调用 processTimeEvents 函数处理 */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    /* 返回已经处理的文件或时间*/
    return processed; 
}

核心是第 2 个 if 语句:

代码语言:javascript
复制
// 有 IO 事件发生 || 紧急时间事件发生
if (eventLoop->maxfd != -1 ||
    ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        ……
        // 调用 aeApiPoll 捕获事件
        numevents = aeApiPoll(eventLoop, tvp);
        ……
}

aeApiPoll 函数如何捕获事件?依赖于操作系统底层提供的 IO 多路复用机制,实现事件捕获,检查是否有新的连接、读写事件的发生。为了适配不同的操作系统,Redis 对不同操作系统实现网络 IO 多路复用函数,进行统一封装,封装后的代码在 4 个文件中实现:

  • ae_epoll.c,对应 Linux 上的 IO 复用函数 epoll
  • ae_evport.c,对应 Solaris 上的 IO 复用函数 evport
  • ae_kqueue.c,对应 macOS 或 FreeBSD 上的 IO 复用函数 kqueue
  • ae_select.c,对应 Linux(或 Windows)的 IO 复用函数 select

ae_epoll.c 中 aeApiPoll 函数的实现,核心是调用了 epoll_wait 函数,并将 epoll 返回的事件信息保存起来。

代码语言:javascript
复制
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    // 调用 epoll_wait 获取监听到的事件
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
    if (retval > 0) {
        int j;

        // 获取监听到的事件数量
        numevents = retval;
        // 处理每个事件
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events + j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE | AE_READABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE | AE_READABLE;

            // 保存事件信息
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

在 Mac 上查看源码,aeApiPoll 方法会进入 ae_kqueue.c 中:

代码语言:javascript
复制
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    if (tvp != NULL) {
        struct timespec timeout;
        timeout.tv_sec = tvp->tv_sec;
        timeout.tv_nsec = tvp->tv_usec * 1000;
        retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
                        &timeout);
    } else {
        retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
                        NULL);
    }

    if (retval > 0) {
        int j;

        /* Normally we execute the read event first and then the write event.
         * When the barrier is set, we will do it reverse.
         * 
         * However, under kqueue, read and write events would be separate
         * events, which would make it impossible to control the order of
         * reads and writes. So we store the event's mask we've got and merge
         * the same fd events later. */
        for (j = 0; j < retval; j++) {
            struct kevent *e = state->events+j;
            int fd = e->ident;
            int mask = 0; 

            if (e->filter == EVFILT_READ) mask = AE_READABLE;
            else if (e->filter == EVFILT_WRITE) mask = AE_WRITABLE;
            addEventMask(state->eventsMask, fd, mask);
        }

        /* Re-traversal to merge read and write events, and set the fd's mask to
         * 0 so that events are not added again when the fd is encountered again. */
        numevents = 0;
        for (j = 0; j < retval; j++) {
            struct kevent *e = state->events+j;
            int fd = e->ident;
            int mask = getEventMask(state->eventsMask, fd);

            if (mask) {
                eventLoop->fired[numevents].fd = fd;
                eventLoop->fired[numevents].mask = mask;
                resetEventMask(state->eventsMask, fd);
                numevents++;
            }
        }
    }
    return numevents;
}

image

事件注册:aeCreateFileEvent 函数

main 函数中调用了 createSocketAcceptHandler

代码语言:javascript
复制
if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
    serverPanic("Unrecoverable error creating TCP socket accept handler.");
}

createSocketAcceptHandler 创建接收连接的 handler:

代码语言:javascript
复制
int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {
    int j;

    for (j = 0; j < sfd->count; j++) {
        if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL) == AE_ERR) {
            /* Rollback */
            for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
            return C_ERR;
        }
    }
    return C_OK;
}

其主要是调用了 aeCreateFileEvent,aeCreateFileEvent 就是实现事件和处理函数注册的核心函数。

代码语言:javascript
复制
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    // 错误处理
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    // 核心
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

Linux 提供了 epoll_ctl API,用于增加新的观察事件。而 Redis 在此基础上,封装了 aeApiAddEvent 函数,对 epoll_ctl 进行调用,注册希望监听的事件和相应的处理函数。

ae_epoll.c 中 aeApiAddEvent 实现如下:

代码语言:javascript
复制
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0};
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    // 增加新的观察事件
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

注册的函数 acceptTcpHandler 在 network.c 中:

代码语言:javascript
复制
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    // 每次处理 1000 个
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        anetCloexec(cfd);
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}

总结

Redis 处理连接、客户端请求是单线程的,但是这单个线程能够处理上千个客户端,就是因为 Redis 是基于 Reactor 模型的。通过事件驱动框架,Redis 可以使用一个循环不断捕获、分发、处理客户端产生的网络连接、数据读写事件。当然这里有一个前提,就是 Redis 几乎所有数据读取和处理都是在内存中操作的,服务端对单个客户端的读写请求处理时间极短。

参考链接

Redis 源码简洁剖析系列

最简洁的 Redis 源码剖析系列文章

Java 编程思想-最全思维导图-GitHub 下载链接,需要的小伙伴可以自取~

原创不易,希望大家转载时请先联系我,并标注原文链接。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022.02.10 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Reactor 模型
  • 事件驱动框架
  • Redis 如何实现 Reactor 模型
    • 事件的数据结构:aeFileEvent
      • 主循环:aeMain 函数
        • 事件捕获与分发:aeProcessEvents 函数
          • 事件注册:aeCreateFileEvent 函数
          • 总结
          • 参考链接
          • Redis 源码简洁剖析系列
          相关产品与服务
          云数据库 Redis®
          腾讯云数据库 Redis®(TencentDB for Redis®)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档