很多做数据采集的同学都会遇到一个老问题:到底是一次性把网站的数据全部抓取下来,还是定期只更新新增和变化的部分?
我之前在做二手房市场监测的时候,就碰到过这个选择。当时目标是对比不同城市、不同小区的挂牌房源,看看价格走势和交易活跃度。如果抓取策略不对,不仅会浪费资源,还可能导致数据质量不高。
所以,本文就结合「链家二手房」这个实际站点,聊聊全量抓取和增量采集的取舍,并通过一个实战小项目,展示如何结合爬虫代理IP技术去实现定期的数据获取和统计。
目标字段(示例)
house_id
(从 URL 或页面特征提取)、title
、url
city
、district
(区)、bizcircle
(商圈/板块)、community
(小区名) total_price
(万元)、unit_price
(元/㎡)、area
(㎡)、room_type
(几室几厅) first_seen_at
(首次发现时间)、last_seen_at
(最后一次看到) content_hash
(用于判断记录是否变更)存储设计
house_id
作为主键,配合 content_hash
实现 幂等写入 与 增量更新; 统计示例
district
维度:挂牌量、平均单价、价格分位 community
维度:挂牌量 Top N、均价 Top N 在数据获取方式上,常见有两种:
我的经验是:
在网络层面,由于链家有一定的访问频率限制,所以必须结合代理池。这里我选用了 亿牛云爬虫代理服务,支持用户名密码认证,可以减少封禁风险。
运行环境:Python 3.10+undefined安装依赖:
pip install requests curl_cffi lxml beautifulsoup4 fake-useragent sqlalchemy pandas apscheduler
# -*- coding: utf-8 -*-
"""
项目:贝壳二手房抓取 - 全量 vs 增量
"""
import re, time, hashlib, random, datetime
from urllib.parse import urljoin
import requests
from curl_cffi import requests as cfre
from fake_useragent import UserAgent
from bs4 import BeautifulSoup
import pandas as pd
from sqlalchemy import create_engine, Table, Column, String, Float, DateTime, MetaData, insert, select
# ------------------------
# 基础配置 代理(参考:亿牛云爬虫代理)
# ------------------------
BASE_URL = "https://bj.lianjia.com/ershoufang/"
PROXY_USER, PROXY_PASS = "16YUN", "16IP"
PROXY_SERVER = "proxy.16yun.cn:1234"
DB_URL = "sqlite:///ershoufang.db"
engine = create_engine(DB_URL)
metadata = MetaData()
# 定义表结构
houses = Table("houses", metadata,
Column("house_id", String, primary_key=True),
Column("title", String),
Column("url", String),
Column("city", String),
Column("district", String),
Column("bizcircle", String),
Column("community", String),
Column("total_price", Float),
Column("unit_price", Float),
Column("area", Float),
Column("room_type", String),
Column("first_seen_at", DateTime),
Column("last_seen_at", DateTime),
Column("content_hash", String),
)
metadata.create_all(engine)
ua = UserAgent()
def gen_headers():
return {"User-Agent": ua.random}
def gen_proxy():
return f"http://{PROXY_USER}:{PROXY_PASS}@{PROXY_SERVER}"
def get_content_hash(d: dict):
sig = f"{d.get('title')}-{d.get('total_price')}-{d.get('unit_price')}-{d.get('area')}"
return hashlib.md5(sig.encode()).hexdigest()
def get_house_id(url: str):
m = re.search(r"/(\d+)\.html", url)
return m.group(1) if m else None
def fetch(url, use_cffi=False, retries=3):
for i in range(retries):
try:
if use_cffi:
resp = cfre.get(url, headers=gen_headers(), proxies={"http": gen_proxy(), "https": gen_proxy()}, impersonate="chrome110")
else:
resp = requests.get(url, headers=gen_headers(), proxies={"http": gen_proxy(), "https": gen_proxy()}, timeout=15)
if resp.status_code == 200:
return resp.text
except Exception as e:
print(f"[WARN] fetch error {e}, retry {i}")
time.sleep(2)
return None
def parse_list(html):
soup = BeautifulSoup(html, "lxml")
ul = soup.find("ul", class_="sellListContent")
if not ul: return []
data = []
for li in ul.find_all("li", recursive=False):
try:
a = li.find("a", class_="noresultRecommend img")
if not a: continue
url = a["href"]
title = a.get("title", "").strip()
house_id = get_house_id(url)
info = li.find("div", class_="houseInfo").get_text(" ", strip=True)
pos = li.find("div", class_="positionInfo").get_text(" ", strip=True)
total_price = float(li.find("div", class_="totalPrice").span.text)
unit_price = float(re.sub(r"\D", "", li.find("div", class_="unitPrice").get_text()))
area_match = re.search(r"(\d+\.?\d*)㎡", info)
area = float(area_match.group(1)) if area_match else None
room_match = re.search(r"(\d+室\d厅)", info)
room_type = room_match.group(1) if room_match else None
district, bizcircle, community = None, None, None
pos_parts = pos.split()
if len(pos_parts) >= 2:
district, bizcircle = pos_parts[:2]
community = pos_parts[-1]
rec = dict(
house_id=house_id, url=url, title=title,
city="北京", district=district, bizcircle=bizcircle, community=community,
total_price=total_price, unit_price=unit_price, area=area, room_type=room_type,
)
rec["content_hash"] = get_content_hash(rec)
data.append(rec)
except Exception as e:
print("[WARN] parse error", e)
continue
return data
def upsert_records(records):
if not records: return
conn = engine.connect()
now = datetime.datetime.now()
for rec in records:
stmt = select(houses).where(houses.c.house_id == rec["house_id"])
row = conn.execute(stmt).fetchone()
if not row:
rec["first_seen_at"] = now
rec["last_seen_at"] = now
conn.execute(insert(houses).values(**rec))
else:
if row.content_hash != rec["content_hash"]:
conn.execute(houses.update().where(houses.c.house_id == rec["house_id"]).values(**rec, last_seen_at=now))
else:
conn.execute(houses.update().where(houses.c.house_id == rec["house_id"]).values(last_seen_at=now))
conn.commit()
conn.close()
def crawl_area(area_url, mode="incremental", max_pages=50):
stop_after_no_new = 5
no_new_count = 0
for page in range(1, max_pages + 1):
url = f"{area_url}pg{page}/"
html = fetch(url, use_cffi=True)
if not html: break
recs = parse_list(html)
if not recs: break
before_count = len(recs)
upsert_records(recs)
if mode == "incremental":
if len(recs) == 0: no_new_count += 1
else: no_new_count = 0
if no_new_count >= stop_after_no_new:
print("[INFO] early stop, no new records found")
break
time.sleep(random.uniform(1, 3))
def run_city(city="bj", mode="incremental"):
districts = ["dongcheng", "xicheng", "chaoyang", "haidian"]
for d in districts:
url = f"https://{city}.lianjia.com/ershoufang/{d}/"
crawl_area(url, mode=mode, max_pages=30)
def export_stats():
df = pd.read_sql_table("houses", engine)
now = datetime.datetime.now().strftime("%Y%m%d_%H%M")
out = f"stats_{now}.xlsx"
summary1 = df.groupby("district")["unit_price"].mean().reset_index().rename(columns={"unit_price": "avg_price"})
summary2 = df.groupby("community")["unit_price"].mean().reset_index().sort_values("unit_price", ascending=False).head(20)
with pd.ExcelWriter(out) as writer:
df.to_excel(writer, sheet_name="raw", index=False)
summary1.to_excel(writer, sheet_name="by_district", index=False)
summary2.to_excel(writer, sheet_name="by_community_top20", index=False)
print("[INFO] stats exported", out)
from apscheduler.schedulers.blocking import BlockingScheduler
def scheduled_job():
print("[INFO] crawl start", datetime.datetime.now())
run_city("bj", mode="incremental")
export_stats()
if __name__ == "__main__":
scheduler = BlockingScheduler()
scheduler.add_job(scheduled_job, "cron", hour="8,16", minute=0)
print("[INFO] Scheduler started")
scheduler.start()
在真实业务里,“全量 vs 增量”从来不是二选一,而是 阶段性权衡 与 工程化妥协。建议你将两种模式都纳入框架能力:用全量做“基线校准”,用增量做“日常维护”,再辅以内容哈希、早停策略、代理与频控,既稳且快,长期运营成本最低。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。