首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Redis单线程不行了,快来割VM/ BIO/ IO多线程的韭菜!(附源码)

背景

Redis 在早期,曾因单线程“闻名”。在 Redis 的 FAQ 里有一个提问

《Redis is single threaded. How can I exploit multiple CPU/cores?》

https://redis.io/topics/faq,说明了 redis 使用单线程的原因:

CPU 通常并不是 Redis 的瓶颈,因为 Redis 通常要么受内存限制,要么受网络限制。比如说,一般在 Linux 系统上运行的流水线 Redis,每秒可以交付一百万个请求,如果你的应用程序主要使用 O(N)或 O(log(N))命令,几乎不会使用过多的 CPU 。

......

不过从 Redis 4.0 开始,Redis 就开始使用更多的线程了。目前使用多线程的场景(Redis 4.0),仅限于在后台删除对象,以及通过 Redis modules 实现的阻塞命令。在未来的版本中,计划是让 Redis 越来越线程化。

这不禁让我好奇,Redis 一开始是单线程的吗?又是怎么朝多线程演化的呢,又是为什么让 Redis 越来越线程化呢。在阅读了几篇文章后,我决定自己读一遍相关源代码,了解 Redis 的多线程演化历史。

Redis 多线程源码分析系列指南:

  • Redis VM 线程(Redis 1.3.x - Redis 2.4)
  • Redis BIO 线程(Redis 2.4+ 和 Redis 4.0+)
  • Redis 网络 IO 线程(Redis 6.0+)

Redis VM 线程(Redis 1.3.x - Redis 2.4)

实际上 Redis 很早就用到多线程,我们在 Redis 的 1.3.x (2010 年)的源代码中,能看到 Redis VM 相关的多线程代码,这部分代码主要是在 Redis 中实现线程化 VM 的能力。Redis VM 可以将 Redis 中很少访问的 value 存到磁盘中,也可以将占用内存大的 value 存到磁盘。

Redis VM 的底层是读写磁盘,所以在从磁盘读写 value 时,阻塞 VM 会产生阻塞主线程,影响所有的客户端,导致所有客户端耗时增加。所以 Redis VM 又提供了线程化 VM,可以将读写文件数据的操作,放在 IO 线程中执行,这样就只影响一个客户端(需要从文件中读出数据的客户端),从而避免像阻塞 VM 那样,提升所有客户端的耗时。

我们从《Virtual Memory technical specification》https://redis.io/topics/internals-vm 能看到线程化 VM 的优势。

列举线程化 VM 设计目标的重要性: 简单的实现,很少条件竞争,简单的锁,VM 系统多少与其余 Redis 代码解耦。 良好的性能,客户端访问内存中的 value 没有锁了。 能够在 I / O 线程中,对对象进行解码/编码。

但其实,Redis VM 是一个被弃用的短寿特性。在 Redis 1.3.x 出现 Redis VM 之后,Redis 2.4 是最后支持它的版本。Redis 1.3.x 在 2010 年发布,Redis 2.6 在 2012 年发布,Redis VM 的生命在 Redis 项目中,只持续了两年。我们现在从《Virtual Memory》https://redis.io/topics/virtual-memory能看到弃用 Redis VM 的原因:

……我们发现使用 VM 有许多缺点和问题。在未来,我们只想提供有史以来最好的内存数据库(但仍像往常一样在磁盘上持久化),而至少现在,不考虑对大于 RAM 的数据库的支持。我们未来的工作重点是提供脚本,群集和更好的持久性。

我个人以为,去掉 Redis VM 的根本原因,可能是定位问题。Redis 的准确定位了磁盘备份的内存数据库,去掉 VM 后的 Redis 更纯粹,更简单,更容易让用户理解和使用。

下面简单介绍下 Redis VM 的多线程代码。

Redis 主线程和 IO 线程使用任务队列和单个互斥锁进行通信。队列定义和互斥锁定义如下:

代码语言:javascript
复制
/* Global server state structure */struct redisServer {...    list *io_newjobs; /* List of VM I/O jobs yet to be processed */    list *io_processing; /* List of VM I/O jobs being processed */    list *io_processed; /* List of VM I/O jobs already processed */    list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */    pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */    pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */    pthread_attr_t io_threads_attr; /* attributes for threads creation */...}

复制代码

Redis 在需要处理 IO 任务时(比如使用的内存超过最大内存等情况),Redis 通过 queueIOJob 函数,将一个 IO 任务(iojob)入队到任务队列(io_newjobs),在 queueIOJob 中,会根据 VM 的最大线程数,判断是否需要创建新的 IO 线程。

