高效爬虫的思考与实现

14 分钟阅读

我一直在思考到底什么样的工具或系统能够完全地解脱爬虫开发人员。 我认为爬虫开发工程师应该去解决系统层面的问题,去解决性能上的问题,而不是每天都在分析如何去抓取某个网站或某个APP。 原则上讲,具体的业务需求应该有数据的需求方自己来解决,自己在某一个系统上或工具上进行定制就可以了。

2016年我做了一个简单的尝试,将爬虫的流程剥离成一个配置文件; 2017年我尝试着去设计一套配置化抓取系统; 2018年全年我致力于所谓的配置化抓取工具的开发上。

本文将系统性地总结这些年在高效爬虫系统设计上的思考与实践。


一、爬虫系统的演进

1.1 脚本时代

最原始的爬虫就是一个脚本:

import requests
from bs4 import BeautifulSoup

url = "https://example.com/list"
resp = requests.get(url)
soup = BeautifulSoup(resp.text, 'html.parser')
for item in soup.select('.item'):
    print(item.text)

问题:

  • 无法并发,效率低
  • 无异常处理,不稳定
  • 代码重复,难维护

1.2 框架时代

Scrapy 等框架的出现解决了工程化问题:

class ExampleSpider(scrapy.Spider):
    name = 'example'
    start_urls = ['https://example.com']
    
    def parse(self, response):
        for item in response.css('.item'):
            yield {'title': item.css('h2::text').get()}

解决了:

  • 并发调度
  • 异常重试
  • 数据管道

但仍存在:

  • 每个网站都要写代码
  • 反爬应对需要定制
  • 运维成本高

1.3 配置化时代

我的目标:让业务方通过配置就能完成数据抓取

# spider.yaml
name: example_spider
source:
  type: web
  url: "https://example.com/list?page={page}"
  pagination:
    start: 1
    end: 100

extract:
  selector: ".item"
  fields:
    title: "h2::text"
    link: "a::attr(href)"
    price: ".price::text | float"

output:
  type: mongodb
  collection: products

二、高效爬虫架构设计

2.1 分层架构

┌─────────────────────────────────────────────────────────────┐
│                      任务调度层                              │
│  (任务分发、优先级队列、去重、限速)                           │
├─────────────────────────────────────────────────────────────┤
│                      下载执行层                              │
│  (请求构造、代理管理、重试机制、浏览器渲染)                    │
├─────────────────────────────────────────────────────────────┤
│                      数据处理层                              │
│  (解析提取、清洗转换、实体识别)                               │
├─────────────────────────────────────────────────────────────┤
│                      存储输出层                              │
│  (数据库、消息队列、文件系统)                                 │
└─────────────────────────────────────────────────────────────┘

2.2 核心组件

任务调度器 (Scheduler)

from redis import Redis
from dataclasses import dataclass
from typing import Optional
import json
import hashlib

@dataclass
class Task:
    url: str
    method: str = 'GET'
    headers: dict = None
    body: dict = None
    meta: dict = None
    priority: int = 0
    retry_count: int n    @property
    def fingerprint(self) -> str:
        """任务指纹,用于去重"""
        s = f"{self.method}:{self.url}:{json.dumps(self.body, sort_keys=True)}"
        return hashlib.md5(s.encode()).hexdigest()

class Scheduler:
    def __init__(self, redis_url: str, queue_key: str):
        self.redis = Redis.from_url(redis_url)
        self.queue_key = queue_key
        self.seen_key = f"{queue_key}:seen"
    
    def push(self, task: Task) -> bool:
        """添加任务,自动去重"""
        fp = task.fingerprint
        if self.redis.sismember(self.seen_key, fp):
            return False
        
        self.redis.sadd(self.seen_key, fp)
        # 使用有序集合实现优先级队列
        self.redis.zadd(self.queue_key, {json.dumps(task.__dict__): task.priority})
        return True
    
    def pop(self) -> Optional[Task]:
        """获取最高优先级任务"""
        result = self.redis.zpopmax(self.queue_key)
        if not result:
            return None
        task_json, _ = result[0]
        return Task(**json.loads(task_json))
    
    def size(self) -> int:
        return self.redis.zcard(self.queue_key)

下载器 (Downloader)

import asyncio
import aiohttp
from typing import Optional
from dataclasses import dataclass

@dataclass
class Response:
    url: str
    status: int
    headers: dict
    body: bytes
    encoding: str = 'utf-8'
    
    @property
    def text(self) -> str:
        return self.body.decode(self.encoding, errors='replace')
    
    @property
    def json(self) -> dict:
        return json.loads(self.text)

