欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 文化 > 爬虫——将amazon集成到scrapy里面

爬虫——将amazon集成到scrapy里面

2025/4/2 17:21:47 来源:https://blog.csdn.net/qq_24680545/article/details/146649930  浏览:    关键词:爬虫——将amazon集成到scrapy里面

目录

  • mongodb转移
    • 改写原mongodb内容
    • 创建mongodb中间件
    • 编写process_spider_output
    • 编写scrapy中的Amazon类

这一篇章的任务是将我们前两个篇章的内容给转移到scrapy里面,相当于前面两章爬虫——playwright获取亚马逊数据、爬虫——将数据保存到MongoDB中在scrapy实现,整体上没多少变化。

mongodb转移

先前mongodb模块还可以继续使用,只不过做一些修改罢了,需求有这些:

  • 1.想要不同的平台使用不同的collection_name
  • 2.想要指定不同的key作为唯一标识。

改写原mongodb内容

class MongoDBManager:def __init__(self,mongo_uri,db_name,max_pool_size=10,**kwargs):"""# 删掉collection_name参数,:param mongo_uri: MongoDB 连接字符串:param db_name: 数据库名称(不存在时自动创建):param retry_attempts: 连接重试次数:param retry_delay: 重试间隔(秒)"""self.db_name = db_nameself.retry_attempts = kwargs.get('retry_attempts', 3)self.retry_delay = kwargs.get('retry_delay', 5)# 初始化连接self.client = MongoClient(mongo_uri,serverSelectionTimeoutMS=5000,server_api=ServerApi('1'),maxPoolSize=max_pool_size,**kwargs,)# 这里也删掉collection_nameself.db = self.client[self.db_name]self._connect()# ...# 修改_connect和_ensure_indexesdef _connect(self):for attempt in range(self.retry_attempts):try:# 检查/创建索引(幂等操作)self._ensure_indexes()print(f"成功连接 MongoDB | 数据库: {self.db_name}") # 删掉这个printreturn# ...def _ensure_indexes(self, collection_name='mydb'):# 添加collection_name参数"""创建必要索引(幂等操作)"""try:collection = self.db[collection_name] #增加这一步# 创建唯一索引(如果已存在会自动跳过)collection.create_index([("ASIN", 1)],unique=True,name="asin_unique_idx",background=True  # 后台创建不影响服务)except errors.OperationFailure as e:if "already exists" not in str(e):print(f"创建索引失败: {str(e)}")raise# 同样的修改其他增删改查的内容def insert_data(self, data, collection_name='mydb'):#...def upsert_batch(self, records, key_field="ASIN", collection_name='mydb', batch_size=50000, max_retries=3):# ...def delete_data(self, query, collection_name='mydb'):# ...def update_data(self, query, update_data, upsert=False, collection_name='mydb'):# ...def find_data(self, query=None, projection=None, limit=0, collection_name='mydb'):# ...def find_one_data(self, query=None, projection=None, collection_name='mydb'):# ...

创建mongodb中间件

​在Scrapy框架中,中间件是位于引擎与其他组件(如下载器、爬虫)之间的钩子框架,允许开发者在请求和响应的处理过程中插入自定义的处理逻辑。​
Scrapy提供了两种主要的中间件,Downloader MiddlewareSpider Middleware,不过中间件的内容基本上都差不多,能用就行,搞这个数据库中间件的目的,是为了将爬虫项目的parse返回的items进行处理,根据scrapy的设计框架,这个items最终会被中间件的process_spider_output接收,所以中间件里面写这个方法就可以实现mongodb的功能了。

