首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >全量抓取还是增量采集?二手房数据采集实战解析

全量抓取还是增量采集?二手房数据采集实战解析

原创
作者头像
jackcode
发布2025-08-18 11:37:33
发布2025-08-18 11:37:33
14700
代码可运行
举报
文章被收录于专栏:爬虫资料爬虫资料
运行总次数:0
代码可运行

项目背景

很多做数据采集的同学都会遇到一个老问题:到底是一次性把网站的数据全部抓取下来,还是定期只更新新增和变化的部分?

我之前在做二手房市场监测的时候,就碰到过这个选择。当时目标是对比不同城市、不同小区的挂牌房源,看看价格走势和交易活跃度。如果抓取策略不对,不仅会浪费资源,还可能导致数据质量不高。

所以,本文就结合「链家二手房」这个实际站点,聊聊全量抓取和增量采集的取舍,并通过一个实战小项目,展示如何结合爬虫代理IP技术去实现定期的数据获取和统计。

数据目标

目标字段(示例)

  • 基础识别:house_id(从 URL 或页面特征提取)、titleurl
  • 位置维度:citydistrict(区)、bizcircle(商圈/板块)、community(小区名)
  • 核心指标:total_price(万元)、unit_price(元/㎡)、area(㎡)、room_type(几室几厅)
  • 时间戳:first_seen_at(首次发现时间)、last_seen_at(最后一次看到)
  • 变更检测:content_hash(用于判断记录是否变更)

存储设计

  • 使用 SQLite(轻量)/ PostgreSQL(生产可选)持久化记录;
  • house_id 作为主键,配合 content_hash 实现 幂等写入增量更新
  • 定期产出 统计汇总,如“按区/小区的挂牌数量、均价、面积分布”。

统计示例

  • district 维度:挂牌量、平均单价、价格分位
  • community 维度:挂牌量 Top N、均价 Top N
  • 趋势维度(可扩展):每日新增挂牌量、下架量(需引入“消失检测”)

技术选型

在数据获取方式上,常见有两种:

  1. 全量抓取
    • 每次任务都从头到尾抓一遍。
    • 优点:不会漏数据。
    • 缺点:压力大,耗时耗流量,重复数据多。
  2. 增量采集
    • 每次只采集“新增”或“变化”的部分,比如根据发布时间筛选。
    • 优点:节省资源,数据更新快。
    • 缺点:需要额外逻辑来判断哪些是新数据,哪些是修改过的数据。

我的经验是:

  • 前期数据基线不足时,用全量抓取先把底子打好。
  • 后期维护阶段,采用增量采集,避免重复抓取大量无效信息。

在网络层面,由于链家有一定的访问频率限制,所以必须结合代理池。这里我选用了 亿牛云爬虫代理服务,支持用户名密码认证,可以减少封禁风险。

模块实现(代码可直接运行/改造)

运行环境:Python 3.10+undefined安装依赖:

代码语言:bash
复制
pip install requests curl_cffi lxml beautifulsoup4 fake-useragent sqlalchemy pandas apscheduler

0)统一配置(目标入口、代理、数据库)

代码语言:python
代码运行次数:0
运行
复制
# -*- coding: utf-8 -*-
"""
项目:贝壳二手房抓取 - 全量 vs 增量
"""

import re, time, hashlib, random, datetime
from urllib.parse import urljoin

import requests
from curl_cffi import requests as cfre
from fake_useragent import UserAgent
from bs4 import BeautifulSoup

import pandas as pd
from sqlalchemy import create_engine, Table, Column, String, Float, DateTime, MetaData, insert, select

# ------------------------
# 基础配置 代理(参考:亿牛云爬虫代理)
# ------------------------
BASE_URL = "https://bj.lianjia.com/ershoufang/"
PROXY_USER, PROXY_PASS = "16YUN", "16IP"
PROXY_SERVER = "proxy.16yun.cn:1234"

