前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Redis事务的灵活应用与异步连接的优化策略

Redis事务的灵活应用与异步连接的优化策略

原创
作者头像
Lion Long
发布2024-11-07 23:33:28
850
发布2024-11-07 23:33:28
举报
文章被收录于专栏:后端开发技术

一、redis 事务命令

事务是指用户定义一系列数据库操作,这些操作视为一个完整的逻辑处理工作单元,要么全部执行,要么全部不执行,是不可分割的工作单元。

(1)MULTI, 开启事务。 事务执行过程中,单个命令是入队列操作,直到调用 EXEC 才会一起执行。 begin / start transaction。 (2)EXEC,提交事务。 commit。 (3)DISCARD,取消事务。 rollback。 (4)WATCH。 检测 key 的变动,若在事务执行中,key 变动则取消事务;在事务开启前调用,乐观锁实现(cas); 若被取消则事务返回 nil 。

二、lua 脚本实现原子性

redis 中加载了一个 lua 虚拟机;用来执行 redis lua 脚本;redis lua 脚本的执行是原子性的;当某个脚本正在执行的时候,不会 有其他命令或者脚本被执行。 lua 脚本当中的命令会直接修改数据状态。

图片
图片

lua 脚本 mysql 存储区别:MySQL存储过程不具备事务性,所以也不具备原子性。

注意:如果项目中使用了 lua 脚本,不需要使用上面的事务命令。

2.1、EVAL

代码语言:javascript
复制
# 测试使用
EVAL script numkeys key [key ...] arg [arg ...]

示例:

代码语言:javascript
复制
127.0.0.1:6379> get mark
"100"
127.0.0.1:6379> eval "local val=redis.call('get',KEYS[1]);if val then redis.call('set',KEYS[1],2*val); return 2*val; end;return 0;" 1 mark
(integer) 200
127.0.0.1:6379> eval "local val=redis.call('get',KEYS[1]);if val then redis.call('set',KEYS[1],2*val); return 2*val; end;return 0;" 1 not_exist
(integer) 0

2.2、EVALSHA

代码语言:javascript
复制
# 线上使用
EVALSHA sha1 numkeys key [key ...] arg [arg ...]

2.3、script load

代码语言:javascript
复制
# 从文件中读取 lua脚本内容
cat test1.lua | redis-cli script load --pipe
# 加载 lua脚本字符串 生成 sha1
> script load 'local val = KEYS[1]; return val'
"b8059ba43af6ffe8bed3db65bac35d452f8115d8"
# 检查脚本缓存中,是否有该 sha1 散列值的lua脚本
> script exists
"b8059ba43af6ffe8bed3db65bac35d452f8115d8"
1) (integer) 1
# 清除所有脚本缓存
> script flush
OK
# 如果当前脚本运行时间过长,可以通过 script kill 杀死当前运行的脚本
> script kill
(error) NOTBUSY No scripts in execution right now.

使用 script load可以将脚本放入到redis中,redis会返回一个sha1值,redis内部是通过字典方式存放sha1-value,然后使用evalsha执行sha1对应的lua脚本。 示例:

代码语言:javascript
复制
127.0.0.1:6379> script load  "local val=redis.call('get',KEYS[1]);if val then redis.call('set',KEYS[1],2*val); return 2*val; end;return 0;"
"e221b6cd9c6e2a664ed7f4da89f21ca8223e4c7c"
127.0.0.1:6379> evalsha e221b6cd9c6e2a664ed7f4da89f21ca8223e4c7c 1 mark
(integer) 400
127.0.0.1:6379> evalsha e221b6cd9c6e2a664ed7f4da89f21ca8223e4c7c 1 not_exist
(integer) 0

script load带来的好处之一:只需要执行一次script load,就可以多次调用evalsha复用执行。

使用这种方式的原因是真正使用过程中的lua脚本很长,是远远超过40位的字符串,使用eval不合理,而使用evalsha使数据包小一些,从而使网络带宽小一些,服务器处理压力也小一些。

