高效爬虫的思考与实现
14 分钟阅读
我一直在思考到底什么样的工具或系统能够完全地解脱爬虫开发人员。 我认为爬虫开发工程师应该去解决系统层面的问题,去解决性能上的问题,而不是每天都在分析如何去抓取某个网站或某个APP。 原则上讲,具体的业务需求应该有数据的需求方自己来解决,自己在某一个系统上或工具上进行定制就可以了。
2016年我做了一个简单的尝试,将爬虫的流程剥离成一个配置文件; 2017年我尝试着去设计一套配置化抓取系统; 2018年全年我致力于所谓的配置化抓取工具的开发上。
本文将系统性地总结这些年在高效爬虫系统设计上的思考与实践。
一、爬虫系统的演进
1.1 脚本时代
最原始的爬虫就是一个脚本:
import requests
from bs4 import BeautifulSoup
url = "https://example.com/list"
resp = requests.get(url)
soup = BeautifulSoup(resp.text, 'html.parser')
for item in soup.select('.item'):
print(item.text)
问题:
- 无法并发,效率低
- 无异常处理,不稳定
- 代码重复,难维护
1.2 框架时代
Scrapy 等框架的出现解决了工程化问题:
class ExampleSpider(scrapy.Spider):
name = 'example'
start_urls = ['https://example.com']
def parse(self, response):
for item in response.css('.item'):
yield {'title': item.css('h2::text').get()}
解决了:
- 并发调度
- 异常重试
- 数据管道
但仍存在:
- 每个网站都要写代码
- 反爬应对需要定制
- 运维成本高
1.3 配置化时代
我的目标:让业务方通过配置就能完成数据抓取
# spider.yaml
name: example_spider
source:
type: web
url: "https://example.com/list?page={page}"
pagination:
start: 1
end: 100
extract:
selector: ".item"
fields:
title: "h2::text"
link: "a::attr(href)"
price: ".price::text | float"
output:
type: mongodb
collection: products
二、高效爬虫架构设计
2.1 分层架构
┌─────────────────────────────────────────────────────────────┐
│ 任务调度层 │
│ (任务分发、优先级队列、去重、限速) │
├─────────────────────────────────────────────────────────────┤
│ 下载执行层 │
│ (请求构造、代理管理、重试机制、浏览器渲染) │
├─────────────────────────────────────────────────────────────┤
│ 数据处理层 │
│ (解析提取、清洗转换、实体识别) │
├─────────────────────────────────────────────────────────────┤
│ 存储输出层 │
│ (数据库、消息队列、文件系统) │
└─────────────────────────────────────────────────────────────┘
2.2 核心组件
任务调度器 (Scheduler)
from redis import Redis
from dataclasses import dataclass
from typing import Optional
import json
import hashlib
@dataclass
class Task:
url: str
method: str = 'GET'
headers: dict = None
body: dict = None
meta: dict = None
priority: int = 0
retry_count: int n @property
def fingerprint(self) -> str:
"""任务指纹,用于去重"""
s = f"{self.method}:{self.url}:{json.dumps(self.body, sort_keys=True)}"
return hashlib.md5(s.encode()).hexdigest()
class Scheduler:
def __init__(self, redis_url: str, queue_key: str):
self.redis = Redis.from_url(redis_url)
self.queue_key = queue_key
self.seen_key = f"{queue_key}:seen"
def push(self, task: Task) -> bool:
"""添加任务,自动去重"""
fp = task.fingerprint
if self.redis.sismember(self.seen_key, fp):
return False
self.redis.sadd(self.seen_key, fp)
# 使用有序集合实现优先级队列
self.redis.zadd(self.queue_key, {json.dumps(task.__dict__): task.priority})
return True
def pop(self) -> Optional[Task]:
"""获取最高优先级任务"""
result = self.redis.zpopmax(self.queue_key)
if not result:
return None
task_json, _ = result[0]
return Task(**json.loads(task_json))
def size(self) -> int:
return self.redis.zcard(self.queue_key)
下载器 (Downloader)
import asyncio
import aiohttp
from typing import Optional
from dataclasses import dataclass
@dataclass
class Response:
url: str
status: int
headers: dict
body: bytes
encoding: str = 'utf-8'
@property
def text(self) -> str:
return self.body.decode(self.encoding, errors='replace')
@property
def json(self) -> dict:
return json.loads(self.text)
class Downloader:
def __init__(self,
proxy_pool=None,
timeout: int = 30,
max_retries: int = 3,
concurrent_limit: int = 100):
self.proxy_pool = proxy_pool
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.max_retries = max_retries
self.semaphore = asyncio.Semaphore(concurrent_limit)
async def fetch(self, task: Task) -> Optional[Response]:
"""执行下载,支持重试和代理"""
async with self.semaphore:
for attempt in range(self.max_retries):
try:
proxy = self.proxy_pool.get() if self.proxy_pool else None
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.request(
method=task.method,
url=task.url,
headers=task.headers,
json=task.body if task.method == 'POST' else None,
proxy=proxy
) as resp:
body = await resp.read()
return Response(
url=str(resp.url),
status=resp.status,
headers=dict(resp.headers),
body=body
)
except Exception as e:
if self.proxy_pool and proxy:
self.proxy_pool.report_bad(proxy)
if attempt == self.max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # 指数退避
return None
数据提取器 (Extractor)
from parsel import Selector
from typing import Any, Dict, List
import re
class Extractor:
def __init__(self, config: dict):
"""
config 示例:
{
"selector": ".item",
"fields": {
"title": "h2::text",
"price": ".price::text | float",
"link": "a::attr(href) | abs_url",
"tags": ".tag::text | list"
}
}
"""
self.config = config
def extract(self, html: str, base_url: str = '') -> List[Dict[str, Any]]:
sel = Selector(text=html)
items = []
container_selector = self.config.get('selector', 'body')
for container in sel.css(container_selector):
item = {}
for field_name, field_selector in self.config['fields'].items():
value = self._extract_field(container, field_selector, base_url)
item[field_name] = value
items.append(item)
return items
def _extract_field(self, sel: Selector, selector: str, base_url: str) -> Any:
"""解析字段选择器,支持管道操作"""
parts = selector.split('|')
css_selector = parts[0].strip()
processors = [p.strip() for p in parts[1:]]
# 提取原始值
if '::text' in css_selector or '::attr' in css_selector:
values = sel.css(css_selector).getall()
else:
values = sel.css(css_selector + '::text').getall()
# 应用处理器
result = values
for proc in processors:
result = self._apply_processor(result, proc, base_url)
return result
def _apply_processor(self, value, processor: str, base_url: str) -> Any:
if processor == 'list':
return value if isinstance(value, list) else [value]
# 单值处理
if isinstance(value, list):
value = value[0] if value else None
if value is None:
return None
if processor == 'float':
nums = re.findall(r'[\d.]+', str(value))
return float(nums[0]) if nums else None
elif processor == 'int':
nums = re.findall(r'\d+', str(value))
return int(nums[0]) if nums else None
elif processor == 'strip':
return value.strip()
elif processor == 'abs_url':
from urllib.parse import urljoin
return urljoin(base_url, value)
return value
2.3 工作流引擎
将各组件串联成完整的工作流:
import asyncio
from typing import Callable, List
class CrawlerEngine:
def __init__(self,
scheduler: Scheduler,
downloader: Downloader,
extractor: Extractor,
pipelines: List[Callable] = None,
workers: int = 10):
self.scheduler = scheduler
self.downloader = downloader
self.extractor = extractor
self.pipelines = pipelines or []
self.workers = workers
self.running = False
async def run(self):
"""启动爬虫引擎"""
self.running = True
workers = [self._worker(i) for i in range(self.workers)]
await asyncio.gather(*workers)
async def _worker(self, worker_id: int):
"""工作协程"""
while self.running:
task = self.scheduler.pop()
if not task:
await asyncio.sleep(1)
continue
try:
# 下载
response = await self.downloader.fetch(task)
if not response or response.status != 200:
continue
# 提取
items = self.extractor.extract(response.text, response.url)
# 处理管道
for item in items:
for pipeline in self.pipelines:
item = await pipeline(item)
if item is None:
break
except Exception as e:
# 重试逻辑
if task.retry_count < 3:
task.retry_count += 1
task.priority -= 1 # 降低优先级
self.scheduler.push(task)
def stop(self):
self.running = False
三、性能优化实践
3.1 并发模型选择
| 模型 | 适用场景 | 优缺点 |
|---|---|---|
| 多线程 | I/O 密集,简单场景 | 实现简单,GIL 限制 |
| 多进程 | CPU 密集,隔离性要求高 | 资源消耗大,进程间通信复杂 |
| 异步 IO | 高并发网络请求 | 性能最佳,代码复杂度高 |
| 混合模式 | 大规模生产环境 | 多进程 + 协程,充分利用多核 |
推荐:多进程 + 协程
from multiprocessing import Process
import asyncio
def run_worker(worker_id: int, config: dict):
"""每个进程运行一个异步事件循环"""
async def main():
engine = CrawlerEngine(**config)
await engine.run()
asyncio.run(main())
def start_crawler(config: dict, process_count: int = 4):
"""启动多进程爬虫"""
processes = []
for i in range(process_count):
p = Process(target=run_worker, args=(i, config))
p.start()
processes.append(p)
for p in processes:
p.join()
3.2 连接池优化
import aiohttp
# 创建全局连接池
connector = aiohttp.TCPConnector(
limit=100, # 总连接数限制
limit_per_host=10, # 每个主机连接数限制
ttl_dns_cache=300, # DNS 缓存时间
keepalive_timeout=30 # 连接保持时间
)
async def create_session():
return aiohttp.ClientSession(connector=connector)
3.3 内存优化
# 1. 使用生成器处理大数据
def iter_items(file_path: str):
with open(file_path) as f:
for line in f:
yield json.loads(line)
# 2. 及时释放响应体
async def fetch_and_process(url: str):
async with session.get(url) as resp:
# 流式处理
async for chunk in resp.content.iter_chunks():
process_chunk(chunk)
# 退出 with 后自动释放
# 3. 批量写入减少 I/O
class BatchWriter:
def __init__(self, collection, batch_size: int = 1000):
self.collection = collection
self.batch_size = batch_size
self.buffer = []
async def write(self, item: dict):
self.buffer.append(item)
if len(self.buffer) >= self.batch_size:
await self.flush()
async def flush(self):
if self.buffer:
await self.collection.insert_many(self.buffer)
self.buffer = []
四、分布式架构
4.1 架构图
┌─────────────┐
│ Master │
│ (调度中心) │
└──────┬──────┘
│
┌───────────────┼───────────────┐
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ Worker 1 │ │ Worker 2 │ │ Worker N │
│ (下载节点) │ │ (下载节点) │ │ (下载节点) │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
└───────────────┼───────────────┘
│
┌──────▼──────┐
│ Redis │
│ (任务队列) │
└──────┬──────┘
│
┌──────▼──────┐
│ MongoDB │
│ (数据存储) │
└─────────────┘
4.2 任务分发策略
# 基于一致性哈希的任务分发
import hashlib
class ConsistentHash:
def __init__(self, nodes: List[str], replicas: int = 100):
self.replicas = replicas
self.ring = {}
self.sorted_keys = []
for node in nodes:
self.add_node(node)
def add_node(self, node: str):
for i in range(self.replicas):
key = self._hash(f"{node}:{i}")
self.ring[key] = node
self.sorted_keys.append(key)
self.sorted_keys.sort()
def get_node(self, key: str) -> str:
"""根据 key 获取对应节点"""
if not self.ring:
return None
h = self._hash(key)
for ring_key in self.sorted_keys:
if h <= ring_key:
return self.ring[ring_key]
return self.ring[self.sorted_keys[0]]
def _hash(self, key: str) -> int:
return int(hashlib.md5(key.encode()).hexdigest(), 16)
4.3 监控与告警
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# 定义指标
REQUESTS_TOTAL = Counter('crawler_requests_total', 'Total requests', ['status'])
REQUEST_LATENCY = Histogram('crawler_request_latency_seconds', 'Request latency')
QUEUE_SIZE = Gauge('crawler_queue_size', 'Queue size')
ACTIVE_WORKERS = Gauge('crawler_active_workers', 'Active workers')
# 在代码中使用
async def fetch_with_metrics(task: Task):
with REQUEST_LATENCY.time():
try:
response = await downloader.fetch(task)
REQUESTS_TOTAL.labels(status=response.status).inc()
return response
except Exception:
REQUESTS_TOTAL.labels(status='error').inc()
raise
# 启动 metrics 服务
start_http_server(8000)
五、配置化系统实现
5.1 配置 Schema
# crawler_config.yaml
version: "1.0"
name: "example_crawler"
# 数据源配置
source:
type: web # web | api | app
method: GET
url: "https://api.example.com/items"
params:
page: "{page}"
size: 20
headers:
User-Agent: "Mozilla/5.0"
pagination:
type: page_number # page_number | cursor | offset
start: 1
end: null # null 表示自动检测
step: 1
# 认证配置
auth:
type: cookies # cookies | token | oauth
source: file
path: "./cookies.json"
# 数据提取配置
extract:
type: json # json | html | xml
root: "data.items" # JSON Path
fields:
id: "$.id"
title: "$.title"
price:
path: "$.price"
type: float
tags:
path: "$.tags[*]"
type: list
# 数据处理配置
transform:
- type: filter
condition: "price > 0"
- type: map
field: title
func: strip
- type: dedupe
keys: [id]
# 输出配置
output:
type: mongodb
uri: "mongodb://localhost:27017"
database: "crawler"
collection: "items"
mode: upsert # insert | upsert | replace
upsert_keys: [id]
# 运行配置
runtime:
workers: 10
rate_limit: 10 # 每秒请求数
retry_times: 3
timeout: 30
5.2 配置解析与执行
import yaml
from typing import Any
from dataclasses import dataclass
@dataclass
class CrawlerConfig:
name: str
source: dict
extract: dict
output: dict
runtime: dict
auth: dict = None
transform: list = None
def load_config(path: str) -> CrawlerConfig:
with open(path) as f:
data = yaml.safe_load(f)
return CrawlerConfig(**data)
class ConfigurableCrawler:
def __init__(self, config: CrawlerConfig):
self.config = config
self._init_components()
def _init_components(self):
"""根据配置初始化各组件"""
# 初始化数据源
source_type = self.config.source['type']
if source_type == 'web':
self.source = WebSource(self.config.source)
elif source_type == 'api':
self.source = ApiSource(self.config.source)
# 初始化提取器
extract_type = self.config.extract['type']
if extract_type == 'json':
self.extractor = JsonExtractor(self.config.extract)
elif extract_type == 'html':
self.extractor = HtmlExtractor(self.config.extract)
# 初始化输出
output_type = self.config.output['type']
if output_type == 'mongodb':
self.output = MongoOutput(self.config.output)
elif output_type == 'file':
self.output = FileOutput(self.config.output)
async def run(self):
"""执行爬取"""
async for page_data in self.source.iter_pages():
items = self.extractor.extract(page_data)
# 应用转换
if self.config.transform:
items = self._apply_transforms(items)
# 输出
await self.output.write(items)
六、总结与展望
已实现
- 框架层面:模块化、可扩展的爬虫引擎
- 配置化:大部分常见场景可通过配置完成
- 分布式:支持多节点部署和任务调度
- 监控:完整的指标采集和告警
未来方向
-
智能化
- 自动识别页面结构
- 自动生成提取规则
- 异常自动诊断和修复
-
可视化
- 拖拽式规则配置
- 实时数据预览
- 任务状态大屏
-
生态化
- 规则市场(分享和复用)
- 插件系统
- 多语言 SDK
附录:工具推荐
框架
| 名称 | 语言 | 特点 |
|---|---|---|
| Scrapy | Python | 成熟稳定,生态丰富 |
| Colly | Go | 高性能,适合大规模 |
| Playwright | 多语言 | 浏览器自动化,处理 JS 渲染 |
| Crawlee | TypeScript | 现代化,内置反爬绕过 |
工具
| 名称 | 用途 |
|---|---|
| mitmproxy | 抓包代理 |
| Splash | JS 渲染服务 |
| Scrapyd | Scrapy 部署服务 |
| Gerapy | Scrapy 管理平台 |
服务
| 名称 | 用途 |
|---|---|
| ScraperAPI | 代理 + 渲染服务 |
| Zyte (Crawlera) | 智能代理 |
| Diffbot | 自动提取 API |
Changelog
2025-02-19完成全文2018-12-15初始化框架

Written by
Zoe
AI Infra Engineer · LLM Serving · GPU/RDMA · 造工具的偏执狂