在当今的大数据时代,单机爬虫的能力已远远无法满足海量数据采集的需求。分布式爬虫通过将爬取任务分发到多台机器(节点)上并行执行,极大地提升了效率和规模。然而,这种强大的能力也带来了新的挑战:如何避免因并发过高而给目标网站带来过大压力?如何防止所有节点因使用同一IP池而导致整个集群被大规模封禁?
解决这些问题的核心,在于实施有效的全局请求间隔协调与IP轮换策略。这正是分布式爬虫区别于单机爬虫、能否稳定、高效、友好运行的关键。
在单机爬虫中,我们通常设置一个固定的 DOWNLOAD_DELAY
或使用自动限速(AutoThrottle)来控制请求频率。这套机制在该台机器内部运作良好。
但在分布式环境中,如果每台机器都只管理自己的请求频率,就会出现严重问题:
10 / 2秒 = 5次/秒
,这很可能超过网站的容忍阈值,导致所有节点的IP都被封禁。因此,我们必须引入一个全局协调中心,来统一管理和调度所有节点的请求行为,确保从整个集群的视角来看,请求频率和IP使用是合规、合理的。
一个典型的具备全局协调能力的分布式爬虫系统通常包含以下组件:
我们以最常用的 Redis
作为全局状态中心,Scrapy
作为爬虫框架来阐述实现过程。
目标:确保对同一域名 www.example.com
的请求,无论来自哪个节点,间隔都不小于 N
秒。
原理:在Redis中为每个域名设置一个最后请求时间戳。任何节点在执行请求前,需要检查当前时间与Redis中记录的最后请求时间的差值,如果小于设定的间隔 N
,则需等待至间隔期满,才能执行请求并更新最后请求时间。
实现(使用Scrapy中间件)
# middlewares.py
import redis
import time
from scrapy import signals
from scrapy.downloadermiddlewares.httpproxy import HttpProxyMiddleware
class GlobalThrottleMiddleware:
"""
全局分布式频率控制中间件
"""
def __init__(self, redis_host, redis_port, redis_db, default_delay=2.0):
self.redis_client = redis.StrictRedis(
host=redis_host, port=redis_port, db=redis_db, decode_responses=True
)
self.default_delay = default_delay # 全局默认请求间隔,单位秒
@classmethod
def from_crawler(cls, crawler):
s = cls(
redis_host=crawler.settings.get('REDIS_HOST', 'localhost'),
redis_port=crawler.settings.get('REDIS_PORT', 6379),
redis_db=crawler.settings.get('REDIS_DB', 0),
default_delay=crawler.settings.get('GLOBAL_DOWNLOAD_DELAY', 2.0)
)
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
def process_request(self, request, spider):
# 获取请求的域名(或IP)作为Redis键的一部分
domain = request.url.split('/')[2]
redis_key = f"global_throttle:{domain}"
# 使用Redis的pipeline保证原子性操作
with self.redis_client.pipeline() as pipe:
while True:
try:
# 监听这个key,防止多个客户端同时修改
pipe.watch(redis_key)
last_request_time = pipe.get(redis_key)
current_time = time.time()
if last_request_time is not None:
last_request_time = float(last_request_time)
elapsed = current_time - last_request_time
if elapsed < self.default_delay:
# 还需要等待多久
wait_time = self.default_delay - elapsed
time.sleep(wait_time)
current_time = time.time() # 等待后更新当前时间
# 获取到锁后,更新最后请求时间
pipe.multi()
pipe.set(redis_key, current_time)
pipe.execute()
break
except redis.WatchError:
# 如果key被其他客户端改变,重试
continue
return None
# settings.py
DOWNLOADER_MIDDLEWARES = {
'myproject.middlewares.GlobalThrottleMiddleware': 543, # 需要在HttpProxyMiddleware之前执行
# ...
}
目标:让每个请求使用不同的代理IP,并自动淘汰失效的IP。
原理:在Redis中维护一个有序集合(Sorted Set),成员是IP地址,分数是IP的“健康分数”(或最后成功使用的时间)。节点需要代理时,从集合中选取分数最高(最健康)的IP使用。根据请求的成功失败情况,动态调整IP的分数。
实现(结合上述中间件):
# middlewares.py (续)
import base64
import redis
import time
from scrapy.downloadermiddlewares.httpproxy import HttpProxyMiddleware
# 代理服务器信息
proxyHost = "www.16yun.cn"
proxyPort = "5445"
proxyUser = "16QMSOML"
proxyPass = "280651"
class GlobalIPRotationMiddleware(HttpProxyMiddleware):
"""
全局IP代理池管理与轮换中间件(带认证信息)
"""
def __init__(self, redis_client, ip_pool_key='ip_proxy_pool'):
self.redis_client = redis_client
self.ip_pool_key = ip_pool_key
# 生成代理认证信息
self.proxy_auth = self.generate_proxy_auth(proxyUser, proxyPass)
@classmethod
def from_crawler(cls, crawler):
redis_client = redis.StrictRedis(
host=crawler.settings.get('REDIS_HOST', 'localhost'),
port=crawler.settings.get('REDIS_PORT', 6379),
db=crawler.settings.get('REDIS_DB', 0),
decode_responses=True
)
return cls(redis_client)
def generate_proxy_auth(self, username, password):
"""生成代理认证信息"""
auth_string = f"{username}:{password}"
encoded_auth = base64.b64encode(auth_string.encode()).decode()
return f"Basic {encoded_auth}"
def process_request(self, request, spider):
# 只有当请求需要代理时才执行
if 'proxy' in request.meta or getattr(spider, 'use_proxy', False):
proxy_url = self.get_best_proxy()
if proxy_url:
# 设置代理URL
request.meta['proxy'] = proxy_url
# 添加代理认证头信息
request.headers['Proxy-Authorization'] = self.proxy_auth
# 可选:添加其他必要的代理头信息
request.headers['Connection'] = 'close'
def get_best_proxy(self):
"""
从Redis中获取最佳代理
返回格式:http://host:port 或 https://host:port
"""
# 示例策略:获取分数最低(最久未使用)的IP
proxies = self.redis_client.zrange(self.ip_pool_key, 0, 0, withscores=True)
if proxies:
proxy_url, last_used = proxies[0]
# 确保代理URL格式正确
if not proxy_url.startswith(('http://', 'https://')):
proxy_url = f"http://{proxy_url}"
# 更新这个IP的最后使用时间为当前时间(分数)
current_time = time.time()
self.redis_client.zadd(self.ip_pool_key, {proxy_url: current_time})
return proxy_url
# 如果没有从Redis获取到代理,使用默认代理
return self.get_default_proxy()
def get_default_proxy(self):
"""获取默认代理(当Redis中没有代理时使用)"""
return f"http://{proxyHost}:{proxyPort}"
def process_exception(self, request, exception, spider):
"""处理请求异常,降低代理IP分数"""
if 'proxy' in request.meta:
proxy = request.meta['proxy']
try:
# 大幅降低分数,例如减去100。失败次数越多,分数越低。
self.redis_client.zincrby(self.ip_pool_key, -100, proxy)
# 检查分数是否低于阈值,如果是则移除
score = self.redis_client.zscore(self.ip_pool_key, proxy)
if score and score < -500: # 设置移除阈值为-500
self.redis_client.zrem(self.ip_pool_key, proxy)
spider.logger.warning(f"Removed invalid proxy: {proxy}")
except Exception as e:
spider.logger.error(f"Error updating proxy score: {e}")
def process_response(self, request, response, spider):
"""处理响应,更新代理IP健康状态"""
if 'proxy' in request.meta:
proxy = request.meta['proxy']
try:
if response.status != 200:
# 非200响应,轻微惩罚
self.redis_client.zincrby(self.ip_pool_key, -10, proxy)
spider.logger.debug(f"Proxy {proxy} penalized for status {response.status}")
else:
# 成功请求,增加奖励
self.redis_client.zincrby(self.ip_pool_key, 5, proxy)
except Exception as e:
spider.logger.error(f"Error updating proxy health: {e}")
return response
def format_proxy_url(self, proxy_host, proxy_port, scheme='http'):
"""格式化代理URL"""
return f"{scheme}://{proxy_host}:{proxy_port}"
# settings.py 配置示例
"""
DOWNLOADER_MIDDLEWARES = {
'myproject.middlewares.GlobalThrottleMiddleware': 542,
'myproject.middlewares.GlobalIPRotationMiddleware': 543, # 在Throttle之后执行
'scrapy.downloadermiddlewares.httpproxy.HttpProxyMiddleware': None, # 禁用默认的代理中间件
}
# 代理相关设置
PROXY_ENABLED = True
PROXY_HOST = "www.16yun.cn"
PROXY_PORT = "5445"
PROXY_USER = "16QMSOML"
PROXY_PASS = "280651"
# Redis配置
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
"""
robots.txt
协议和相关法律法规,尊重网站的数据产权和用户隐私。分布式爬虫的全局请求间隔协调与IP轮换策略,是其能否在商业环境中稳定、高效、长期运行的生命线。通过引入一个强大的全局状态中心(如Redis),并设计精巧的中间件逻辑,我们可以将分散的爬虫节点整合成一个行为可控、资源分配合理的有机整体。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。