import time
from scrapy import signals
from itemadapter import is_item, ItemAdapter
from firstpc.utils.database import MongoDBManager
from twisted.internet import reactor, threads
from twisted.internet.defer import DeferredLock
from twisted.internet import defer
from scrapy.exceptions import DropItem
from collections import deque
from scrapy import crawler
import loggingclass MongoDBMiddleware:"""MongoDB操作中间件(同时支持请求前处理和结果保存)"""def __init__(self,crawler, mongo_uri, db_name, collection_name, batch_size=100, flush_interval=5):self.manager = MongoDBManager(mongo_uri=mongo_uri,db_name=db_name,)self.flush_interval = flush_intervalself.last_flush_time = time.time()self.batch_size = batch_sizeself.collection_name = collection_name# 初始化数据结构self.batch_queue = deque()self.lock = DeferredLock()  # 使用Twisted的线程安全锁self.is_flushing = Falseself.crawler = crawlercrawler.spider.logger.info("MongoDB中间件已初始化!")  # 添加初始化日志@classmethoddef from_crawler(cls, crawler):# 从配置读取参数settings = crawler.settingsinstance = cls(crawler,mongo_uri=settings.get('MONGODB_URI'),db_name=settings.get('MONGODB_DATABASE'),collection_name=settings.get('MONGODB_COLLECTION'),batch_size = settings.getint('MONGODB_BATCH_SIZE', 100),flush_interval = settings.getint('MONGODB_FLUSH_INTERVAL', 5))# 注册信号处理器crawler.signals.connect(instance.spider_opened, signal=signals.spider_opened)crawler.signals.connect(instance.spider_closed, signal=signals.spider_closed)return instancedef spider_opened(self, spider):"""启动定时刷新任务"""self._schedule_flush()def spider_closed(self, spider):"""爬虫关闭时清理资源"""self.manager.client.close()spider.logger.info("MongoDB连接已关闭")

【细节讲解】

  • from_crawler:其是Scrapy中的一个约定,用于通过传入的crawler对象来创建中间件(或者其他扩展)的实例。
  • @classmethod:表明 from_crawler 方法是一个类方法,它绑定在类上而不是实例上。
  • crawler.signals.connect:注册两个信号处理器,分别监听spider_openedspider_closed这两个信号,如果接受到了信号,就执行实例的spider_opened方法或spider_closed方法。
  • Twisted:一个基于事件驱动的网络编程框架,在Scrapy中,Twisted被用作核心的异步网络引擎,负责管理大量并发的网络请求和响应,使得爬虫可以高效地抓取数据。

编写process_spider_output