代码语言:javascript
复制
void queueIOJob(iojob *j) {    redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",        (void*)j, j->type, (char*)j->key->ptr);    listAddNodeTail(server.io_newjobs,j);    if (server.io_active_threads < server.vm_max_threads)        spawnIOThread();}

复制代码

创建出的 IO 线程,主逻辑是 IOThreadEntryPoint。IO 线程会先从 io_newjobs 队列中取出一个 iojob,然后推入 io_processing 队列,然后根据 iojob 中的 type 来执行对应的任务:

  • 从磁盘读数据到内存
  • 计算需要的 page 数
  • 将内存 swap 到磁盘

执行完成后,将 iojob 推入 io_processed 队列。最后,IO 线程通过 UINX 管道,向主线程发送一个字节,告诉主线程,有一个新的任务处理完成,需要主线程处理结果。

代码语言:javascript
复制
typedef struct iojob {    int type;   /* Request type, REDIS_IOJOB_* */    redisDb *db;/* Redis database */    robj *key;  /* This I/O request is about swapping this key */    robj *id;   /* Unique identifier of this job:                   this is the object to swap for REDIS_IOREQ_*_SWAP, or the                   vmpointer objct for REDIS_IOREQ_LOAD. */    robj *val;  /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this                 * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */    off_t page; /* Swap page where to read/write the object */    off_t pages; /* Swap pages needed to save object. PREPARE_SWAP return val */    int canceled; /* True if this command was canceled by blocking side of VM */    pthread_t thread; /* ID of the thread processing this entry */} iojob;#define REDIS_IOJOB_LOAD 0          /* Load from disk to memory */#define REDIS_IOJOB_PREPARE_SWAP 1  /* Compute needed pages */#define REDIS_IOJOB_DO_SWAP 2       /* Swap from memory to disk */void *IOThreadEntryPoint(void *arg) {    iojob *j;    listNode *ln;    REDIS_NOTUSED(arg);    pthread_detach(pthread_self());    while(1) {        /* Get a new job to process */        lockThreadedIO();        if (listLength(server.io_newjobs) == 0) {            /* No new jobs in queue, exit. */            ...                        unlockThreadedIO();            return NULL;        }                ln = listFirst(server.io_newjobs);        j = ln->value;        listDelNode(server.io_newjobs,ln);        /* Add the job in the processing queue */                j->thread = pthread_self();        listAddNodeTail(server.io_processing,j);        ln = listLast(server.io_processing); /* We use ln later to remove it */        unlockThreadedIO();                ...        /* Process the Job */        if (j->type == REDIS_IOJOB_LOAD) {            vmpointer *vp = (vmpointer*)j->id;            j->val = vmReadObjectFromSwap(j->page,vp->vtype);        } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {            j->pages = rdbSavedObjectPages(j->val);        } else if (j->type == REDIS_IOJOB_DO_SWAP) {            if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)                j->canceled = 1;        }        /* Done: insert the job into the processed queue */        ...                lockThreadedIO();        listDelNode(server.io_processing,ln);        listAddNodeTail(server.io_processed,j);        unlockThreadedIO();        /* Signal the main thread there is new stuff to process */        redisAssert(write(server.io_ready_pipe_write,"x",1) == 1);    }    return NULL; /* never reached */}

复制代码

总结

因为 Redis VM 特性已经从 Redis 中删除,相关代码也比较古早,就不展开阐述了。

除了学习到多线程下,Redis 对数据读写的优化,我们在学习源码和 Redis 的官方博客时,能够明显感受到:

“去掉 Redis VM 的根本原因,可能是定位问题。Redis 的准确定位了磁盘备份的内存数据库,去掉 VM 后的 Redis 更纯粹,更简单,更容易让用户理解和使用。”

有时候,砍掉性能不好、意义不明的特性代码,就是最好的性能优化吧。

Redis BIO 线程(Redis 2.4+ 和 Redis 4.0+)

Redis BIO 线程(Redis 2.4+)

从系列上一篇我们知道,从一开始,除了“短寿”的 VM 特性和 VM 线程,Redis 主要还是单线程的。不过,我们在 Redis 的官方文章里能看到,从 Redis 2.4 (2011 年)开始,Redis 会使用线程在后台执行一些主要跟磁盘 I/O 有关的慢速的 I/O 操作。我们把代码分支切到 Redis 2.4 的分支上,能发现有两个 BIO 线程,协助 Redis 进行 AOF 文件同步刷盘和文件删除的工作。

  • 怎么找到多线程相关的代码?

根据 Redis 的配置 appendfsync,我们在代码里面找到配置对应的定义。

代码语言:javascript
复制
// config.c...    else if (!strcasecmp(c->argv[2]->ptr,"appendfsync")) {        if (!strcasecmp(o->ptr,"no")) {            server.appendfsync = APPENDFSYNC_NO;        } else if (!strcasecmp(o->ptr,"everysec")) {            server.appendfsync = APPENDFSYNC_EVERYSEC;        } else if (!strcasecmp(o->ptr,"always")) {            server.appendfsync = APPENDFSYNC_ALWAYS;        } else {            goto badfmt;        }    }...

复制代码

通过搜索 APPENDFSYNC_EVERYSEC ,我们找到了 backgroundRewriteDoneHandler: 

代码语言:javascript
复制
// aof.cvoid backgroundRewriteDoneHandler(int statloc) {......    else if (server.appendfsync == APPENDFSYNC_EVERYSEC)        aof_background_fsync(newfd);......}在 aof_background_fsync 函数中,发现了后台任务相关函数:// aof.cvoid aof_background_fsync(int fd) {    bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);}

复制代码

搜索关键词 REDIS_BIO_AOF_FSYNC,最后我们找到了 BIO 模块的头文件(bio.h),包含了 BIO 相关的接口和常量定义:

代码语言:javascript
复制
// bio.h/* Exported API */void bioInit(void);void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3);unsigned long long bioPendingJobsOfType(int type);void bioWaitPendingJobsLE(int type, unsigned long long num);time_t bioOlderJobOfType(int type);/* Background job opcodes */#define REDIS_BIO_CLOSE_FILE    0 /* Deferred close(2) syscall. */#define REDIS_BIO_AOF_FSYNC     1 /* Deferred AOF fsync. */#define REDIS_BIO_NUM_OPS       2

复制代码

最后,我们找到了 bioInit,发现 Redis 创建了 2 个 BIO 线程来执行 bioProcessBackgroundJobs 函数,而 bioInit 又是在 server.c 的 main 方法中,通过 initServer 函数来调用:

代码语言:javascript
复制
// bio.c/* Initialize the background system, spawning the thread. */void bioInit(void) {    pthread_attr_t attr;    pthread_t thread;    size_t stacksize;    int j;    /* Initialization of state vars and objects */    for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {        pthread_mutex_init(&bio_mutex[j],NULL);        pthread_cond_init(&bio_condvar[j],NULL);        bio_jobs[j] = listCreate();        bio_pending[j] = 0;    }    /* Set the stack size as by default it may be small in some system */    pthread_attr_init(&attr);    pthread_attr_getstacksize(&attr,&stacksize);    if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;    pthread_attr_setstacksize(&attr, stacksize);    /* Ready to spawn our threads. We use the single argument the thread     * function accepts in order to pass the job ID the thread is     * responsible of. */    for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {        void *arg = (void*)(unsigned long) j;        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {            redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");            exit(1);        }    }}

复制代码

  • BIO 多线程的意义

在 backgroundRewriteDoneHandler 函数中,我们会给 BIO 线程增加后台任务,然后让 BIO 线程在后台处理一些工作,为了搞清楚 Redis 使用 BIO 多线程的意义,我们可以先弄清楚这个函数是做什么的。

看注释的描述,这个函数是在后台 AOF 重写(BGREWRITEAOF)结束时调用,然后我们继续往下看代码,主要是一些写文件的操作,直到我们看到 aof.c 中有一段很详细的注释:

剩下要做的唯一事情就是将临时文件重命名为配置的文件,并切换用于执行 AOF 写入的文件描述符。我们不希望 close(2)或 rename(2)调用在删除旧文件时阻塞服务器。有两种可能的方案: AOF 已禁用,这是一次重写。临时文件将重命名为配置的文件。当该文件已经存在时,它将被取消链接(unlink),这可能会阻塞 server。 AOF 已启用,重写的 AOF 将立即开始接收写操作。将临时文件重命名为配置文件后,原始 AOF 文件描述符将关闭。由于这将是对该文件的最后一个引用,因此关闭该文件将导致底层文件被取消链接(unlink),这可能会阻塞 server。

为了减轻取消链接(unlink)操作的阻塞效果(由方案 1 中的 rename(2)或方案 2 中的 close(2)引起),我们使用后台线程来解决此问题。首先,通过打开目标文件,使方案 1 与方案 2 相同。rename(2)之后的取消链接(unlink)操作将在为其描述符调用 close(2)时执行。到那时,保证这条分支原子性的一切都已发生,因此,只要文件描述符再次被释放,我们就不在乎该关闭操作的影响或持续时间。

我们发现了 Redis 使用 BIO 线程(REDIS_BIO_CLOSE_FILE)的目的——后台线程删除文件,避免因为删除大文件耗时过长导致主线程阻塞:在 AOF 重写时,rename(2)或者 close(2)文件,可能会导致系统调用执行删除文件的操作,而删除文件的操作是在当前进程执行(内核态),所以如果文件较大,当前进程删除文件的耗时就会比较长。而如果在主线程删除比较大的文件,就会导致主线程被磁盘 IO 阻塞。

代码语言:javascript
复制
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释 
* //aof.c/* A background append only file rewriting (BGREWRITEAOF) terminated its work. * Handle this. */void backgroundRewriteDoneHandler(int statloc) {    int exitcode = WEXITSTATUS(statloc);    int bysignal = WIFSIGNALED(statloc);    if (!bysignal && exitcode == 0) {        int newfd, oldfd;        int nwritten;        char tmpfile[256];        long long now = ustime();                ...        /* Flush the differences accumulated by the parent to the         * rewritten AOF. */        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",            (int)server.bgrewritechildpid);        newfd = open(tmpfile,O_WRONLY|O_APPEND);        if (newfd == -1) {            redisLog(REDIS_WARNING,                "Unable to open the temporary AOF produced by the child: %s", strerror(errno));            goto cleanup;        }        nwritten = write(newfd,server.bgrewritebuf,sdslen(server.bgrewritebuf));        if (nwritten != (signed)sdslen(server.bgrewritebuf)) {            if (nwritten == -1) {                redisLog(REDIS_WARNING,                    "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));            } else {                redisLog(REDIS_WARNING,                    "Short write trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));            }            close(newfd);            goto cleanup;        }        redisLog(REDIS_NOTICE,            "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", nwritten);        /* The only remaining thing to do is to rename the temporary file to         * the configured file and switch the file descriptor used to do AOF         * writes. We don't want close(2) or rename(2) calls to block the         * server on old file deletion.         *         * There are two possible scenarios:         *         * 1) AOF is DISABLED and this was a one time rewrite. The temporary         * file will be renamed to the configured file. When this file already         * exists, it will be unlinked, which may block the server.         *         * 2) AOF is ENABLED and the rewritten AOF will immediately start         * receiving writes. After the temporary file is renamed to the         * configured file, the original AOF file descriptor will be closed.         * Since this will be the last reference to that file, closing it         * causes the underlying file to be unlinked, which may block the         * server.         *         * To mitigate the blocking effect of the unlink operation (either         * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we         * use a background thread to take care of this. First, we         * make scenario 1 identical to scenario 2 by opening the target file         * when it exists. The unlink operation after the rename(2) will then         * be executed upon calling close(2) for its descriptor. Everything to         * guarantee atomicity for this switch has already happened by then, so         * we don't care what the outcome or duration of that close operation         * is, as long as the file descriptor is released again. */        if (server.appendfd == -1) {            /* AOF disabled */             /* Don't care if this fails: oldfd will be -1 and we handle that.              * One notable case of -1 return is if the old file does              * not exist. */             oldfd = open(server.appendfilename,O_RDONLY|O_NONBLOCK);        } else {            /* AOF enabled */            oldfd = -1; /* We'll set this to the current AOF filedes later. */        }        /* Rename the temporary file. This will not unlink the target file if         * it exists, because we reference it with "oldfd". */        if (rename(tmpfile,server.appendfilename) == -1) {            redisLog(REDIS_WARNING,                "Error trying to rename the temporary AOF: %s", strerror(errno));            close(newfd);            if (oldfd != -1) close(oldfd);            goto cleanup;        }        if (server.appendfd == -1) {            /* AOF disabled, we don't need to set the AOF file descriptor             * to this new file, so we can close it. */            close(newfd);        } else {            /* AOF enabled, replace the old fd with the new one. */            oldfd = server.appendfd;            server.appendfd = newfd;            if (server.appendfsync == APPENDFSYNC_ALWAYS)                aof_fsync(newfd);            else if (server.appendfsync == APPENDFSYNC_EVERYSEC)                aof_background_fsync(newfd);            server.appendseldb = -1; /* Make sure SELECT is re-issued */            aofUpdateCurrentSize();            server.auto_aofrewrite_base_size = server.appendonly_current_size;            /* Clear regular AOF buffer since its contents was just written to             * the new AOF from the background rewrite buffer. */            sdsfree(server.aofbuf);            server.aofbuf = sdsempty();        }        redisLog(REDIS_NOTICE, "Background AOF rewrite successful");        /* Asynchronously close the overwritten AOF. */        if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);        redisLog(REDIS_VERBOSE,            "Background AOF rewrite signal handler took %lldus", ustime()-now);    } else if (!bysignal && exitcode != 0) {        redisLog(REDIS_WARNING,            "Background AOF rewrite terminated with error");    } else {        redisLog(REDIS_WARNING,            "Background AOF rewrite terminated by signal %d",            WTERMSIG(statloc));    }cleanup:    sdsfree(server.bgrewritebuf);    server.bgrewritebuf = sdsempty();    aofRemoveTempFile(server.bgrewritechildpid);    server.bgrewritechildpid = -1;}
*/

复制代码

我们回到 backgroundRewriteDoneHandler 函数中调用的 aof_background_fsync 函数,在这个函数里,我们发现了另一个 BIO 线程(REDIS_BIO_AOF_FSYNC)的任务创建代码:

代码语言:javascript
复制
void aof_background_fsync(int fd) {    bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);}

