基于SpringBoot实现过一个分布式锁:SpringBoot实现Redis分布式锁,最近面试候选人,这个问题也问的比较多,在这里自己也复习一下。
需要一种支持分布式集群环境下的锁:查询 DB 时,只有一个线程能访问,其他线程都需要等待第一个线程释放锁资源后,才能继续执行。
根据上图,可以简单描述下分布式锁的工作流程:
所有请求的线程都去同一个地方获取锁
,如果有锁
,就执行业务逻辑,没有锁
,就需要等其他线程释放锁
。这个锁
是所有线程可见的,可以把这个锁
放到 Redis 缓存或者数据库。
Redis作为一个可以公共访问的地方,非常适合用来做分布式锁。
用 Redis 实现分布式锁的几种方案,都是用 SETNX 命令(设置 key 等于某 value)。只是高阶方案传的参数个数不一样,以及考虑了异常情况。
SETNX
是set If not exist
的简写。意思就是当 key 不存在时,设置 key 的值,存在时,什么都不做。
Redis命令如下:
set <key> <value> NX
返回 OK
,表示设置成功。重复执行该命令,会返回 nil
表示设置失败。
用 Redis 的 SETNX 命令来实现最简单的分布式锁
// 1.先抢占锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "123");
if (lock) {
// 2.抢占成功,执行业务
Object object = doSomeThing();
// 3.解锁
redisTemplate.delete("lock");
return object;
} else {
// 4.休眠一段时间
sleep(100);
// 5.抢占失败,等待锁释放
return doSomethingsByRedisDistributedLock();
}
递归调用,可能会导致栈空间溢出。因此休眠一段时间。
这个方案会有个问题,当setnx占锁成功之后,业务代码或服务器宕机,没有执行删除锁的逻辑,则会造成死锁
。
设置锁的自动过期时间
,过一段时间后,自动删除锁,这样其他线程就能获取到锁了。
清理redis key代码如下
// 在 10s 以后,自动清理 lock
redisTemplate.expire("lock", 10, TimeUnit.SECONDS);
完整代码
// 1.先抢占锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "123");
if(lock) {
// 2.在 10s 以后,自动清理 lock
redisTemplate.expire("lock", 10, TimeUnit.SECONDS);
// 3.抢占成功,执行业务
Object object = doSomeThing();
// 4.解锁
redisTemplate.delete("lock");
return object;
}
这个方案解决了线程异常或服务器宕机造成的锁未释放的问题,但还是存在其他问题:
因为占锁和设置过期时间是分两步执行的,所以如果在这两步之间发生了异常,则锁的过期时间根本就没有设置成功。
所以和简单方案方案有一样的问题:锁永远不能过期。
原子性:多条命令要么都成功执行,要么都不执行。
将获取锁和设置锁过期时间放到一步执行。
Redis中可以这样执行:
# 设置某个 key 的值并设置多少毫秒或秒 过期。
set <key> <value> PX <多少毫秒> NX
# 或
set <key> <value> EX <多少秒> NX
与前面两种方案相比。获取锁的时候,也需要设置锁的过期时间,这是一个原子操作,要么都成功执行,要么都不执行。如下图所示:
设置 lock
的值等于 123
,过期时间为 10 秒。如果 10
秒 以后,lock 还存在,则清理 lock。
setIfAbsent("lock", "123", 10, TimeUnit.SECONDS);
这个方案也有缺陷,比如:
A 处理任务所需要的时间大于锁自动清理(开锁)的时间,所以在自动开锁后,又有其他用户抢占到了锁。当用户 A 完成任务后,会把其他用户抢占到的锁给主动打开。
上面方案的缺陷,过程看似复杂,但其实也很好解决,给每个锁设置不同编号就行了。
// 1.生成唯一 id
String uuid = UUID.randomUUID().toString();
// 2. 抢占锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 10, TimeUnit.SECONDS);
if(lock) {
System.out.println("抢占成功:" + uuid);
// 3.抢占成功,执行业务
Object object = doSomeThing();
// 4.获取当前锁的值
String lockValue = redisTemplate.opsForValue().get("lock");
// 5.如果锁的值和设置的值相等,则清理自己的锁
if(uuid.equals(lockValue)) {
System.out.println("清理锁:" + lockValue);
redisTemplate.delete("lock");
}
return object;
} else {
System.out.println("抢占失败,等待锁释放");
// 4.休眠一段时间
sleep(100);
// 5.抢占失败,等待锁释放
return doSomethingsByRedisDistributedLock();
}
此方案还是会有点问题:
第 4 步和第 5 步并不是原子性的。
上面的线程 A 查询锁和删除锁的逻辑不是原子性
的,所以将查询锁和删除锁这两步作为原子指令操作就可以了。
用脚本进行删锁,达到原子性操作。
redis中的脚本:
if redis.call("get",KEYS[1]) == ARGV[1]
then
return redis.call("del",KEYS[1])
else
return 0
end
这段脚本和上一个方案的获取key,删除key的方式很像。先获取 KEYS1 的 value,判断 KEYS1 的 value 是否和 ARGV1 的值相等,如果相等,则删除 KEYS1。
分两步执行这段脚本:先定义脚本;用 redisTemplate.execute 方法执行脚本。
// 脚本解锁
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);
上面的代码中,KEYS1 对应lock
,ARGV1 对应 uuid
,含义就是如果 lock 的 value 等于 uuid 则删除 lock。
这段 Redis 脚本是由 Redis 内嵌的 Lua 环境执行的,所以又称作 Lua 脚本。
Redisson 提供了使用 Redis的最简单和最便捷的方法。
Redisson的宗旨是促进使用者对 Redis 的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。
点击查看详细信息
Hash
,List
,Set
,String
,Geo
,HyperLogLog
等数据结构封装为Java里大家最熟悉的映射(Map)
,列表(List)
,集(Set)
,通用对象桶(Object Bucket)
,地理空间对象桶(Geospatial Bucket)
,基数估计算法(HyperLogLog)
等结构,Lock
这样的更高阶应用场景。事实上Redisson并没有不止步于此,在分布式锁的基础上还提供了联锁(MultiLock)
,读写锁(ReadWriteLock)
,公平锁(Fair Lock)
,红锁(RedLock)
,信号量(Semaphore)
,可过期性信号量(PermitExpirableSemaphore)
和闭锁(CountDownLatch)
这些实际当中对多线程高并发应用至关重要的基本部件。正是通过实现基于Redis的高阶应用方案,使Redisson成为构建分布式系统的重要工具。分布式执行服务
和分布式调度服务
里的远程任务。SpringBoot整合Redisson的示例代码可点击这里查看:cayzlh-starter
因为 Redisson 非常强大,实现分布式锁的方案非常简洁。
// 1.设置分布式锁
RLock lock = redisson.getLock("lock");
// 2.占用锁
lock.lock();
// 3.执行业务
...
// 4.释放锁
lock.unlock();
和之前 Redis 的方案相比,简洁很多。
基于 Redis 的 Redisson 分布式可重入读写锁RReadWriteLock
Java对象实现了java.util.concurrent.locks.ReadWriteLock
接口。其中读锁和写锁都继承了 RLock
接口。
写锁是一个排他锁(互斥锁),读锁是一个共享锁。
示例代码:
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
另外Redisson还通过加锁的方法提供了leaseTime
的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
上面几种方案的不断演进的过程中,知道了系统中哪些地方可能存在异常情况,以及该如何更好地进行处理。
参考资料
分享计划
博客内容将同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/
许可协议
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 许可协议,转载请注明出处。