事务是指用户定义一系列数据库操作,这些操作视为一个完整的逻辑处理工作单元,要么全部执行,要么全部不执行,是不可分割的工作单元。
(1)MULTI, 开启事务。 事务执行过程中,单个命令是入队列操作,直到调用 EXEC 才会一起执行。 begin / start transaction。 (2)EXEC,提交事务。 commit。 (3)DISCARD,取消事务。 rollback。 (4)WATCH。 检测 key 的变动,若在事务执行中,key 变动则取消事务;在事务开启前调用,乐观锁实现(cas); 若被取消则事务返回 nil 。
redis 中加载了一个 lua 虚拟机;用来执行 redis lua 脚本;redis lua 脚本的执行是原子性的;当某个脚本正在执行的时候,不会 有其他命令或者脚本被执行。 lua 脚本当中的命令会直接修改数据状态。
lua 脚本 mysql 存储区别:MySQL存储过程不具备事务性,所以也不具备原子性。
注意:如果项目中使用了 lua 脚本,不需要使用上面的事务命令。
# 测试使用
EVAL script numkeys key [key ...] arg [arg ...]
示例:
127.0.0.1:6379> get mark
"100"
127.0.0.1:6379> eval "local val=redis.call('get',KEYS[1]);if val then redis.call('set',KEYS[1],2*val); return 2*val; end;return 0;" 1 mark
(integer) 200
127.0.0.1:6379> eval "local val=redis.call('get',KEYS[1]);if val then redis.call('set',KEYS[1],2*val); return 2*val; end;return 0;" 1 not_exist
(integer) 0
# 线上使用
EVALSHA sha1 numkeys key [key ...] arg [arg ...]
# 从文件中读取 lua脚本内容
cat test1.lua | redis-cli script load --pipe
# 加载 lua脚本字符串 生成 sha1
> script load 'local val = KEYS[1]; return val'
"b8059ba43af6ffe8bed3db65bac35d452f8115d8"
# 检查脚本缓存中,是否有该 sha1 散列值的lua脚本
> script exists
"b8059ba43af6ffe8bed3db65bac35d452f8115d8"
1) (integer) 1
# 清除所有脚本缓存
> script flush
OK
# 如果当前脚本运行时间过长,可以通过 script kill 杀死当前运行的脚本
> script kill
(error) NOTBUSY No scripts in execution right now.
使用 script load可以将脚本放入到redis中,redis会返回一个sha1值,redis内部是通过字典方式存放sha1-value,然后使用evalsha执行sha1对应的lua脚本。 示例:
127.0.0.1:6379> script load "local val=redis.call('get',KEYS[1]);if val then redis.call('set',KEYS[1],2*val); return 2*val; end;return 0;"
"e221b6cd9c6e2a664ed7f4da89f21ca8223e4c7c"
127.0.0.1:6379> evalsha e221b6cd9c6e2a664ed7f4da89f21ca8223e4c7c 1 mark
(integer) 400
127.0.0.1:6379> evalsha e221b6cd9c6e2a664ed7f4da89f21ca8223e4c7c 1 not_exist
(integer) 0
script load带来的好处之一:只需要执行一次script load,就可以多次调用evalsha复用执行。
使用这种方式的原因是真正使用过程中的lua脚本很长,是远远超过40位的字符串,使用eval不合理,而使用evalsha使数据包小一些,从而使网络带宽小一些,服务器处理压力也小一些。
(1)原则性。lua脚本具备原则性。lua脚本是通过一个完整的数据包,一个命令发送过去的;作为一个完整的数据包执行,因为redis是单线程,只有这个数据包执行完才会执行其他的数据包,所以不会被其他连接干扰。 (2)一致性。lua脚本不具备一致性。lua脚本中存在多条语句,如果有一部分执行成功,有一条语句执行失败时,成功执行的语句是不会回滚的。此时,它不满足全部都不执行或全部都执行,也就不满足不可分割的工作单元。 (3)隔离性。lua脚本满足隔离性,因为redis是单线程的,而lua脚本又是一个完整的数据包。天然具备隔离性。 (4)持久性。redis只有在aof持久化策略的时候,并且每写入一个数据都要进行写盘操作,才满足持久性。
(1)乐观锁实现,watch+multi+exec,所以失败需要重试,增加业务逻辑的复杂度。 缺点就是写代码的时候不方便,需要通过watch来保证事务的正确性;watch过程中可能会取消事务,失败需要进行重试,所有业务逻辑比较麻烦。 (2)lua脚本实现。script load+evalsha。网络带宽小一些,服务器处理压力也小一些,并且可以多次调用evalsha复用执行。
为了支持消息的多播机制,redis 引入了发布订阅模块。 消息不一定可达;分布式消息队列;stream 的方式确保一定可达。
# 订阅频道
subscribe 频道
# 订阅模式频道
psubscribe 频道
# 取消订阅频道
unsubscribe 频道
# 取消订阅模式频道
punsubscribe 频道
# 发布具体频道或模式频道的内容
publish 频道 内容
# 客户端收到具体频道内容
message 具体频道 内容
# 客户端收到模式频道内容
pmessage 模式频道 具体频道 内容
示例:
subscribe news.it news.showbiz news.car
psubscribe news.*
publish new.showbiz 'hello redis'
后端通常都是采用的reactor网络模型,redis驱动是指server端的驱动,在server程序构建一个模块,可以和redis交互数据(即server发送的协议redis能识别并处理,redis返回的数据驱动模块能够识别并开展业务逻辑)。
redis 驱动就是把redis连接融合reactor进行管理。
异步连接处理逻辑需要提供函数来接收返回。 redis协议图:
协议实现的第一步需要知道如何界定数据包:
git clone https://gitee.com/mirrors/redis.git -b 6.2
cd redis/deps/hiredis
mkdir build
cd build
cmake ..
make
sudo make install
程序代码编译的时候加上 -lhiredis
同步连接方案采用阻塞 io 来实现;优点是代码书写是同步的,业务逻辑没有割裂。缺点是阻塞当前线程,直至 redis 返回结 果;通常用多个线程来实现线程池来解决效率问题。
异步连接方案采用非阻塞 io 来实现。优点是没有阻塞当前线程,redis 没有返回,依然可以往 redis 发送命令。缺点是代码 书写是异步的(回调函数),业务逻辑割裂,可以通过协程解决(openresty,skynet);配合 redis6.0 以后的 io 多线程(前 提是有大量并发请求),异步连接池,能更好解决应用层的数据访问性能。
libevent.h
#ifndef __HIREDIS_LIBEVENT_H__
#define __HIREDIS_LIBEVENT_H__
#include <event2/event.h>
#include "hiredis.h"
#include "async.h"
#define REDIS_LIBEVENT_DELETED 0x01
#define REDIS_LIBEVENT_ENTERED 0x02
typedef struct redisLibeventEvents {
redisAsyncContext *context;
struct event *ev;
struct event_base *base;
struct timeval tv;
short flags;
short state;
} redisLibeventEvents;
static void redisLibeventDestroy(redisLibeventEvents *e) {
hi_free(e);
}
static void redisLibeventHandler(int fd, short event, void *arg) {
((void)fd);
redisLibeventEvents *e = (redisLibeventEvents*)arg;
e->state |= REDIS_LIBEVENT_ENTERED;
#define CHECK_DELETED() if (e->state & REDIS_LIBEVENT_DELETED) {\
redisLibeventDestroy(e);\
return; \
}
if ((event & EV_TIMEOUT) && (e->state & REDIS_LIBEVENT_DELETED) == 0) {
redisAsyncHandleTimeout(e->context);
CHECK_DELETED();
}
if ((event & EV_READ) && e->context && (e->state & REDIS_LIBEVENT_DELETED) == 0) {
redisAsyncHandleRead(e->context);
CHECK_DELETED();
}
if ((event & EV_WRITE) && e->context && (e->state & REDIS_LIBEVENT_DELETED) == 0) {
redisAsyncHandleWrite(e->context);
CHECK_DELETED();
}
e->state &= ~REDIS_LIBEVENT_ENTERED;
#undef CHECK_DELETED
}
static void redisLibeventUpdate(void *privdata, short flag, int isRemove) {
redisLibeventEvents *e = (redisLibeventEvents *)privdata;
const struct timeval *tv = e->tv.tv_sec || e->tv.tv_usec ? &e->tv : NULL;
if (isRemove) {
if ((e->flags & flag) == 0) {
return;
} else {
e->flags &= ~flag;
}
} else {
if (e->flags & flag) {
return;
} else {
e->flags |= flag;
}
}
event_del(e->ev);
event_assign(e->ev, e->base, e->context->c.fd, e->flags | EV_PERSIST,
redisLibeventHandler, privdata);
event_add(e->ev, tv);
}
static void redisLibeventAddRead(void *privdata) {
redisLibeventUpdate(privdata, EV_READ, 0);
}
static void redisLibeventDelRead(void *privdata) {
redisLibeventUpdate(privdata, EV_READ, 1);
}
static void redisLibeventAddWrite(void *privdata) {
redisLibeventUpdate(privdata, EV_WRITE, 0);
}
static void redisLibeventDelWrite(void *privdata) {
redisLibeventUpdate(privdata, EV_WRITE, 1);
}
static void redisLibeventCleanup(void *privdata) {
redisLibeventEvents *e = (redisLibeventEvents*)privdata;
if (!e) {
return;
}
event_del(e->ev);
event_free(e->ev);
e->ev = NULL;
if (e->state & REDIS_LIBEVENT_ENTERED) {
e->state |= REDIS_LIBEVENT_DELETED;
} else {
redisLibeventDestroy(e);
}
}
static void redisLibeventSetTimeout(void *privdata, struct timeval tv) {
redisLibeventEvents *e = (redisLibeventEvents *)privdata;
short flags = e->flags;
e->flags = 0;
e->tv = tv;
redisLibeventUpdate(e, flags, 0);
}
static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) {
redisContext *c = &(ac->c);
redisLibeventEvents *e;
/* Nothing should be attached when something is already attached */
if (ac->ev.data != NULL)
return REDIS_ERR;
/* Create container for context and r/w events */
e = (redisLibeventEvents*)hi_calloc(1, sizeof(*e));
if (e == NULL)
return REDIS_ERR;
e->context = ac;
/* Register functions to start/stop listening for events */
ac->ev.addRead = redisLibeventAddRead;
ac->ev.delRead = redisLibeventDelRead;
ac->ev.addWrite = redisLibeventAddWrite;
ac->ev.delWrite = redisLibeventDelWrite;
ac->ev.cleanup = redisLibeventCleanup;
ac->ev.scheduleTimer = redisLibeventSetTimeout;
ac->ev.data = e;
/* Initialize and install read/write events */
e->ev = event_new(base, c->fd, EV_READ | EV_WRITE, redisLibeventHandler, e);
e->base = base;
return REDIS_OK;
}
#endif
main.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <hiredis.h>
#include <async.h>
#include <adapters/libevent.h>
void getCallback(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
if (reply == NULL) {
if (c->errstr) {
printf("errstr: %s\n", c->errstr);
}
return;
}
printf("argv[%s]: %s\n", (char*)privdata, reply->str);
/* Disconnect after receiving the reply to GET */
redisAsyncDisconnect(c);
}
void connectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
return;
}
printf("Connected...\n");
}
void disconnectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
return;
}
printf("Disconnected...\n");
}
int main (int argc, char **argv) {
#ifndef _WIN32
signal(SIGPIPE, SIG_IGN);
#endif
struct event_base *base = event_base_new();
redisOptions options = {0};
REDIS_OPTIONS_SET_TCP(&options, "127.0.0.1", 6379);
struct timeval tv = {0};
tv.tv_sec = 1;
options.connect_timeout = &tv;
redisAsyncContext *c = redisAsyncConnectWithOptions(&options);
if (c->err) {
/* Let *c leak for now... */
printf("Error: %s\n", c->errstr);
return 1;
}
redisLibeventAttach(c,base);
redisAsyncSetConnectCallback(c,connectCallback);
redisAsyncSetDisconnectCallback(c,disconnectCallback);
redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
event_base_dispatch(base);
return 0;
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。