
咱们今天聊聊怎么用Selenium这个自动化工具来抓取代理IP,搭建自己的IP资源池。简单来说,就是通过浏览器模拟操作获取API中的代理数据,经过清洗验证后存起来,最终让爬虫程序能轮流使用这些IP地址,有效解决反爬封IP的难题。
使用Selenium结合API来搭建代理IP池是一个实用的方法,特别是当目标API有反爬虫机制或需要JavaScript渲染时。以下是完整的实现方案:
API获取代理 → 验证代理有效性 → 存储到IP池 → 供爬虫使用pip install selenium requests redis pymysqlfrom selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import json
class ProxyAPIExtractor:
def __init__(self, headless=True):
self.options = webdriver.ChromeOptions()
if headless:
self.options.add_argument('--headless')
self.options.add_argument('--disable-gpu')
self.options.add_argument('--no-sandbox')
self.driver = None
def setup_driver(self):
"""初始化浏览器驱动"""
self.driver = webdriver.Chrome(options=self.options)
def extract_proxies_from_api(self, api_url, wait_element=None, timeout=10):
"""
通过API获取代理IP
:param api_url: API地址
:param wait_element: 需要等待的元素选择器
:param timeout: 超时时间
:return: 代理IP列表
"""
if not self.driver:
self.setup_driver()
try:
self.driver.get(api_url)
# 如果需要等待特定元素加载
if wait_element:
WebDriverWait(self.driver, timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, wait_element))
)
time.sleep(2) # 额外等待确保数据加载完成
# 获取页面内容并解析JSON
page_content = self.driver.find_element(By.TAG_NAME, 'pre').text
proxy_data = json.loads(page_content)
return self.parse_proxy_data(proxy_data)
except Exception as e:
print(f"获取代理失败: {e}")
return []
def parse_proxy_data(self, data):
"""解析API返回的代理数据"""
proxies = []
# 根据实际API返回格式调整解析逻辑
if isinstance(data, list):
for item in data:
if 'ip' in item and 'port' in item:
proxy = f"{item['ip']}:{item['port']}"
proxy_type = item.get('type', 'http').lower()
proxies.append({
'proxy': proxy,
'type': proxy_type,
'anonymity': item.get('anonymity', 'unknown'),
'location': item.get('location', 'unknown'),
'response_time': item.get('response_time', 0)
})
return proxies
def close(self):
"""关闭浏览器"""
if self.driver:
self.driver.quit()
# 使用示例
if __name__ == "__main__":
extractor = ProxyAPIExtractor()
# 示例API URL(需要替换为实际的代理API)
api_url = "https://api.example.com/proxies"
proxies = extractor.extract_proxies_from_api(
api_url,
wait_element=".loading-complete" # 根据实际页面调整
)
print(f"获取到 {len(proxies)} 个代理")
extractor.close()import requests
import concurrent.futures
class ProxyValidator:
def __init__(self, test_url="http://httpbin.org/ip", timeout=5):
self.test_url = test_url
self.timeout = timeout
def validate_proxy(self, proxy_info):
"""验证单个代理的有效性"""
proxy_str = proxy_info['proxy']
proxy_type = proxy_info['type']
proxies = {
'http': f'{proxy_type}://{proxy_str}',
'https': f'{proxy_type}://{proxy_str}'
}
try:
start_time = time.time()
response = requests.get(
self.test_url,
proxies=proxies,
timeout=self.timeout
)
response_time = time.time() - start_time
if response.status_code == 200:
proxy_info['valid'] = True
proxy_info['response_time'] = response_time
proxy_info['last_checked'] = time.time()
return proxy_info
except:
pass
proxy_info['valid'] = False
return proxy_info
def validate_batch(self, proxies, max_workers=10):
"""批量验证代理"""
valid_proxies = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_proxy = {
executor.submit(self.validate_proxy, proxy): proxy
for proxy in proxies
}
for future in concurrent.futures.as_completed(future_to_proxy):
result = future.result()
if result['valid']:
valid_proxies.append(result)
return valid_proxiesimport redis
import json
import time
class ProxyPool:
def __init__(self, host='localhost', port=6379, db=0):
self.redis_client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
self.proxy_key = "proxy_pool:valid_proxies"
self.proxy_score_key = "proxy_pool:proxy_scores"
def add_proxy(self, proxy_info):
"""添加代理到池中"""
proxy_str = proxy_info['proxy']
# 存储代理详细信息
self.redis_client.hset(
self.proxy_key,
proxy_str,
json.dumps(proxy_info)
)
# 设置代理分数(基于响应时间)
score = 10 / (proxy_info['response_time'] + 0.1) # 响应时间越短分数越高
self.redis_client.zadd(self.proxy_score_key, {proxy_str: score})
def get_random_proxy(self):
"""随机获取一个有效代理"""
# 获取分数最高的代理
proxies = self.redis_client.zrevrange(self.proxy_score_key, 0, 0)
if proxies:
proxy_str = proxies[0]
proxy_info = self.redis_client.hget(self.proxy_key, proxy_str)
return json.loads(proxy_info) if proxy_info else None
return None
def get_proxy_by_type(self, proxy_type):
"""按类型获取代理"""
all_proxies = self.redis_client.hgetall(self.proxy_key)
result = []
for proxy_str, proxy_info in all_proxies.items():
info = json.loads(proxy_info)
if info['type'] == proxy_type and info['valid']:
result.append(info)
return result
def remove_proxy(self, proxy_str):
"""移除无效代理"""
self.redis_client.hdel(self.proxy_key, proxy_str)
self.redis_client.zrem(self.proxy_score_key, proxy_str)
def get_all_proxies(self):
"""获取所有代理"""
all_proxies = self.redis_client.hgetall(self.proxy_key)
return [json.loads(proxy_info) for proxy_info in all_proxies.values()]
def cleanup_expired_proxies(self, expiry_hours=24):
"""清理过期代理"""
all_proxies = self.get_all_proxies()
current_time = time.time()
for proxy_info in all_proxies:
last_checked = proxy_info.get('last_checked', 0)
if current_time - last_checked > expiry_hours * 3600:
self.remove_proxy(proxy_info['proxy'])import schedule
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ProxyPoolManager:
def __init__(self, api_urls, redis_config=None):
self.api_urls = api_urls
self.extractor = ProxyAPIExtractor(headless=True)
self.validator = ProxyValidator()
self.pool = ProxyPool(**(redis_config or {}))
def fetch_and_validate_proxies(self):
"""获取并验证代理"""
all_proxies = []
for api_url in self.api_urls:
logger.info(f"从 {api_url} 获取代理...")
proxies = self.extractor.extract_proxies_from_api(api_url)
all_proxies.extend(proxies)
logger.info(f"总共获取到 {len(all_proxies)} 个代理,开始验证...")
valid_proxies = self.validator.validate_batch(all_proxies)
logger.info(f"验证通过 {len(valid_proxies)} 个代理")
for proxy in valid_proxies:
self.pool.add_proxy(proxy)
return valid_proxies
def run_periodically(self, interval_minutes=30):
"""定期运行代理获取任务"""
logger.info("启动代理池管理器...")
# 立即运行一次
self.fetch_and_validate_proxies()
# 设置定时任务
schedule.every(interval_minutes).minutes.do(
self.fetch_and_validate_proxies
)
while True:
try:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logger.info("停止代理池管理器")
break
except Exception as e:
logger.error(f"任务执行错误: {e}")
time.sleep(60)
def get_proxy_for_spider(self):
"""为爬虫提供代理"""
return self.pool.get_random_proxy()
# 配置和使用示例
if __name__ == "__main__":
# 代理API列表(需要替换为实际的API)
api_urls = [
"https://api1.example.com/proxies",
"https://api2.example.com/proxies"
]
redis_config = {
'host': 'localhost',
'port': 6379,
'db': 0
}
manager = ProxyPoolManager(api_urls, redis_config)
# 启动定时任务(在生产环境中)
# manager.run_periodically(interval_minutes=30)
# 或者单次运行
manager.fetch_and_validate_proxies()
# 获取代理供爬虫使用
proxy = manager.get_proxy_for_spider()
print(f"推荐代理: {proxy}")import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class SpiderWithProxy:
def __init__(self, proxy_pool_manager):
self.proxy_pool = proxy_pool_manager
self.session = self._create_session()
def _create_session(self):
"""创建带有重试机制的会话"""
session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=0.1,
status_forcelist=[500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def make_request(self, url, timeout=10):
"""使用代理发送请求"""
max_retries = 3
for attempt in range(max_retries):
proxy_info = self.proxy_pool.get_proxy_for_spider()
if not proxy_info:
raise Exception("没有可用的代理")
proxies = {
'http': f"{proxy_info['type']}://{proxy_info['proxy']}",
'https': f"{proxy_info['type']}://{proxy_info['proxy']}"
}
try:
response = self.session.get(
url,
proxies=proxies,
timeout=timeout
)
logger.info(f"成功使用代理 {proxy_info['proxy']} 访问 {url}")
return response
except Exception as e:
logger.warning(f"代理 {proxy_info['proxy']} 失败: {e}")
# 标记代理为无效
self.proxy_pool.pool.remove_proxy(proxy_info['proxy'])
raise Exception(f"所有代理尝试失败,无法访问 {url}")
# 使用示例
if __name__ == "__main__":
# 初始化代理池管理器
manager = ProxyPoolManager(api_urls, redis_config)
# 初始化爬虫
spider = SpiderWithProxy(manager)
# 使用代理发送请求
try:
response = spider.make_request("https://httpbin.org/ip")
print(f"响应内容: {response.text}")
except Exception as e:
print(f"请求失败: {e}")创建Dockerfile:
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
# 安装Chrome和ChromeDriver
RUN apt-get update && apt-get install -y \
wget \
gnupg \
&& wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \
&& echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list \
&& apt-get update \
&& apt-get install -y google-chrome-stable \
&& wget -O /tmp/chromedriver.zip https://chromedriver.storage.googleapis.com/$(curl -s https://chromedriver.storage.googleapis.com/LATEST_RELEASE)/chromedriver_linux64.zip \
&& unzip /tmp/chromedriver.zip -d /usr/bin/ \
&& rm /tmp/chromedriver.zip
CMD ["python", "proxy_pool_manager.py"]创建config.py:
# config.py
API_URLS = [
"https://api.proxyscrape.com/v2/?request=displayproxies",
"https://proxylist.geonode.com/api/proxy-list",
]
REDIS_CONFIG = {
'host': 'redis',
'port': 6379,
'db': 0
}
SCHEDULE_INTERVAL = 30 # 分钟1、遵守API使用条款:确保你有权使用目标API
2、频率控制:避免过于频繁地请求API
3、代理质量:定期清理无效代理
4、错误处理:添加完善的错误处理和日志记录
5、资源清理:确保正确关闭浏览器实例
这个方案提供了完整的代理IP池搭建流程,从代理获取、验证、存储到最终供爬虫使用。你可以根据实际需求调整各个模块的具体实现。
总之,这套方案相当于给爬虫装备了"IP护甲",通过定期更新和维护IP池,能显著提升爬虫的工作效率。在实际使用时记得要控制请求频率,遵守网站规则,同时做好错误处理和日志记录,这样就能长期稳定地获取可用的代理资源了。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。