复制代码

阅读 bioCreateBackgroundJob 函数的代码,我们发现 Redis 在写对应 Job 类型的任务队列时加了互斥锁(mutex),写完队列后通过释放条件变量和互斥锁,用来激活等待条件变量的 BIO 线程,让 BIO 线程继续执行任务队列的任务,这样保证队列在多线程下的数据一致性(还增加了对应 BIO 类型的 IO 等待计数,暂时我们用不上),而 Redis BIO 线程就是从 BIO 的任务队列不断取任务的:

代码语言:javascript
复制
// bio.cvoid bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {    struct bio_job *job = zmalloc(sizeof(*job));    job->time = time(NULL);    job->arg1 = arg1;    job->arg2 = arg2;    job->arg3 = arg3;    pthread_mutex_lock(&bio_mutex[type]);    listAddNodeTail(bio_jobs[type],job);    bio_pending[type]++;    pthread_cond_signal(&bio_condvar[type]);    pthread_mutex_unlock(&bio_mutex[type]);}

复制代码

接着我们回到 BIO 线程的主函数 bioProcessBackgroundJobs,我们验证了 BIO 线程执行逻辑,BIO 线程通过等待互斥锁和条件变量来判断是否继续读取队列。如前面的注释所说,在执行 REDIS_BIO_CLOSE_FILE 类型的任务时,调用的是 close(fd) 函数。继续阅读代码,发现在执行 REDIS_BIO_AOF_FSYNC 类型的任务时,调用的是函数 aof_fsync:

