说明:
本篇文章主要讲解 ,从redis原理的角度了解一个 set 命令从redis client发出到 redis server端接收到客户端请求的时候,到底经历了哪些过程?
同样会附带了解下面几个问题
为了了解redis请求流程,首先先了解下redis的网络模型。redis 支持 4中网络模式, select、poll、epoll、kqueue ,其中epoll 模型我个人认为是应用最广泛的模型,所以本篇文章以epoll 模型为 demo 进行讲解。
Select 和 poll 模型的缺点:
Epoll 模型为了解决 Select ,Poll的两次轮训和每次都需要传入文件描述符的问题,对整体的结构做了一个新的优化,具体架构如下:
Epoll 启动具体流程如下:
Epoll 收到消息后处理流程:
不同于 select/poll 的中断和异常处理,Epoll 采用的是内核通过调度机制,将等待事件的线程从挂起状态移动到可运行状态。
在 epoll 的等待过程中,内核会监视所有被注册的文件描述符,一旦有文件描述符上发生了注册的事件,内核会将这个事件通知到 epoll 实例。具体流程如下:
过程伪代码如下:
// 用户空间代码
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) {
// 在内核中等待事件发生
wait_for_events(epfd, events, maxevents, timeout);
// 返回事件信息
return num_events;
}
// 内核空间代码
void wait_for_events(int epfd, struct epoll_event *events, int maxevents, int timeout) {
// 如果没有事件发生,将当前线程挂起
add_thread_to_wait_queue(current_thread, epfd->wait_queue);
// 进入调度器,切换到其他线程执行
schedule();
// 返回时,说明事件发生,处理事件
process_events(epfd, events, maxevents);
}
// 文件描述符事件发生时的处理
void handle_events(struct epoll_event *events, int num_events) {
// 遍历等待队列,唤醒等待的线程
wake_up_threads(epfd->wait_queue);
}
在了解完 epoll 模型的时候,那我们需要思考,在redis中是如何利用Epoll模型通信的。我们看下redis 核心启动的源码:
int main(int argc, char **argv) {
//...
initServer();
//...
aeMain(server.el);
}
redis在启动时,有两个主要的方法,initServer 和 aeMain,其中 initServer 会有以下和epoll相关的核心流程:
aeMain 函数循环调用 aeApiPoll (相当于 epoll_wait)等待 FD 就绪。总体流程如下:
Redis 集群模式是常用的架构模式,其结构图如下:
在集群中 master 节点同步采用的 Gossip协议进行通信,保证集群内消息通信。
在 master 和 slave 同步采用定时发送数据完成。
经过上面的讨论,把Redis 相关的背景知识进行了梳理,下面开始看命令的流转。
当redis启动时候,Redis 已经注册了链接应答管理器(tcpAccepthandler),这个作用主要是把就绪的 fd 绑定到对应的处理器上面(readQueryFromClient),这样当FD有数据就是的时候,可以调用对应的处理器方法。
void initServer(void) {
//...
createSocketAcceptHandler(&server.ipfd, acceptTcpHandler);
//...
}
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
//...
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
anetCloexec(cfd);
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
//...
/* Create connection and client */
if ((c = createClient(conn)) == NULL) {
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (conn: %s)",
connGetLastError(conn),
connGetInfo(conn, conninfo, sizeof(conninfo)));
connClose(conn); /* May be already closed, just ignore errors */
return;
}
//...
}
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
/* passing NULL as conn it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
if (conn) {
//调用 readQueryFromClient
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
}
//...
}
当注册完成后,在aeMain方法中会调用 epoll_wait() 方法,具体代码流程如下:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
int aeProcessEvents(aeEventLoop *eventLoop, int flags){
//...
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
//...
}
// ae_epoll.c
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
当在redis 客户端输入 set xxx aaa 这个命令后,会经历下面几个过程:
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
void readQueryFromClient(connection *conn) {
//...
/* There is more data in the client input buffer, continue parsing it
* in case to check if there is a full command to execute. */
processInputBuffer(c);
}
void processInputBuffer(client *c) {
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) {
//
//...
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command. */
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
/* We are finally ready to execute the command. */
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid exiting this
* loop and trimming the client buffer later. So we return
* ASAP in that case. */
return;
}
}
}
//。。。
}
int processCommandAndResetClient(client *c) {
int deadclient = 0;
client *old_client = server.current_client;
server.current_client = c;
if (processCommand(c) == C_OK) {
commandProcessed(c);
}
//..
}
int processCommand(client *c) {
//...
/**
* lookupCommand 查询对应的命令
**/
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
//..
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand &&
c->cmd->proc != resetCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
}
}
struct redisCommand *lookupCommand(sds name) {
return dictFetchValue(server.commands, name);
}
void *dictFetchValue(dict *d, const void *key) {
dictEntry *he;
he = dictFind(d,key);
return he ? dictGetVal(he) : NULL;
}
dictEntry *dictFind(dict *d, const void *key)
{
dictEntry *he;
uint64_t h, idx, table;
if (dictSize(d) == 0) return NULL; /* dict is empty */
if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);
for (table = 0; table <= 1; table++) {
idx = h & d->ht[table].sizemask;
he = d->ht[table].table[idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key))
return he;
he = he->next;
}
if (!dictIsRehashing(d)) return NULL;
}
return NULL;
}
void populateCommandTable(void) {
int j;
int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
for (j = 0; j < numcommands; j++) {
struct redisCommand *c = redisCommandTable+j;
int retval1, retval2;
/* Translate the command string flags description into an actual
* set of flags. */
if (populateCommandTableParseFlags(c,c->sflags) == C_ERR)
serverPanic("Unsupported command flag");
c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */
retval1 = dictAdd(server.commands, sdsnew(c->name), c);
/* Populate an additional dictionary that will be unaffected
* by rename-command statements in redis.conf. */
retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
}
}
struct redisCommand redisCommandTable[] = {
...
/* Note that we can't flag set as fast, since it may perform an
* implicit DEL of a large key. */
{"set",setCommand,-3,
"write use-memory @string",
0,NULL,1,1,1,0,0,0},
{"setnx",setnxCommand,3,
"write use-memory fast @string",
0,NULL,1,1,1,0,0,0},
{"setex",setexCommand,4,
"write use-memory @string",
0,NULL,1,1,1,0,0,0},
...
};
void setCommand(client *c) {
robj *expire = NULL;
int unit = UNIT_SECONDS;
int flags = OBJ_NO_FLAGS;
if (parseExtendedStringArgumentsOrReply(c,&flags,&unit,&expire,COMMAND_SET) != C_OK) {
return;
}
c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
执行完命令后,实现函数会生成一个响应对象,并将其添加到客户端的输出缓冲区中。这个过程通常由 addReply 系列函数完成。 对于 SET 命令,实现函数可能会生成一个 “OK” 响应并添加到输出缓冲区中。
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
/* For integer encoded strings we just convert it into a string
* using our optimized function, and attach the resulting string
* to the output buffer. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
if (_addReplyToBuffer(c,buf,len) != C_OK)
_addReplyProtoToList(c,buf,len);
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
}
当事件循环检测到输出缓冲区中有数据可以发送时,它会调用 writeToClient 函数将响应发送给客户端。
通过以上步骤,Redis 能够根据客户端发送的命令找到相应的实现函数并执行它,然后将结果发送回客户端。这个过程涉及到多个源码文件和函数,但主要逻辑在 commands.c 文件中完成。
void beforeSleep(struct aeEventLoop *eventLoop) {
//...
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
//...
}
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
/* If I/O threads are disabled or we have few clients to serve, don't
* use I/O threads, but the boring synchronous code. */
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
/* Start threads if needed. */
if (!server.io_threads_active) startThreadedIO();
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
/* Remove clients from the list of pending writes since
* they are going to be closed ASAP. */
if (c->flags & CLIENT_CLOSE_ASAP) {
listDelNode(server.clients_pending_write, ln);
continue;
}
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
/* Run the list of clients again to install the write handler where
* needed. */
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
/* Update processed count on server */
server.stat_io_writes_processed += processed;
return processed;
}
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
/* If a client is protected, don't do anything,
* that may trigger write error or recreate handler. */
if (c->flags & CLIENT_PROTECTED) continue;
/* Don't write to clients that are going to be closed anyway. */
if (c->flags & CLIENT_CLOSE_ASAP) continue;
/* Try to write buffers to the client socket. */
if (writeToClient(c,0) == C_ERR) continue;
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
int ae_barrier = 0;
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
* actual fsync of AOF to disk. the write barrier ensures that. */
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_barrier = 1;
}
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
freeClientAsync(c);
}
}
}
return processed;
}