DB_URL = "sqlite:///ershoufang.db"
engine = create_engine(DB_URL)
metadata = MetaData()

1)建表 & 工具函数(主键、哈希、幂等)

代码语言:python
代码运行次数:0
运行
复制
# 定义表结构
houses = Table("houses", metadata,
    Column("house_id", String, primary_key=True),
    Column("title", String),
    Column("url", String),
    Column("city", String),
    Column("district", String),
    Column("bizcircle", String),
    Column("community", String),
    Column("total_price", Float),
    Column("unit_price", Float),
    Column("area", Float),
    Column("room_type", String),
    Column("first_seen_at", DateTime),
    Column("last_seen_at", DateTime),
    Column("content_hash", String),
)
metadata.create_all(engine)

ua = UserAgent()

def gen_headers():
    return {"User-Agent": ua.random}

def gen_proxy():
    return f"http://{PROXY_USER}:{PROXY_PASS}@{PROXY_SERVER}"

def get_content_hash(d: dict):
    sig = f"{d.get('title')}-{d.get('total_price')}-{d.get('unit_price')}-{d.get('area')}"
    return hashlib.md5(sig.encode()).hexdigest()

def get_house_id(url: str):
    m = re.search(r"/(\d+)\.html", url)
    return m.group(1) if m else None

2)请求层

代码语言:python
代码运行次数:0
运行
复制
def fetch(url, use_cffi=False, retries=3):
    for i in range(retries):
        try:
            if use_cffi:
                resp = cfre.get(url, headers=gen_headers(), proxies={"http": gen_proxy(), "https": gen_proxy()}, impersonate="chrome110")
            else:
                resp = requests.get(url, headers=gen_headers(), proxies={"http": gen_proxy(), "https": gen_proxy()}, timeout=15)
            if resp.status_code == 200:
                return resp.text
        except Exception as e:
            print(f"[WARN] fetch error {e}, retry {i}")
            time.sleep(2)
    return None

3)解析层(列表页抽取 + 详情增强可选)

代码语言:python
代码运行次数:0
运行
复制
def parse_list(html):
    soup = BeautifulSoup(html, "lxml")
    ul = soup.find("ul", class_="sellListContent")
    if not ul: return []
    data = []
    for li in ul.find_all("li", recursive=False):
        try:
            a = li.find("a", class_="noresultRecommend img")
            if not a: continue
            url = a["href"]
            title = a.get("title", "").strip()
            house_id = get_house_id(url)
            info = li.find("div", class_="houseInfo").get_text(" ", strip=True)
            pos = li.find("div", class_="positionInfo").get_text(" ", strip=True)
            total_price = float(li.find("div", class_="totalPrice").span.text)
            unit_price = float(re.sub(r"\D", "", li.find("div", class_="unitPrice").get_text()))
            area_match = re.search(r"(\d+\.?\d*)㎡", info)
            area = float(area_match.group(1)) if area_match else None
            room_match = re.search(r"(\d+室\d厅)", info)
            room_type = room_match.group(1) if room_match else None

            district, bizcircle, community = None, None, None
            pos_parts = pos.split()
            if len(pos_parts) >= 2:
                district, bizcircle = pos_parts[:2]
            community = pos_parts[-1]

            rec = dict(
                house_id=house_id, url=url, title=title,
                city="北京", district=district, bizcircle=bizcircle, community=community,
                total_price=total_price, unit_price=unit_price, area=area, room_type=room_type,
            )
            rec["content_hash"] = get_content_hash(rec)
            data.append(rec)
        except Exception as e:
            print("[WARN] parse error", e)
            continue
    return data

4)写库(幂等插入/更新 + 增量哈希)

