当前位置: 首页 > news >正文

分布式爬虫架构设计

随着互联网数据的爆炸式增长,单机爬虫已经难以满足大规模数据采集的需求。分布式爬虫应运而生,它通过多节点协作,实现了数据采集的高效性和容错性。本文将深入探讨分布式爬虫的架构设计,包括常见的架构模式、关键技术组件、完整项目示例以及面临的挑战与优化方向。

一、常见架构模式

1. 主从架构(Master - Slave)

架构组成
  • 主节点(Master) :作为整个爬虫系统的控制中心,负责全局调度工作。

  • 从节点(Slave) :接受主节点分配的任务,执行具体的网页爬取和数据解析操作。

工作原理
  • 任务调度 :主节点维护一个待爬取 URL 队列,按照预设策略(如轮询、权重分配等)将任务分配给从节点。

  • 负载均衡 :主节点实时监控从节点的负载情况(如 CPU 使用率、内存占用、网络带宽等),根据监控数据动态调整任务分配策略,确保各从节点的负载相对均衡。

  • 容错管理 :主节点定期检测从节点的运行状态,一旦发现节点故障(如宕机、网络中断等),会将该节点未完成的任务重新分配给其他正常节点,保障任务的持续执行。

通信方式
  • HTTP/RPC 调用 :从节点通过调用主节点提供的 HTTP API 或 RPC 接口获取任务。

  • 消息队列 :主节点利用 Redis、Kafka 等消息队列中间件推送任务,从节点订阅相应的队列接收任务。

代码示例(Python + Redis)
  • 主节点:任务分发

# _*_ coding: utf - 8 _*_
import redis
import time
import threadingclass MasterNode:def __init__(self):self.r = redis.Redis(host='master', port=6379, decode_responses=True)self.task_timeout = 20  # 任务超时时间(秒)def task_distribute(self):# 清空之前的任务队列和任务状态记录self.r.delete('task_queue')self.r.delete('task_status')# 初始化待爬取的 URL 列表urls = ['https://example.com/page1', 'https://example.com/page2', 'https://example.com/page3']# 将任务推入 Redis 队列for url in urls:task_id = self.r.incr('task_id')  # 生成任务 IDtask_info = {'task_id': task_id, 'url': url, 'status': 'waiting', 'assign_time': time.time()}self.r.hset('task_status', task_id, str(task_info))self.r.lpush('task_queue', task_id)print("任务分发完成,共有", len(urls), "个任务")def monitor_tasks(self):while True:# 获取所有任务状态task_statuses = self.r.hgetall('task_status')current_time = time.time()for task_id, task_info in task_statuses.items():task_info_dict = eval(task_info)if task_info_dict['status'] == 'processing':# 检查任务是否超时if current_time - task_info_dict['assign_time'] > self.task_timeout:print(f"任务 {task_id} 超时,重新分配")# 重新分配任务task_info_dict['status'] = 'waiting'self.r.hset('task_status', task_id, str(task_info_dict))self.r.rpush('task_queue', task_id)# 适当间隔检测time.sleep(5)if __name__ == "__main__":master = MasterNode()master.task_distribute()# 启动任务监控线程monitor_thread = threading.Thread(target=master.monitor_tasks)monitor_thread.daemon = Truemonitor_thread.start()# 阻止主线程退出monitor_thread.join()
  • 从节点:任务执行
# _*_ coding: utf - 8 _*_
import redis
import requests
from bs4 import BeautifulSoup
import time
import threadingclass SlaveNode:def __init__(self):self.r = redis.Redis(host='master', port=6379, decode_responses=True)def task_execute(self):while True:# 从 Redis 队列中获取任务 IDtask_id = self.r.brpop('task_queue', 10)if task_id:task_id = task_id[1]# 更新任务状态为处理中task_info = eval(self.r.hget('task_status', task_id))task_info['status'] = 'processing'self.r.hset('task_status', task_id, str(task_info))print(f"从节点获取到任务 {task_id}:", task_info['url'])try:response = requests.get(task_info['url'], timeout=5)if response.status_code == 200:# 解析网页数据soup = BeautifulSoup(response.text, 'html.parser')title = soup.find('title').get_text() if soup.find('title') else '无标题'links = [link.get('href') for link in soup.find_all('a') if link.get('href')]# 更新任务状态为完成task_info['status'] = 'completed'task_info['result'] = {'url': task_info['url'], 'title': title, 'links': links}self.r.hset('task_status', task_id, str(task_info))print(f"任务 {task_id} 处理完成,结果已存储")else:# 更新任务状态为失败task_info['status'] = 'failed'task_info['error'] = f"请求失败,状态码:{response.status_code}"self.r.hset('task_status', task_id, str(task_info))print(f"任务 {task_id} 请求失败,状态码:{response.status_code}")except Exception as e:# 更新任务状态为失败task_info['status'] = 'failed'task_info['error'] = f"请求或解析过程中出现错误:{str(e)}"self.r.hset('task_status', task_id, str(task_info))print(f"任务 {task_id} 处理过程中出现错误:", str(e))else:print("任务队列为空,等待新任务...")# 为避免频繁轮询,设置一个短暂的休眠时间time.sleep(1)if __name__ == "__main__":slave = SlaveNode()slave.task_execute()

 二、对等架构(Peer - to - Peer)

