前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >基于 Redis 实现高级限流器及其在队列任务处理中的应用

基于 Redis 实现高级限流器及其在队列任务处理中的应用

作者头像
学院君
发布于 2021-01-22 02:59:54
发布于 2021-01-22 02:59:54
1.5K00
代码可运行
举报
文章被收录于专栏:学院君的专栏学院君的专栏
运行总次数:0
代码可运行

更高级的限流器设计

上篇教程学院君给大家演示了如何通过 Redis 的字符串数据结构实现限流器,其中需要用到两个字符串键值对:一个用于设置单位时间窗口内的请求上限,另一个用于在这个时间窗口内对请求数进行统计,当请求数超出请求上限,则拒绝后续请求。

这是一个最简单的限流器实现,其原理是通过对指定时间窗口内的请求次数上限进行限定,一旦进入的请求数超出这个限制,则拒绝后续进来的请求,而不管之前进来的请求是否已经处理完毕,讲到这里,聪明的同学可能已经想到了更高级的限流器是怎么实现的 —— 那就是引入已处理请求这个变量动态统计当前限流器内的请求总量。

在这种情况下,当新请求进来后,依然会对请求总量做自增统计,所不同的是,当之前进入的请求被处理后,则释放掉这部分的请求总量。这样一来,请求总量就不再是只增不减,而是动态变化的。

这种限流器有两种实现模式,一种依然是基于时间窗口,限定请求数上限,只不过需要额外考虑已处理的请求,这就增加了限流系统实现的复杂性。另一种是不考虑时间窗口,只考虑同时支持的并发请求上限,这种情况下的请求数上限针对的是每个时间点,而不是前一种实现针对的是整个时间窗口。

两种设计能够支持的最高并发量是一致的(假设前一个版本所有请求在同一个时间点涌入),但是显然,后一种实现的限流器大大提高了系统总的吞吐量,因为请求进进出出,只要同一时间点的总数不超过上限即可,而不是单位时间内累计的总数。

如果更抽象一点看,后一种实现的限流器是基于请求进入/处理的速率,而前一种实现则只是请求进入量的简单累加,对于后一种实现而言,只要请求处理速率高于或等于进入速率,则永远不会触发请求上限,反之如果请求处理速率低于进入速率,则累计的未处理请求总量迟早会触发请求上限。显然,通过这种限流器可以更精确地控制系统的并发访问频率。

通常,请求进入的速率都是高于请求处理速率的,这是不是像极了我们日常生活中使用的上面大而粗、下面小而细的漏斗(Funnel)?

-w738

所以后一种限流器的实现算法有一个很形象的名字 —— 漏斗算法,漏斗内液体最快的流动速率就是该限流器的最高访问频率。

Redis 高级限流器的 Laravel 实现

在 Laravel 底层的 Redis 组件库中,已经通过 PHP 代码为我们实现了这两种限流器:

  • ConcurrencyLimiter 是一个基于漏斗算法实现的并发请求频率限流器;
  • DurationLimiter 则是一个基于时间窗口实现的限流器,我们在上篇教程中也演示了基于 Redis 缓存驱动实现的时间窗口限流器 RateLimiter,相较于 DurationLimiter,它是一个非常简单的实现。
限定并发请求访问上限

下面我们通过限定用户并发访问指定控制器动作的频率来演示基于漏斗算法实现的 Redis 限流器使用。

以文章浏览这个控制器动作为例,我们要实现最多支持 100 个并发请求,可以这么做:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
use Illuminate\Support\Facades\Redis;

// 浏览文章
public function show($id)
{
    // 定义一个限定并发访问频率的限流器,最多支持 100 个并发请求
    Redis::funnel("posts.${id}.show.concurrency")
        ->limit(100)
        ->then(function () use ($id) {
            // 正常访问
            $post = $this->postRepo->getById($id);
            event(new PostViewed($post));
            return "Show Post #{$post->id}, Views: {$post->views}";
        }, function () {
            // 触发并发访问上限
            abort(429, 'Too Many Requests');
        });
}

