前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Redis网络连接层的过去、现状和展望

Redis网络连接层的过去、现状和展望

作者头像
皮振伟
发布2022-12-03 12:29:20
1.1K0
发布2022-12-03 12:29:20
举报
文章被收录于专栏:皮振伟的专栏

Redis网络连接层

Redis取自Remote Dictionary Server,顾名思义,Redis是运行在网络环境之上的。Redis目前支持3种网络连接类型:

  • TCP:默认监听TCP 6379端口,接收网络请求,提供服务。
  • Unix Socket:可以用作测试,以及使用Unix Socket做配置变更等。
  • TLS:使用TLS加密的网络连接,可以防止网络链路上被监听、劫持,更加安全。不过代价就是性能有损失。

Redis通过这3种网络连接类型的支持,满足了绝大多数的用户需求,成为了目前最流行的KV存储数据库。

过去

截至2022-Q3,Redis最新的版本是7.0.5。在当前版本中,使用了“传统”的网络连接管理方式。在redis/src/server.c中监听端口:

代码语言:javascript
复制
    /* Open the TCP listening socket for the user commands. */
    if (server.port != 0 &&
        listenToPort(server.port,&server.ipfd) == C_ERR) {
        /* Note: the following log text is matched by the test suite. */
        serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port);
        exit(1);
    }
    if (server.tls_port != 0 &&
        listenToPort(server.tls_port,&server.tlsfd) == C_ERR) {
        /* Note: the following log text is matched by the test suite. */
        serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.", server.tls_port);
        exit(1);
    }

    /* Open the listening Unix domain socket. */
    if (server.unixsocket != NULL) {
        unlink(server.unixsocket); /* don't care if this fails */
        server.sofd = anetUnixServer(server.neterr,server.unixsocket,
            (mode_t)server.unixsocketperm, server.tcp_backlog);
        if (server.sofd == ANET_ERR) {
            serverLog(LL_WARNING, "Failed opening Unix socket: %s", server.neterr);
            exit(1);
        }
        anetNonBlock(NULL,server.sofd);
        anetCloexec(server.sofd);
    }

以及设置监听文件描述符的处理函数,开始提供网络服务:

代码语言:javascript
复制
    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
    if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
        serverPanic("Unrecoverable error creating TCP socket accept handler.");
    }
    if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) {
        serverPanic("Unrecoverable error creating TLS socket accept handler.");
    }
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");

从代码中,我们可以清晰地看到这几种连接类型的初始化过程和配置参数等。但是它的代价是:

  • 不能扩展:如果想要支持一个新的连接类型,那么势必要修改代码。事实上,需要修改的代码还包含另外的几处。
  • 宏的引用:因为TLS不是操作系统默认支持,需要依赖编译选项控制,那么存在TLS是否支持两种情况,就需要使用宏控制。在Redis的代码中存在多处#ifdef USE_OPENSSL的使用。
  • 代码的可维护性降低:在server.c中,不得不引用、调用TCP、TLS和Unix Socket相关的代码逻辑。

现状

连接层框架

截至2022-Q3,在Redis最新的开发分支上,支持了连接层框架(connection layer framework),它长成这样:

代码语言:javascript
复制
                           uplayer
                              |
                       connection layer
                         /    |     \
                       TCP   Unix   TLS

connection layer负责抽象连接类型,它要求每种连接类型具有如下的方法:

代码语言:javascript
复制
typedef struct ConnectionType {
    /* connection type */
    const char *(*get_type)(struct connection *conn);
    /* connection type initialize & finalize & configure */
    void (*init)(void); /* auto-call during register */
    void (*cleanup)(void);
    int (*configure)(void *priv, int reconfigure);

    /* ae & accept & listen & error & address handler */
    void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
    aeFileProc *accept_handler;
    int (*addr)(connection *conn, char *ip, size_t ip_len, int *port, int remote);
    int (*listen)(connListener *listener);

    /* create/close connection */
    connection* (*conn_create)(void);
    connection* (*conn_create_accepted)(int fd, void *priv);
    void (*close)(struct connection *conn);
    /* connect & accept */
    int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
    int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
    int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
    /* IO */
    int (*write)(struct connection *conn, const void *data, size_t data_len);
    int (*writev)(struct connection *conn, const struct iovec *iov, int iovcnt);
    int (*read)(struct connection *conn, void *buf, size_t buf_len);
    int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
    int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
    const char *(*get_last_error)(struct connection *conn);
    ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
    ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
    ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);

    /* pending data */
    int (*has_pending_data)(void);
    int (*process_pending_data)(void);
    /* TLS specified methods */
    sds (*get_peer_cert)(struct connection *conn);
} ConnectionType;

