欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 八卦 > 爬虫-打包整个小说网站

爬虫-打包整个小说网站

2024/10/24 9:28:38 来源:https://blog.csdn.net/iku_n/article/details/139509931  浏览:    关键词:爬虫-打包整个小说网站

使用Python爬取小说章节并存储到MongoDB数据库

1. 背景介绍

在本篇文章中,我们将展示如何使用Python爬取小说网站的章节内容并将其存储到MongoDB数据库中。我们会使用requests进行网页请求,lxml处理HTML,re进行正则表达式匹配,threading实现并发处理,以及pymongo连接MongoDB。

真实情况是写课设没数据,获取取一点

2. 环境和依赖

请确保你的环境已经安装了以下依赖包:

pip install requests lxml pymongo

3. 代码详解

下面是完整的代码实现。我们将按照功能模块进行解释。

3.1 引入必要的库

首先,我们需要引入所需的库:

import requests
from lxml import etree
import re
import threading
import time
from pymongo import MongoClient
3.2 设置请求头

为了防止被网站封禁,我们设置了请求头:

headers = {'User-Agent': '写你自己的'
}
3.3 初始化全局变量和信号量

为了控制并发和存储章节内容,我们设置了全局变量和信号量:

zhangjie_content = []  # 存储章节内容
semaphore = threading.Semaphore(20)  # 限制并发数量为20
3.4 MongoDB连接设置

连接到MongoDB数据库

client = MongoClient('mongodb://localhost:27017/')
db = client.novel_database
collection = db.novels
3.5 插入数据到MongoDB

定义一个辅助函数,将数据插入到MongoDB:

def insert_to_mongodb(title, novel_type, author, update_time, chapters):data = {"title": title,"novel_type": novel_type,"author": author,"update_time": update_time,"zhangjie": chapters}collection.insert_one(data)print(f"插入 {len(chapters)} 章成功:{title}")
3.6 爬取章节内容

定义爬取章节内容的函数:

def neirong(ur, url, s, retries=3):while retries > 0:try:reps = requests.get(rf'{ur}{url}', headers=headers)reps.raise_for_status()html = etree.HTML(reps.text)if html is None:print(f"解析 HTML 内容错误,URL: {ur}/{url}")returnchapter = html.xpath('//*[@id="content"]/h1/text()')if not chapter:print(f"未找到章节标题,URL: {ur}/{url}")returnchapter = chapter[0].strip()text = html.xpath('//*[@id="htmlContent"]/text()')if not text:print(f"未找到章节内容,URL: {ur}/{url}")returntext = ''.join(text[1:])  # 连接文本内容zhangjie_content.append({"chapter": chapter, "text": text})returnexcept requests.RequestException as e:print(f"请求错误,URL: {ur}{url}, 错误: {e}")retries -= 1time.sleep(1)  # 等待一段时间后重试print(f"重试次数过多,放弃 URL: {ur}{url}")
3.7 爬取章节列表

定义爬取章节列表的函数:

def zhangjie(url, retries=3):while retries > 0:try:reps = requests.get(url, headers=headers, timeout=10)reps.raise_for_status()html = etree.HTML(reps.text)if html is None:print(f"解析 HTML 内容错误,URL: {url}")returntitle = html.xpath('//*[@id="info"]/h1/text()')title = title[0].strip() if title else "未知书名"novel_type = html.xpath('//*[@id="maininfo"]/div[1]/a[2]/text()')novel_type = novel_type[0].strip() if novel_type else "未知类型"author = html.xpath('//*[@id="info"]/p[1]/a/text()')author = author[0].strip() if author else "未知作者"update_time = html.xpath('//*[@id="info"]/p[3]/text()')update_time = update_time[0].strip() if update_time else "未知时间"option_texts = html.xpath('/html/body/div[4]/div/div/select/option/text()')if not option_texts:print(f"未找到页码信息,URL: {url}")returnzhang = re.findall(r'第\s*(\d+)\s*页\(末页\)', option_texts[-1])if not zhang:print(f"未找到页码匹配,URL: {url}")returnzhang = int(zhang[0])print('开始爬取:', title)s = 0  # 设置爬取多少章for i in range(1, zhang + 1):if s >= 100:break  # 已经爬取100章,跳出循环zhangjie_url = f'{url}/index_{i}.html'zhangjie_reps = requests.get(zhangjie_url, headers=headers, timeout=10)zhangjie_reps.raise_for_status()zhangjie_html = etree.HTML(zhangjie_reps.text)if zhangjie_html is None:print(f"解析 HTML 内容错误,URL: {zhangjie_url}")breakzhangjieLis = zhangjie_html.xpath('/html/body/div[4]/div/ul/li/a/@href')if not zhangjieLis:print(f"未找到章节列表,URL: {zhangjie_url}")breakthreads = []for j in zhangjieLis:if s >= 100:break  # 已经爬取100章,跳出循环thread = threading.Thread(target=crawl_with_semaphore, args=(neirong, url, j, s))threads.append(thread)thread.start()time.sleep(0.1)s += 1  # 统计章节数目for thread in threads:thread.join()# 插入所有爬取的章节内容到MongoDBinsert_to_mongodb(title, novel_type, author, update_time, list(zhangjie_content))zhangjie_content.clear()  # 清空章节内容列表print(f"已成功记录数据:{title}")returnexcept requests.RequestException as e:print(f"请求错误,URL: {url}, 错误: {e}")retries -= 1time.sleep(1)  # 等待一段时间后重试print(f"重试次数过多,放弃 URL: {url}")
3.8 使用信号量控制并发

