欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 美景 > Python爬虫技术 第21节 多线程和异步编程

Python爬虫技术 第21节 多线程和异步编程

2024/11/30 14:35:45 来源:https://blog.csdn.net/hummhumm/article/details/140764904  浏览:    关键词:Python爬虫技术 第21节 多线程和异步编程

在 Python 中,多线程和异步编程是处理 I/O 密集型任务(比如网络请求)的两种常用方法。下面我将详细介绍这两种方法,并给出具体的代码示例。
在这里插入图片描述

多线程

多线程是一种允许程序在单个进程中执行多个线程的方法。每个线程可以独立执行不同的任务或处理不同的数据。在 Python 中,我们可以使用标准库 threading 来创建和管理线程。

多线程爬虫

假设我们要从多个网站抓取数据,我们可以创建多个线程来并行处理这些请求。

import threading
import requests
from queue import Queueclass Worker(threading.Thread):def __init__(self, work_queue):threading.Thread.__init__(self)self.work_queue = work_queueself.start()def run(self):while True:url = self.work_queue.get()if url is None:breaktry:response = requests.get(url, timeout=5)print(f"Thread {self.name} fetched URL: {url}")except Exception as e:print(f"Failed to fetch {url}: {e}")finally:self.work_queue.task_done()def main():urls = ["https://example.com"] * 100  # 假设我们需要抓取100个页面work_queue = Queue()# 创建并启动线程threads = []num_worker_threads = 5for i in range(num_worker_threads):t = Worker(work_queue)threads.append(t)# 将任务放入队列for url in urls:work_queue.put(url)# 等待所有任务完成work_queue.join()# 让线程退出for i in range(num_worker_threads):work_queue.put(None)for t in threads:t.join()if __name__ == "__main__":main()

异步编程

异步编程允许你在等待某些操作(如网络请求)完成的同时继续执行其他任务。Python 中最常用的异步编程库是 asyncio

异步爬虫

使用 asyncioaiohttp 库来实现异步爬虫。

import asyncio
import aiohttpasync def fetch(session, url):async with session.get(url) as response:return await response.text()async def main(urls):async with aiohttp.ClientSession() as session:tasks = [fetch(session, url) for url in urls]responses = await asyncio.gather(*tasks)for url, resp in zip(urls, responses):print(f"Fetched URL: {url}")urls = ["https://example.com"] * 100
loop = asyncio.get_event_loop()
loop.run_until_complete(main(urls))

分析与总结

  • 多线程 在 Python 中由于全局解释器锁(GIL)的存在,并不会显著提高 CPU 密集型任务的性能,但对于 I/O 密集型任务(如网络请求)仍然非常有用。
  • 异步编程 通过非阻塞的方式处理 I/O 密集型任务,可以在等待某些操作时继续执行其他任务,从而提高程序的整体性能。
  • 性能对比:对于网络请求这类 I/O 密集型任务,异步编程通常比多线程更高效,因为它避免了线程上下文切换带来的开销,并能更好地利用单个 CPU 核心。

接下来,我会为之前提供的多线程和异步爬虫示例添加一些额外的功能,包括错误处理、日志记录以及更详细的输出信息。

案例 1: 多线程爬虫的功能的增加

示例:
import threading
import requests
from queue import Queue
import logging# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')class Worker(threading.Thread):def __init__(self, work_queue):threading.Thread.__init__(self)self.work_queue = work_queueself.start()def run(self):while True:url = self.work_queue.get()if url is None:breaktry:response = requests.get(url, timeout=5)logging.info(f"Thread {self.name} fetched URL: {url}")except Exception as e:logging.error(f"Failed to fetch {url}: {e}")finally:self.work_queue.task_done()def main():urls = ["https://example.com"] * 100  # 假设我们需要抓取100个页面work_queue = Queue()# 创建并启动线程threads = []num_worker_threads = 5for i in range(num_worker_threads):t = Worker(work_queue)threads.append(t)# 将任务放入队列for url in urls:work_queue.put(url)# 等待所有任务完成work_queue.join()# 让线程退出for i in range(num_worker_threads):work_queue.put(None)for t in threads:t.join()if __name__ == "__main__":main()