架构特点
  • 节点平等 :所有节点地位平等,不存在中心化的控制节点,每个节点既是任务的执行者,也是任务的协调者。

  • 自主协调 :节点通过分布式算法自主协调任务,实现任务的自分配和数据去重。

工作原理
  • 任务自分配 :每个节点根据一定的规则(如 URL 哈希值)自主决定要爬取的任务。例如,采用一致性哈希算法将 URL 映射到特定的节点上。

  • 数据去重 :利用布隆过滤器或分布式哈希表(DHT)等技术避免重复爬取,确保每个 URL 只被一个节点处理。

通信方式
  • 哈希算法 :一致性哈希算法将 URL 映射到一个哈希环上,根据哈希值确定对应的节点。

  • P2P 协议 :节点之间通过 P2P 协议直接通信,传递任务信息、数据以及节点状态等。

代码示例(Node.js + RabbitMQ)
  • 任务分配与发送

// _*_ coding: utf - 8 _*_
const amqplib = require('amqplib');
const crypto = require('crypto');async function sendTask(url) {try {// 连接 RabbitMQ 服务器const conn = await amqplib.connect('amqp://localhost');const ch = await conn.createChannel();// 定义任务队列const queueName = 'task_queue';await ch.assertQueue(queueName, { durable: true });// 计算 URL 哈希值并确定节点const hash = crypto.createHash('md5').update(url).digest('hex');const nodeId = hash % 3; // 假设有 3 个节点console.log(`URL ${url} 分配给节点 node_${nodeId}`);// 将任务发送到对应节点的队列const nodeQueue = `node_${nodeId}`;await ch.assertQueue(nodeQueue, { durable: true });ch.sendToQueue(nodeQueue, Buffer.from(url), { persistent: true });await ch.close();await conn.close();} catch (err) {console.error('发送任务出错:', err);}
}// 测试发送多个任务
const urls = ['https://example.com/page1', 'https://example.com/page2', 'https://example.com/page3', 'https://example.com/page4'];
urls.forEach(url => {sendTask(url);
});
  • 节点接收与处理任务
// _*_ coding: utf - 8 _*_
const amqplib = require('amqplib');
const axios = require('axios');async function receiveTask(nodeId) {try {// 连接 RabbitMQ 服务器const conn = await amqplib.connect('amqp://localhost');const ch = await conn.createChannel();// 定义节点对应的队列const queueName = `node_${nodeId}`;await ch.assertQueue(queueName, { durable: true });console.log(`节点 node_${nodeId} 开始接收任务...`);// 从队列中接收任务并处理ch.consume(queueName, async (msg) => {if (msg !== null) {const url = msg.content.toString();console.log(`节点 node_${nodeId} 获取到任务:${url}`);try {// 发送 HTTP 请求获取网页内容const response = await axios.get(url, { timeout: 5000 });if (response.status === 200) {// 解析网页数据(此处仅为简单示例,实际解析逻辑可根据需求定制)const data = {url: url,status: response.status,contentLength: response.headers['content-length']};console.log(`节点 node_${nodeId} 处理任务完成:`, data);// 这里可以将结果存储到分布式数据库等} else {console.log(`请求失败,状态码:${response.status}`);}} catch (error) {console.error(`请求或解析过程中出错:${error.message}`);} finally {// 确认任务已处理,可以从队列中移除ch.ack(msg);}}}, { noAck: false });} catch (err) {console.error('接收任务出错:', err);}
}// 启动多个节点接收任务(实际应用中每个节点运行独立的进程)
receiveTask(0);
receiveTask(1);
receiveTask(2);

三、关键技术组件

(1)任务调度与队列

  1. Redis 队列 :提供简单列表结构,实现任务缓冲与分发。主节点用 LPUSH 推任务,从节点用 BRPOP 获取任务,适用于小规模爬虫。

  2. Kafka/RabbitMQ :适合大规模场景,支持高吞吐量任务流。Kafka 的分布式架构可将任务分配到多分区,实现并行消费,提升处理效率。

(2)数据存储

  1. 分布式数据库 :MongoDB 分片功能实现数据水平扩展,按业务需求设计分片策略,提升存储容量与读写速度。

  2. 分布式文件系统 :HDFS 存储大规模非结构化数据,采用冗余存储机制,保障数据安全可靠,便于后续解析处理。

(3)负载均衡策略

  1. 轮询调度 :主节点按固定顺序分配任务,实现简单但不适用节点性能差异大场景。

  2. 动态权重 :主节点依从节点性能动态调任务分配权重,充分利用资源,但需准确获取性能信息且算法复杂。

四、完整项目示例(Scrapy - Redis)

(1)settings.py

# _*_ coding: utf - 8 _*_
# Scrapy settings for scrapy_redis_example project
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
REDIS_URL = 'redis://master:6379/0'
SCHEDULER_PERSIST = True
DOWNLOAD_DELAY = 1
RANDOMIZE_DOWNLOAD_DELAY = True
CONCURRENT_REQUESTS = 16
CONCURRENT_REQUESTS_PER_DOMAIN = 8

(2)items.py

# _*_ coding: utf - 8 _*_
import scrapyclass ScrapyRedisExampleItem(scrapy.Item):title = scrapy.Field()url = scrapy.Field()content = scrapy.Field()

(3)spiders/distributed_spider.py

# _*_ coding: utf - 8 _*_
import scrapy
from scrapy_redis.spiders import RedisSpider
from scrapy_redis_example.items import ScrapyRedisExampleItemclass DistributedSpider(RedisSpider):name = 'distributed_spider'redis_key = 'distributed_spider:start_urls'def __init__(self, *args, **kwargs):super(DistributedSpider, self).__init__(*args, **kwargs)def parse(self, response):# 解析网页数据item = ScrapyRedisExampleItem()item['title'] = response.css('h1::text').get()item['url'] = response.urlitem['content'] = response.css('div.content::text').get()yield item# 提取新的 URL 并生成请求new_urls = response.css('a::attr(href)').getall()for url in new_urls:# 过滤掉非绝对 URLif url.startswith('http'):yield scrapy.Request(url, callback=self.parse)

(4)启动爬虫

在主节点上启动 Redis 服务器,然后运行以下命令启动爬虫。在从节点上,只需安装相同的 Scrapy - Redis 项目,并连接到同一 Redis 服务器即可。

scrapy crawl distributed_spider

五、挑战与优化方向

(1)反爬对抗

  1. 动态 IP 代理池 :构建动态 IP 代理池,从节点用不同代理 IP 发请求,防被目标网站封禁,可参考 proxy_pool 开源项目。

  2. 请求频率伪装 :随机延迟请求发送时间,轮换 User - Agent,打乱请求模式,降低被识别风险。

(2)容错机制

  1. 任务超时重试 :设任务超时时间,从节点未按时完成,主节点重试或转交其他节点,借鉴 Celery retry 机制。

  2. 节点心跳检测 :用 ZooKeeper 等服务监控节点存活,节点定期发心跳信号,主节点监听判断故障,及时重分配任务。

http://www.xdnf.cn/news/651061.html

相关文章:

  • Python 实现简单OCR文本识别
  • HTML应用指南:利用GET请求获取全国捞王锅物料理门店位置信息
  • cannot access ‘/etc/mysql/debian.cnf‘: No such file or directory
  • 解决 iTerm2 中 nvm 不生效的问题(Mac 环境)
  • “安康杯”安全生产知识竞赛活动流程方案
  • 课上实验111111
  • 4、docker compose
  • 汽配快车道:助力汽车零部件行业的产业重构与数字化出海
  • 关于OT IIOT系统远程访问的零信任安全
  • 从“黑箱”到透明化:MES如何重构生产执行全流程?
  • NV211NV212美光科技颗粒NV219NV220
  • Python——day37早停策略和模型权重的保存
  • 第九届水动力学与能源电力系统国际学术会议(HEEPS 2025)
  • Linux笔记---分页存储管理
  • 从OTA双雄「共舞」,透视旅游持续繁荣背后的结构性跃迁
  • BERT分类器和朴素贝叶斯分类器比较
  • 大语言模型 提示词的少样本案例的 演示选择与排序新突破
  • Baklib内容中台效能跃升实践
  • 什么是3D全景视角?3D全景有什么魅力?
  • 大语言模型(LLM)入门项目推荐
  • Java设计模式之模板方法模式:从基础到高级的全面解析(最详解)
  • Docker基础 -- Ubuntu 22.04 AArch64 交叉编译 Docker 镜像构建指南
  • Linux Docker 安装oracle19c数据库教程
  • ceph 剔除 osd
  • Serverless成本优化实战:从资源浪费到精准管控的架构演进
  • RabbitMQ 应用 - SpringBoot
  • OpenSSL 与 C++ 搭建一个支持 TLS 1.3 的服务器
  • 图论:floyed算法
  • Go语言开发的GMQT物联网MQTT消息服务器(mqtt Broker)支持海量MQTT连接和快速低延时消息传输-提供源码可二次开发定制需求
  • 支持向量机(SVM)例题