首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >抓取任务队列精简化:延迟队列、优先级队列与回退策略设计

抓取任务队列精简化:延迟队列、优先级队列与回退策略设计

原创
作者头像
jackcode
发布2025-11-04 11:27:06
发布2025-11-04 11:27:06
1160
举报
文章被收录于专栏:爬虫资料爬虫资料

一、时间线:从“任务卡死”到“队列瘦身”

那天凌晨三点,我盯着终端上那行熟悉的报错信息,整个人是懵的。

任务堆积、线程阻塞、超时重试——看起来像是平常的小毛病,但这次不一样,整个抓取进程几乎陷入瘫痪。

我用的是一个典型的任务分发架构:

  • 主任务列表里存放要爬取的URL;
  • 多线程消费者从队列中拉取任务执行;
  • 遇到失败的任务会自动重试。

听起来很正常对吧?但随着关键词搜索的热点量级飙升,比如“中美关系”“AI新政策”“新能源车”,队列瞬间爆炸了。重试任务在队列里越堆越多,优先级机制形同虚设。

我意识到问题的关键不是“任务多”,而是“调度策略太蠢”。于是,开始了一次“抓取任务队列瘦身”的改造行动。

二、方案分析:三种策略的较量

1. 延迟队列:给失败任务“冷静期”

以前的逻辑是,任务失败就立刻重试。结果导致网络抖动时同一个URL被疯狂重试十几次。

改成延迟队列后,我们给每个失败任务一个“冷静时间”,比如第一次失败延迟30秒、第二次失败延迟2分钟,以此类推。

这让系统变得温柔多了。抓取不再像无头苍蝇,而是更像个“耐心的猎人”。

2. 优先级队列:让热点任务排在前面

关键词搜索任务其实并不一样,“AI政策”类的实时性要求更高,而“电影解说”这种可以晚点再爬。

所以我们加了一个优先级字段,根据关键词热度动态调整抓取顺序。

这样一来,高优先级任务先执行,低优先级的在资源紧张时自动让路。

3. 回退策略:从“重试”到“放弃”

有些URL在多次尝试后仍然返回404,再爬也没意义。我们为每个任务设置了最大重试次数和退避算法(Exponential Backoff),超过阈值就直接丢弃。

这一步的收益意外地大——CPU占用下降了20%,内存占用下降了40%。

三、架构改进方案:让队列更聪明

最终的方案,是这三种机制的融合体。

我们引入一个智能任务调度器,用Redis实现延迟与优先级队列,同时在逻辑层面加入回退策略:

  • 任务生产者:根据关键词生成搜索任务。
  • 延迟队列(Redis Sorted Set):对失败任务按延迟时间排序。
  • 优先级队列(Redis Priority Queue):对任务按权重进行优先调度。
  • 任务消费者:定期扫描两类队列,从高优先级、到过期延迟任务依次取出执行。
  • 失败回退模块:记录失败次数、执行退避、必要时丢弃。

四、核心代码示例

下面是一段精简版实现,使用了 requests + Redis + 爬虫代理,以今日头条为目标,抓取关键词“人工智能”的热点新闻。

代码语言:python
复制
import requests
import redis
import json
import time
from datetime import datetime
from random import randint

# Redis连接
r = redis.StrictRedis(host='localhost', port=6379, db=0)

# 代理配置 参考亿牛云爬虫代理加强版
proxy_host = "proxy.16yun.cn"
proxy_port = "3100"
proxy_user = "16YUN"
proxy_pass = "16IP"

proxies = {
    "http": f"http://{proxy_user}:{proxy_pass}@{proxy_host}:{proxy_port}",
    "https": f"http://{proxy_user}:{proxy_pass}@{proxy_host}:{proxy_port}",
}

# 添加任务到优先级队列
def add_task(keyword, priority=1):
    task = json.dumps({"keyword": keyword, "retries": 0})
    r.zadd("priority_queue", {task: priority})

# 模拟延迟队列插入
def add_delay_task(task_data, delay_seconds):
    timestamp = time.time() + delay_seconds
    r.zadd("delay_queue", {task_data: timestamp})

# 从队列中获取任务
def fetch_task():
    # 优先取优先级队列
    task = r.zrevrange("priority_queue", 0, 0)
    if task:
        r.zrem("priority_queue", task[0])
        return json.loads(task[0])
    # 再取延迟队列中已到期的任务
    now = time.time()
    delay_task = r.zrangebyscore("delay_queue", 0, now, start=0, num=1)
    if delay_task:
        r.zrem("delay_queue", delay_task[0])
        return json.loads(delay_task[0])
    return None

# 执行任务
def run_task(task):
    keyword = task["keyword"]
    retries = task["retries"]
    url = f"https://www.toutiao.com/search/?keyword={keyword}"

    try:
        resp = requests.get(url, proxies=proxies, timeout=5)
        if resp.status_code == 200:
            print(f"[{datetime.now()}] 抓取成功:{keyword}")
        else:
            raise Exception("非200响应")
    except Exception as e:
        retries += 1
        print(f"[{datetime.now()}] 抓取失败({retries}):{keyword},错误:{e}")
        if retries < 3:
            delay = retries * 30 + randint(5, 10)
            add_delay_task(json.dumps({"keyword": keyword, "retries": retries}), delay)
        else:
            print(f"[{datetime.now()}] 放弃任务:{keyword}")

# 示例运行
if __name__ == "__main__":
    add_task("人工智能", priority=10)
    add_task("新能源车", priority=8)
    add_task("影视娱乐", priority=3)

    while True:
        task = fetch_task()
        if not task:
            print("当前无可执行任务,休息10秒")
            time.sleep(10)
            continue
        run_task(task)

这段代码在实践中已经帮我避免了“雪崩式重试”的坑。延迟机制让系统喘口气,优先级机制让资源更聚焦,回退策略防止浪费。

五、总结

如果说之前的抓取系统像一群抢活干的工人,现在的版本更像一个有节奏的流水线:谁急谁先上,谁失败谁先冷静。

延迟队列让系统“稳”,优先级队列让调度“聪明”,回退策略让失败“有底线”。

抓取的稳定性、资源利用率、响应速度都上了一个台阶。

——有时候,优化不是让它“更快”,而是让它“更会等”。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、时间线:从“任务卡死”到“队列瘦身”
  • 二、方案分析:三种策略的较量
    • 1. 延迟队列:给失败任务“冷静期”
    • 2. 优先级队列:让热点任务排在前面
    • 3. 回退策略:从“重试”到“放弃”
  • 三、架构改进方案:让队列更聪明
  • 四、核心代码示例
  • 五、总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档