代码语言:javascript
复制
// bio.cvoid *bioProcessBackgroundJobs(void *arg) {    struct bio_job *job;    unsigned long type = (unsigned long) arg;    pthread_detach(pthread_self());    pthread_mutex_lock(&bio_mutex[type]);    while(1) {        listNode *ln;        /* The loop always starts with the lock hold. */        if (listLength(bio_jobs[type]) == 0) {            pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);            continue;        }        /* Pop the job from the queue. */        ln = listFirst(bio_jobs[type]);        job = ln->value;        /* It is now possible to unlock the background system as we know have         * a stand alone job structure to process.*/        pthread_mutex_unlock(&bio_mutex[type]);        /* Process the job accordingly to its type. */        if (type == REDIS_BIO_CLOSE_FILE) {            close((long)job->arg1);        } else if (type == REDIS_BIO_AOF_FSYNC) {            aof_fsync((long)job->arg1);        } else {            redisPanic("Wrong job type in bioProcessBackgroundJobs().");        }        zfree(job);        /* Lock again before reiterating the loop, if there are no longer         * jobs to process we'll block again in pthread_cond_wait(). */        pthread_mutex_lock(&bio_mutex[type]);        listDelNode(bio_jobs[type],ln);        bio_pending[type]--;    }}

我们继续看 aof_fsync 的函数定义,发现 aof_fsync 其实就是 fdatasync 和 fsync :

/* Define aof_fsync to fdatasync() in Linux and fsync() for all the rest */#ifdef __linux__#define aof_fsync fdatasync#else#define aof_fsync fsync#endif

复制代码

熟悉 Redis 的朋友知道,这是 Redis 2.4 中 BIO 线程关于 Redis AOF 持久性的设计:

使用 AOF Redis 更加持久; 你有不同的 fsync 策略:完全不 fsync,每秒 fsync,每个查询 fsync。使用 fsync 的默认策略,每秒的写入性能当然很好(fsync 是使用后台线程执行的,并且当没有 fsync 执行时,主线程将尽力执行写入操作),但是你会损失一秒钟的写入数据。——《Redis Persistence》https://redis.io/topics/persistence AOF advantages

而为什么 fsync 需要使用 BIO 线程在后台执行,其实就很简单了。因为 Redis 需要保证数据的持久化,数据写入文件时,其实只是写到缓冲区,只有数据刷入磁盘,才能保证数据不会丢失,而 fsync 将缓冲区刷入磁盘是一个同步 IO 操作。所以,在主线程执行缓冲区刷盘的操作,虽然能更好的保证数据的持久化,但是却会阻塞主线程。

