那天凌晨三点,我盯着终端上那行熟悉的报错信息,整个人是懵的。
任务堆积、线程阻塞、超时重试——看起来像是平常的小毛病,但这次不一样,整个抓取进程几乎陷入瘫痪。
我用的是一个典型的任务分发架构:
听起来很正常对吧?但随着关键词搜索的热点量级飙升,比如“中美关系”“AI新政策”“新能源车”,队列瞬间爆炸了。重试任务在队列里越堆越多,优先级机制形同虚设。
我意识到问题的关键不是“任务多”,而是“调度策略太蠢”。于是,开始了一次“抓取任务队列瘦身”的改造行动。
以前的逻辑是,任务失败就立刻重试。结果导致网络抖动时同一个URL被疯狂重试十几次。
改成延迟队列后,我们给每个失败任务一个“冷静时间”,比如第一次失败延迟30秒、第二次失败延迟2分钟,以此类推。
这让系统变得温柔多了。抓取不再像无头苍蝇,而是更像个“耐心的猎人”。
关键词搜索任务其实并不一样,“AI政策”类的实时性要求更高,而“电影解说”这种可以晚点再爬。
所以我们加了一个优先级字段,根据关键词热度动态调整抓取顺序。
这样一来,高优先级任务先执行,低优先级的在资源紧张时自动让路。
有些URL在多次尝试后仍然返回404,再爬也没意义。我们为每个任务设置了最大重试次数和退避算法(Exponential Backoff),超过阈值就直接丢弃。
这一步的收益意外地大——CPU占用下降了20%,内存占用下降了40%。
最终的方案,是这三种机制的融合体。
我们引入一个智能任务调度器,用Redis实现延迟与优先级队列,同时在逻辑层面加入回退策略:
下面是一段精简版实现,使用了 requests + Redis + 爬虫代理,以今日头条为目标,抓取关键词“人工智能”的热点新闻。
import requests
import redis
import json
import time
from datetime import datetime
from random import randint
# Redis连接
r = redis.StrictRedis(host='localhost', port=6379, db=0)
# 代理配置 参考亿牛云爬虫代理加强版
proxy_host = "proxy.16yun.cn"
proxy_port = "3100"
proxy_user = "16YUN"
proxy_pass = "16IP"
proxies = {
"http": f"http://{proxy_user}:{proxy_pass}@{proxy_host}:{proxy_port}",
"https": f"http://{proxy_user}:{proxy_pass}@{proxy_host}:{proxy_port}",
}
# 添加任务到优先级队列
def add_task(keyword, priority=1):
task = json.dumps({"keyword": keyword, "retries": 0})
r.zadd("priority_queue", {task: priority})
# 模拟延迟队列插入
def add_delay_task(task_data, delay_seconds):
timestamp = time.time() + delay_seconds
r.zadd("delay_queue", {task_data: timestamp})
# 从队列中获取任务
def fetch_task():
# 优先取优先级队列
task = r.zrevrange("priority_queue", 0, 0)
if task:
r.zrem("priority_queue", task[0])
return json.loads(task[0])
# 再取延迟队列中已到期的任务
now = time.time()
delay_task = r.zrangebyscore("delay_queue", 0, now, start=0, num=1)
if delay_task:
r.zrem("delay_queue", delay_task[0])
return json.loads(delay_task[0])
return None
# 执行任务
def run_task(task):
keyword = task["keyword"]
retries = task["retries"]
url = f"https://www.toutiao.com/search/?keyword={keyword}"
try:
resp = requests.get(url, proxies=proxies, timeout=5)
if resp.status_code == 200:
print(f"[{datetime.now()}] 抓取成功:{keyword}")
else:
raise Exception("非200响应")
except Exception as e:
retries += 1
print(f"[{datetime.now()}] 抓取失败({retries}):{keyword},错误:{e}")
if retries < 3:
delay = retries * 30 + randint(5, 10)
add_delay_task(json.dumps({"keyword": keyword, "retries": retries}), delay)
else:
print(f"[{datetime.now()}] 放弃任务:{keyword}")
# 示例运行
if __name__ == "__main__":
add_task("人工智能", priority=10)
add_task("新能源车", priority=8)
add_task("影视娱乐", priority=3)
while True:
task = fetch_task()
if not task:
print("当前无可执行任务,休息10秒")
time.sleep(10)
continue
run_task(task)这段代码在实践中已经帮我避免了“雪崩式重试”的坑。延迟机制让系统喘口气,优先级机制让资源更聚焦,回退策略防止浪费。
如果说之前的抓取系统像一群抢活干的工人,现在的版本更像一个有节奏的流水线:谁急谁先上,谁失败谁先冷静。
延迟队列让系统“稳”,优先级队列让调度“聪明”,回退策略让失败“有底线”。
抓取的稳定性、资源利用率、响应速度都上了一个台阶。
——有时候,优化不是让它“更快”,而是让它“更会等”。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。