案例 2: 异步爬虫功能的增加

示例:
import asyncio
import aiohttp
import logging# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')async def fetch(session, url):try:async with session.get(url) as response:text = await response.text()logging.info(f"Fetched URL: {url}")return textexcept Exception as e:logging.error(f"Failed to fetch {url}: {e}")async def main(urls):async with aiohttp.ClientSession() as session:tasks = [fetch(session, url) for url in urls]responses = await asyncio.gather(*tasks, return_exceptions=True)for url, resp in zip(urls, responses):if isinstance(resp, Exception):logging.error(f"Error fetching {url}: {resp}")else:logging.info(f"Fetched content for {url}")urls = ["https://example.com"] * 100
loop = asyncio.get_event_loop()
loop.run_until_complete(main(urls))

更新说明

  1. 日志记录:使用 logging 库记录爬虫的状态和错误信息。
  2. 错误处理:对于网络请求失败的情况,增加了异常捕获,并记录了错误信息。
  3. 输出改进:通过日志记录提供更详细的输出信息,包括请求成功或失败的消息。

接下来,我将对之前的多线程和异步爬虫示例进行进一步的优化,包括更详细的错误处理、增加重试机制、以及使用更现代的异步库如 httpx。这将使代码更加健壮并且易于维护。

案例 1: 多线程爬虫的优化

示例:
import threading
import requests
from queue import Queue
import logging
from time import sleep# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')class Worker(threading.Thread):def __init__(self, work_queue):threading.Thread.__init__(self)self.work_queue = work_queueself.start()def run(self):while True:url = self.work_queue.get()if url is None:breaktry:response = requests.get(url, timeout=5)logging.info(f"Thread {self.name} fetched URL: {url}")except requests.RequestException as e:logging.error(f"Failed to fetch {url}: {e}")self.work_queue.put(url)  # 将失败的URL重新放回队列finally:self.work_queue.task_done()def main():urls = ["https://example.com"] * 100  # 假设我们需要抓取100个页面work_queue = Queue()# 创建并启动线程threads = []num_worker_threads = 5for i in range(num_worker_threads):t = Worker(work_queue)threads.append(t)# 将任务放入队列for url in urls:work_queue.put(url)# 等待所有任务完成work_queue.join()# 让线程退出for i in range(num_worker_threads):work_queue.put(None)for t in threads:t.join()if __name__ == "__main__":main()

案例 2: 异步爬虫的优化

示例:
import asyncio
import httpx
import logging# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')async def fetch(session, url, max_retries=3):retries = 0while retries < max_retries:try:async with session.get(url) as response:text = await response.text()logging.info(f"Fetched URL: {url}")return textexcept Exception as e:logging.error(f"Failed to fetch {url}: {e}")retries += 1if retries < max_retries:logging.info(f"Retrying {url}... Attempt {retries}/{max_retries}")await asyncio.sleep(1)  # 等待一段时间后重试logging.error(f"Max retries reached for {url}.")return Noneasync def main(urls):async with httpx.AsyncClient() as session:tasks = [fetch(session, url) for url in urls]responses = await asyncio.gather(*tasks, return_exceptions=True)for url, resp in zip(urls, responses):if isinstance(resp, Exception):logging.error(f"Error fetching {url}: {resp}")elif resp is None:logging.error(f"Failed to fetch {url} after multiple retries.")else:logging.info(f"Fetched content for {url}")urls = ["https://example.com"] * 100
loop = asyncio.get_event_loop()
loop.run_until_complete(main(urls))

更新说明

  1. 错误处理:对于网络请求失败的情况,增加了异常捕获,并记录了错误信息。
  2. 重试机制:对于失败的请求,加入了重试机制,以提高爬虫的可靠性。
  3. 使用 httpx:使用了 httpx 替代 aiohttp,因为 httpx 提供了更多现代特性,如同步和异步 API 的统一。
  4. 日志记录:使用 logging 库记录爬虫的状态和错误信息,包括重试次数和最终失败的情况。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com