空间宠物业务需要实现一个定时消息触发组件,如在特定时刻给用户推送收集糖果通知、biubiu球功能定时回收用户丢弃的球等。可见,消息只有在特定时间到达才能被处理。同时,消息的产生是无序的,即后产生的消息被处理的时间可能早于先产生的消息。
一些著名的消息队列组件,如ActiveMQ ,本身支持消息延迟投递,为何本文选择Redis呢?一方面是引入新组建有学习、运维、接入成本,而组内已积累一定Redis开发运维经验;另一方面则是基于Redis实现这样一个组件难度也不大。所以决定采用Redis。
键空间通知可以在消息到达时插入一个key,并给key设置过期时间,键过期后会通过特定频道发布键过期通知,订阅方可收到通知并处理事件。但问题在于:
ZSET可在消息插入时根据score排序,从而使最早的消息排在最前面。但ZSET没有提供POP方法,取得第一个元素和删除需要执行两个命令。为保证原子性,可以采用事务,如:
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> ZRANGE myzset 0 0 WITHSCORES
QUEUED
127.0.0.1:6379> ZREMRANGEBYRANK myzset 0 0
QUEUED
127.0.0.1:6379> EXEC
1) 1) "b"
2) "2"
2) (integer) 1
或者使用pipelining,如:
$ (printf "ZRANGE myzset 0 0 WITHSCORES\r\nZREMRANGEBYRANK myzset 0 0\r\n"; sleep 1) | nc localhost 6379
*2
$1
c
$1
3
:1
但问题在于,虽然可顺序取出消息,但无法只在时间到达后取出消息。因此需要client端实现逻辑等待时间到达再推送。同时,消息产生是无序的,如果取得了一个10分钟后处理的消息,在此期间又产生了一个需要在5分钟后处理的消息,逻辑将变得复杂。
由于使用原生Redis无法满足需求,我们决定扩展Redis命令。
LUA脚本是利用3.X版官方特性实现命令扩展的途径。以下脚本将读出首元素,并与当前时间戳(以参数传入)比较,如果消息处理时间到达则删除消息并返回;所有操作将是原子的。目前我们线上服务使用该方案。
LUA脚本:
local rs = redis.call('ZRANGE', KEYS[1], '0', '0', 'WITHSCORES');
if table.getn(rs)<2 then return rs end;
if tonumber(rs[2]) < tonumber(ARGV[1]) then
redis.call('ZREMRANGEBYRANK', KEYS[1], 0, 0);
return rs
end;
return {}
client生成命令:
redisFormatCommand(&pCmd, "eval %s 1 %s %lld", szScript, szKey, (int64_t)time(NULL)));
缺点是:
改源码,加一个命令。我们较早上线的一个服务使用了该方案。
/* 需要在server.c中加入实现的命令:
struct redisCommand redisCommandTable[] = {
//......
{"zlpopif",zlpopifCommand,3,"w",0,NULL,1,1,1,0,0},
{"zrpopif",zrpopifCommand,3,"w",0,NULL,1,1,1,0,0},
};
*/
/* 实现在t_zset.c: */
void zpopGenericCommand(client *c, int reverse, int condition) {
robj *key = c->argv[1];
robj *zobj;
int keyremoved = 0;
unsigned long deleted = 0;
long start = 0, end = 0, llen = 0;
/* for deletion */
unsigned char *zleptr, *zlsptr;
/* for addReply */
unsigned char zlvstr[128];
unsigned int zlvlen = 0;
long long zlvlong = 0;
robj *slele;
double node_score;
/* Step 1: Lookup & range sanity checks if needed. */
if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
checkType(c,zobj,OBJ_ZSET)) return;
llen = zsetLength(zobj);
if (end >= llen) end = llen-1;
if (start > end || start >= llen) {
return;
}
/* Step 2: Get value of the node will be remove */
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *vstr;
if (reverse)
zleptr = ziplistIndex(zl,-2-(2*start));
else
zleptr = ziplistIndex(zl,2*start);
serverAssertWithInfo(c,zobj,zleptr != NULL);
zlsptr = ziplistNext(zl,zleptr);
serverAssertWithInfo(c,zobj,zleptr != NULL && zlsptr != NULL);
serverAssertWithInfo(c,zobj,ziplistGet(zleptr,&vstr,&zlvlen,&zlvlong));
/* copy the result, sice the node will be delete before addReply */
node_score = zzlGetScore(zlsptr);
if (vstr)
strncpy((char *)zlvstr, (char *)vstr, zlvlen);
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
/* Check if starting point is trivial, before doing log(N) lookup. */
if (reverse) {
ln = zsl->tail;
} else {
ln = zsl->header->level[0].forward;
}
serverAssertWithInfo(c,zobj,ln != NULL);
slele = ln->obj;
incrRefCount(slele); /* MUST call decrRefCount to free mem */
node_score = ln->score;
} else {
serverPanic("Unknown sorted set encoding");
}
/* Step 3: Check if condition satisfied. */
if (condition) {
double condscore = 0;
if (getDoubleFromObjectOrReply(c,c->argv[2],&condscore,NULL)
!= C_OK) goto cleanup;
if (!reverse && condscore < node_score) {
addReply(c,shared.emptymultibulk);
goto cleanup;
}
if (reverse && condscore > node_score) {
addReply(c,shared.emptymultibulk);
goto cleanup;
}
}
/* Step 4: Perform the deletion operation. */
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
/* delete by ptr */
serverAssertWithInfo(c,zobj,zleptr != NULL);
zobj->ptr = zzlDelete(zobj->ptr,zleptr);
deleted = 1;
/* delete by range */
/*zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);*/
if (zzlLength(zobj->ptr) == 0) {
dbDelete(c->db,key);
keyremoved = 1;
}
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
/* delete by ptr */
/*serverAssertWithInfo(c,zobj,slele != NULL);
serverAssertWithInfo(c,zobj,zslDelete(zs->zsl,node_score,slele));
dictDelete(zs->dict,slele);
deleted = 1;*/
/* delete by range */
deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
if (dictSize(zs->dict) == 0) {
dbDelete(c->db,key);
keyremoved = 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
/* Step 5: Notifications and reply. */
if (deleted) {
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id); /* we reuse the built in "zrem" keyspace event for pop operation! */
if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
server.dirty += deleted;
}
/* Step 6: Return the result in form of a multi-bulk reply */
if (deleted) {
addReplyMultiBulkLen(c, 2); /* at most one element with score */
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
if (zlvlen == 0)
addReplyBulkLongLong(c,zlvlong);
else
addReplyBulkCBuffer(c,zlvstr,zlvlen);
addReplyDouble(c,node_score);
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
addReplyBulk(c,slele);
addReplyDouble(c,node_score);
}
} else {
addReply(c,shared.emptymultibulk);
}
cleanup:;
if (zobj->encoding == OBJ_ENCODING_SKIPLIST)
decrRefCount(slele);
}
void zlpopifCommand(client *c) {
zpopGenericCommand(c, 0, 1);
}
void zrpopifCommand(client *c) {
zpopGenericCommand(c, 1, 1);
}
缺点是:后续官方更新都需要改代码。
使用[Redis 4.0模块实现。此处是GitHub传送门。
相比前两种方法,此方法逻辑收归在服务端,且不需要修改Redis源码便于升级。但需要注意资源释放、复制机制等细节,谨防踩坑。
1 . 兼容性:要求所有从机、或加载AOF/RDB的实例均实现了新的命令,即均为修改版Redis或均加载了扩展模块。
2 . 命令写入AOF和从机的时机:
3 . 消息处理失败处理:ZSET中消息被pop后才被client取得处理,若client处理失败则需要client在保证幂等的前提下自行重试。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。