👉这是一个或许对你有用的开源项目 国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。 功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号等等功能:
来源:juejin.cn/post/ 7257882929718788157
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
我们可以借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同时有多个客户端发送setnx命令,只有一个客户端可以成功,返回1(true);其他的客户端返回0(false),流程图如下图所示:
public void testLock() {
// 1. 从redis中获取锁,setnx
Boolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", "111");
if (lock) {
// 查询redis中的num值
String value = this.redisTemplate.opsForValue().get("num");
// 没有该值return
if (StringUtils.isBlank(value)){
return ;
}
// 有值就转成成int
int num = Integer.parseInt(value);
// 把redis中的num值+1
this.redisTemplate.opsForValue().set("num", String.valueOf(++num));
// 2. 释放锁 del
this.redisTemplate.delete("lock");
} else {
// 3. 每隔1秒钟回调一次,再次尝试获取锁
try {
Thread.sleep(1000);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
那么以上代码是否可以解决全部问题呢? 显示是不能的,我们假设setnx刚好获取到锁,业务逻辑出现异常,导致锁无法释放,怎么办呢?
设置过期有俩种方式可以选择:
代码实现优化就是在设置锁的时候设置过期时间:
public void testLock() {
// 1. 从redis中获取锁,setnx
Boolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", "111",3, TimeUnit.MINUTES);
if (lock) {
//与之前相同代码略过
...
}
}
那么还会不会存在问题呢?
场景:如果业务逻辑的执行时间是7s。执行流程如下:
解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁
public void testLock() {
// 1. 从redis中获取锁,setnx
String uuid = UUID.randomUUID().toString();
Boolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.MINUTES);
if (lock) {
//与之前相同代码略过
...
// 2. 释放锁 del
if (StringUtils.equals(redisTemplate.opsForValue().get("lock"),uuid)){
this.redisTemplate.delete("lock");
}
}
}
场景:
首先我们先简单介绍一下lua脚本的基本知识(lua脚本是c语言)定义变量:
流程控制:
if(exp) then
业务逻辑
elseif(exp) then
业务逻辑
else
业务逻辑
end
redis中执行lua脚本:
eval script numkeys keys[] args[]
: eval指令的输出不是lua脚本的打印而是lua脚本的返回值script
:lua脚本字符串,定义动态变量:KEYS[1] ARGV[1]numkeys
:key数组的元素个数keys
:keys数组args
:argv数组redis集群执行lua脚本可能会报错:如果所有keys不在同一个分片上,lua脚本就会报错:解决方案是:
删除LUA脚本:
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end
public void testLock() {
// 1. 从redis中获取锁,setnx
String uuid = UUID.randomUUID().toString();
Boolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);
if (lock) {
//与之前相同代码略过
...
// 2. 释放锁 del
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
this.redisTemplate.execute(new DefaultRedisScript<>(script), Arrays.asList("lock"), uuid);
} else {
// 3. 每隔1秒钟回调一次,再次尝试获取锁
try {
Thread.sleep(1000);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下执行。
可重入锁最大特性就是计数,计算加锁的次数。所以当可重入锁需要在分布式环境实现时,我们也就需要统计加锁次数。
我们基于Redis Hash 实现方案 :
Redis 提供了 Hash (哈希表)这种可以存储键值对数据结构。所以我们可以使用 Redis Hash 存储的锁的重入次数,然后利用 lua 脚本判断逻辑。
if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1)
then
redis.call('hincrby', KEYS[1], ARGV[1], 1);
redis.call('expire', KEYS[1], ARGV[2]);
return 1;
else
return 0;
end
假设值为:KEYS:[lock]
, ARGV[uuid, expire]
如果锁不存在或者这是自己的锁,就通过hincrby(不存在新增,存在就加1)获取锁或者锁次数加1。 代码实例如下:
private Boolean tryLock(String lockName, String uuid, Long expire){
String script = "if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1) " +
"then" +
" redis.call('hincrby', KEYS[1], ARGV[1], 1);" +
" redis.call('expire', KEYS[1], ARGV[2]);" +
" return 1;" +
"else" +
" return 0;" +
"end";
if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, expire.toString())){
try {
// 没有获取到锁,重试
Thread.sleep(200);
tryLock(lockName, uuid, expire);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 获取到锁,返回true
return true;
}
-- 判断 hash set 可重入 key 的值是否等于 0
-- 如果为 nil 代表 自己的锁已不存在,在尝试解其他线程的锁,解锁失败
-- 如果为 0 代表 可重入次数被减 1
-- 如果为 1 代表 该可重入 key 解锁成功
if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then
return nil;
end;
-- 小于等于 0 代表可以解锁
if (redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then
return 0;
else
redis.call('del', KEYS[1]);
return 1;
end;
这里之所以没有跟加锁一样使用 Boolean ,这是因为解锁 lua 脚本中,三个返回值含义如下:
如果返回值使用 Boolean,Spring-data-redis 进行类型转换时将会把 null 转为 false,这就会影响我们逻辑判断,所以返回类型只好使用 Long。
private void unlock(String lockName, String uuid){
String script = "if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then" +
" return nil;" +
"end;" +
"if (redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then" +
" return 0;" +
"else" +
" redis.call('del', KEYS[1]);" +
" return 1;" +
"end;";
// 这里之所以没有跟加锁一样使用 Boolean ,这是因为解锁 lua 脚本中,三个返回值含义如下:
// 1 代表解锁成功,锁被释放
// 0 代表可重入次数被减 1
// null 代表其他线程尝试解锁,解锁失败
Long result = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Lists.newArrayList(lockName), uuid);
// 如果未返回值,代表尝试解其他线程的锁
if (result == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: "
+ lockName + " with request: " + uuid);
}
}
public void testLock() {
// 加锁
String uuid = UUID.randomUUID().toString();
Boolean lock = this.tryLock("lock", uuid, 300l);
if (lock) {
// 读取redis中的num值
String numString = this.redisTemplate.opsForValue().get("num");
if (StringUtils.isBlank(numString)) {
return;
}
// ++操作
Integer num = Integer.parseInt(numString);
num++;
// 放入redis
this.redisTemplate.opsForValue().set("num", String.valueOf(num));
// 测试可重入性
this.testSubLock(uuid);
// 释放锁
this.unlock("lock", uuid);
}
}
// 测试可重入性
private void testSubLock(String uuid){
// 加锁
Boolean lock = this.tryLock("lock", uuid, 300l);
if (lock) {
System.out.println("分布式可重入锁。。。");
this.unlock("lock", uuid);
}
}
A线程超时时间设为10s(为了解决死锁问题),但代码执行时间可能需要30s,然后redis服务端10s后将锁删除。 此时,B线程恰好申请锁,redis服务端不存在该锁,可以申请,也执行了代码。
那么问题来了, A、B线程都同时获取到锁并执行业务逻辑,这与分布式锁最基本的性质相违背:在任意一个时刻,只有一个客户端持有锁(即独享排他)。
锁延期方法:开启子线程执行延期
/**
* 锁延期
* 线程等待超时时间的2/3时间后,执行锁延时代码,直到业务逻辑执行完毕,因此在此过程中,其他线程无法获取到锁,保证了线程安全性
* @param lockName
* @param expire 单位:毫秒
*/
private void renewTime(String lockName, String uuid, Long expire){
String script = "if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then redis.call('expire', KEYS[1], ARGV[2]); return 1; else return 0; end";
new Thread(() -> {
while (this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Lists.newArrayList(lockName), uuid, expire.toString())){
try {
// 到达过期时间的2/3时间,自动续期
Thread.sleep(expire / 3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
获取锁成功后,调用延期方法给锁 定时延期:
private Boolean tryLock(String lockName, String uuid, Long expire){
String script = "if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1) " +
"then" +
" redis.call('hincrby', KEYS[1], ARGV[1], 1);" +
" redis.call('expire', KEYS[1], ARGV[2]);" +
" return 1;" +
"else" +
" return 0;" +
"end";
if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, expire.toString())){
try {
// 没有获取到锁,重试
Thread.sleep(200);
tryLock(lockName, uuid, expire);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 锁续期
this.renewTime(lockName, uuid, expire * 1000);
// 获取到锁,返回true
return true;
}
redis集群状态下的问题:
我们知道java中有synchronized、lock锁、读写锁ReadWriteLock,众所周知这些锁都是本地锁。
提到锁就不得不提JUC:java.util.concurrent包,又称concurrent包。jdk1.5提供,为多线程高并发编程而提供的包,但此文章的场景是分布式场景,后续会出JUC的文章。
redis、ab工具(压测)
@GetMapping("/test")
public void testNoLock(){
String count = (String) this.redisTemplate.opsForValue().get("count");
if (count == null){
//没有值直接返回
return;
}
// 有值就转成成int
int number = Integer.parseInt(count);
// 把redis中的num值+1
this.redisTemplate.opsForValue().set("count", String.valueOf(++number));
}
测试之前的查看值为1
@GetMapping("/getCount")
public String getCount(){
String count = String.valueOf(this.redisTemplate.opsForValue().get("count"));
return count; //1
}
// ab -n(一次发送的请求数) -c(请求的并发数) 访问路径
ab -n100 -c50 http://127.0.0.1:8080/test/test
再次查询结果为6,说明问题很大
public synchronized void testNoLock(){
String count = String.valueOf(this.redisTemplate.opsForValue().get("count"));
if ("null".equals(count)){
//没有值直接返回
return;
}
// 有值就转成成int
int number = Integer.parseInt(count);
// 把redis中的num值+1
this.redisTemplate.opsForValue().set("count", String.valueOf(++number));
}
ab -n100 -c50 http://127.0.0.1:8080/test/test
此次结果为106,说明结果是正确的,看样子结果是非常完美的,但是真的很完美吗?
ab -n100 -c50 http://127.0.0.1:8080/test/test
此次的结果为58!!!
到此我们可以知道,本地锁是有局限性的。
[1]redlock文档: https://redis.io/topics/distlock