最后,为了减少阻塞,Redis 使用 BIO 线程处理 fsync。但其实这并不意味着 Redis 不再受 fsync 的影响,实际上如果 fsync 过于缓慢(数据 2S 以上未刷盘),Redis 主线程会不计代价的阻塞执行文件写入(Redis persistence demystified http://oldblog.antirez.com/m/p.php?i=251  #appendfsync everysec)。

Redis BIO 线程(Redis 4.0+)

从 Redis 4.0 (2017 年)开始,又增加了一个新的 BIO 线程,我们在 bio.h 中发现了新的定义——BIO_LAZY_FREE,这个线程主要用来协助 Redis 异步释放内存。在 antirez 的《Lazy Redis is better Redis》http://antirez.com/news/93中,我们能了解到为什么要将释放内存放在异步线程中:

(渐进式回收内存)这是一个很好的技巧,效果很好。但是,我们还是必须在一个线程中执行此操作,这仍然让我感到很难过。当有很多逻辑需要处理,并且 lazy free 也非常频繁时,ops(每秒的操作数)会减少到正常值的 65%左右。 释放不同线程中的对象会更简单:如果有一个线程正忙于仅执行释放操作,则释放应该总是比在数据集中添加新值快。 当然,主线程和 lazy free 线程之间在调用内存分配器上也存在一些竞争,但是 Redis 只会花一小部分时间在内存分配上,而将更多的时间花在 I/O,命令分派,缓存未命中等等。

对这个特性背景感兴趣的朋友还可以看看这个 issue: Lazy free of keys and databases #1748  github.com/redis/re...ues/1748

代码语言:javascript
复制
// bio.h/* Background job opcodes */#define BIO_CLOSE_FILE    0 /* Deferred close(2) syscall. */#define BIO_AOF_FSYNC     1 /* Deferred AOF fsync. */#define BIO_LAZY_FREE     2 /* Deferred objects freeing. */#define BIO_NUM_OPS       3

复制代码

我们回头看,发现在原来的基础上,增加了 BIO_LAZY_FREE 的部分。lazy free 的任务有三种:

  • 释放对象
  • 释放 Redis Database
  • 释放 跳表(skip list)
代码语言:javascript
复制
// bio.cvoid *bioProcessBackgroundJobs(void *arg) {    struct bio_job *job;    unsigned long type = (unsigned long) arg;    sigset_t sigset;    /* Check that the type is within the right interval. */    if (type >= BIO_NUM_OPS) {        serverLog(LL_WARNING,            "Warning: bio thread started with wrong type %lu",type);        return NULL;    }    /* Make the thread killable at any time, so that bioKillThreads()     * can work reliably. */    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);    pthread_mutex_lock(&bio_mutex[type]);    /* Block SIGALRM so we are sure that only the main thread will     * receive the watchdog signal. */    sigemptyset(&sigset);    sigaddset(&sigset, SIGALRM);    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))        serverLog(LL_WARNING,            "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));    while(1) {        listNode *ln;        /* The loop always starts with the lock hold. */        if (listLength(bio_jobs[type]) == 0) {            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);            continue;        }        /* Pop the job from the queue. */        ln = listFirst(bio_jobs[type]);        job = ln->value;        /* It is now possible to unlock the background system as we know have         * a stand alone job structure to process.*/        pthread_mutex_unlock(&bio_mutex[type]);        /* Process the job accordingly to its type. */        if (type == BIO_CLOSE_FILE) {            close((long)job->arg1);        } else if (type == BIO_AOF_FSYNC) {            aof_fsync((long)job->arg1);        } else if (type == BIO_LAZY_FREE) {            /* What we free changes depending on what arguments are set:             * arg1 -> free the object at pointer.             * arg2 & arg3 -> free two dictionaries (a Redis DB).             * only arg3 -> free the skiplist. */            if (job->arg1)                lazyfreeFreeObjectFromBioThread(job->arg1);            else if (job->arg2 && job->arg3)                lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);            else if (job->arg3)                lazyfreeFreeSlotsMapFromBioThread(job->arg3);        } else {            serverPanic("Wrong job type in bioProcessBackgroundJobs().");        }        zfree(job);        /* Unblock threads blocked on bioWaitStepOfType() if any. */        pthread_cond_broadcast(&bio_step_cond[type]);        /* Lock again before reiterating the loop, if there are no longer         * jobs to process we'll block again in pthread_cond_wait(). */        pthread_mutex_lock(&bio_mutex[type]);        listDelNode(bio_jobs[type],ln);        bio_pending[type]--;    }}

复制代码

其中释放对象的主要逻辑在 decrRefCount 中:

代码语言:javascript
复制
// lazyfree.c/* Release objects from the lazyfree thread. It's just decrRefCount() * updating the count of objects to release. */void lazyfreeFreeObjectFromBioThread(robj *o) {    decrRefCount(o);    atomicDecr(lazyfree_objects,1);}

复制代码

按照不同的数据类型,执行不同的内存释放逻辑:

代码语言:javascript
复制
// object.cvoid decrRefCount(robj *o) {    if (o->refcount == 1) {        switch(o->type) {        case OBJ_STRING: freeStringObject(o); break;        case OBJ_LIST: freeListObject(o); break;        case OBJ_SET: freeSetObject(o); break;        case OBJ_ZSET: freeZsetObject(o); break;        case OBJ_HASH: freeHashObject(o); break;        case OBJ_MODULE: freeModuleObject(o); break;        default: serverPanic("Unknown object type"); break;        }        zfree(o);    } else {        if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");        if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;    }}

复制代码

扩展

其他的相关内容就不一一说明了,这里有一个扩展内容,算是 Redis 开发背后的故事。

我参考学习了文章《Lazy Redis is better Redis》http://antirez.com/news/93,发现其实 antirez 在设计 lazy free 时还是比较纠结的。因为 lazy free 的特性涉及到了 Redis 本身的内部特性 —— 共享对象 (sharing objects),lazy free 特性的推进受到了共享对象的影响。这里只说说结论,最后为了实现 lazy free 的特性,antirez 去掉了共享对象的特性。直到现在 (Redis 6.0),共享对象仅在少部分地方出现,我们追踪代码的话,可以发现 robj 结构体的 refcount 目前大部分情况下等于 1。当然还有少部分情况,比如 server.c 中初始化创建整型数字的共享字符串,又或者手动增加计数来降低内存对象的回收速度等等。这就是为什么 Redis 明明去掉了共享对象的设计,但是我们还能看到 refcount 相关的代码,这大概就是历史遗留原因吧(手动狗头)。

代码语言:javascript
复制
// server.c#define OBJ_SHARED_REFCOUNT INT_MAXtypedef struct redisObject {    unsigned type:4;    unsigned encoding:4;    unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or                            * LFU data (least significant 8 bits frequency                            * and most significant 16 bits access time). */    int refcount;    void *ptr;} robj;// server.cvoid createSharedObjects(void) {......    for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {        shared.integers[j] =            makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));        shared.integers[j]->encoding = OBJ_ENCODING_INT;    }......}

复制代码

Redis 网络 IO 线程(Redis 6.0+)

从 2020 年正式发布的 Redis 6.0 开始开始,Redis 增加了与客户端 IO 读写线程,减轻主线程与客户端的网络 IO 负担。而实际上,这个设想在 2015 年开发 lazy free 特性的时候就已经出现了。《Lazy Redis is better Redis》http://antirez.com/news/93 #Not just lazy freeing :

既然聚合数据类型的值是完全不共享的,并且客户端输出缓冲区也不包含共享对象,有很多地方可以利用这一点。例如,最终有可能在 Redis 中实现线程化 I/O,以便由不同的线程为不同的客户端提供服务。这意味着我们仅在访问数据库时才具有全局锁定,但是客户端读取/写入系统调用,甚至解析客户端发送的指令数据,都可以在不同的线程中进行。这是一种类似 memcached 的设计,我期待去实现和测试。 而且,有可能实现对某一线程中的聚合数据类型执行某些慢速操作,只会导致“几个”键被“阻塞”,而所有其他客户端都可以继续工作。这可以通过与我们当前使用阻塞操作(请参阅 blocking.c)非常相似的方式来实现,此外还可以使用哈希表来存储当前正在使用哪些键以及它使用的客户端。因此,如果客户要求使用 SMEMBERS 之类的东西,就能够仅锁定键,处理创建输出缓冲区的请求,然后再次释放键。如果某个键被阻塞了,则尝试访问同一键的客户端都将被阻塞。 所有这些都需要进行更大幅度的内部修改,但是最重要的是,我们的禁忌要少一些。我们可以用更少的缓存丢失和更少内存占用的聚合数据类型,来弥补对象复制的时间,我们现在可以畅想无共享设计的线程化 Redis ,这是唯一可以轻松战胜我们单线程架构的设计。过去,如果为了实现并发访问,在数据结构和对象中增加一系列互斥锁,始终会被视为一个坏主意。但现在幸运的是,有方法可以两全其美。我们可现在以仍然像过去那样,从主线程继续执行所有快速的操作。而要在性能方面有所收获,需要增加一些复杂性作为代价。

上述是 antirez 在《Lazy Redis is better Redis》的 Not just lazy freeing 部分所分享的内容,理解这个,我们就能知道为何 Redis 要实现 IO 线程化了:

  • IO 单线程时,某些键的阻塞操作会阻塞整个线程,而使用多线程,可以实现只有访问相同键的客户端被阻塞。
  • 去掉了共享对象,让 IO 线程化更加简单,不再需要向数据结构和对象中增加一系列的互斥锁来实现多线程,从而保留了 Redis 单线程的“传统艺能”。(PS:去掉共享对象,会增加内存的复制,但是也可以带来内存上更紧凑的数据类型,也因为内存上更加连续带来更少的缓存丢失。)

接下来,我们从 redis server.c 中的 main()函数开始,看看 IO 线程是怎么运行的。

IO 线程的创建

通过 pthread_create 搜索到 initThreadedIO() 函数,然后整理下 IO 线程的创建过程:

无论是否哨兵模式,Redis 都会执行 InitServerLast:

代码语言:javascript
复制
int main(int argc, char **argv) {    struct timeval tv;    int j;    server.supervised = redisIsSupervised(server.supervised_mode);    int background = server.daemonize && !server.supervised;    if (background) daemonize();    ......some log......    readOOMScoreAdj();    initServer();    if (background || server.pidfile) createPidFile();    redisSetProcTitle(argv[0]);    redisAsciiArt();    checkTcpBacklogSettings();    if (!server.sentinel_mode) {        moduleLoadFromQueue();        ACLLoadUsersAtStartup();        InitServerLast();        loadDataFromDisk();        ......    } else {        InitServerLast();        sentinelIsRunning();        ......    }    ......    redisSetCpuAffinity(server.server_cpulist);    setOOMScoreAdj(-1);    aeMain(server.el);    aeDeleteEventLoop(server.el);    return 0;}

复制代码

initServer()中,Redis 会初始化相关的任务队列,而在 InitServerLast 中,才会初始化网络 IO 相关的线程资源,因为 Redis 的网络 IO 多线程是可以配置的。Redis 实现了网络 IO 多线程,但是网络 IO 的逻辑,既可以在 ThreadedIO 线程执行,也可以在主线程执行,给用户提供了选择:

代码语言:javascript
复制
void initServer(void) {    ......    /* Initialization after setting defaults from the config system. */    server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;    server.hz = server.config_hz;    server.pid = getpid();    server.in_fork_child = CHILD_TYPE_NONE;    server.main_thread_id = pthread_self();    server.current_client = NULL; // 当前正在执行命令的客户端    server.errors = raxNew();    server.fixed_time_expire = 0;    server.clients = listCreate(); // 活跃的客户端列表    server.clients_index = raxNew(); // 按照 client_id 索引的活跃的客户端字典    server.clients_to_close = listCreate(); // 需要异步关闭的客户端列表    server.slaves = listCreate();    server.monitors = listCreate();    server.clients_pending_write = listCreate(); // 等待写或者安装handler的客户端列表    server.clients_pending_read = listCreate(); // 等待读socket缓冲区的客户端列表    server.clients_timeout_table = raxNew();    server.replication_allowed = 1;    server.slaveseldb = -1; /* Force to emit the first SELECT command. */    server.unblocked_clients = listCreate(); // 下一个循环之前,要取消阻塞的客户端列表    server.ready_keys = listCreate();    server.clients_waiting_acks = listCreate();    server.get_ack_from_slaves = 0;    server.client_pause_type = 0;    server.paused_clients = listCreate();    server.events_processed_while_blocked = 0;    server.system_memory_size = zmalloc_get_memory_size();    server.blocked_last_cron = 0;    server.blocking_op_nesting = 0;    ......}在 InitServerLast()中 ,除了 initThreadedIO (Redis网络IO线程),我们还能看到bioInit(background I/O 初始化),两个模块使用了不同的资源:/* Some steps in server initialization need to be done last (after modules * are loaded). * Specifically, creation of threads due to a race bug in ld.so, in which * Thread Local Storage initialization collides with dlopen call. * see: https://sourceware.org/bugzilla/show_bug.cgi?id=19329 */void InitServerLast() {    bioInit();    initThreadedIO();    set_jemalloc_bg_thread(server.jemalloc_bg_thread);    server.initial_memory_usage = zmalloc_used_memory();}

复制代码

接下来我们来看看 Redis 源码的 networking.c 文件:io_threads 线程池,io_threads_mutex 互斥锁,io_threads_pending IO 线程客户端等待数,io_threads_list 每个 IO 线程的客户端列表。

代码语言:javascript
复制
/* ========================================================================== * Threaded I/O * ========================================================================== */#define IO_THREADS_MAX_NUM 128#define IO_THREADS_OP_READ 0#define IO_THREADS_OP_WRITE 1pthread_t io_threads[IO_THREADS_MAX_NUM];pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];redisAtomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];int io_threads_op;      /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. *//* This is the list of clients each thread will serve when threaded I/O is * used. We spawn io_threads_num-1 threads, since one is the main thread * itself. */list *io_threads_list[IO_THREADS_MAX_NUM];

复制代码

然后就是创建线程的 initThreadedIO 函数。初始化的时候 IO 线程处于未激活状态,等待后续激活,如果 Redis 配置的 io_threads_num 为 1,代表 IO 使用主线程单线程处理,如果线程数配置超过最大值 IO_THREADS_MAX_NUM (128) 则异常退出,最后,创建的线程都将被锁上直到被唤醒:

代码语言:javascript
复制
/* Initialize the data structures needed for threaded I/O. */void initThreadedIO(void) {    server.io_threads_active = 0; /* We start with threads not active. */    /* Don't spawn any thread if the user selected a single thread:     * we'll handle I/O directly from the main thread. */    if (server.io_threads_num == 1) return;    if (server.io_threads_num > IO_THREADS_MAX_NUM) {        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "                             "The maximum number is %d.", IO_THREADS_MAX_NUM);        exit(1);    }    /* Spawn and initialize the I/O threads. */    for (int i = 0; i < server.io_threads_num; i++) {        /* Things we do for all the threads including the main thread. */        io_threads_list[i] = listCreate();        if (i == 0) continue; /* Thread 0 is the main thread. */        /* Things we do only for the additional threads. */        pthread_t tid;        pthread_mutex_init(&io_threads_mutex[i],NULL);        io_threads_pending[i] = 0;        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");            exit(1);        }        io_threads[i] = tid;    }}