代码语言:python
代码运行次数:0
运行
复制
def upsert_records(records):
    if not records: return
    conn = engine.connect()
    now = datetime.datetime.now()
    for rec in records:
        stmt = select(houses).where(houses.c.house_id == rec["house_id"])
        row = conn.execute(stmt).fetchone()
        if not row:
            rec["first_seen_at"] = now
            rec["last_seen_at"] = now
            conn.execute(insert(houses).values(**rec))
        else:
            if row.content_hash != rec["content_hash"]:
                conn.execute(houses.update().where(houses.c.house_id == rec["house_id"]).values(**rec, last_seen_at=now))
            else:
                conn.execute(houses.update().where(houses.c.house_id == rec["house_id"]).values(last_seen_at=now))
    conn.commit()
    conn.close()

5)分页抓取与“模式切换”(全量 vs 增量)

代码语言:python
代码运行次数:0
运行
复制
def crawl_area(area_url, mode="incremental", max_pages=50):
    stop_after_no_new = 5
    no_new_count = 0
    for page in range(1, max_pages + 1):
        url = f"{area_url}pg{page}/"
        html = fetch(url, use_cffi=True)
        if not html: break
        recs = parse_list(html)
        if not recs: break
        before_count = len(recs)
        upsert_records(recs)
        if mode == "incremental":
            if len(recs) == 0: no_new_count += 1
            else: no_new_count = 0
            if no_new_count >= stop_after_no_new:
                print("[INFO] early stop, no new records found")
                break
        time.sleep(random.uniform(1, 3))

6)地理位置 / 小区名搜索入口(多维组合)

代码语言:python
代码运行次数:0
运行
复制
def run_city(city="bj", mode="incremental"):
    districts = ["dongcheng", "xicheng", "chaoyang", "haidian"]
    for d in districts:
        url = f"https://{city}.lianjia.com/ershoufang/{d}/"
        crawl_area(url, mode=mode, max_pages=30)

7)统计与导出(定期归纳)

代码语言:python
代码运行次数:0
运行
复制
def export_stats():
    df = pd.read_sql_table("houses", engine)
    now = datetime.datetime.now().strftime("%Y%m%d_%H%M")
    out = f"stats_{now}.xlsx"

    summary1 = df.groupby("district")["unit_price"].mean().reset_index().rename(columns={"unit_price": "avg_price"})
    summary2 = df.groupby("community")["unit_price"].mean().reset_index().sort_values("unit_price", ascending=False).head(20)

    with pd.ExcelWriter(out) as writer:
        df.to_excel(writer, sheet_name="raw", index=False)
        summary1.to_excel(writer, sheet_name="by_district", index=False)
        summary2.to_excel(writer, sheet_name="by_community_top20", index=False)
    print("[INFO] stats exported", out)

8)调度(每天 08:00 / 16:00 运行)

代码语言:python
代码运行次数:0
运行
复制
from apscheduler.schedulers.blocking import BlockingScheduler

def scheduled_job():
    print("[INFO] crawl start", datetime.datetime.now())
    run_city("bj", mode="incremental")
    export_stats()

if __name__ == "__main__":
    scheduler = BlockingScheduler()
    scheduler.add_job(scheduled_job, "cron", hour="8,16", minute=0)
    print("[INFO] Scheduler started")
    scheduler.start()

结语

在真实业务里,“全量 vs 增量”从来不是二选一,而是 阶段性权衡工程化妥协。建议你将两种模式都纳入框架能力:用全量做“基线校准”,用增量做“日常维护”,再辅以内容哈希、早停策略、代理与频控,既稳且快,长期运营成本最低。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 图片
    • 项目背景
    • 数据目标
    • 技术选型
    • 模块实现(代码可直接运行/改造)
      • 0)统一配置(目标入口、代理、数据库)
      • 1)建表 & 工具函数(主键、哈希、幂等)
      • 2)请求层
      • 3)解析层(列表页抽取 + 详情增强可选)
      • 4)写库(幂等插入/更新 + 增量哈希)
      • 5)分页抓取与“模式切换”(全量 vs 增量)
      • 6)地理位置 / 小区名搜索入口(多维组合)
      • 7)统计与导出(定期归纳)
      • 8)调度(每天 08:00 / 16:00 运行)
    • 结语
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档