2.4、应用

  1. 项目启动时,建立redis连接并验证后,先加载所有项目中使用的lua脚本(script load)。
  2. 项目中若需要热更新,通过redis-cli script flush;然后可以通过订阅发布功能通知所有服务器重新加载lua脚本。
  3. 若项目中lua脚本发生阻塞,可通过script kill暂停当前阻塞脚本的执行。

2.5、lua脚本的ACID分析

(1)原则性。lua脚本具备原则性。lua脚本是通过一个完整的数据包,一个命令发送过去的;作为一个完整的数据包执行,因为redis是单线程,只有这个数据包执行完才会执行其他的数据包,所以不会被其他连接干扰。 (2)一致性。lua脚本不具备一致性。lua脚本中存在多条语句,如果有一部分执行成功,有一条语句执行失败时,成功执行的语句是不会回滚的。此时,它不满足全部都不执行或全部都执行,也就不满足不可分割的工作单元。 (3)隔离性。lua脚本满足隔离性,因为redis是单线程的,而lua脚本又是一个完整的数据包。天然具备隔离性。 (4)持久性。redis只有在aof持久化策略的时候,并且每写入一个数据都要进行写盘操作,才满足持久性。

三、redis 事务实现方式

(1)乐观锁实现,watch+multi+exec,所以失败需要重试,增加业务逻辑的复杂度。 缺点就是写代码的时候不方便,需要通过watch来保证事务的正确性;watch过程中可能会取消事务,失败需要进行重试,所有业务逻辑比较麻烦。 (2)lua脚本实现。script load+evalsha。网络带宽小一些,服务器处理压力也小一些,并且可以多次调用evalsha复用执行。

四、redis 发布订阅

为了支持消息的多播机制,redis 引入了发布订阅模块。 消息不一定可达;分布式消息队列;stream 的方式确保一定可达。

代码语言:javascript
复制
# 订阅频道
subscribe 频道
# 订阅模式频道
psubscribe 频道
# 取消订阅频道
unsubscribe 频道
# 取消订阅模式频道
punsubscribe 频道
# 发布具体频道或模式频道的内容
publish 频道 内容
# 客户端收到具体频道内容
message 具体频道 内容
# 客户端收到模式频道内容
pmessage 模式频道 具体频道 内容

示例:

代码语言:javascript
复制
subscribe news.it news.showbiz news.car
psubscribe news.*
publish new.showbiz 'hello redis'

五、redis 驱动异步连接

后端通常都是采用的reactor网络模型,redis驱动是指server端的驱动,在server程序构建一个模块,可以和redis交互数据(即server发送的协议redis能识别并处理,redis返回的数据驱动模块能够识别并开展业务逻辑)。

redis 驱动就是把redis连接融合reactor进行管理。

图片
图片

异步连接处理逻辑需要提供函数来接收返回。 redis协议图:

图片
图片

协议实现的第一步需要知道如何界定数据包:

  1. 长度 + 二进制流
  2. 二进制流 + 特殊分隔符

5.1、hiredis库安装

代码语言:javascript
复制
git clone https://gitee.com/mirrors/redis.git -b 6.2
cd redis/deps/hiredis
mkdir build
cd build
cmake ..
make
sudo make install

程序代码编译的时候加上 -lhiredis

5.2、redis 异步连接

同步连接方案采用阻塞 io 来实现;优点是代码书写是同步的,业务逻辑没有割裂。缺点是阻塞当前线程,直至 redis 返回结 果;通常用多个线程来实现线程池来解决效率问题。

异步连接方案采用非阻塞 io 来实现。优点是没有阻塞当前线程,redis 没有返回,依然可以往 redis 发送命令。缺点是代码 书写是异步的(回调函数),业务逻辑割裂,可以通过协程解决(openresty,skynet);配合 redis6.0 以后的 io 多线程(前 提是有大量并发请求),异步连接池,能更好解决应用层的数据访问性能。