Redis::funnel 返回的是 ConcurrencyLimiter 限流器对应的构建器实例,然后我们通过 limit 方法指定并发请求上限,再通过 then 方法定义两个回调函数,第一个回调执行的是未触发并发上限时的正常业务逻辑,第二个回调执行的是触发并发上限后返回 429 响应的异常处理逻辑。

限定单位时间访问上限

如果想要使用基于时间窗口的限流器限定用户在单位时间内的请求上限,可以这么做:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 定义一个单位时间内限定请求上限的限流器,每 10 秒最多支持 100 个请求
Redis::throttle("posts.${id}.show.concurrency")
    ->allow(100)->every(10)
    ->then(function () use ($id) {
        // 正常访问
        $post = $this->postRepo->getById($id);
        event(new PostViewed($post));
        return "Show Post #{$post->id}, Views: {$post->views}";
    }, function () {
        // 触发并发访问上限
        abort(429, 'Too Many Requests');
    });

这里,我们通过 Redis::throttle 方法返回 DurationLimiter 限流器对应的构建器实例,再通过 allow 指定请求上限,通过 every 指定时间窗口,这里最高支持的并发请求也是 100,但是分散到 10 秒内的累积请求上限也是 100,所以吞吐量不及上面基于漏斗算法实现的限流器。

另外,需要注意的是不同于路由限流中间件 throttle,这里的 Redis 键不是基于用户标识的,而是基于文章 ID 标识的,所以会统计所有用户针对指定文章详情页的访问次数。

ConcurrencyLimiter 的底层实现

接下来,我们沿着 Redis::funnel 去分析 ConcurrencyLimiter 限流器的底层实现源码。

Redis 门面代理的是 RedisManager 对象实例,因此 Redis::funnel 调用的是 RedisManager 对象 connection 方法返回实例上的方法,这里我们配置的 Redis 客户端是 phpredis,所以 connection 方法返回的是 PhpRedisConnection 对象实例(相关源码请阅读 Illuminate\Redis\RedisManager 底层实现代码),因此,最终调用的也是这个对象实例上的 funnel 方法(定义在其父类 Illuminate\Redis\Connections\Connection 中):

该方法返回的是 ConcurrencyLimiter 限流器对应的构建器 ConcurrencyLimiterBuilder 对象实例,我们可以在这个构建器实例上通过 limit 方法设置并发请求频率上限,再通过 then 方法定义请求处理细节:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public function then(callable $callback, callable $failure = null)
{
    try {
        return (new ConcurrencyLimiter(
            $this->connection, $this->name, $this->maxLocks, $this->releaseAfter
        ))->block($this->timeout, $callback);
    } catch (LimiterTimeoutException $e) {
        if ($failure) {
            return $failure($e);
        }

        throw $e;
    }
}

在这里,我们会基于 Redis 连接、限流器名称、并发访问频率上限等变量值实例化 ConcurrencyLimiter 限流器实例,然后通过该实例的 block 方法传入 then 方法的第一个闭包函数定义正常请求回调代码,如果请求出现异常则执行 then 方法传入的第二个回调。

ConcurrencyLimiterblock 方法中包含了基于漏斗算法实现的限流器底层源码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public function block($timeout, $callback = null)
{
    $starting = time();

    $id = Str::random(20);

    while (! $slot = $this->acquire($id)) {
        if (time() - $timeout >= $starting) {
            throw new LimiterTimeoutException;
        }

        usleep(250 * 1000);
    }

    if (is_callable($callback)) {
        try {
            return tap($callback(), function () use ($slot, $id) {
                $this->release($slot, $id);
            });
        } catch (Exception $exception) {
            $this->release($slot, $id);

            throw $exception;
        }
    }

    return true;
}

