在大多数抓取项目里,任务去重看上去是个再普通不过的小功能。可当采集规模一旦上到成千上万条请求,它的影响就不止是“多爬了几次网页”这么简单,而是直接关系到 系统性能、代理池成本、甚至数据准确性。
在软件架构的世界里,这类问题其实早就有对应的思路,那就是 责任链模式(Chain of Responsibility)。
想象一下,你在流水线上检验产品。每一层质检员都负责不同的环节,有人检查尺寸,有人看外观,有人测性能。任务从一头进来,层层检查、层层传递。
在我们的抓取系统里,请求就像那条流水线上的产品,而每个“质检员”——也就是中间件——都承担不同的逻辑,比如:
这样的解耦让整个系统更干净、更有条理,也方便以后扩展或替换任何一个模块。
如果你做过分布式采集,肯定见过那种“任务爆炸”的场面:
同一个关键词被多个节点同时抓,结果浪费了带宽、代理、CPU,还导致数据库里一堆重复数据。
要解决这个问题,单纯在抓取逻辑里加个 if not in 远远不够。
更优雅的方式,是借助设计模式,把复杂逻辑拆分成一组能协同工作的“模块”,各司其职。
比如可以这么类比:
模块 | 对应的设计模式 | 作用 |
|---|---|---|
去重中间件 | 单例 + 责任链 | 负责判定请求是否重复,并阻止重复抓取 |
分发中间件 | 策略模式 | 根据关键词、代理负载等条件分配任务 |
调度控制器 | 观察者模式 | 实时监控任务完成率,并动态调整策略 |
这样的设计并不只是让代码更“学术”,它能让系统在面对复杂情况时更稳、更灵活。
比如,一个节点挂了,任务可以自动被分发到其他代理上;某个关键词短时间内重复提交,也会被过滤掉。
说理论容易,关键是怎么落地。
我们可以用 eBay 的商品搜索页面作为实验场景,试着让一个简单的抓取具备“去重 + 分发”的能力。
它的运行逻辑大致是这样:
代码如下:
import hashlib
import json
import random
import requests
from bs4 import BeautifulSoup
from urllib.parse import quote
# ========= 代理配置(亿牛云示例) =========
PROXY_HOST = "proxy.16yun.com"
PROXY_PORT = "3100"
PROXY_USER = "16YUN"
PROXY_PASS = "16IP"
def get_proxy():
"""生成带认证的代理配置"""
return {
"http": f"http://{PROXY_USER}:{PROXY_PASS}@{PROXY_HOST}:{PROXY_PORT}",
"https": f"http://{PROXY_USER}:{PROXY_PASS}@{PROXY_HOST}:{PROXY_PORT}"
}
# ========= 去重中间件 =========
class DeduplicationMiddleware:
"""任务去重逻辑"""
def __init__(self):
self.visited = set()
def is_duplicate(self, url):
key = hashlib.md5(url.encode('utf-8')).hexdigest()
if key in self.visited:
return True
self.visited.add(key)
return False
# ========= 分发中间件 =========
class DistributionMiddleware:
"""根据任务特征选择代理"""
def select_proxy(self, keyword):
# 简单策略:根据关键词长度动态分配
return get_proxy()
# ========= 爬虫核心逻辑 =========
class EbaySpider:
def __init__(self, keywords):
self.keywords = keywords
self.dedup = DeduplicationMiddleware()
self.dist = DistributionMiddleware()
self.headers = {
"User-Agent": random.choice([
"Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)"
]),
"Accept-Language": "en-US,en;q=0.9"
}
def fetch(self, keyword):
search_url = f"https://www.ebay.com/sch/i.html?_nkw={quote(keyword)}"
if self.dedup.is_duplicate(search_url):
print(f"跳过重复任务:{keyword}")
return
proxies = self.dist.select_proxy(keyword)
try:
response = requests.get(search_url, headers=self.headers, proxies=proxies, timeout=10)
response.raise_for_status()
self.parse(response.text, keyword)
except Exception as e:
print(f"抓取失败:{keyword} -> {e}")
def parse(self, html, keyword):
soup = BeautifulSoup(html, "html.parser")
items = soup.select(".s-item")
for item in items:
title = item.select_one(".s-item__title")
price = item.select_one(".s-item__price")
location = item.select_one(".s-item__location")
time = item.select_one(".s-item__listingDate")
data = {
"keyword": keyword,
"title": title.text.strip() if title else None,
"price": price.text.strip() if price else None,
"location": location.text.strip() if location else None,
"time": time.text.strip() if time else None
}
print(json.dumps(data, ensure_ascii=False))
def run(self):
for kw in self.keywords:
self.fetch(kw)
# ========= 运行入口 =========
if __name__ == "__main__":
keywords = ["laptop", "headphones", "watch", "camera"]
spider = EbaySpider(keywords)
spider.run()运行之后,你会发现它的行为更像一个有判断力的系统:
不会重复抓取同一个搜索词,也会在代理之间智能分配流量。
写中间件的过程,某种程度上像是在给系统装上“神经系统”。
它能判断、能决策、还能不断学习。
对于采集来说,这种能力非常宝贵,因为网络环境变化太快,网站结构、反爬机制、请求参数都在不停调整。
如果系统本身能通过中间件层做策略分离,就能在修改逻辑时保持底层稳定。
更深层的意义在于——
我们开始不只是“写代码去抓数据”,而是 在构建一个有调度、有反馈、有演化能力的数据系统。
去重,是让系统不浪费;分发,是让资源更高效;中间件,就是让这一切井然有序的关键角色。
中间件的设计,不只是写几行逻辑判断,更是一种系统思维的体现。
当我们用设计模式去思考抓取问题,代码就从“能跑”变成了“能成长”。
很多人觉得抓取只是技术活,但其实做得久了你会发现,它更像一门关于 秩序与复杂性管理的艺术。
而中间件——正是维持这种秩序的那双看不见的手。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。