class Downloader:
    def __init__(self, 
                 proxy_pool=None,
                 timeout: int = 30,
                 max_retries: int = 3,
                 concurrent_limit: int = 100):
        self.proxy_pool = proxy_pool
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.max_retries = max_retries
        self.semaphore = asyncio.Semaphore(concurrent_limit)
    
    async def fetch(self, task: Task) -> Optional[Response]:
        """执行下载,支持重试和代理"""
        async with self.semaphore:
            for attempt in range(self.max_retries):
                try:
                    proxy = self.proxy_pool.get() if self.proxy_pool else None
                    
                    async with aiohttp.ClientSession(timeout=self.timeout) as session:
                        async with session.request(
                            method=task.method,
                            url=task.url,
                            headers=task.headers,
                            json=task.body if task.method == 'POST' else None,
                            proxy=proxy
                        ) as resp:
                            body = await resp.read()
                            return Response(
                                url=str(resp.url),
                                status=resp.status,
                                headers=dict(resp.headers),
                                body=body
                            )
                except Exception as e:
                    if self.proxy_pool and proxy:
                        self.proxy_pool.report_bad(proxy)
                    if attempt == self.max_retries - 1:
                        raise
                    await asyncio.sleep(2 ** attempt)  # 指数退避
        return None

数据提取器 (Extractor)

from parsel import Selector
from typing import Any, Dict, List
import re

class Extractor:
    def __init__(self, config: dict):
        """
        config 示例:
        {
            "selector": ".item",
            "fields": {
                "title": "h2::text",
                "price": ".price::text | float",
                "link": "a::attr(href) | abs_url",
                "tags": ".tag::text | list"
            }
        }
        """
        self.config = config
    
    def extract(self, html: str, base_url: str = '') -> List[Dict[str, Any]]:
        sel = Selector(text=html)
        items = []
        
        container_selector = self.config.get('selector', 'body')
        for container in sel.css(container_selector):
            item = {}
            for field_name, field_selector in self.config['fields'].items():
                value = self._extract_field(container, field_selector, base_url)
                item[field_name] = value
            items.append(item)
        
        return items
    
    def _extract_field(self, sel: Selector, selector: str, base_url: str) -> Any:
        """解析字段选择器,支持管道操作"""
        parts = selector.split('|')
        css_selector = parts[0].strip()
        processors = [p.strip() for p in parts[1:]]
        
        # 提取原始值
        if '::text' in css_selector or '::attr' in css_selector:
            values = sel.css(css_selector).getall()
        else:
            values = sel.css(css_selector + '::text').getall()
        
        # 应用处理器
        result = values
        for proc in processors:
            result = self._apply_processor(result, proc, base_url)
        
        return result
    
    def _apply_processor(self, value, processor: str, base_url: str) -> Any:
        if processor == 'list':
            return value if isinstance(value, list) else [value]
        
        # 单值处理
        if isinstance(value, list):
            value = value[0] if value else None
        
        if value is None:
            return None
            
        if processor == 'float':
            nums = re.findall(r'[\d.]+', str(value))
            return float(nums[0]) if nums else None
        elif processor == 'int':
            nums = re.findall(r'\d+', str(value))
            return int(nums[0]) if nums else None
        elif processor == 'strip':
            return value.strip()
        elif processor == 'abs_url':
            from urllib.parse import urljoin
            return urljoin(base_url, value)
        
        return value

2.3 工作流引擎

将各组件串联成完整的工作流:

import asyncio
from typing import Callable, List

class CrawlerEngine:
    def __init__(self,
                 scheduler: Scheduler,
                 downloader: Downloader,
                 extractor: Extractor,
                 pipelines: List[Callable] = None,
                 workers: int = 10):
        self.scheduler = scheduler
        self.downloader = downloader
        self.extractor = extractor
        self.pipelines = pipelines or []
        self.workers = workers
        self.running = False
    
    async def run(self):
        """启动爬虫引擎"""
        self.running = True
        workers = [self._worker(i) for i in range(self.workers)]
        await asyncio.gather(*workers)
    
    async def _worker(self, worker_id: int):
        """工作协程"""
        while self.running:
            task = self.scheduler.pop()
            if not task:
                await asyncio.sleep(1)
                continue
            
            try:
                # 下载
                response = await self.downloader.fetch(task)
                if not response or response.status != 200:
                    continue
                
                # 提取
                items = self.extractor.extract(response.text, response.url)
                
                # 处理管道
                for item in items:
                    for pipeline in self.pipelines:
                        item = await pipeline(item)
                        if item is None:
                            break
                            
            except Exception as e:
                # 重试逻辑
                if task.retry_count < 3:
                    task.retry_count += 1
                    task.priority -= 1  # 降低优先级
                    self.scheduler.push(task)
    
    def stop(self):
        self.running = False

