事例网站链接: https://xk.scjgj.sh.gov.cn/xzxk_wbjg/#/tzdwSYDJList
一.数据获取流程
1️⃣对列表页请求获取有关详情页的字段值
2️⃣构造详情页的URL获取详情页数据
3️⃣将数据存入excel
二.异步代码
import asyncio
import logging
import pandas as pd
from aiohttp import ClientSession
from httpx._urlparse import quote
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
import ssl# 设置日志
logging.basicConfig(level=logging.INFO)# 请求头和数据定义
url_template = 'https://xk.scjgj.sh.gov.cn/xzxk_wbjg/query/public/sydjQueryDeviceEtInfo'
url_template2 = 'https://xk.scjgj.sh.gov.cn/xzxk_wbjg/query/public/useLicInfo/{}/{}'
headers = {'Accept': 'application/json, text/plain, */*','Accept-Language': 'zh-CN,zh;q=0.9','Cache-Control': 'no-cache','Connection': 'keep-alive','Content-Type': 'application/json;charset=UTF-8','User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36',
}# 初始化DataFrame
columns = ['applyId', '设备名称', '设备种类', '注册代码', '证书编号', '维保单位', '制造单位', '使用单位', '产品编号','单位内编号', '使用单位地址', '发证日期', '登记机关', '设备类别']
df = pd.DataFrame(columns=columns)# 创建一个新的工作簿用于保存数据
wb = Workbook()
ws = wb.active# 定义一个锁来确保在保存Excel时没有其他操作
save_lock = asyncio.Lock()# 并发请求的限制
concurrency_limit = 8 # 限制并发请求数量为32# Semaphore用于控制并发数
semaphore = asyncio.Semaphore(concurrency_limit)# 创建一个忽略SSL验证的上下文
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE# 协程函数用于发送HTTP请求
async def fetch(session, url, data):async with semaphore: # 控制并发数logging.warning(f"send URL: {url}")async with session.post(url, json=data, headers=headers, ssl=context) as response:content_type = response.headers.get('Content-Type', '')if 'application/json' in content_type:return await response.json()else:logging.warning(f"Unexpected Content-Type: {content_type} for URL: {url}")return None# 协程函数用于处理一页数据
async def process_page(page):global dflogging.info(f"start processing page {page}.")data = {"rows": 50, "zszl": "00206", "page": page}async with ClientSession() as session:res1 = await fetch(session, url_template, data)if res1 is None:logging.warning(f"Failed to fetch initial data for page {page}.")returnresultList = res1['data']['resultList']tasks = []for item in resultList:encoded_lic_unique_id = quote(item['licUniqueId'])url2 = url_template2.format(item['applyId'], encoded_lic_unique_id)task = asyncio.create_task(fetch(session, url2, {}))tasks.append(task)results = await asyncio.gather(*tasks)for result, item in zip(results, resultList):if result is None:logging.warning(f"Failed to fetch secondary data for applyId {item['applyId']}.inpage:{page}")row = pd.DataFrame({'applyId': [item['applyId']],'设备名称': [item['devName']],'设备种类': [item['devSuperclass']],'注册代码': [item['deviceCode']],'证书编号': [item['useLicNo']],'维保单位': [item['maintainComName']],'制造单位': [item['makeComName']],'使用单位': [item['useComName']],'产品编号': None,'单位内编号': None,'使用单位地址': None,'发证日期': None,'登记机关': None,'设备类别': None})df = pd.concat([df, row], ignore_index=True)continuerow = pd.DataFrame({'applyId': [item['applyId']],'设备名称': [item['devName']],'设备种类': [item['devSuperclass']],'注册代码': [item['deviceCode']],'证书编号': [item['useLicNo']],'维保单位': [item['maintainComName']],'制造单位': [item['makeComName']],'使用单位': [item['useComName']],'产品编号': [result['data'].get('productCode')],'单位内编号': [result['data'].get('innerCode')],'使用单位地址': [result['data'].get('usePlace')],'发证日期': [result['data'].get('qfsj')],'登记机关': [result['data'].get('fzjgmc')],'设备类别': [result['data'].get('devSubclass')]})df = pd.concat([df, row], ignore_index=True)logging.info(f"Finished processing page {page}.")# 异步保存数据函数
async def save_data(df):global wsasync with save_lock:try:logging.info("Saving data.")# 等待当前所有任务完成pending_tasks = [task for task in asyncio.all_tasks() if task is not asyncio.current_task()]if pending_tasks:logging.info(f"Waiting for {len(pending_tasks)} tasks to complete before saving.")await asyncio.gather(*pending_tasks)# 先保存数据for r_idx, row in enumerate(dataframe_to_rows(df, index=False, header=ws.max_row == 1), start=1):for c_idx, value in enumerate(row, start=1):ws.cell(row=r_idx, column=c_idx, value=value)# 保存Excel文件wb.save('./yb.xlsx')# 清空DataFramedf.drop(df.index, inplace=True)logging.info("Data saved.")except Exception as e:logging.error(f"Failed to save data: {e}")# 异步主函数
async def main():global df, last_saved_pagelogging.warning("start")pages = list(range(8000, 9000))tasks = [process_page(page) for page in pages]logging.warning("listtaskend")await asyncio.gather(*tasks)# 检查是否有剩余数据需要保存if not df.empty:await save_data(df)# 确保所有任务都已完成pending_tasks = [task for task in asyncio.all_tasks() if task is not asyncio.current_task()]if pending_tasks:logging.info(f"Waiting for {len(pending_tasks)} final tasks to complete.")await asyncio.gather(*pending_tasks)# 释放锁save_lock.release()# 运行异步主函数
if __name__ == '__main__':asyncio.run(main())