这里,我们首先通过 acquire 方法为所有的请求槽位上锁(默认有效期是 60s):

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected function acquire($id)
{
    $slots = array_map(function ($i) {
        return $this->name.$i;
    }, range(1, $this->maxLocks));

    return $this->redis->eval(...array_merge(
        [$this->lockScript(), count($slots)],
        array_merge($slots, [$this->name, $this->releaseAfter, $id])
    ));
}

lockScript 对应的实现代码如下,就是通过一个循环结构依次为所有请求槽位上锁:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected function lockScript()
{
    return <<<'LUA'
for index, value in pairs(redis.call('mget', unpack(KEYS))) do
    if not value then
        redis.call('set', KEYS[index], ARGV[3], "EX", ARGV[2])
        return ARGV[1]..index
    end
end
LUA;
}

以后每处理完一次请求,则释放对应槽位的锁,这样,下一次请求过来就可以获取对应的锁,如果请求处理异常也释放锁。

如果用户请求进来获取锁失败,则表示所有锁都被其他请求持有,这就意味着用户并发请求已达上限,如果获取锁超时(默认超时时间是 3s),则在上一层 then 方法中捕获到超时异常,执行其第二个参数对应的异常回调函数,否则通过 usleep 方法阻塞 250ms,等待槽位空出,继续处理请求。

由于这里使用了基于 Lua 脚本的 Redis 分布式锁,所以可以保证操作的原子性。

DurationLimiter 的底层实现

基于时间窗口的限流器 DurationLimiter 对应的底层实现源码前面的流程也是和 ConcurrencyLimiter 一样的,我们直接从 then 方法开始分析,执行其第一个回调时,最终调用的是 DurationLimiterblock 方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public function block($timeout, $callback = null)
{
    $starting = time();

    // 获取锁操作
    while (! $this->acquire()) {
        // 超时抛出异常
        if (time() - $timeout >= $starting) {
            throw new LimiterTimeoutException;
        }

        // 获取失败,则阻塞 750ms 后重试
        usleep(750 * 1000);
    }

    // 获取锁成功(还未触发上限),则执行回调函数
    if (is_callable($callback)) {
        return $callback();
    }

    return true;
}

ConcurrencyLimiter 类似,这里也使用了 Redis 分布式锁来确保操作的原子性。

获取锁失败时,意味着限流器已触发请求上限,没有剩余槽位可用,这个时候会先判断是否超时(默认超时时间 3s),如果超时,抛出超时异常,否则阻塞 750ms 后重试。如果获取锁成功,意味着还没有触发请求上限,则执行上一层构建器 then 方法传入的第一个回调函数。

接下来,我们来看 acquire 方法的实现代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public function acquire()
{
    // 通过 Redis Lua 脚本设置锁,然后从返回值读取信息
    $results = $this->redis->eval(
        $this->luaScript(), 1, $this->name, microtime(true), time(), $this->decay, $this->maxLocks
    );

    // 时间窗口过期时间点(当前时间 + 传入的时间窗口大小)
    $this->decaysAt = $results[1];

    // 剩余支持的请求槽位(传入的请求上限 - 已处理请求数)
    $this->remaining = max(0, $results[2]);

    // 是否获取锁成功(基于是否还有剩余请求槽位判断)
    return (bool) $results[0];
}

