Redis单线程是指处理的事件循环的线程是单一的,命令执行主要是依靠单一线程执行的。Redis使用单线程是因为基于内存速度快,而且多路复用也能确保redis能同时处理多个请求,在Redis 6.0引入多线程是因为在某些操作要优化,比如删除操作。
在客户端与Redis服务器建立连接之后,所有的请求都会执行到readQueryFromClient()
方法中,readQueryFromClient()
方法会从socket
中读取数据放到输入缓冲区querybuf
中,接着会调用processInputBufferAndReplicate
中的 processInputBuffer()
方法按照RESP协议来解析参数。解析完参数之后会调用processCommand()
方法执行具体的命令。在processCommand()
中根据命令名称找到对应的命令并调用命令的call()
完成具体的操作,命令在执行完成之后都会调用addReply()
方法返回执行结果。
但是这里需要注意的是addReply()
方法只是把返回的数据写入到输出缓冲区client->buf
或者client->reply
中,并不执行实际的网络发送操作。
Redis在每次进入事件循环之前,都会先调用beforeSleep()
方法,实际的网络发送数据操作时在beforeSleep()
方法中完成的。在beforeSleep()
中,会调用handleClientsWithPendingWrites()
返回数据给客户端:handleClientsWithPendingWrites()
中会调用writeToClient()
方法把输出缓冲区client->buf
和client->reply
中的数据通过socket发送给客户端。
引入多线程说明Redis在有些方面,单线程已经不具有优势了。
因为读写网络的read/write系统调用在Redis执行期间占用了大部分CPU时间,如果把网络读写做成多线程的方式对性能会有很大提升。Redis 的多线程部分只是用来处理网络数据的读写和协议解析,执行命令仍然是单线程。
Redis 引入多线程操作也是出于性能上的考虑,对于一些大键值对的删除操作,通过多线程非阻塞地释放内存空间也能减少对 Redis 主线程阻塞的时间,提高执行的效率。
网上有同学对Redis多线程和单线程版本进行了性能测试,对比显示,Redis的多线程版本性能至少比单线程版本提高了一倍。
接下来,我们Redis 6.0的多线程做个流程介绍:
详细流程:
InitServerLast()
初始化 IO 线程(用户设置了线程数量,且允许多线程读),但是 IO 线程一开始处于阻塞状态。 readQueryFromClient()
,在 readQueryFromClient()
中主线程会把 client 对象添加到 server.clients_pending_read
列表中。afterSleep()
中,Redis 主线程会调用 handleClientsWithPendingReadsUsingThreads()
方法,在方法中主线程会把server.clients_pending_read
列表中的 client 对象按照 RoundRobin 算法依次分配到 io_threads_list
队列数组中,并空循环等待所有的IO线程完成读数据操作。io_threads_list
队列中获取client对象,依次调用readQueryFromClient()
方法读取数据并按照RESP协议解析参数。processCommandAndResetClient()
方法,该方法会调用processCommand()
执行具体的命令,并把执行结果写入到client对象的输出缓冲区中。beforeSleep()
中,Redis主线程会调用handleClientsWithPendingWritesUsingThreads()
方法,在该方法中,主线程会把所有需要返回数据的client 对象按照 RoundRobin 算法分配到 io_threads_list
队列数组中,并空循环等待所有的IO线程完成写数据的操作。io_threads_list
队列中获取client对象,依次调用 writeToClient()
方法把client对象输出缓冲区中的数据通过socket返回给客户端。 首先,在main()
方法中会调用InitServerLast()
方法,InitServerLast()
方法中会调用initThreadedIO()
方法,这个方法的主要作用是初始化IO线程。
/* 初始化IO线程 */
void initThreadedIO(void) {
// 设置标志位,0表示IO线程还没有被激活,1:已激活
io_threads_active = 0;
// 如果设置的IO线程数量为1,则不启动多余的线程,只使用主线程
if (server.io_threads_num == 1) return;
// 超过最大线程数128,报错
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
// 依次初始化各个IO线程
for (int i = 0; i < server.io_threads_num; i++) {
io_threads_list[i] = listCreate();
// 如果io_threads_num=0,表示用户不需要开启多余的IO线程,直接使用主线程进行IO
if (i == 0) continue;
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
// 当前线程(主线程)会先锁定所有的互斥锁
pthread_mutex_lock(&io_threads_mutex[i]);
// 生成新的IO线程,每个IO线程都是执行IOThreadMain()方法,方法参数是当前索引
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
这里有一点需要注意的是:主线程和IO线程通过共享变量数组io_threads_pending
来进行通信。
主线程修改io_threads_pending
,IO线程读取io_threads_pending
,那么就有可能存在线程安全问题。
那么Redis是怎么避免线程安全问题的呢?答案是通过_Atomic
限定符。
io_threads_pending
变量在声明的时候加上了_Atomic
限定符:
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
_Atomic
是C11标准中引入的原子操作。被_Atomic
修饰的变量被认为是原子变量,对原子变量的操作是不可分割的(Atomicity),且操作结果对其他线程可见,执行的顺序也不能被重排。
所以,io_threads_pending
是属于线程安全的变量。
initThreadedIO()
方法执行完成之后,io_threads_num
个的IO线程已经启动了,且执行的是IOThreadMain()
方法:
void *IOThreadMain(void *myid) {
// 首先获取当前线程在io_threads数组中的下标,在io_threads_pending和io_threads_list中的下标是一致的
long id = (unsigned long)myid;
while(1) {
// 先自旋一会,如果自旋期间当前线程被分配了任务的话就可以不用抢夺互斥锁
// 可以提高性能
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
// 如果自旋之后还没有任务分配,IO线程则会调用pthread_mutex_lock()方法来抢夺对应的互斥锁
// 但主线程在生成具体的IO线程前已经把所有的互斥锁给锁上,所以IO线程此时会因为抢锁失败处于阻塞状态
// 主线程可以借此停止该线程,因为任务的分配由主线程配置
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(io_threads_pending[id] != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
// 在pending计数降至0之前,主线程将永远不会触及io_threads_list
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 从io_threads_list列表中获取任务
// 如果是写任务,则进行写操作
// 如果是读任务,则进行读操作
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
// 清空列表
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id);
}
}
IOThreadMain()
在一个死循环中完成下面几件事:
io_threads_pending
数组保存的是每个线程被分配的任务client对象的个数(由主线程来进行分配)。
如果io_threads_pending[id]>0
,则表示有新的任务需要处理。
在判断io_threads_pending[id]
(id
是当前线程在数组中的索引)是否大于0的时候,IO线程会先自旋一会。如果在自旋期间主线程就给当前IO线程分配了任务的话,IO线程就不会去抢夺互斥锁(可以节省了抢夺互斥锁的开销)。
如果自旋之后还没有任务分配,IO线程则会调用pthread_mutex_lock()
方法来抢夺对应的互斥锁。
之前提到过在initThreadedIO()
方法中主线程在生成具体的IO线程之前会先调用pthread_mutex_lock()
把所有的互斥锁给锁上。所以IO线程此时会因为抢锁失败处于阻塞状态。
io_threads_list[id]
任务队列,对队列中的每一个client对象执行具体的读写操作。
变量io_threads_op
标识当前线程需要进行的操作:
如果是IO_THREADS_OP_READ
,表示读操作,则所有的IO线程都会调用readQueryFromClient()
方法读取客户端的请求;
如果是IO_THREADS_OP_WRITE
,表示写操作,则所有的IO线程都会调用writeToClient()
方法把各个client对象的输出缓冲区数据通过socket返回给客户端。
注意:所有的IO线程,只会同时进行读操作或者进行写操作。
每次有新客户端请求的时候,主线程会执行到readQueryFromClient()
,用以读取客户端发送的请求。
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;
// 判断是否需要把读数据请求放到IO线程中去执行
if (postponeClientRead(c)) return;
//... 省略代码
// 处理输入buffer的主流程
// client输入缓冲区中还有更多数据,需要继续对其进行解析,以检查是否有完整的命令要执行
processInputBuffer(c);
}
在readQueryFromClient()
中,会调用postponeClientRead()
方法来判断是否需要把读数据请求放到IO线程中去执行:
int postponeClientRead(client *c) {
if (io_threads_active &&
server.io_threads_do_reads &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
// 给client对象的标志位增加CLIENT_PENDING_READ,这很重要
c->flags |= CLIENT_PENDING_READ;
// 把client对象添加到server.clients_pending_read列表中
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
如果IO线程已激活,并且当前client的标志位不包含 CLIENT_MASTER、CLIENT_SLAVE、CLIENT_PENDING_READ,则先给当前client对象增加 CLIENT_PENDING_READ 标志位,然后把当前client对象添加到server.clients_pending_read
列表末尾并返回1。
postponeClientRead()
返回1之后,readQueryFromClient()
方法随即返回,结束执行。
Redis在每次事件之后都会调用afterSleep()
方法,在afterSleep()
方法中会调用handleClientsWithPendingReadsUsingThreads()
方法。
int handleClientsWithPendingReadsUsingThreads(void) {
// 判断是否使用多线程进行读
if (!io_threads_active || !server.io_threads_do_reads) return 0;
// 需要读取数据的client对象保存在server.clients_pending_read中
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
// 1. 按照RoundRobin算法分配读任务
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
// 2. 设置读操作标志位并统计各个IO线程任务数
io_threads_op = IO_THREADS_OP_READ;
for (int j = 0; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
// 3. 等待所有的线程处理完了所有的client的读数据操作
while(1) {
unsigned long pending = 0; // pending表示所有的线程加起来需要处理的client的数量
for (int j = 0; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
// 再次运行客户端列表以处理新缓冲区
listRewind(server.clients_pending_read,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~ CLIENT_PENDING_COMMAND;
// 4. 执行命令
processCommandAndResetClient(c);
}
// 5. 如果还有数据需要读取的话读取数据
processInputBufferAndReplicate(c);
}
listEmpty(server.clients_pending_read);
return processed;
}
handleClientsWithPendingReadsUsingThreads()
方法主要完成下面几个任务:
1. 主线程按照RoundRobin算法给IO线程分配任务。
2. 主线程设置读操作标志位并统计各个IO线程任务数。
3. 主线程空循环等待所有的IO线程处理完了所有的client的读数据操作。
此时io_threads_op = IO_THREADS_OP_READ
,IO线程会执行readQueryFromClient()
方法进行读数据操作。
【之前看到这里时,产生了一个疑问在前面的readQueryFromClient()
方法中会把client对象添加到server.clients_pending_read
列表中。现在IO线程再次调用readQueryFromClient()
方法,会不会又把当前client添加到server.clients_pending_read
列表中然后形成死循环呢?】答案是不会的。
重新来看一下postponeClientRead()
方法:
int postponeClientRead(client *c) {
if (io_threads_active &&
server.io_threads_do_reads &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
// 给client对象的标志位增加CLIENT_PENDING_READ,这很重要
c->flags |= CLIENT_PENDING_READ;
// 把client对象添加到server.clients_pending_read列表中
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
在主线程把client对象添加到server.clients_pending_read
列表之前,会先设置对应的client的CLIENT_PENDING_READ
标志位,所以在IO线程调用readQueryFromClient()
方法的时候不会重复进行添加,会继续往下执行。
在Redis5的版本中,主线程调用readQueryFromClient()
读取数据,readQueryFromClient()
又会调用processInputBuffer()
方法解析参数,解析完参数之后processInputBuffer()
会立即调用processCommand()
方法执行命令,并把执行结果写入到输出缓冲区中。也就是说,在Redis6之前的版本中只要调用了readQueryFromClient()
方法就会执行具体的命令。但是,在Redis6中不能这样做,如果这样做的话,那IO线程就不只是读数据了,还会执行命令,这样的话多个IO线程同时执行命令的话,可能会出现线程安全问题。在Redis6中,readQueryFromClient()
最终是调用processInputBuffer()
来解析请求参数:
void processInputBuffer(client *c) {
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) {
//...其他省略解析参数的代码
if (c->argc == 0) {
resetClient(c);
} else {
// 判断当前client是否处于多线程环境
// 如果是的话,只是给client新增CLIENT_PENDING_COMMAND标志位,不会继续执行命令
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
// 执行命令
if (processCommandAndResetClient(c) == C_ERR) {
return;
}
}
}
// 省略代码
}
从代码里面可以看到,processInputBuffer()
方法在调用processCommandAndResetClient()
执行命令之前,会先判断当前的clien是否包含CLIENT_PENDING_READ
标志位,如果是的话,则只是给当前的client添加CLIENT_PENDING_COMMAND
标志位然后直接返回,并不会继续执行命令。【因此,在IO线程调用readQueryFromClient()
方法读取数据之后,会继续调用processInputBuffer()
完成参数的解析,但是不会继续执行命令。所以,IO线程只做读数据的操作。】
4. 等所有IO线程读取数据之后由主线程执行具体的命令。
主线程遍历server.clients_pending_read
列表,对列表中的每一个client,会判断当前的client是否有CLIENT_PENDING_COMMAND
标志位,如果有的话,则会继续调用processCommandAndResetClient()
,而processCommandAndResetClient()
会调用processCommand()
执行具体的命令。
在上一步中分析过,IO线程在调用processInputBuffer()
时如果发现client对象包含CLIENT_PENDING_READ
标志位后会继续给当前client对象增加CLIENT_PENDING_COMMAND
标志位。所以在这一步中,主线程会对server.clients_pending_read
列中的所有的client调用processCommandAndResetClient()
方法执行具体的命令。
5. 如果还有数据没有读取完的话主线程则继续读取数据。
Redis在每次事件开始前都会先调用beforeSleep()
方法,在beforeSleep()
方法中会调用handleClientsWithPendingWritesUsingThreads()
方法:
int handleClientsWithPendingWritesUsingThreads(void) {
// 1. 判断是否还有client对象需要写数据给客户端
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0;
// 2. 判断是否的确需要使用多IO线程进行数据读写 (如果只有少量client,则不需要使用多线程)
if (stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
// 3. 如果IO线程没有激活的话则开启IO线程
if (!io_threads_active) startThreadedIO();
if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
// 4. 按照RoundRobin算法把需要返回数据的client对象分配给IO线程
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
// 5. 设置标志位为写操作,统计各个io线程需要处理的client的个数
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 0; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
// 6. 空循环,监听、等待所有的IO线程完成IO读写
while(1) {
unsigned long pending = 0;
for (int j = 0; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");
// 7. 如果还有数据没有写完的话则继续处理
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
// 8. 清空需要写数据的client对象列表
listEmpty(server.clients_pending_write);
return processed;
}
handleClientsWithPendingWritesUsingThreads()
主要完成下面几个操作:
1. 判断当前需要返回数据给客户端的client对象的个数。
Redis把需要返回数据的client对象保存在server.clients_pending_write
列表中;如果没有需要处理的client对象则直接返回。
2. 判断是否有必要使用多IO线程进行数据处理。
Redis会调用stopThreadedIOIfNeeded()
方法来判断是否的确需要使用多IO线程(依据:当前需要处理的Client对象的数量 > 两倍的IO线程数量)
int stopThreadedIOIfNeeded(void) {
int pending = listLength(server.clients_pending_write);
if (server.io_threads_num == 1) return 1;
// 只要当前需要处理的client对象的数量超过两倍的IO线程的数量的情况下才会使用多线程
if (pending < (server.io_threads_num*2)) {
if (io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
}
}
如果不需要使用多IO线程,则把对应的互斥锁给锁上了,以及设置激活标志位io_threads_active=0
,然后依然是由主线程调用handleClientsWithPendingWrites()
方法完成数据的返回操作。
3. 如果需要使用多IO线程且IO线程还没激活的情况下则调用startThreadedIO()
激活IO线程。
void startThreadedIO(void) {
if (tio_debug) { printf("S"); fflush(stdout); }
if (tio_debug) printf("--- STARTING THREADED IO ---\n");
serverAssert(io_threads_active == 0);
for (int j = 0; j < server.io_threads_num; j++)
// 把所有的互斥锁给释放掉
pthread_mutex_unlock(&io_threads_mutex[j]);
// 设置激活标志位为1
io_threads_active = 1;
}
在主线程释放锁之后,被阻塞的IO线程会抢到锁从而继续判断有没有被分配任务。
4. 主线程按照Round Robin算法把需要返回数据给客户端的client分配到io_threads_list
数组中。
5. 设置io_threads_op
为写操作,同时统计各个IO线程需要处理的client对象的个数,并写入对应的io_threads_pending
数组中。
6. 主线程空循环等待所有的IO线程执行完成。
【从这里可以看到,当IO线程在执行具体的读写操作的时候,主线程是属于空循环等待状态的。】
7. 如果还有数据没有写完的话则由主线程继续处理。
8. 主线程清空clients_pending_write
。
从这整个过程可以看下来,当主线程执行的时候,IO线程基本上处于阻塞或者自旋空循环的状态,而IO线程执行读写操作的时候,主线程处于自旋空循环状态。两个之间通过_Atomic
类型的变量来通信,所以从根本上保证了线程安全问题。
流程简述:
1、主线程负责接收建立连接请求,获取 socket 放入全局等待读处理队列
2、主线程处理完读事件之后,通过 RR(Round Robin) 将这些连接分配给这些 IO 线程
3、主线程阻塞等待 IO 线程读取 socket 完毕
4、主线程通过单线程的方式执行请求命令,请求数据读取并解析完成,但并不执行
5、主线程阻塞等待 IO 线程将数据回写 socket 完毕
6、解除绑定,清空等待队列
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。