上层(uplayer)通过connection layer访问Redis的各个连接类型,则可以忽略连接类型的具体实现,仅仅需要调用各个方法即可。

同时,connection layer还负责管理各个连接类型,例如一个新连接类型在使用之前,需要向Redis进行注册,参考redis/src/connection.c:

代码语言:javascript
复制
int connTypeRegister(ConnectionType *ct) {
    const char *typename = ct->get_type(NULL);
    ConnectionType *tmpct;
    int type;

    /* find an empty slot to store the new connection type */
    for (type = 0; type < CONN_TYPE_MAX; type++) {
        tmpct = connTypes[type];
        if (!tmpct)
            break;
        /* ignore case, we really don't care "tls"/"TLS" */
        if (!strcasecmp(typename, tmpct->get_type(NULL))) {
            serverLog(LL_WARNING, "Connection types %s already registered", typename);
            return C_ERR;
        }
    }

    serverLog(LL_VERBOSE, "Connection type %s registered", typename);
    connTypes[type] = ct;

    if (ct->init) {
        ct->init();
    }

    return C_OK;
}

基于此,在redis/src/server.c监听各个连接类型则变成:

代码语言:javascript
复制
    /* create all the configured listener, and add handler to start to accept */
    int listen_fds = 0; 
    for (int j = 0; j < CONN_TYPE_MAX; j++) {
        listener = &server.listeners[j];
        if (listener->ct == NULL)
            continue;

        if (connListen(listener) == C_ERR) {
            serverLog(LL_WARNING, "Failed listening on port %u (%s), aborting.", listener->port, listener->ct->get_type(NULL));
            exit(1);
        }    

        if (createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)) != C_OK)
            serverPanic("Unrecoverable error creating %s listener accept handler.", listener->ct->get_type(NULL));

       listen_fds += listener->count;
    }       

动态加载连接类型

在过去的版本中,需要在Redis编译时决定是否支持TLS。得益于新的连接层框架,Redis支持:

  • 不支持TLS。
  • 静态支持TLS:make BUILD_TLS=yes,代价是redis-server始终需要链接libssl和libcrypto,尽管可能不运行。
  • 动态支持TLS:make BUILD_TLS=module即可把TLS支持编译成为redis-tls.so。如果希望使用TLS,通过redis-server --loadmodule src/redis-tls.so即可动态加载TLS,达到了“运行时加载”的效果。

同时,在代码结构上,也带来了一定的收益:几乎移除掉#ifdef USE_OPENSSL,仅在redis/src/tls.c中使用,同时重载ConnectionType:

代码语言:javascript
复制
static ConnectionType CT_TLS = {
    /* connection type */
    .get_type = connTLSGetType,
    /* connection type initialize & finalize & configure */
    .init = tlsInit,
    .cleanup = tlsCleanup,
    .configure = tlsConfigure,
    /* ae & accept & listen & error & address handler */
    .ae_handler = tlsEventHandler,
    .accept_handler = tlsAcceptHandler,
    .addr = connTLSAddr,
    .listen = connTLSListen,
    /* create/close connection */
    .conn_create = connCreateTLS,
    .conn_create_accepted = connCreateAcceptedTLS,
    .close = connTLSClose,
    /* connect & accept */
    .connect = connTLSConnect,
    .blocking_connect = connTLSBlockingConnect,
    .accept = connTLSAccept,
    /* IO */
    .read = connTLSRead,
    .write = connTLSWrite,
    .writev = connTLSWritev,
    .set_write_handler = connTLSSetWriteHandler,
    .set_read_handler = connTLSSetReadHandler,
    .get_last_error = connTLSGetLastError,
    .sync_write = connTLSSyncWrite,
    .sync_read = connTLSSyncRead,
    .sync_readline = connTLSSyncReadLine,

    /* pending data */
    .has_pending_data = tlsHasPendingData,
    .process_pending_data = tlsProcessPendingData,

    /* TLS specified methods */
    .get_peer_cert = connTLSGetPeerCert,
};

以及在Redis Module的入口函数中,执行connTypeRegister向Redis注册新的连接类型:

代码语言:javascript
复制
int RedisModule_OnLoad(void *ctx, RedisModuleString **argv, int argc) {
    UNUSED(argv);
    UNUSED(argc);

    /* Connection modules must be part of the same build as redis. */
    if (strcmp(REDIS_BUILD_ID_RAW, redisBuildIdRaw())) {
        serverLog(LL_NOTICE, "Connection type %s was not built together with the redis-server used.", CONN_TYPE_TLS);
        return REDISMODULE_ERR;
    }

    if (RedisModule_Init(ctx,"tls",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR)
        return REDISMODULE_ERR;
    /* Connection modules is available only bootup. */
    if ((RedisModule_GetContextFlags(ctx) & REDISMODULE_CTX_FLAGS_SERVER_STARTUP) == 0) {
        serverLog(LL_NOTICE, "Connection type %s can be loaded only during bootup", CONN_TYPE_TLS);
        return REDISMODULE_ERR;
    }

    RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD);

    if(connTypeRegister(&CT_TLS) != C_OK)
        return REDISMODULE_ERR;

    return REDISMODULE_OK;
}

通过Redis Module机制,以及连接层的抽象和框架扩展能力,让Redis的连接类型支持更加易用、可扩展。

重写的Unix Socket连接类型

尽管Unix Socket和TCP是完全不同的连接类型,但是二者具有很大的相似性:基于一个FD(文件描述符)即可操作;支持read、write、writev等IO操作。于是Redis在代码中谨慎地判断TCP/Unix Socket,最大程度上复用了TCP的函数。

基于新的连接类型框框架,把Unix Socket支持从TCP中剥离出来,让代码拥有更好的维护性,参考redis/src/unix.c:

代码语言:javascript
复制
/* ==========================================================================
 * unix.c - unix socket connection implementation
 * --------------------------------------------------------------------------
 * Copyright (C) 2022  zhenwei pi
 *
 * Permission is hereby granted, free of charge, to any person obtaining a
 * copy of this software and associated documentation files (the
 * "Software"), to deal in the Software without restriction, including
 * without limitation the rights to use, copy, modify, merge, publish,
 * distribute, sublicense, and/or sell copies of the Software, and to permit
 * persons to whom the Software is furnished to do so, subject to the
 * following conditions:
 *
 * The above copyright notice and this permission notice shall be included
 * in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
 * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
 * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
 * USE OR OTHER DEALINGS IN THE SOFTWARE.
 * ==========================================================================
 */
#include "server.h"
#include "connection.h"

static ConnectionType CT_Unix;

static const char *connUnixGetType(connection *conn) {
    UNUSED(conn);


    return CONN_TYPE_UNIX;
}

static void connUnixEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) {
    connectionTypeTcp()->ae_handler(el, fd, clientData, mask);
}

static int connUnixAddr(connection *conn, char *ip, size_t ip_len, int *port, int remote) {
    return connectionTypeTcp()->addr(conn, ip, ip_len, port, remote);
}

static int connUnixListen(connListener *listener) {
    int fd;
    mode_t *perm = (mode_t *)listener->priv;

    if (listener->bindaddr_count == 0)
        return C_OK;

    /* currently listener->bindaddr_count is always 1, we still use a loop here in case Redis supports multi Unix socket in the future */
    for (int j = 0; j < listener->bindaddr_count; j++) {
        char *addr = listener->bindaddr[j];

        unlink(addr); /* don't care if this fails */
        fd = anetUnixServer(server.neterr, addr, *perm, server.tcp_backlog);
        if (fd == ANET_ERR) {
            serverLog(LL_WARNING, "Failed opening Unix socket: %s", server.neterr);
            exit(1);
        }
        anetNonBlock(NULL, fd);
        anetCloexec(fd);
        listener->fd[listener->count++] = fd;
    }

    return C_OK;
}

static connection *connCreateUnix(void) {
    connection *conn = zcalloc(sizeof(connection));
    conn->type = &CT_Unix;
    conn->fd = -1;

    return conn;
}

static connection *connCreateAcceptedUnix(int fd, void *priv) {
    UNUSED(priv);
    connection *conn = connCreateUnix();
    conn->fd = fd;
    conn->state = CONN_STATE_ACCEPTING;
    return conn;
}