复制代码

IO 线程的工作流程

Redis 在启动时,初始化函数 initServer 将 beforeSleep 和 afterSleep 注册为事件循环休眠前和休眠后的 handler :

代码语言:javascript
复制
void initServer(void) {......    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);......    /* Register before and after sleep handlers (note this needs to be done     * before loading persistence since it is used by processEventsWhileBlocked. */    aeSetBeforeSleepProc(server.el,beforeSleep);    aeSetAfterSleepProc(server.el,afterSleep);......}

复制代码

事件循环执行 beforeSleep 时,会调用 handleClientsWithPendingReadsUsingThreads 和 handleClientsWithPendingWritesUsingThreads,分别是 IO 读写任务的分配逻辑。特殊情况下,在 AOF 和 RDB 数据恢复(从文件读取数据到内存)的时候,Redis 会通过 processEventsWhileBlocked 调用 beforeSleep,这个时候,只会执行 handleClientsWithPendingReadsUsingThreads ,这个时候 IO 写是同步的:

代码语言:javascript
复制
/* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep * for ready file descriptors. * * Note: This function is (currently) called from two functions: * 1. aeMain - The main server loop * 2. processEventsWhileBlocked - Process clients during RDB/AOF load * * If it was called from processEventsWhileBlocked we don't want * to perform all actions (For example, we don't want to expire * keys), but we do need to perform some actions. * * The most important is freeClientsInAsyncFreeQueue but we also * call some other low-risk functions. */void beforeSleep(struct aeEventLoop *eventLoop) {......    /* Just call a subset of vital functions in case we are re-entering     * the event loop from processEventsWhileBlocked(). Note that in this     * case we keep track of the number of events we are processing, since     * processEventsWhileBlocked() wants to stop ASAP if there are no longer     * events to handle. */    if (ProcessingEventsWhileBlocked) {        uint64_t processed = 0;        processed += handleClientsWithPendingReadsUsingThreads();        processed += tlsProcessPendingData();        processed += handleClientsWithPendingWrites();        processed += freeClientsInAsyncFreeQueue();        server.events_processed_while_blocked += processed;        return;    }......    /* We should handle pending reads clients ASAP after event loop. */    handleClientsWithPendingReadsUsingThreads();......    /* Handle writes with pending output buffers. */    handleClientsWithPendingWritesUsingThreads();    /* Close clients that need to be closed asynchronous */    freeClientsInAsyncFreeQueue();......    /* Before we are going to sleep, let the threads access the dataset by     * releasing the GIL. Redis main thread will not touch anything at this     * time. */    if (moduleCount()) moduleReleaseGIL();    /* Do NOT add anything below moduleReleaseGIL !!! */}