这里的实现并不是基于 Redis 的字符串数据结构,而是基于 Hash 数据结构,因此更加复杂一些:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected function luaScript()
{
    return <<<'LUA'
local function reset()
    redis.call('HMSET', KEYS[1], 'start', ARGV[2], 'end', ARGV[2] + ARGV[3], 'count', 1)
    return redis.call('EXPIRE', KEYS[1], ARGV[3] * 2)
end

-- 第一次请求会初始化一个 Hash 结构作为限流器,键名是外部传入的名称,键值是包含起始时间、结束时间和请求统计数的对象
-- 返回值的第一个对应的是是否获取锁成功,即是否可以继续请求,第二个是有效期结束时间点,第三个是剩余的请求槽位数
if redis.call('EXISTS', KEYS[1]) == 0 then
    return {reset(), ARGV[2] + ARGV[3], ARGV[4] - 1}
end

-- 如果限流器已存在,并且还处于有效期对应的时间窗口内,则对请求统计数做自增操作
-- 这里,我们会限定其值不能超过请求上限,否则获取锁失败,有效期结束时间点不便,剩余槽位数=请求上限-当前请求统计数
if ARGV[1] >= redis.call('HGET', KEYS[1], 'start') and ARGV[1] <= redis.call('HGET', KEYS[1], 'end') then
    return {
        tonumber(redis.call('HINCRBY', KEYS[1], 'count', 1)) <= tonumber(ARGV[4]),
        redis.call('HGET', KEYS[1], 'end'),
        ARGV[4] - redis.call('HGET', KEYS[1], 'count')
    }
end

-- 如果限流器已过期,则和第一个请求一样,重置这个限流器,重新开始统计
return {reset(), ARGV[2] + ARGV[3], ARGV[4] - 1}
LUA;
    }
}

详细细节,我已经在通过注释标注了,从这个 Lua 脚本中会返回是否可以继续请求(是否可以获取锁)、有效期结束时间点和剩余可用请求槽位三个值,以便在 acquire 方法中使用。

可以看出,在 block 方法中获取锁成功并执行回调函数处理请求后,并没有重置剩余可用槽位和当前请求数统计,所以目前而言,这个限流器的功能和上篇教程实现的是一样的,如果触发请求上限,只能等到时间窗口结束才能继续发起请求。

不过,如果需要的话,你是可以在处理完请求后,去更新 Redis Hash 数据结构中的当前请求统计数的,只是这里没有提供这种实现罢了。

通过限流器限制队列任务处理频率

除了用于处理用户请求频率外,还可以在处理队列任务的时候使用限流器,限定队列任务的处理频率。这一点,在 Laravel 队列文档中已有体现。

PostViewsIncrement 这个队列任务为例,要限定最多支持 60 个并发处理进程,可以这么做:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public function handle()
{
    Redis::funnel('post.views.increment')
        ->limit(60)
        ->then(function () {
            // 队列任务正常处理逻辑
            if ($this->post->increment('views')) {
                Redis::zincrby('popular_posts', 1, $this->post->id);
            }
        }, function () {
            // 超出处理频率上限,延迟 60s 再执行
            $this->release(60);
        });
}

和处理路由请求不同,如果触发并发处理进程上限,则使用 release 方法延迟 60s 执行这个任务。

如果想要通过时间窗口限定处理频率,比如每分钟最多执行 60 次,可以这么做:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Redis::throttle('posts.views.increment')
    ->allow(60)->every(60)
    ->then(function () {
        // 队列任务正常处理逻辑
        if ($this->post->increment('views')) {
            Redis::zincrby('popular_posts', 1, $this->post->id);
        }
    }, function () {
        // 超出处理频率上限,延迟60s再执行
        $this->release(60);
    });

如果你已经深刻洞悉对应 Redis 限流器底层的执行逻辑和实现原理,就可以驾轻就熟地使用这两个限流器来解决系统的并发瓶颈问题或者有效抵御垃圾流量。关于 Redis 限流器我们就简单介绍到这里,下一篇教程,我们一起来看看如何通过 Redis 实现用户 UV 统计功能。