static void connUnixAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cfd, max = MAX_ACCEPTS_PER_CALL;
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        cfd = anetUnixAccept(server.neterr, fd);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket);
        acceptCommonHandler(connCreateAcceptedUnix(cfd, NULL),CLIENT_UNIX_SOCKET,NULL);
    }
}

static void connUnixClose(connection *conn) {
    connectionTypeTcp()->close(conn);
}

static int connUnixAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
    return connectionTypeTcp()->accept(conn, accept_handler);
}

static int connUnixWrite(connection *conn, const void *data, size_t data_len) {
    return connectionTypeTcp()->write(conn, data, data_len);
}

static int connUnixWritev(connection *conn, const struct iovec *iov, int iovcnt) {
    return connectionTypeTcp()->writev(conn, iov, iovcnt);
}

static int connUnixRead(connection *conn, void *buf, size_t buf_len) {
    return connectionTypeTcp()->read(conn, buf, buf_len);
}

static int connUnixSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
    return connectionTypeTcp()->set_write_handler(conn, func, barrier);
}

static int connUnixSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    return connectionTypeTcp()->set_read_handler(conn, func);
}

static const char *connUnixGetLastError(connection *conn) {
    return strerror(conn->last_errno);
}

static ssize_t connUnixSyncWrite(connection *conn, char *ptr, ssize_t size, long long timeout) {
    return syncWrite(conn->fd, ptr, size, timeout);
}

static ssize_t connUnixSyncRead(connection *conn, char *ptr, ssize_t size, long long timeout) {
    return syncRead(conn->fd, ptr, size, timeout);
}

static ssize_t connUnixSyncReadLine(connection *conn, char *ptr, ssize_t size, long long timeout) {
    return syncReadLine(conn->fd, ptr, size, timeout);
}

static ConnectionType CT_Unix = {
    /* connection type */
    .get_type = connUnixGetType,

    /* connection type initialize & finalize & configure */
    .init = NULL,
    .cleanup = NULL,
    .configure = NULL,

    /* ae & accept & listen & error & address handler */
    .ae_handler = connUnixEventHandler,
    .accept_handler = connUnixAcceptHandler,
    .addr = connUnixAddr,
    .listen = connUnixListen,

    /* create/close connection */
    .conn_create = connCreateUnix,
    .conn_create_accepted = connCreateAcceptedUnix,
    .close = connUnixClose,
    /* connect & accept */
    .connect = NULL,
    .blocking_connect = NULL,
    .accept = connUnixAccept,

    /* IO */
    .write = connUnixWrite,
    .writev = connUnixWritev,
    .read = connUnixRead,
    .set_write_handler = connUnixSetWriteHandler,
    .set_read_handler = connUnixSetReadHandler,
    .get_last_error = connUnixGetLastError,
    .sync_write = connUnixSyncWrite,
    .sync_read = connUnixSyncRead,
    .sync_readline = connUnixSyncReadLine,

    /* pending data */
    .has_pending_data = NULL,
    .process_pending_data = NULL,
};

int RedisRegisterConnectionTypeUnix()
{
    return connTypeRegister(&CT_Unix);
}

由于Unix Socket实现较为简单,且复用了大量的TCP连接代码,unix.c中仅使用了少量的代码实现,从中依然可以窥探一个连接类型具有的基本属性:

  • 重载ConnectionType连接类型。
  • 连接类型变量和重载函数为static类型,对外不做任何暴露。
  • 向连接层注册。
  • 事实上,也可以把Unix Socket支持编译成为一个动态链接库,以loadmodule的方式动态加载。Redis的Maintainer Oran认为Unix Socket是一个基础的连接类型,不需要额外的链接和宏控制,因此始终使用静态编译支持。

展望

得益于Redis连接层框架和Module机制,向Redis中增加一个新的连接类型变得更加容易。RDMA是一种高性能的网络技术,iWARP和RoCE v2也让数据中心的以太网络支持了RDMA,近年来也变得更加流行。因此,是不是可以让Redis跑在RDMA上呢?在测试中,在1KB的KV情况下,Redis Over RDMA技术让Redis单核性能达到~450K QPS,大约是相同环境下的TCP性能的2.5倍(~180K QPS)。目前Redis Over RDMA的Pull Request正在社区接受Review,也欢迎提建议、捉BUG。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-10-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 AlwaysGeek 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis®
腾讯云数据库 Redis®(TencentDB for Redis®)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档