5.3、hridis+libevent实现

libevent.h

代码语言:javascript
复制
#ifndef __HIREDIS_LIBEVENT_H__
#define __HIREDIS_LIBEVENT_H__
#include <event2/event.h>
#include "hiredis.h"
#include "async.h"

#define REDIS_LIBEVENT_DELETED 0x01
#define REDIS_LIBEVENT_ENTERED 0x02

typedef struct redisLibeventEvents {
    redisAsyncContext *context;
    struct event *ev;
    struct event_base *base;
    struct timeval tv;
    short flags;
    short state;
} redisLibeventEvents;

static void redisLibeventDestroy(redisLibeventEvents *e) {
    hi_free(e);
}

static void redisLibeventHandler(int fd, short event, void *arg) {
    ((void)fd);
    redisLibeventEvents *e = (redisLibeventEvents*)arg;
    e->state |= REDIS_LIBEVENT_ENTERED;

    #define CHECK_DELETED() if (e->state & REDIS_LIBEVENT_DELETED) {\
        redisLibeventDestroy(e);\
        return; \
    }

    if ((event & EV_TIMEOUT) && (e->state & REDIS_LIBEVENT_DELETED) == 0) {
        redisAsyncHandleTimeout(e->context);
        CHECK_DELETED();
    }

    if ((event & EV_READ) && e->context && (e->state & REDIS_LIBEVENT_DELETED) == 0) {
        redisAsyncHandleRead(e->context);
        CHECK_DELETED();
    }

    if ((event & EV_WRITE) && e->context && (e->state & REDIS_LIBEVENT_DELETED) == 0) {
        redisAsyncHandleWrite(e->context);
        CHECK_DELETED();
    }

    e->state &= ~REDIS_LIBEVENT_ENTERED;
    #undef CHECK_DELETED
}

static void redisLibeventUpdate(void *privdata, short flag, int isRemove) {
    redisLibeventEvents *e = (redisLibeventEvents *)privdata;
    const struct timeval *tv = e->tv.tv_sec || e->tv.tv_usec ? &e->tv : NULL;

    if (isRemove) {
        if ((e->flags & flag) == 0) {
            return;
        } else {
            e->flags &= ~flag;
        }
    } else {
        if (e->flags & flag) {
            return;
        } else {
            e->flags |= flag;
        }
    }

    event_del(e->ev);
    event_assign(e->ev, e->base, e->context->c.fd, e->flags | EV_PERSIST,
                 redisLibeventHandler, privdata);
    event_add(e->ev, tv);
}

static void redisLibeventAddRead(void *privdata) {
    redisLibeventUpdate(privdata, EV_READ, 0);
}

static void redisLibeventDelRead(void *privdata) {
    redisLibeventUpdate(privdata, EV_READ, 1);
}

static void redisLibeventAddWrite(void *privdata) {
    redisLibeventUpdate(privdata, EV_WRITE, 0);
}

static void redisLibeventDelWrite(void *privdata) {
    redisLibeventUpdate(privdata, EV_WRITE, 1);
}

static void redisLibeventCleanup(void *privdata) {
    redisLibeventEvents *e = (redisLibeventEvents*)privdata;
    if (!e) {
        return;
    }
    event_del(e->ev);
    event_free(e->ev);
    e->ev = NULL;

    if (e->state & REDIS_LIBEVENT_ENTERED) {
        e->state |= REDIS_LIBEVENT_DELETED;
    } else {
        redisLibeventDestroy(e);
    }
}

static void redisLibeventSetTimeout(void *privdata, struct timeval tv) {
    redisLibeventEvents *e = (redisLibeventEvents *)privdata;
    short flags = e->flags;
    e->flags = 0;
    e->tv = tv;
    redisLibeventUpdate(e, flags, 0);
}