本系列教程首发在Laravel学院(laravelacademy.org)

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 极客书房 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
Git撤销&回滚操作(git reset 和 get revert)
俗话说,老虎也有打盹的时候。我们提交代码,也会有出错的时候。 我今天不小心把不该提交的文件给提交了。
赵云龙龙
2020/02/13
38.4K0
Git撤销&回滚操作(git reset 和 get revert)
使用 Git,10个最需要常备的后悔药
https://segmentfault.com/a/1190000022951517
前端老道
2020/07/07
1K0
使用 Git,10个最需要常备的后悔药
Git知识总览(五) Git中的merge、rebase、cherry-pick以及交互式rebase
上篇博客聊了《git分支管理之rebase 以及 cherry-pick相关操作》本篇博客我们就以Learning Git中的关卡进行展开。下方列举了LearningGit中的 merge、rebase、reset、revert、cherry-pick 以及交互式rebase相关关卡的操作以及对应的解析。后边在聊交互式rebase操作是,不单单给出了LearningGit中的内容,而且给出了真正的Git分支在交互式rebase操作时的具体案例。 learngitbranching的地址为:https://l
lizelu
2018/03/28
16K0
Git知识总览(五) Git中的merge、rebase、cherry-pick以及交互式rebase
珍藏多年的 Git 问题和操作清单
本文整理自工作多年以来遇到的所有 Git 问题汇总,之前都是遗忘的时候去看一遍操作,这次重新整理了一下,发出来方便大家收藏以及需要的时候查找答案。
猿天地
2019/09/03
1.4K0
珍藏多年的 Git 问题和操作清单
GIT常用指令
注:并不是修改上一次commit,而是生成新的commit取代上一次commit。
Careteen
2022/02/14
4220
GIT常用指令
一文读懂Git
版本控制就是记录项目文件的历史变化。它为我们查阅日志,回退,协作等方面提供了有力的帮助。
onlythinking
2020/06/02
6600
一文读懂Git
学会这 11 条,你离 Git 大神就不远了!
支持使用 merge 的开发者,他们认为仓库的提交历史就是记录实际发生过什么,它是针对于历史的一个文档,本身其实是有价值的,我们不应该随意修改。我们改变历史的话,就相当于使用“谎言”来掩盖实际发生过的事情,而这些痕迹是应该被保留的。可能,这样并不是很好。
民工哥
2021/04/21
3620
Git 修改已提交 commit 的信息
由于 Github 和公司 Git 使用账号不一样,偶尔没注意,提交出错后就需要修改 commit 信息。
叨叨软件测试
2020/10/27
107.7K0
常用的Git Tips
一、Configuration:配置 列举所有的别名与配置 git config --list Git 别名配置 git config --global alias. git config --global alias.st status 设置git为大小写敏感 git config --global core.ignorecase false 二、Help:常用的辅助查询命令 在git 命令行里查看everyday git git help everyday 显示git常用的帮助命令 git help -g 获取Git Bash的自动补全 ~/.git-completion.bash && echo '[ -f ~/.git-completion.bash ] && . ~/.git-completion.bash' >> ~/.bashrc 设置自动更正 git config --global help.autocorrect 1 三、Remote:远端仓库配置 获取所有远端引用配置 git remote 或者 git remote show 修改某个远端的地址 git remote set-url origin URL Repo 查看当前仓库中的所有未打包的objects和磁盘占用 git count-objects --human-readable 从object数据库中删除所有不可达的object git gc --prune=now --aggressive 四、文件类操作 ,Cache:缓存,Track:文件追踪, 展示所有被追踪的文件 git ls-files -t 展示所有未被追踪的分支 git ls-files --others 展示所有被忽略的文件 git ls-files --others -i --exclude-standard git check-ignore* git status --ignored Manipulation:操作 停止追踪某个文件但是不删除它 git rm --cached <file_path 或者 git rm --cached -r <directory_path 强制删除未被追踪的文件或者目录 git clean -f git clean -f -d git clean -df 清空.gitignore git clean -X -f Changes:修改 Info:信息查看 查看上次提交之后的未暂存文件 git diff 查看准备用于提交的暂存了的修改的文件 git diff --cached 显示所有暂存与未暂存的文件 git diff HEAD 查看最新的文件版本与Stage中区别 git diff --staged dd:追踪某个修改,准备提交 Stage某个文件的部分修改而不是全部 git add -p Reset:修改重置 以HEAD中的最新的内容覆盖某个本地文件的修改 git checkout -- <file_name> Stash:贮存 Info:信息查看 展示所有保存的Stashes git stash list Manipulation:操作 Save:保存 保存当前追踪的文件修改状态而不提交,并使得工作空间恢复干净 git stash 或者 git stash save 保存所有文件修改,包括未追踪的文件 git stash save -u 或者 git stash save --include-untracked Apply:应用 应用任何的Stash而不从Stash列表中删除 git stash apply <stash@{n}> 应用并且删除Stash列表中的最后一个 git stash pop 或者 git stash apply stash@{0} && git stash drop stash@{0} 删除全部存储的Stashes git stash clear 或者 git stash drop <stash@{n}> 从某个Stash中应用单个文件 git checkout <stash@{n}> -- <file_path> 或者 git checkout stash@{0} -- <file_path> Commit:提交 检索某个提交的Hash值 git rev-list --reverse HEAD | head -1 Info:信息查看 List:Commit列表 查看自Fork Master以来的全部提交 git log --no-merges --stat --reverse master.. 展示当前分支中所有尚未合并到Master中的提交 git cherry -v master 或者 git cherry -v master <branch-to-be-merged> 可视化地查看整个Version树
竹清
2018/08/31
7120
我在工作中是如何使用Git的
最近在网上有个真实发生的案例比较火,说的是一个新入职的员工,不会用 Git 拉代码,第二天被开除。由此,可见 Git 对我们工作的重要性,无论是前端后端,都是离不开 Git 的,下面就让我们一探究竟吧。
政采云前端团队
2021/07/19
1.9K0
git使用步骤_小猪酸奶的使用步骤
接触Git也些年头了,对于Git的使用也算是略有心得,想着 出于自己日后回顾,也便于他人查阅学习的目的,遂有此文, 相信看完此文你的Git使用会更进一步,谢谢~
全栈程序员站长
2022/11/08
1.1K0
git使用步骤_小猪酸奶的使用步骤
git 奇技淫巧
和 revert 的区别:reset 命令会抹去某个 commit id 之后的所有 commit
s_在路上
2018/09/30
6600
一些常用的 Git 进阶知识与技巧
假设我们在同一电脑上拥有多个 Git 账号,例如公司内部使用的是 Gitlab,个人使用的是 Github 或者 Gitee。那就会遇到一种情况,上班时想给个人开源项目提交代码,但是 Git 绑定的是公司的账号。
谭光志
2022/03/24
4420
一些常用的 Git 进阶知识与技巧
【linux命令讲解大全】013.Git:分布式版本控制系统的先驱和常用命令清单(二)
我还遇到了如下面错误,lab默认给master分支加了保护,不允许强制覆盖。Project(项目)->Setting->Repository 菜单下面的Protected branches把master的保护去掉就可以了。修改完之后,建议把master的保护再加回来,毕竟强推不是件好事。
全栈若城
2024/03/02
1140
十分钟了解 git 那些 “不常用” 命令
链接:https://segmentfault.com/a/1190000022107836
好好学java
2020/03/31
5020
代码管理工具的扛把子-Git
但是最近小❤发现很多人(包括我自己)只熟悉日常代码的拉取和提交,连 git revert/rebase 都不知道怎么用,太尴尬了 T.T
xin猿意码
2023/10/18
3430
代码管理工具的扛把子-Git
Git 实用指南
Git 是一个分布式的版本控制工具,因此远程和本地可以视为两个独立的 Git 仓库。上图是一张经典的 Git 中的数据流与存储级别的介绍,其中储存级别主要包含几部分:
grain先森
2019/05/06
6590
Git 实用指南
深入理解 git 一切皆 commit
git 入门教程推荐: 简介 - Git教程 - 廖雪峰的官方网站 Git入门图文教程(1.5W字40图)🔥🔥—深入浅出、图文并茂 - 安木夕 - 博客园
jgrass
2024/12/25
970
深入理解 git 一切皆 commit
相关推荐
Git撤销&回滚操作(git reset 和 get revert)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档