定义一个辅助函数,使用信号量控制并发数量:

def crawl_with_semaphore(target, *args):with semaphore:  # 使用信号量来控制并发数量target(*args)
3.9 主函数

定义主函数,从主页爬取小说列表并调用爬取章节的函数:

def main(i):main_url = rf'http://www.biqule.net/top/monthvisit/{i}.html'  # 月访问榜主页链接try:reps = requests.get(main_url, headers=headers, timeout=10)reps.raise_for_status()html = etree.HTML(reps.text)if html is None:print("解析 HTML 内容错误,主页 URL")returnnovels = html.xpath('//div/ul/li/span[@class="sp_2"]/a')novel_urls = [{"title": novel.text.strip(), "url": novel.attrib['href']} for novel in novels ifnovel.text and 'href' in novel.attrib]for i in novel_urls:global lislis = []zhangjie(i['url'])except requests.RequestException as e:print(f"请求错误,URL: {main_url}, 错误: {e}")
3.10 程序入口

定义程序入口,并调用主函数:

if __name__ == "__main__":for i in range(1, 51): # 1,51是从第一页爬到第五十页main(i)

4. 总结

通过本文的示例,我们展示了如何使用Python爬取小说网站的章节内容并将其存储到Mongo

5.完整代码

import requests
from lxml import etree
import re
import threading
import time
from pymongo import MongoClient# 设置请求头部,防止被网站封禁
headers = {'User-Agent': '改成你自己的'
}# 全局变量和信号量用于控制并发和存储章节内容
zhangjie_content = []  # 存储章节内容
semaphore = threading.Semaphore(20)  # 限制并发数量为20# MongoDB连接设置
client = MongoClient('mongodb://localhost:27017/')
db = client.novel_database
collection = db.novelsdef insert_to_mongodb(title, novel_type, author, update_time, chapters):"""辅助函数,用于将数据插入到MongoDB中"""data = {"title": title,"novel_type": novel_type,"author": author,"update_time": update_time,"zhangjie": chapters}collection.insert_one(data)print(f"插入 {len(chapters)} 章成功:{title}")def neirong(ur, url, s, retries=3):while retries > 0:try:reps = requests.get(rf'{ur}{url}', headers=headers)reps.raise_for_status()html = etree.HTML(reps.text)if html is None:print(f"解析 HTML 内容错误,URL: {ur}/{url}")returnchapter = html.xpath('//*[@id="content"]/h1/text()')if not chapter:print(f"未找到章节标题,URL: {ur}/{url}")returnchapter = chapter[0].strip()text = html.xpath('//*[@id="htmlContent"]/text()')if not text:print(f"未找到章节内容,URL: {ur}/{url}")returntext = ''.join(text[1:])  # 连接文本内容zhangjie_content.append({"chapter": chapter, "text": text})returnexcept requests.RequestException as e:print(f"请求错误,URL: {ur}{url}, 错误: {e}")retries -= 1time.sleep(1)  # 等待一段时间后重试print(f"重试次数过多,放弃 URL: {ur}{url}")def zhangjie(url, retries=3):while retries > 0:try:reps = requests.get(url, headers=headers, timeout=10)reps.raise_for_status()html = etree.HTML(reps.text)if html is None:print(f"解析 HTML 内容错误,URL: {url}")returntitle = html.xpath('//*[@id="info"]/h1/text()')title = title[0].strip() if title else "未知书名"novel_type = html.xpath('//*[@id="maininfo"]/div[1]/a[2]/text()')novel_type = novel_type[0].strip() if novel_type else "未知类型"author = html.xpath('//*[@id="info"]/p[1]/a/text()')author = author[0].strip() if author else "未知作者"update_time = html.xpath('//*[@id="info"]/p[3]/text()')update_time = update_time[0].strip() if update_time else "未知时间"option_texts = html.xpath('/html/body/div[4]/div/div/select/option/text()')if not option_texts:print(f"未找到页码信息,URL: {url}")returnzhang = re.findall(r'第\s*(\d+)\s*页\(末页\)', option_texts[-1])if not zhang:print(f"未找到页码匹配,URL: {url}")returnzhang = int(zhang[0])print('开始爬取:', title)s = 0  # 设置爬取多少章for i in range(1, zhang + 1):if s >= 100:break  # 已经爬取100章,跳出循环zhangjie_url = f'{url}/index_{i}.html'zhangjie_reps = requests.get(zhangjie_url, headers=headers, timeout=10)zhangjie_reps.raise_for_status()zhangjie_html = etree.HTML(zhangjie_reps.text)if zhangjie_html is None:print(f"解析 HTML 内容错误,URL: {zhangjie_url}")breakzhangjieLis = zhangjie_html.xpath('/html/body/div[4]/div/ul/li/a/@href')if not zhangjieLis:print(f"未找到章节列表,URL: {zhangjie_url}")breakthreads = []for j in zhangjieLis:if s >= 100:break  # 已经爬取100章,跳出循环thread = threading.Thread(target=crawl_with_semaphore, args=(neirong, url, j, s))threads.append(thread)thread.start()time.sleep(0.1)s += 1  # 统计章节数目for thread in threads:thread.join()# 插入所有爬取的章节内容到MongoDBinsert_to_mongodb(title, novel_type, author, update_time, list(zhangjie_content))zhangjie_content.clear()  # 清空章节内容列表print(f"已成功记录数据:{title}")returnexcept requests.RequestException as e:print(f"请求错误,URL: {url}, 错误: {e}")retries -= 1time.sleep(1)  # 等待一段时间后重试print(f"重试次数过多,放弃 URL: {url}")def crawl_with_semaphore(target, *args):with semaphore:  # 使用信号量来控制并发数量target(*args)# 主函数
def main(i):main_url = rf'http://www.biqule.net/top/monthvisit/{i}.html'  # 月访问榜主页链接try:reps = requests.get(main_url, headers=headers, timeout=10)reps.raise_for_status()html = etree.HTML(reps.text)if html is None:print("解析 HTML 内容错误,主页 URL")returnnovels = html.xpath('//div/ul/li/span[@class="sp_2"]/a')novel_urls = [{"title": novel.text.strip(), "url": novel.attrib['href']} for novel in novels ifnovel.text and 'href' in novel.attrib]for i in novel_urls:global lislis = []zhangjie(i['url'])except requests.RequestException as e:print(f"请求错误,URL: {main_url}, 错误: {e}")if __name__ == "__main__":for i in range(1, 51):main(i)

