下载地址:https://www.pan38.com/yun/share.php?code=JCnzE 提取密码:2211
这个项目包含三个主要文件:主爬虫程序(xy_spider.py)、工具函数(xy_utils.py)和配置文件(xy_config.py)。主程序实现了商品采集、价格监控和批量上架功能,使用Selenium模拟浏览器操作。工具文件包含各种辅助函数,配置文件则集中管理所有设置参数。
import requests
import json
import time
import random
from bs4 import BeautifulSoup
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import logging
import os
import schedule
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='xy_spider.log'
)
class XianYuSpider:
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Referer': 'https://2.taobao.com/'
}
self.session = requests.Session()
self.session.headers.update(self.headers)
self.chrome_options = Options()
self.chrome_options.add_argument('--headless')
self.chrome_options.add_argument('--disable-gpu')
self.driver = webdriver.Chrome(options=self.chrome_options)
self.wait = WebDriverWait(self.driver, 10)
self.base_url = "https://2.taobao.com/"
self.keywords = ["手机", "电脑", "耳机", "手表"]
self.min_price = 50
self.max_price = 1000
self.data_file = "xy_products.csv"
self.login_status = False
def login(self):
try:
self.driver.get("https://login.taobao.com/")
self.wait.until(EC.presence_of_element_located((By.ID, "fm-login-id")))
username = self.driver.find_element(By.ID, "fm-login-id")
password = self.driver.find_element(By.ID, "fm-login-password")
submit = self.driver.find_element(By.CSS_SELECTOR, ".fm-button.fm-submit.password-login")
username.send_keys("your_username")
password.send_keys("your_password")
submit.click()
# 等待登录成功
self.wait.until(EC.url_contains("taobao.com"))
self.login_status = True
logging.info("登录成功")
except Exception as e:
logging.error(f"登录失败: {str(e)}")
self.login_status = False
def search_products(self, keyword, pages=3):
if not self.login_status:
self.login()
if not self.login_status:
return []
products = []
try:
for page in range(1, pages+1):
url = f"{self.base_url}search.htm?search={keyword}&page={page}"
self.driver.get(url)
time.sleep(random.uniform(1, 3))
# 等待商品列表加载
self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".item-lists .item")))
soup = BeautifulSoup(self.driver.page_source, 'html.parser')
items = soup.select('.item-lists .item')
for item in items:
try:
title = item.select_one('.title a').get_text(strip=True)
price = float(item.select_one('.price').get_text(strip=True).replace('¥', ''))
link = item.select_one('.title a')['href']
seller = item.select_one('.seller-nick').get_text(strip=True)
location = item.select_one('.location').get_text(strip=True)
if self.min_price <= price <= self.max_price:
products.append({
'title': title,
'price': price,
'link': link,
'seller': seller,
'location': location,
'keyword': keyword,
'timestamp': int(time.time())
})
except Exception as e:
logging.error(f"解析商品失败: {str(e)}")
continue
logging.info(f"关键词: {keyword}, 第{page}页, 采集到{len(items)}个商品")
time.sleep(random.uniform(2, 5))
except Exception as e:
logging.error(f"搜索商品失败: {str(e)}")
return products
def save_to_csv(self, products):
try:
df = pd.DataFrame(products)
if os.path.exists(self.data_file):
existing_df = pd.read_csv(self.data_file)
df = pd.concat([existing_df, df], ignore_index=True)
df.drop_duplicates(subset=['title', 'price', 'seller'], keep='first', inplace=True)
df.to_csv(self.data_file, index=False, encoding='utf-8-sig')
logging.info(f"成功保存{len(products)}条商品数据到{self.data_file}")
except Exception as e:
logging.error(f"保存数据失败: {str(e)}")
def monitor_prices(self):
try:
if not os.path.exists(self.data_file):
logging.warning("没有找到商品数据文件")
return
df = pd.read_csv(self.data_file)
for _, row in df.iterrows():
try:
self.driver.get(row['link'])
time.sleep(random.uniform(1, 3))
soup = BeautifulSoup(self.driver.page_source, 'html.parser')
current_price = float(soup.select_one('.price').get_text(strip=True).replace('¥', ''))
if current_price < row['price']:
logging.info(f"商品降价提醒: {row['title']}, 原价: {row['price']}, 现价: {current_price}")
# 这里可以添加邮件或短信通知逻辑
except Exception as e:
logging.error(f"监控价格失败: {str(e)}")
continue
except Exception as e:
logging.error(f"价格监控失败: {str(e)}")
def batch_upload(self, products):
if not self.login_status:
self.login()
if not self.login_status:
return False
try:
self.driver.get("https://2.taobao.com/publish.htm")
time.sleep(3)
for product in products:
try:
# 填写商品信息
title_input = self.driver.find_element(By.NAME, "title")
price_input = self.driver.find_element(By.NAME, "price")
desc_input = self.driver.find_element(By.NAME, "desc")
title_input.clear()
price_input.clear()
desc_input.clear()
title_input.send_keys(product['title'])
price_input.send_keys(str(product['price'] * 1.2)) # 加价20%
desc_input.send_keys(f"优质二手商品,{product['title']},成色良好")
# 模拟上传图片
# 这里需要根据实际情况调整
# 提交表单
submit_btn = self.driver.find_element(By.CSS_SELECTOR, ".submit-btn")
submit_btn.click()
logging.info(f"成功上架商品: {product['title']}")
time.sleep(random.uniform(5, 10))
except Exception as e:
logging.error(f"上架商品失败: {str(e)}")
continue
return True
except Exception as e:
logging.error(f"批量上架失败: {str(e)}")
return False
def run(self):
# 定时任务
schedule.every(1).hours.do(self.monitor_prices)
while True:
try:
# 采集商品
all_products = []
for keyword in self.keywords:
products = self.search_products(keyword)
all_products.extend(products)
# 保存商品
if all_products:
self.save_to_csv(all_products)
# 执行定时任务
schedule.run_pending()
time.sleep(60)
except KeyboardInterrupt:
logging.info("程序退出")
self.driver.quit()
break
except Exception as e:
logging.error(f"主循环错误: {str(e)}")
time.sleep(300)
continue
if __name__ == "__main__":
spider = XianYuSpider()
spider.run()
import json
import hashlib
import time
import random
import string
import pandas as pd
from datetime import datetime
def generate_product_id(title, seller):
"""生成商品唯一ID"""
s = f"{title}_{seller}_{int(time.time())}"
return hashlib.md5(s.encode('utf-8')).hexdigest()
def filter_products(products, min_price=0, max_price=9999, keywords=None):
"""过滤商品"""
if keywords is None:
keywords = []
filtered = []
for p in products:
if min_price <= p['price'] <= max_price:
if not keywords or any(kw.lower() in p['title'].lower() for kw in keywords):
filtered.append(p)
return filtered
def analyze_price_trend(data_file):
"""分析价格趋势"""
try:
df = pd.read_csv(data_file)
df['date'] = pd.to_datetime(df['timestamp'], unit='s')
# 按天分组计算平均价格
daily_avg = df.groupby([df['date'].dt.date, 'keyword'])['price'].mean().unstack()
# 计算价格变化百分比
price_change = daily_avg.pct_change().fillna(0) * 100
return {
'daily_avg': daily_avg.to_dict(),
'price_change': price_change.to_dict()
}
except Exception as e:
print(f"分析价格趋势失败: {str(e)}")
return None
def send_notification(message, method='console'):
"""发送通知"""
if method == 'console':
print(f"[通知] {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {message}")
# 可以扩展其他通知方式,如邮件、短信等
def generate_random_string(length=8):
"""生成随机字符串"""
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def save_to_json(data, filename):
"""保存数据到JSON文件"""
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
print(f"保存JSON失败: {str(e)}")
return False
def load_from_json(filename):
"""从JSON文件加载数据"""
try:
with open(filename, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"加载JSON失败: {str(e)}")
return None
# 爬虫配置
SPIDER_CONFIG = {
'keywords': ['手机', '笔记本电脑', '耳机', '智能手表', '相机'],
'price_range': {
'min': 50,
'max': 2000
},
'search_pages': 3,
'interval': 3600, # 采集间隔(秒)
'user_agents': [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1'
],
'proxy': None, # 代理设置
'headless': True # 是否使用无头模式
}
# 数据库配置
DATABASE_CONFIG = {
'type': 'csv', # csv/json/sqlite
'filename': 'xy_products.csv',
'table_name': 'products'
}
# 通知配置
NOTIFICATION_CONFIG = {
'email': {
'enabled': False,
'smtp_server': 'smtp.example.com',
'smtp_port': 587,
'username': 'your_email@example.com',
'password': 'your_password',
'recipients': ['recipient@example.com']
},
'sms': {
'enabled': False,
'api_key': 'your_sms_api_key'
}
}
# 上架配置
UPLOAD_CONFIG = {
'price_markup': 0.2, # 加价比例
'min_stock': 1,
'max_stock': 10,
'auto_renew': True,
'category_mapping': {
'手机': '数码产品',
'笔记本电脑': '数码产品',
'耳机': '数码产品',
'智能手表': '数码产品',
'相机': '数码产品'
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。