Redis网络连接层
Redis取自Remote Dictionary Server,顾名思义,Redis是运行在网络环境之上的。Redis目前支持3种网络连接类型:
Redis通过这3种网络连接类型的支持,满足了绝大多数的用户需求,成为了目前最流行的KV存储数据库。
过去
截至2022-Q3,Redis最新的版本是7.0.5。在当前版本中,使用了“传统”的网络连接管理方式。在redis/src/server.c中监听端口:
/* Open the TCP listening socket for the user commands. */
if (server.port != 0 &&
listenToPort(server.port,&server.ipfd) == C_ERR) {
/* Note: the following log text is matched by the test suite. */
serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port);
exit(1);
}
if (server.tls_port != 0 &&
listenToPort(server.tls_port,&server.tlsfd) == C_ERR) {
/* Note: the following log text is matched by the test suite. */
serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.", server.tls_port);
exit(1);
}
/* Open the listening Unix domain socket. */
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,
(mode_t)server.unixsocketperm, server.tcp_backlog);
if (server.sofd == ANET_ERR) {
serverLog(LL_WARNING, "Failed opening Unix socket: %s", server.neterr);
exit(1);
}
anetNonBlock(NULL,server.sofd);
anetCloexec(server.sofd);
}
以及设置监听文件描述符的处理函数,开始提供网络服务:
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
serverPanic("Unrecoverable error creating TCP socket accept handler.");
}
if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) {
serverPanic("Unrecoverable error creating TLS socket accept handler.");
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
从代码中,我们可以清晰地看到这几种连接类型的初始化过程和配置参数等。但是它的代价是:
现状
连接层框架
截至2022-Q3,在Redis最新的开发分支上,支持了连接层框架(connection layer framework),它长成这样:
uplayer
|
connection layer
/ | \
TCP Unix TLS
connection layer负责抽象连接类型,它要求每种连接类型具有如下的方法:
typedef struct ConnectionType {
/* connection type */
const char *(*get_type)(struct connection *conn);
/* connection type initialize & finalize & configure */
void (*init)(void); /* auto-call during register */
void (*cleanup)(void);
int (*configure)(void *priv, int reconfigure);
/* ae & accept & listen & error & address handler */
void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
aeFileProc *accept_handler;
int (*addr)(connection *conn, char *ip, size_t ip_len, int *port, int remote);
int (*listen)(connListener *listener);
/* create/close connection */
connection* (*conn_create)(void);
connection* (*conn_create_accepted)(int fd, void *priv);
void (*close)(struct connection *conn);
/* connect & accept */
int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
/* IO */
int (*write)(struct connection *conn, const void *data, size_t data_len);
int (*writev)(struct connection *conn, const struct iovec *iov, int iovcnt);
int (*read)(struct connection *conn, void *buf, size_t buf_len);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
const char *(*get_last_error)(struct connection *conn);
ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
/* pending data */
int (*has_pending_data)(void);
int (*process_pending_data)(void);
/* TLS specified methods */
sds (*get_peer_cert)(struct connection *conn);
} ConnectionType;
上层(uplayer)通过connection layer访问Redis的各个连接类型,则可以忽略连接类型的具体实现,仅仅需要调用各个方法即可。
同时,connection layer还负责管理各个连接类型,例如一个新连接类型在使用之前,需要向Redis进行注册,参考redis/src/connection.c:
int connTypeRegister(ConnectionType *ct) {
const char *typename = ct->get_type(NULL);
ConnectionType *tmpct;
int type;
/* find an empty slot to store the new connection type */
for (type = 0; type < CONN_TYPE_MAX; type++) {
tmpct = connTypes[type];
if (!tmpct)
break;
/* ignore case, we really don't care "tls"/"TLS" */
if (!strcasecmp(typename, tmpct->get_type(NULL))) {
serverLog(LL_WARNING, "Connection types %s already registered", typename);
return C_ERR;
}
}
serverLog(LL_VERBOSE, "Connection type %s registered", typename);
connTypes[type] = ct;
if (ct->init) {
ct->init();
}
return C_OK;
}
基于此,在redis/src/server.c监听各个连接类型则变成:
/* create all the configured listener, and add handler to start to accept */
int listen_fds = 0;
for (int j = 0; j < CONN_TYPE_MAX; j++) {
listener = &server.listeners[j];
if (listener->ct == NULL)
continue;
if (connListen(listener) == C_ERR) {
serverLog(LL_WARNING, "Failed listening on port %u (%s), aborting.", listener->port, listener->ct->get_type(NULL));
exit(1);
}
if (createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)) != C_OK)
serverPanic("Unrecoverable error creating %s listener accept handler.", listener->ct->get_type(NULL));
listen_fds += listener->count;
}
动态加载连接类型
在过去的版本中,需要在Redis编译时决定是否支持TLS。得益于新的连接层框架,Redis支持:
同时,在代码结构上,也带来了一定的收益:几乎移除掉#ifdef USE_OPENSSL,仅在redis/src/tls.c中使用,同时重载ConnectionType:
static ConnectionType CT_TLS = {
/* connection type */
.get_type = connTLSGetType,
/* connection type initialize & finalize & configure */
.init = tlsInit,
.cleanup = tlsCleanup,
.configure = tlsConfigure,
/* ae & accept & listen & error & address handler */
.ae_handler = tlsEventHandler,
.accept_handler = tlsAcceptHandler,
.addr = connTLSAddr,
.listen = connTLSListen,
/* create/close connection */
.conn_create = connCreateTLS,
.conn_create_accepted = connCreateAcceptedTLS,
.close = connTLSClose,
/* connect & accept */
.connect = connTLSConnect,
.blocking_connect = connTLSBlockingConnect,
.accept = connTLSAccept,
/* IO */
.read = connTLSRead,
.write = connTLSWrite,
.writev = connTLSWritev,
.set_write_handler = connTLSSetWriteHandler,
.set_read_handler = connTLSSetReadHandler,
.get_last_error = connTLSGetLastError,
.sync_write = connTLSSyncWrite,
.sync_read = connTLSSyncRead,
.sync_readline = connTLSSyncReadLine,
/* pending data */
.has_pending_data = tlsHasPendingData,
.process_pending_data = tlsProcessPendingData,
/* TLS specified methods */
.get_peer_cert = connTLSGetPeerCert,
};
以及在Redis Module的入口函数中,执行connTypeRegister向Redis注册新的连接类型:
int RedisModule_OnLoad(void *ctx, RedisModuleString **argv, int argc) {
UNUSED(argv);
UNUSED(argc);
/* Connection modules must be part of the same build as redis. */
if (strcmp(REDIS_BUILD_ID_RAW, redisBuildIdRaw())) {
serverLog(LL_NOTICE, "Connection type %s was not built together with the redis-server used.", CONN_TYPE_TLS);
return REDISMODULE_ERR;
}
if (RedisModule_Init(ctx,"tls",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
/* Connection modules is available only bootup. */
if ((RedisModule_GetContextFlags(ctx) & REDISMODULE_CTX_FLAGS_SERVER_STARTUP) == 0) {
serverLog(LL_NOTICE, "Connection type %s can be loaded only during bootup", CONN_TYPE_TLS);
return REDISMODULE_ERR;
}
RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD);
if(connTypeRegister(&CT_TLS) != C_OK)
return REDISMODULE_ERR;
return REDISMODULE_OK;
}
通过Redis Module机制,以及连接层的抽象和框架扩展能力,让Redis的连接类型支持更加易用、可扩展。
重写的Unix Socket连接类型
尽管Unix Socket和TCP是完全不同的连接类型,但是二者具有很大的相似性:基于一个FD(文件描述符)即可操作;支持read、write、writev等IO操作。于是Redis在代码中谨慎地判断TCP/Unix Socket,最大程度上复用了TCP的函数。
基于新的连接类型框框架,把Unix Socket支持从TCP中剥离出来,让代码拥有更好的维护性,参考redis/src/unix.c:
/* ==========================================================================
* unix.c - unix socket connection implementation
* --------------------------------------------------------------------------
* Copyright (C) 2022 zhenwei pi
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to permit
* persons to whom the Software is furnished to do so, subject to the
* following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
* USE OR OTHER DEALINGS IN THE SOFTWARE.
* ==========================================================================
*/
#include "server.h"
#include "connection.h"
static ConnectionType CT_Unix;
static const char *connUnixGetType(connection *conn) {
UNUSED(conn);
return CONN_TYPE_UNIX;
}
static void connUnixEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) {
connectionTypeTcp()->ae_handler(el, fd, clientData, mask);
}
static int connUnixAddr(connection *conn, char *ip, size_t ip_len, int *port, int remote) {
return connectionTypeTcp()->addr(conn, ip, ip_len, port, remote);
}
static int connUnixListen(connListener *listener) {
int fd;
mode_t *perm = (mode_t *)listener->priv;
if (listener->bindaddr_count == 0)
return C_OK;
/* currently listener->bindaddr_count is always 1, we still use a loop here in case Redis supports multi Unix socket in the future */
for (int j = 0; j < listener->bindaddr_count; j++) {
char *addr = listener->bindaddr[j];
unlink(addr); /* don't care if this fails */
fd = anetUnixServer(server.neterr, addr, *perm, server.tcp_backlog);
if (fd == ANET_ERR) {
serverLog(LL_WARNING, "Failed opening Unix socket: %s", server.neterr);
exit(1);
}
anetNonBlock(NULL, fd);
anetCloexec(fd);
listener->fd[listener->count++] = fd;
}
return C_OK;
}
static connection *connCreateUnix(void) {
connection *conn = zcalloc(sizeof(connection));
conn->type = &CT_Unix;
conn->fd = -1;
return conn;
}
static connection *connCreateAcceptedUnix(int fd, void *priv) {
UNUSED(priv);
connection *conn = connCreateUnix();
conn->fd = fd;
conn->state = CONN_STATE_ACCEPTING;
return conn;
}
static void connUnixAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cfd, max = MAX_ACCEPTS_PER_CALL;
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
while(max--) {
cfd = anetUnixAccept(server.neterr, fd);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket);
acceptCommonHandler(connCreateAcceptedUnix(cfd, NULL),CLIENT_UNIX_SOCKET,NULL);
}
}
static void connUnixClose(connection *conn) {
connectionTypeTcp()->close(conn);
}
static int connUnixAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
return connectionTypeTcp()->accept(conn, accept_handler);
}
static int connUnixWrite(connection *conn, const void *data, size_t data_len) {
return connectionTypeTcp()->write(conn, data, data_len);
}
static int connUnixWritev(connection *conn, const struct iovec *iov, int iovcnt) {
return connectionTypeTcp()->writev(conn, iov, iovcnt);
}
static int connUnixRead(connection *conn, void *buf, size_t buf_len) {
return connectionTypeTcp()->read(conn, buf, buf_len);
}
static int connUnixSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
return connectionTypeTcp()->set_write_handler(conn, func, barrier);
}
static int connUnixSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
return connectionTypeTcp()->set_read_handler(conn, func);
}
static const char *connUnixGetLastError(connection *conn) {
return strerror(conn->last_errno);
}
static ssize_t connUnixSyncWrite(connection *conn, char *ptr, ssize_t size, long long timeout) {
return syncWrite(conn->fd, ptr, size, timeout);
}
static ssize_t connUnixSyncRead(connection *conn, char *ptr, ssize_t size, long long timeout) {
return syncRead(conn->fd, ptr, size, timeout);
}
static ssize_t connUnixSyncReadLine(connection *conn, char *ptr, ssize_t size, long long timeout) {
return syncReadLine(conn->fd, ptr, size, timeout);
}
static ConnectionType CT_Unix = {
/* connection type */
.get_type = connUnixGetType,
/* connection type initialize & finalize & configure */
.init = NULL,
.cleanup = NULL,
.configure = NULL,
/* ae & accept & listen & error & address handler */
.ae_handler = connUnixEventHandler,
.accept_handler = connUnixAcceptHandler,
.addr = connUnixAddr,
.listen = connUnixListen,
/* create/close connection */
.conn_create = connCreateUnix,
.conn_create_accepted = connCreateAcceptedUnix,
.close = connUnixClose,
/* connect & accept */
.connect = NULL,
.blocking_connect = NULL,
.accept = connUnixAccept,
/* IO */
.write = connUnixWrite,
.writev = connUnixWritev,
.read = connUnixRead,
.set_write_handler = connUnixSetWriteHandler,
.set_read_handler = connUnixSetReadHandler,
.get_last_error = connUnixGetLastError,
.sync_write = connUnixSyncWrite,
.sync_read = connUnixSyncRead,
.sync_readline = connUnixSyncReadLine,
/* pending data */
.has_pending_data = NULL,
.process_pending_data = NULL,
};
int RedisRegisterConnectionTypeUnix()
{
return connTypeRegister(&CT_Unix);
}
由于Unix Socket实现较为简单,且复用了大量的TCP连接代码,unix.c中仅使用了少量的代码实现,从中依然可以窥探一个连接类型具有的基本属性:
展望
得益于Redis连接层框架和Module机制,向Redis中增加一个新的连接类型变得更加容易。RDMA是一种高性能的网络技术,iWARP和RoCE v2也让数据中心的以太网络支持了RDMA,近年来也变得更加流行。因此,是不是可以让Redis跑在RDMA上呢?在测试中,在1KB的KV情况下,Redis Over RDMA技术让Redis单核性能达到~450K QPS,大约是相同环境下的TCP性能的2.5倍(~180K QPS)。目前Redis Over RDMA的Pull Request正在社区接受Review,也欢迎提建议、捉BUG。