复制代码

在 handleClientsWithPendingReadsUsingThreads 函数中,Redis 会执行 IO 读的任务分配逻辑,当 Redis 配置了 IO 线程的读取和解析(io_threads_do_reads),可读的 handler 会将普通的客户端放到客户端队列中处理,而不是同步处理。这个函数将队列分配给 IO 线程处理,累积读取 buffer 中的数据:

  • IO 线程在初始化时未激活,Redis 配置了用 IO 线程读取和解析数据(io_threads_do_reads),才会继续执行;
  • 读取待处理的客户端列表 clients_pending_read,将任务按照取模平均分配到不同线程的任务队列 io_threads_list[target_id];
  • 通过 setIOPendingCount 给对应的 IO 线程设置条件变量,激活 IO 线程;
  • 依然在主线程处理一些客户端请求;
  • 如果客户端等待写入,并且响应的 buffer 还有待写数据,或有待发送给客户端的响应对象,则给客户端的连接安装写 handler;
代码语言:javascript
复制
/* When threaded I/O is also enabled for the reading + parsing side, the * readable handler will just put normal clients into a queue of clients to * process (instead of serving them synchronously). This function runs * the queue using the I/O threads, and process them in order to accumulate * the reads in the buffers, and also parse the first command available * rendering it in the client structures. */int handleClientsWithPendingReadsUsingThreads(void) {    // IO线程在初始化时未激活,Redis配置了用IO线程读取和解析数据(io_threads_do_reads),才会继续执行    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;    int processed = listLength(server.clients_pending_read);    if (processed == 0) return 0;    /* Distribute the clients across N different lists. */    // 读取待处理的客户端列表 clients_pending_read,    // 将任务按照取模平均分配到不同线程的任务队列io_threads_list[target_id]    listIter li;    listNode *ln;    listRewind(server.clients_pending_read,&li);    int item_id = 0;    while((ln = listNext(&li))) {        client *c = listNodeValue(ln);        int target_id = item_id % server.io_threads_num;        listAddNodeTail(io_threads_list[target_id],c);        item_id++;    }    /* Give the start condition to the waiting threads, by setting the     * start condition atomic var. */    // 通过setIOPendingCount给对应的IO线程设置条件变量,激活IO线程    io_threads_op = IO_THREADS_OP_READ;    for (int j = 1; j < server.io_threads_num; j++) {        int count = listLength(io_threads_list[j]);        setIOPendingCount(j, count);    }    /* Also use the main thread to process a slice of clients. */    // 依然在主线程处理一些客户端请求    listRewind(io_threads_list[0],&li);    while((ln = listNext(&li))) {        client *c = listNodeValue(ln);        readQueryFromClient(c->conn);    }    listEmpty(io_threads_list[0]);    /* Wait for all the other threads to end their work. */    while(1) {        unsigned long pending = 0;        for (int j = 1; j < server.io_threads_num; j++)            pending += getIOPendingCount(j);        if (pending == 0) break;    }    /* Run the list of clients again to process the new buffers. */    while(listLength(server.clients_pending_read)) {        ln = listFirst(server.clients_pending_read);        client *c = listNodeValue(ln);        c->flags &= ~CLIENT_PENDING_READ;        listDelNode(server.clients_pending_read,ln);        if (processPendingCommandsAndResetClient(c) == C_ERR) {            /* If the client is no longer valid, we avoid             * processing the client later. So we just go             * to the next. */            continue;        }        processInputBuffer(c);        /* We may have pending replies if a thread readQueryFromClient() produced         * replies and did not install a write handler (it can't).         */        // 如果客户端等待写入,        // 并且响应的buffer还有待写数据,或有待发送给客户端的响应对象,        // 则给客户端的连接安装写handler        if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))            clientInstallWriteHandler(c);    }    /* Update processed count on server */    server.stat_io_reads_processed += processed;    return processed;}

复制代码

在 handleClientsWithPendingWritesUsingThreads 中,Redis 会执行 IO 线程的启动,IO 线程写任务的分配等逻辑:

  • 如果没有开启多线程,或者等待的客户端数量小于线程数的两倍,则执行同步代码;
  • 如果 IO 线程没有激活,则激活(在 initThreadedIO 函数创建线程时处于未激活状态);
  • 如果遇到需要关闭的客户端(CLIENT_CLOSE_ASAP),则将其从待处理的客户端列表里删除;
  • 读取待处理的客户端列表 clients_pending_write ,将任务按照取模平均分配到不同线程的任务队列 io_threads_list[target_id];
  • 通过 setIOPendingCount 给对应的 IO 线程设置条件变量,激活 IO 线程;
  • 依然在主线程处理一些客户端请求;
  • 如果响应的 buffer 还有待写数据,或者还有待发送给客户端的响应对象,则给客户端的连接安装写 handler;
  • 最后调用 freeClientAsync 将待释放的客户端放入 clients_to_close 队列,等待 beforeSleep 执行 freeClientsInAsyncFreeQueue 时实现异步释放客户端;
代码语言:javascript
复制
int handleClientsWithPendingWritesUsingThreads(void) {    int processed = listLength(server.clients_pending_write);    if (processed == 0) return 0; /* Return ASAP if there are no clients. */    /* If I/O threads are disabled or we have few clients to serve, don't     * use I/O threads, but the boring synchronous code. */    // 如果没有开启多线程,或者等待的客户端数量小于线程数的两倍,则执行同步代码    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {        return handleClientsWithPendingWrites();    }    /* Start threads if needed. */    // 如果 IO 线程没有激活,则激活(在initThreadedIO函数创建线程时处于未激活状态)    if (!server.io_threads_active) startThreadedIO();    /* Distribute the clients across N different lists. */    listIter li;    listNode *ln;    listRewind(server.clients_pending_write,&li);    int item_id = 0;    while((ln = listNext(&li))) {        client *c = listNodeValue(ln);        c->flags &= ~CLIENT_PENDING_WRITE;        /* Remove clients from the list of pending writes since         * they are going to be closed ASAP. */        // 如果遇到需要关闭的客户端(CLIENT_CLOSE_ASAP),则将其从待处理的客户端列表里删除        if (c->flags & CLIENT_CLOSE_ASAP) {            listDelNode(server.clients_pending_write, ln);            continue;        }        int target_id = item_id % server.io_threads_num;        listAddNodeTail(io_threads_list[target_id],c);        item_id++;    }    /* Give the start condition to the waiting threads, by setting the     * start condition atomic var. */    // 通过setIOPendingCount给对应的IO线程设置条件变量,激活IO线程    io_threads_op = IO_THREADS_OP_WRITE;    for (int j = 1; j < server.io_threads_num; j++) {        int count = listLength(io_threads_list[j]);        setIOPendingCount(j, count);    }        /* Also use the main thread to process a slice of clients. */    // 依然在主线程处理一些客户端请求    listRewind(io_threads_list[0],&li);    while((ln = listNext(&li))) {        client *c = listNodeValue(ln);        writeToClient(c,0);    }    listEmpty(io_threads_list[0]);    /* Wait for all the other threads to end their work. */    while(1) {        unsigned long pending = 0;        for (int j = 1; j < server.io_threads_num; j++)            pending += getIOPendingCount(j);        if (pending == 0) break;    }    /* Run the list of clients again to install the write handler where     * needed. */    listRewind(server.clients_pending_write,&li);    while((ln = listNext(&li))) {        client *c = listNodeValue(ln);        /* Install the write handler if there are pending writes in some         * of the clients. */        // 如果响应的buffer还有待写数据,或者还有待发送给客户端的响应对象,        // 则给客户端的连接安装写handler        if (clientHasPendingReplies(c) &&                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)        {            // 将待释放的客户端放入clients_to_close队列,            // 等待beforeSleep执行freeClientsInAsyncFreeQueue时实现异步释放客户端            freeClientAsync(c);        }    }    listEmpty(server.clients_pending_write);    /* Update processed count on server */    server.stat_io_writes_processed += processed;    return processed;}

复制代码

IO 线程的主逻辑

在 IOThreadMain 函数中,是 Redis IO 线程的主逻辑。

我们发现 IO 线程在创建后,会通过 redisSetCpuAffinity 函数和 server_cpulist 参数,来设置线程的 CPU 的亲和性,合理配置线程的 CPU 亲和性,能够一定程度上提升性能。

之后,IO 线程会根据条件变量 io_threads_pending[id] 判断是否有等待的 IO 需要处理,然后从 io_threads_list[myid] 中获取分给自己的 client,再根据 io_thread_op 来判断,这个时候需要执行读写 IO 中的哪一个, readQueryFromClient 还是 writeToClient :

代码语言:javascript
复制
void *IOThreadMain(void *myid) {    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is     * used by the thread to just manipulate a single sub-array of clients. */    long id = (unsigned long)myid;    char thdname[16];    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);    redis_set_thread_title(thdname);    redisSetCpuAffinity(server.server_cpulist);    makeThreadKillable();    while(1) {        /* Wait for start */        for (int j = 0; j < 1000000; j++) {            if (io_threads_pending[id] != 0) break;        }        /* Give the main thread a chance to stop this thread. */        if (io_threads_pending[id] == 0) {            pthread_mutex_lock(&io_threads_mutex[id]);            pthread_mutex_unlock(&io_threads_mutex[id]);            continue;        }        serverAssert(io_threads_pending[id] != 0);        if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));        /* Process: note that the main thread will never touch our list         * before we drop the pending count to 0. */        listIter li;        listNode *ln;        listRewind(io_threads_list[id],&li);        while((ln = listNext(&li))) {            client *c = listNodeValue(ln);            if (io_threads_op == IO_THREADS_OP_WRITE) {                writeToClient(c,0);            } else if (io_threads_op == IO_THREADS_OP_READ) {                readQueryFromClient(c->conn);            } else {                serverPanic("io_threads_op value is unknown");            }        }        listEmpty(io_threads_list[id]);        io_threads_pending[id] = 0;        if (tio_debug) printf("[%ld] Done\n", id);    }}

复制代码

总结

从 Redis VM 开始,到 Redis BIO,再到最后的 IO 多线程,我们能看到 Redis 正在逐渐的向线程化的方向发展。特别是在实现 Lazy Free 之后(Redis BIO),antirez 似乎尝到了多线程的好处,在保证 db 操作单线程的情况下,让 Redis 发挥 CPU 一部分多核多线程的实力。我们不难发现,Redis 的多线程不过是顺势而为罢了,如果单线程没有瓶颈,就不会产生使用多线程的 Redis。再结合现状来看,毕竟时代变了,从多年前的单核服务器,到后来的双核,四核服务器,再到现在动辄八核,十六核的服务器:单线程模型固然简单,代码清晰,但是在摩尔定律失效,多核多线程的时代洪流下,有谁能够拒绝多线程的好处呢?

作者介绍:

Insutanto,一个普通的编程手艺人。

本文转载自:dbaplus 社群(ID:dbaplus)

原文链接:Redis单线程不行了,快来割VM/ BIO/ IO多线程的韭菜!(附源码)

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/bYc7yxVW074ySgiwa1c5
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券