过完年后,更新博客的热情逐渐被备战暑期实习的焦虑感没过了,今天写项目时上网搜集资料实现了一版自动续期机制的Redis分布式锁,在这里记录巩固一下
Redis我们日常开发经常使用,而分布式锁的一个重要实现就是通过Redis完成,分布式锁要解决的核心问题是防止对某个资源进行重复或者过度请求,例如我们在分布式系统中创建订单之前,必须获取分布式锁才能创建订单,其要解决的主要问题有两个:
1)如果用户重复点击提交订单按钮,可以通过分布式锁避免重复创建订单
2)商品库存有限,通过分布式锁能够解决超卖问题
而分布式锁的实现也有很多种方法,例如:
数据库中有行级锁、表级锁等等,这里举一个轻量级的分布式调度框架xxl-job,它在分布式环境下的调度就是通过获取行级锁,也就是排它锁来实现的:
SELECT * FROM xxl_job_lock WHERE lock_name = 'schedule_lock' FOR UPDATE;
由于Redis中set nx
命令的原子性,只有在键值不存在的时候才能设置值,因此可以通过set nx
实现分布式锁,但是它有弊端就是:
如果业务还没有执行完,那么会导致业务没有执行完,锁就被释放了
因此延伸出了基于Redis延伸的Redisson分布式锁框架,它实现的原理在于使用Redis单线程模型执行SET NX命令lua脚本确保获取锁操作原子性,同时它内置了更为丰富的看门狗机制,满足了在业务执行过程中,自动续期锁
而上面都是基于Redis单一节点分配锁的情况,如果在主从架构下的Redis,主节点获取锁的过程中发生了宕机,这时从节点升级为了主节点,但是此时主节点没有获得锁,就可能导致多个节点同时获得锁的情况,对此就又延伸出了Red Lock(红锁),而红锁的获取就需要客户端依次向每个Redis实例获取锁,只有获得锁的数量超过了半数才说明获取锁成功
ZooKeeper是分布式的应用程序服务,它可以被用作注册中心,也可以用来实现分布式锁,由于ZooKeeper以节点(node)的形式存储数据,因此它的分布式锁主要基于其临时顺序节点实现
当多个客户端同时创建一个父节点下面的临时顺序节点是,ZooKeeper会为每个客户端分配一个唯一的顺序编号,如果客户端检查自己的结点编号是最小的结点编号,那么则获取到了锁,否则就需要等待锁的释放
上面都是基于理论介绍,接下来就实现基于Redis的分布式锁
在实现之前,我们要明确加锁、释放锁与续期锁的机制:
加锁:设置键值与过期时间,通过setnx px
实现,如果键值已经存在直接返回错误
释放锁:我们一定要确保释放的锁是自己加的锁,因此要判断value值是否是之前设置的value值,只有判断正确才能够释放锁
续期锁:与释放锁同理,只有当前锁是自己加的锁才续期
因此我们发现,释放锁与续期锁是两阶段操作,在高并发环境下可能出现错误,因此,加锁我们采用直接执行命令实现,释放锁与续期锁采用Redis执行lua脚本实现
const (
// 解锁lua
unLockScript = "if redis.call('get', KEYS[1]) == ARGV[1] " +
"then redis.call('del', KEYS[1]) return 1 " +
"else " +
"return 0 " +
"end"
// 看门狗lua
watchLogScript = "if redis.call('get', KEYS[1]) == ARGV[1] " +
"then return redis.call('expire', KEYS[1], ARGV[2]) " +
"else " +
"return 0 " +
"end"
)
定义的锁结构如下:
type DistributeRedisLock struct {
redis *Redis.Client //redis客户端
ctx context.Context //上下文信息
cancelFunc context.CancelFunc //取消上下文信息,停止使用这个信息的线程
key string //键
value string //值
expireTime time.Duration //过期时间
status bool //是否获得锁
waitTime time.Duration //重试时间
}
获取锁代码:
func (d *DistributeRedisLock) TryLock() (bool, error) {
if err := d.Lock(); err != nil {
return false, err
}
d.status = true
go d.Watchdog()
return true, nil
}
func (d *DistributeRedisLock) Lock() error {
now := time.Now()
for time.Since(now) < d.waitTime {
isLock, err := d.redis.SetNX(d.ctx, d.key, d.value, d.expireTime).Result()
if err != nil {
return err
}
if !isLock {
time.Sleep(100 * time.Millisecond)
} else {
return nil
}
}
return errors.New("lock timeout")
}
获取锁时,增加重试功能,等待时间可以默认设置为500ms,确保不是因为服务器抖动导致获取锁失败
func (d *DistributeRedisLock) Watchdog() {
// 创建一个定时器NewTicker, 每过期时间的2分之1触发一次
loopTime := time.Duration(d.expireTime*1000*1/2) * time.Millisecond
expTicker := time.NewTicker(loopTime)
for {
select {
case <-d.ctx.Done():
return
case <-expTicker.C:
if d.status {
args := []interface{}{d.key, d.value}
res, err := d.redis.EvalSha(d.ctx, watchLogScript, []string{d.key}, args...).Result()
if err != nil {
log.Debugf(d.ctx, "redis eval error: %v", err)
return
}
r, ok := res.(int64)
if !ok || r == 0 {
log.Debugf(d.ctx, "redis eval error: %v", res)
return
}
}
}
}
}
通过定时器,每隔一半的过期时间就对锁续期,这个时间可以设置为其他,在Redisson里面这个值被设为1/3
func (d *DistributeRedisLock) Unlock() (bool, error) {
d.cancelFunc()
if d.status {
err := d.redis.Eval(context.Background(), unLockScript, []string{d.key}, []string{d.value}).Err()
if err != nil {
return false, err
}
d.status = false
return true, nil
}
return false, errors.New("unlock error")
}
释放锁时,一个关键点在于要将cancelFunc
操作放到释放锁之前执行,否则可能出现释放锁失败,但是还执行看门狗机制续约的流程
完整代码如下:
package redis
import (
"context"
"errors"
"google.golang.org/appengine/log"
"time"
Redis "github.com/redis/go-redis/v9"
)
const (
// 解锁lua
unLockScript = "if redis.call('get', KEYS[1]) == ARGV[1] " +
"then redis.call('del', KEYS[1]) return 1 " +
"else " +
"return 0 " +
"end"
// 看门狗lua
watchLogScript = "if redis.call('get', KEYS[1]) == ARGV[1] " +
"then return redis.call('expire', KEYS[1], ARGV[2]) " +
"else " +
"return 0 " +
"end"
)
type DistributeRedisLock struct {
redis *Redis.Client
ctx context.Context
cancelFunc context.CancelFunc
key string
value string
expireTime time.Duration
status bool
waitTime time.Duration
}
func (d *DistributeRedisLock) TryLock() (bool, error) {
if err := d.Lock(); err != nil {
return false, err
}
d.status = true
go d.Watchdog()
return true, nil
}
func (d *DistributeRedisLock) Lock() error {
now := time.Now()
for time.Since(now) < d.waitTime {
isLock, err := d.redis.SetNX(d.ctx, d.key, d.value, d.expireTime).Result()
if err != nil {
return err
}
if !isLock {
time.Sleep(100 * time.Millisecond)
} else {
return nil
}
}
return errors.New("lock timeout")
}
func (d *DistributeRedisLock) Watchdog() {
// 创建一个定时器NewTicker, 每过期时间的2分之1触发一次
loopTime := time.Duration(d.expireTime*1000*1/2) * time.Millisecond
expTicker := time.NewTicker(loopTime)
for {
select {
case <-d.ctx.Done():
return
case <-expTicker.C:
if d.status {
args := []interface{}{d.key, d.value}
res, err := d.redis.EvalSha(d.ctx, watchLogScript, []string{d.key}, args...).Result()
if err != nil {
log.Debugf(d.ctx, "redis eval error: %v", err)
return
}
r, ok := res.(int64)
if !ok || r == 0 {
log.Debugf(d.ctx, "redis eval error: %v", res)
return
}
}
}
}
}
func (d *DistributeRedisLock) Unlock() (bool, error) {
d.cancelFunc()
if d.status {
err := d.redis.Eval(context.Background(), unLockScript, []string{d.key}, []string{d.value}).Err()
if err != nil {
return false, err
}
d.status = false
return true, nil
}
return false, errors.New("unlock error")
}
func NewDistributeRedisLock(key string, expireTime time.Duration, value string, waitTime ...time.Duration) *DistributeRedisLock {
wait := time.Second * 3
if len(waitTime) > 0 {
wait = waitTime[0]
}
ctx, cancelFunc := context.WithCancel(context.Background())
return &DistributeRedisLock{
redis: DB(),
key: key,
value: value,
expireTime: expireTime,
waitTime: wait,
ctx: ctx,
cancelFunc: cancelFunc,
}
}
自己通过Go语言实现分布式锁还是很有趣的,但是肯定还有可以优化的地方,希望对你有所帮助!!!
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。