static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) {
    redisContext *c = &(ac->c);
    redisLibeventEvents *e;

    /* Nothing should be attached when something is already attached */
    if (ac->ev.data != NULL)
        return REDIS_ERR;

    /* Create container for context and r/w events */
    e = (redisLibeventEvents*)hi_calloc(1, sizeof(*e));
    if (e == NULL)
        return REDIS_ERR;

    e->context = ac;

    /* Register functions to start/stop listening for events */
    ac->ev.addRead = redisLibeventAddRead;
    ac->ev.delRead = redisLibeventDelRead;
    ac->ev.addWrite = redisLibeventAddWrite;
    ac->ev.delWrite = redisLibeventDelWrite;
    ac->ev.cleanup = redisLibeventCleanup;
    ac->ev.scheduleTimer = redisLibeventSetTimeout;
    ac->ev.data = e;

    /* Initialize and install read/write events */
    e->ev = event_new(base, c->fd, EV_READ | EV_WRITE, redisLibeventHandler, e);
    e->base = base;
    return REDIS_OK;
}
#endif

main.c

代码语言:javascript
复制
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>

#include <hiredis.h>
#include <async.h>
#include <adapters/libevent.h>

void getCallback(redisAsyncContext *c, void *r, void *privdata) {
    redisReply *reply = r;
    if (reply == NULL) {
        if (c->errstr) {
            printf("errstr: %s\n", c->errstr);
        }
        return;
    }
    printf("argv[%s]: %s\n", (char*)privdata, reply->str);

    /* Disconnect after receiving the reply to GET */
    redisAsyncDisconnect(c);
}

void connectCallback(const redisAsyncContext *c, int status) {
    if (status != REDIS_OK) {
        printf("Error: %s\n", c->errstr);
        return;
    }
    printf("Connected...\n");
}

void disconnectCallback(const redisAsyncContext *c, int status) {
    if (status != REDIS_OK) {
        printf("Error: %s\n", c->errstr);
        return;
    }
    printf("Disconnected...\n");
}

int main (int argc, char **argv) {
#ifndef _WIN32
    signal(SIGPIPE, SIG_IGN);
#endif

    struct event_base *base = event_base_new();
    redisOptions options = {0};
    REDIS_OPTIONS_SET_TCP(&options, "127.0.0.1", 6379);
    struct timeval tv = {0};
    tv.tv_sec = 1;
    options.connect_timeout = &tv;


    redisAsyncContext *c = redisAsyncConnectWithOptions(&options);
    if (c->err) {
        /* Let *c leak for now... */
        printf("Error: %s\n", c->errstr);
        return 1;
    }

    redisLibeventAttach(c,base);
    redisAsyncSetConnectCallback(c,connectCallback);
    redisAsyncSetDisconnectCallback(c,disconnectCallback);
    redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
    redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
    event_base_dispatch(base);
    return 0;
}

六、总结

  1. 事务如果是乐观锁实现,即watch+multi+exec,失败需要重试,会增加业务逻辑的复杂度。
  2. lua脚本满足原子性和隔离性,但不满足一致性和持久性。redis只有在aof持久化策略的时候,并且每写入一个数据都要进行写盘操作,才满足持久性。
  3. redis 同步连接方案采用阻塞 io 来实现,异步连接方案采用非阻塞 io 来实现。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、redis 事务命令
  • 二、lua 脚本实现原子性
    • 2.1、EVAL
      • 2.2、EVALSHA
        • 2.3、script load
          • 2.4、应用
            • 2.5、lua脚本的ACID分析
            • 三、redis 事务实现方式
            • 四、redis 发布订阅
            • 五、redis 驱动异步连接
              • 5.1、hiredis库安装
                • 5.2、redis 异步连接
                  • 5.3、hridis+libevent实现
                  • 六、总结
                  相关产品与服务
                  云数据库 Redis®
                  腾讯云数据库 Redis®(TencentDB for Redis®)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档