三、性能优化实践

3.1 并发模型选择

模型 适用场景 优缺点
多线程 I/O 密集,简单场景 实现简单,GIL 限制
多进程 CPU 密集,隔离性要求高 资源消耗大,进程间通信复杂
异步 IO 高并发网络请求 性能最佳,代码复杂度高
混合模式 大规模生产环境 多进程 + 协程,充分利用多核

推荐:多进程 + 协程

from multiprocessing import Process
import asyncio

def run_worker(worker_id: int, config: dict):
    """每个进程运行一个异步事件循环"""
    async def main():
        engine = CrawlerEngine(**config)
        await engine.run()
    
    asyncio.run(main())

def start_crawler(config: dict, process_count: int = 4):
    """启动多进程爬虫"""
    processes = []
    for i in range(process_count):
        p = Process(target=run_worker, args=(i, config))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()

3.2 连接池优化

import aiohttp

# 创建全局连接池
connector = aiohttp.TCPConnector(
    limit=100,           # 总连接数限制
    limit_per_host=10,   # 每个主机连接数限制
    ttl_dns_cache=300,   # DNS 缓存时间
    keepalive_timeout=30 # 连接保持时间
)

async def create_session():
    return aiohttp.ClientSession(connector=connector)

3.3 内存优化

# 1. 使用生成器处理大数据
def iter_items(file_path: str):
    with open(file_path) as f:
        for line in f:
            yield json.loads(line)

# 2. 及时释放响应体
async def fetch_and_process(url: str):
    async with session.get(url) as resp:
        # 流式处理
        async for chunk in resp.content.iter_chunks():
            process_chunk(chunk)
        # 退出 with 后自动释放

# 3. 批量写入减少 I/O
class BatchWriter:
    def __init__(self, collection, batch_size: int = 1000):
        self.collection = collection
        self.batch_size = batch_size
        self.buffer = []
    
    async def write(self, item: dict):
        self.buffer.append(item)
        if len(self.buffer) >= self.batch_size:
            await self.flush()
    
    async def flush(self):
        if self.buffer:
            await self.collection.insert_many(self.buffer)
            self.buffer = []

四、分布式架构

4.1 架构图

                    ┌─────────────┐
                    │   Master    │
                    │  (调度中心)  │
                    └──────┬──────┘

           ┌───────────────┼───────────────┐
           │               │               │
    ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
    │   Worker 1  │ │   Worker 2  │ │   Worker N  │
    │  (下载节点)  │ │  (下载节点)  │ │  (下载节点)  │
    └──────┬──────┘ └──────┬──────┘ └──────┬──────┘
           │               │               │
           └───────────────┼───────────────┘

                    ┌──────▼──────┐
                    │    Redis    │
                    │  (任务队列)  │
                    └──────┬──────┘

                    ┌──────▼──────┐
                    │   MongoDB   │
                    │  (数据存储)  │
                    └─────────────┘

4.2 任务分发策略

# 基于一致性哈希的任务分发
import hashlib

class ConsistentHash:
    def __init__(self, nodes: List[str], replicas: int = 100):
        self.replicas = replicas
        self.ring = {}
        self.sorted_keys = []
        
        for node in nodes:
            self.add_node(node)
    
    def add_node(self, node: str):
        for i in range(self.replicas):
            key = self._hash(f"{node}:{i}")
            self.ring[key] = node
            self.sorted_keys.append(key)
        self.sorted_keys.sort()
    
    def get_node(self, key: str) -> str:
        """根据 key 获取对应节点"""
        if not self.ring:
            return None
        
        h = self._hash(key)
        for ring_key in self.sorted_keys:
            if h <= ring_key:
                return self.ring[ring_key]
        return self.ring[self.sorted_keys[0]]
    
    def _hash(self, key: str) -> int:
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

4.3 监控与告警

from prometheus_client import Counter, Histogram, Gauge, start_http_server

# 定义指标
REQUESTS_TOTAL = Counter('crawler_requests_total', 'Total requests', ['status'])
REQUEST_LATENCY = Histogram('crawler_request_latency_seconds', 'Request latency')
QUEUE_SIZE = Gauge('crawler_queue_size', 'Queue size')
ACTIVE_WORKERS = Gauge('crawler_active_workers', 'Active workers')