class MongoDBMiddleware:# ...def _schedule_flush(self):"""定时刷新机制"""reactor.callLater(self.flush_interval, self._periodic_flush)def _periodic_flush(self):"""周期性强制刷新"""if self.batch_queue:self._flush_batch()self._schedule_flush()def process_spider_output(self, response, result, spider):"""处理输出并批量缓存"""item_count = 0for item in result:if is_item(item):collection_name = response.meta.get('collection_name',getattr(spider, 'mongodb_collection', self.collection_name))skip_save = response.request.meta.get('skip_save', False)if skip_save:yield itemcontinueprocessed_item = self._preprocess_item(item)self._add_to_batch(processed_item, spider, collection_name)item_count += 1yield itemspider.logger.info(f"本批次处理 {item_count} 个Item")# 触发条件检查self._check_flush_conditions()def _add_to_batch(self, item, spider, collection_name):"""线程安全添加数据到队列"""def _add(lock):try:data = ItemAdapter(item).asdict()self.batch_queue.append((data, collection_name))spider.logger.debug(f"Added item, queue size: {len(self.batch_queue)}")finally:lock.release()return Noned = self.lock.acquire()d.addCallback(_add)return ddef _check_flush_conditions(self):"""检查是否满足刷新条件"""now = time.time()if (len(self.batch_queue) >= self.batch_size or(now - self.last_flush_time) >= self.flush_interval):self._flush_batch()def _flush_batch(self):"""执行异步批量写入"""def _get_batch(lock):try:batch = list(self.batch_queue)self.batch_queue.clear()grouped = {}for data, _collection_name in batch:grouped.setdefault(_collection_name, []).append(data)return groupedfinally:lock.release()def _save_grouped(grouped):# 创建并行保存任务链deferreds = []for col_name, batch in grouped.items():d = threads.deferToThread(self._save_batch,batch=batch,collection_name=col_name)deferreds.append(d)return defer.DeferredList(deferreds)if self.is_flushing or not self.batch_queue:return defer.succeed(None)self.is_flushing = Trued = self.lock.acquire()d.addCallback(_get_batch)d.addCallback(_save_grouped)  # 新增分组保存逻辑d.addCallbacks(self._on_save_success,self._on_save_error,callbackArgs=('batch',),errbackArgs=('batch',))d.addBoth(self._reset_flush_state)return ddef _save_batch(self, batch, collection_name):"""实际保存方法(在子线程执行)"""return self.manager.upsert_batch(batch,collection_name=collection_name,key_field='ASIN',batch_size=len(batch))def _on_save_success(self, result, batch):"""写入成功回调"""success, fail = resultmsg = f"批量写入成功 | 总数: {len(batch)} | 成功: {success} | 失败: {fail}"self._log_result(msg, level=logging.INFO)def _on_save_error(self, failure, batch):"""写入错误处理"""err_msg = f"批量写入失败: {failure.getTraceback()}"self._log_result(err_msg, level=logging.ERROR)# 将失败数据重新放回队列self.batch_queue.extendleft(batch[::-1])  # 保持原始顺序def _reset_flush_state(self, _=None):"""重置刷新状态"""self.is_flushing = Falseself.last_flush_time = time.time()def _log_result(self, message, level=logging.INFO):"""统一的日志记录方法"""if self.crawler and self.crawler.spider:# 使用当前spider的loggerself.crawler.spider.logger.log(level, message)else:# 后备日志记录logging.log(level, f"[MongoDBMiddleware] {message}")

【细节讲解】

  • _schedule_flush:利用Twisted框架的reactor.callLater方法,在指定的时间间隔flush_interval后调用_periodic_flush方法,实现定时触发批量写入操作。之前定义的flush_interval时间是5秒,所以这个会每5秒执行一次。
  • _periodic_flush:检查batch_queue是否有待处理的项目,如果存在,则调用_flush_batch执行批量写入操作。随后重新安排下一次的定时刷新,形成循环。
  • process_spider_output:作为Scrapy的钩子方法,处理爬虫生成的结果,在该方法中,对每个Item进行预处理,并添加到批量队列batch_queue中。处理完成后,检查是否满足刷新条件,决定是否执行批量写入操作。
  • _add_to_batch:添加数据到批量队列,使用TwistedDeferredLock实现线程安全地将数据添加到batch_queue队列中,确保在多线程环境下的数据一致性。
  • _check_flush_conditions:检查刷新条件,判断当前批量队列的大小是否达到设定的batch_size,或距离上次刷新时间是否超过 flush_interval,以决定是否触发批量写入操作。
  • _flush_batch:负责执行异步的批量写入操作。首先获取批量队列中的数据,并按集合名称进行分组。然后利用deferToThread 将写入操作委托给线程池执行,避免阻塞主线程。在TwistedDeferred回调链中,参数的传递遵循“链式自动传递”机制:
    • d = self.lock.acquire():获取异步锁,
    • d.addCallback(_get_batch):传递锁对象给_get_batch,并接收return,作为下一个函数的参数
    • d.addCallback(_save_grouped) 将_get_batch的返回值,传递给_save_grouped
    • addCallbacks:是Deferred链自动传递的双路特殊处理方式,当前面两个成功回调时则调用第一个self._on_save_success函数,如果失败则调用第二个self._on_save_error函数。看到这两个函数都有额外的参数,所以需要额外传递参数进去。

编写scrapy中的Amazon类