5.2.升级版

  1. 使用异步 I/O
    使用异步库如 aiohttp 和 asyncio 可以显著提高网络请求的效率。

  2. 减少等待时间
    减少每次请求之间的等待时间。

  3. 使用连接池
    使用连接池可以重用连接,减少建立新连接的开销。

说明:

  1. 改用了 aiohttp 和 asyncio,以实现异步 I/O 操作。
  2. 使用信号量(semaphore)仍然控制并发数量,以避免过多请求导致的封禁。
  3. 将 fetch 函数封装请求逻辑,并且所有的网络请求都使用这个函数。
  4. 采用 asyncio.gather 同时处理多个任务。

这样,可以显著提升爬取速度,并且享受到异步 I/O 带来的性能提升。

import aiohttp
import asyncio
from lxml import etree
import re
from pymongo import MongoClient# 全局变量和信号量用于控制并发和存储章节内容
zhangjie_content = []  # 存储章节内容
semaphore = asyncio.Semaphore(50)  # 限制并发数量为50# MongoDB连接设置
client = MongoClient('mongodb://localhost:27017/')
db = client.novel_database
collection = db.novelsdef insert_to_mongodb(title, novel_type, author, update_time, chapters, img_url, jianjie):"""辅助函数,用于将数据插入到MongoDB中"""data = {"title": title,"novel_type": novel_type,"author": author,"update_time": update_time,"zhangjie": chapters,'img_url': img_url,'jianjie': jianjie}collection.insert_one(data)print(f"插入 {len(chapters)} 章成功:{title}")async def fetch(session, url):async with semaphore:  # 使用信号量来控制并发数量try:async with session.get(url) as response:return await response.text()except Exception as e:print(f"请求错误,URL: {url}, 错误: {e}")async def neirong(session, base_url, url):try:html_str = await fetch(session, f'{base_url}{url}')html = etree.HTML(html_str)if html is None:print(f"解析 HTML 内容错误,URL: {base_url}{url}")returnchapter = html.xpath('//*[@id="content"]/h1/text()')if not chapter:print(f"未找到章节标题,URL: {base_url}{url}")returnchapter = chapter[0].strip()text = html.xpath('//*[@id="htmlContent"]/text()')if not text:print(f"未找到章节内容,URL: {base_url}{url}")returntext = ''.join(text[1:])  # 连接文本内容zhangjie_content.append({"chapter": chapter, "text": text})except Exception as e:print(f"处理章节内容错误,URL: {base_url}{url}, 错误: {e}")async def zhangjie(session, url):try:html_str = await fetch(session, url)html = etree.HTML(html_str)if html is None:print(f"解析 HTML 内容错误,URL: {url}")returntitle = html.xpath('//*[@id="info"]/h1/text()')title = title[0].strip() if title else "未知书名"novel_type = html.xpath('//*[@id="maininfo"]/div[1]/a[2]/text()')novel_type = novel_type[0].strip() if novel_type else "未知类型"author = html.xpath('//*[@id="info"]/p[1]/a/text()')author = author[0].strip() if author else "未知作者"update_time = html.xpath('//*[@id="info"]/p[3]/text()')update_time = update_time[0].strip() if update_time else "未知时间"img_url = html.xpath('//*[@id="fmimg"]/img/@src')img_url = img_url[0].strip() if img_url else "未知图片"jianjie = ''.join(html.xpath('//*[@id="intro"]//text()')).strip() if html.xpath('//*[@id="intro"]//text()') else "未知简介"option_texts = html.xpath('/html/body/div[4]/div/div/select/option/text()')if not option_texts:print(f"未找到页码信息,URL: {url}")returnzhang = re.findall(r'第\s*(\d+)\s*页\(末页\)', option_texts[-1])if not zhang:print(f"未找到页码匹配,URL: {url}")returnzhang = int(zhang[0])print('开始爬取:', title)for i in range(1, zhang + 1):if len(zhangjie_content) >= 100:break  # 已经爬取100章,跳出循环zhangjie_url = f'{url}/index_{i}.html'zhangjie_html_str = await fetch(session, zhangjie_url)zhangjie_html = etree.HTML(zhangjie_html_str)if zhangjie_html is None:print(f"解析 HTML 内容错误,URL: {zhangjie_url}")breakzhangjieLis = zhangjie_html.xpath('/html/body/div[4]/div/ul/li/a/@href')if not zhangjieLis:print(f"未找到章节列表,URL: {zhangjie_url}")breaktasks = []for j in zhangjieLis:if len(zhangjie_content) >= 100:break  # 已经爬取100章,跳出循环task = asyncio.create_task(neirong(session, url, j))tasks.append(task)await asyncio.gather(*tasks)# 插入所有爬取的章节内容到MongoDBinsert_to_mongodb(title, novel_type, author, update_time, list(zhangjie_content), img_url, jianjie)zhangjie_content.clear()  # 清空章节内容列表print(f"已成功记录数据:{title}")except Exception as e:print(f"处理章节信息错误,URL: {url}, 错误: {e}")async def main():async with aiohttp.ClientSession() as session:tasks = []for i in range(1, 51):main_url = f'http://www.biqule.net/top/monthvisit/{i}.html'  # 月访问榜主页链接task = asyncio.create_task(process_main_page(session, main_url))tasks.append(task)await asyncio.gather(*tasks)async def process_main_page(session, main_url):try:html_str = await fetch(session, main_url)html = etree.HTML(html_str)if html is None:print("解析 HTML 内容错误,主页 URL")returnnovels = html.xpath('//div/ul/li/span[@class="sp_2"]/a')novel_urls = [{"title": novel.text.strip(), "url": novel.attrib['href']} for novel in novels ifnovel.text and 'href' in novel.attrib]tasks = []for novel in novel_urls:task = asyncio.create_task(zhangjie(session, novel['url']))tasks.append(task)await asyncio.gather(*tasks)except Exception as e:print(f"处理主页面错误,URL: {main_url}, 错误: {e}")if __name__ == "__main__":asyncio.run(main())

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com