# 在代码中使用
async def fetch_with_metrics(task: Task):
    with REQUEST_LATENCY.time():
        try:
            response = await downloader.fetch(task)
            REQUESTS_TOTAL.labels(status=response.status).inc()
            return response
        except Exception:
            REQUESTS_TOTAL.labels(status='error').inc()
            raise

# 启动 metrics 服务
start_http_server(8000)

五、配置化系统实现

5.1 配置 Schema

# crawler_config.yaml
version: "1.0"
name: "example_crawler"

# 数据源配置
source:
  type: web           # web | api | app
  method: GET
  url: "https://api.example.com/items"
  params:
    page: "{page}"
    size: 20
  headers:
    User-Agent: "Mozilla/5.0"
  pagination:
    type: page_number  # page_number | cursor | offset
    start: 1
    end: null          # null 表示自动检测
    step: 1

# 认证配置
auth:
  type: cookies        # cookies | token | oauth
  source: file
  path: "./cookies.json"

# 数据提取配置
extract:
  type: json           # json | html | xml
  root: "data.items"   # JSON Path
  fields:
    id: "$.id"
    title: "$.title"
    price: 
      path: "$.price"
      type: float
    tags:
      path: "$.tags[*]"
      type: list

# 数据处理配置
transform:
  - type: filter
    condition: "price > 0"
  - type: map
    field: title
    func: strip
  - type: dedupe
    keys: [id]

# 输出配置
output:
  type: mongodb
  uri: "mongodb://localhost:27017"
  database: "crawler"
  collection: "items"
  mode: upsert         # insert | upsert | replace
  upsert_keys: [id]

# 运行配置
runtime:
  workers: 10
  rate_limit: 10       # 每秒请求数
  retry_times: 3
  timeout: 30

5.2 配置解析与执行

import yaml
from typing import Any
from dataclasses import dataclass

@dataclass
class CrawlerConfig:
    name: str
    source: dict
    extract: dict
    output: dict
    runtime: dict
    auth: dict = None
    transform: list = None

def load_config(path: str) -> CrawlerConfig:
    with open(path) as f:
        data = yaml.safe_load(f)
    return CrawlerConfig(**data)

class ConfigurableCrawler:
    def __init__(self, config: CrawlerConfig):
        self.config = config
        self._init_components()
    
    def _init_components(self):
        """根据配置初始化各组件"""
        # 初始化数据源
        source_type = self.config.source['type']
        if source_type == 'web':
            self.source = WebSource(self.config.source)
        elif source_type == 'api':
            self.source = ApiSource(self.config.source)
        
        # 初始化提取器
        extract_type = self.config.extract['type']
        if extract_type == 'json':
            self.extractor = JsonExtractor(self.config.extract)
        elif extract_type == 'html':
            self.extractor = HtmlExtractor(self.config.extract)
        
        # 初始化输出
        output_type = self.config.output['type']
        if output_type == 'mongodb':
            self.output = MongoOutput(self.config.output)
        elif output_type == 'file':
            self.output = FileOutput(self.config.output)
    
    async def run(self):
        """执行爬取"""
        async for page_data in self.source.iter_pages():
            items = self.extractor.extract(page_data)
            
            # 应用转换
            if self.config.transform:
                items = self._apply_transforms(items)
            
            # 输出
            await self.output.write(items)

六、总结与展望

已实现

  1. 框架层面:模块化、可扩展的爬虫引擎
  2. 配置化:大部分常见场景可通过配置完成
  3. 分布式:支持多节点部署和任务调度
  4. 监控:完整的指标采集和告警

未来方向

  1. 智能化

    • 自动识别页面结构
    • 自动生成提取规则
    • 异常自动诊断和修复
  2. 可视化

    • 拖拽式规则配置
    • 实时数据预览
    • 任务状态大屏
  3. 生态化

    • 规则市场(分享和复用)
    • 插件系统
    • 多语言 SDK

附录:工具推荐

框架

名称 语言 特点
Scrapy Python 成熟稳定,生态丰富
Colly Go 高性能,适合大规模
Playwright 多语言 浏览器自动化,处理 JS 渲染
Crawlee TypeScript 现代化,内置反爬绕过

工具

名称 用途
mitmproxy 抓包代理
Splash JS 渲染服务
Scrapyd Scrapy 部署服务
Gerapy Scrapy 管理平台

服务

名称 用途
ScraperAPI 代理 + 渲染服务
Zyte (Crawlera) 智能代理
Diffbot 自动提取 API

Changelog

  • 2025-02-19 完成全文
  • 2018-12-15 初始化框架
Zoe

Written by

Zoe

AI Infra Engineer · LLM Serving · GPU/RDMA · 造工具的偏执狂

评论