from urllib.parse import urljoin
import scrapy
from scrapy import Spider, signals
from firstpc.items import AmazonItem
class AmazonSpider(Spider):name = "amazon"allowed_domains = ["www.amazon.com"]# 设置起始URLtarget_url = "https://www.amazon.com"start_urls = ["https://www.amazon.com/s?k=键盘&__mk_zh_CN=亚马逊网站"]def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self.max_page = Noneself.mongodb_collection = 'products' #就用这个字段名称,来命名collection_name,就是上面的@classmethoddef from_crawler(cls, crawler, *args, **kwargs):spider = super().from_crawler(crawler, *args, **kwargs)# 注册关闭信号crawler.signals.connect(spider.spider_closed, signal=signals.spider_closed)return spiderdef spider_closed(self, spider):print(f"\n[完成] 爬虫 {spider.name} 已关闭")stats = spider.crawler.stats.get_stats()print(f"总请求数: {stats.get('downloader/request_count', 0)}")print(f"抓取商品数: {stats.get('item_scraped_count', 0)}")def parse(self, response):products = response.css('div[data-component-type="s-search-result"]')if self.max_page is None:self.max_page = response.css('span.s-pagination-item.s-pagination-disabled:not(.s-pagination-previous)::text').get()current_page = response.css('span.s-pagination-item.s-pagination-selected::text').get()for product in products:item = AmazonItem()# 使用CSS选择器提取数据item['ASIN'] = product.attrib.get('data-asin') or Nonetext = product.css('h2.a-size-medium span::text, h2.a-size-base-plus span::text').get()item['Product'] = item['Product'] = text.replace('\n', ' ').replace('  ', ' ').strip() if text else Noneitem['Price'] = self._extract_price(product) or Noneitem['ImageURL'] = product.css('img.s-image::attr(src)').get() or Noneitem['URL'] = urljoin(self.target_url, product.css('a[class*="a-link-normal"][class*="s-line-clamp-"]::attr(href)').get()) if product.css('a[class*="a-link-normal"][class*="s-line-clamp-"]::attr(href)').get() else Noneitem['UUID'] = product.attrib.get('data-uuid', '') or Noneif all(item.values()):  # 当所有值均为非 None 时返回 Trueyield itemnext_page = response.css('a.s-pagination-next::attr(href)').get()if next_page:next_page_url = urljoin(response.url, next_page)yield scrapy.Request(next_page_url,callback=self.parse,meta={**response.request.meta,'current_page': current_page,  # 传递当前页'max_page': self.max_page,  # 传递最大页'skip_save': False,  # 跳过保存'page': response.meta.get('page', 1) + 1,"collection_name": self.mongodb_collection})def _extract_price(self, product):"""价格提取逻辑"""price_whole = product.css('span.a-price-whole::text').get()price_fraction = product.css('span.a-price-fraction::text').get()if price_whole:return f"${price_whole}.{price_fraction if price_fraction else ''}"# 备用提取逻辑:尝试从其他位置提取价格secondary_price = product.css('div.a-row.a-size-base.a-color-secondary span.a-color-base::text').get()return secondary_price.replace('US', '') if secondary_price else "暂无价格"

【细节讲解】

  • start_urls:是启动时的初始URL列表,Scrapy将从这些地址开始爬取数据。后续的请求将基于这些起始页面提取的数据生成。
  • div[data-component-type="s-search-result"]:选择所有具有属性data-component-type且其值为s-search-result<div>元素。
  • span.s-pagination-item.s-pagination-disabled:not(.s-pagination-previous)::text:选择所有同时具有类s-pagination-items-pagination-disabled,但不具有类s-pagination-previous<span>元素的文本内容。
  • h2.a-size-medium span::text, h2.a-size-base-plus span::text
  • img.s-image::attr(src):选择具有类s-image<img>元素的src属性值。
  • a[class*="a-link-normal"][class*="s-line-clamp-"]::attr(href):选择类属性中同时包含a-link-normals-line-clamp-<a>元素的href属性值。使用*=的原因是,元素的class属性可能包含多个类名,且类名的顺序和组合可能不同。​通过匹配包含特定子字符串的方式,可以更灵活地选取目标元素。

就酱~
在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词