这是Python数据采集系列原创文章。
相关前文回顾:
【Python数据采集系列】利用协程并发采集豆瓣TOP250电影信息(源码解析)
【Python程序开发系列】一文总结API的基本概念、功能分类、认证方式、使用方法和开发流程
【Python程序开发系列】一文教你使用协程处理多任务(案例+源码)
【Python程序开发系列】进程、线程、协程?一文全面梳理多任务并发编程基本概念
本期相关知识:
获取网页上的静态信息通常分为两种:
-?无API情况(Cookie)
????- 无接口,需要借助第三方库对html进行解析(分为面向单个网页和面向多个网页),提取出所需的相关信息。
????- Info = response.text()
????- soup = BeautifulSoup(response.text, 'html.parser')
-?有API情况(Access_token)
????- 有接口,人家已经把相关的信息采集好了以json格式进行了组织,不需要再去解析html,直接从json读取所需信息即可。例如:利用GitHub和Gitee 的API获取PR相关信息。
????- Info = response.json()
要想API的url的get请求有响应,必须要有这个API的key,也就是access_token,在进行get请求时将这个key放在header里面,如果时多页,需要在将页数放在params里面。本文将用协程的方式并发执行获取每一页仓库信息的任务。
import time
import aiohttp
import asyncio
import openpyxl
import itertools
import?requests
async def get_repository_names_onepage(session, org_name, access_token, page):
url = f'https://gitee.com/api/v5/orgs/{org_name}/repos'
params = {'page': page}
headers = {'Authorization': f'Bearer {access_token}'}
????async?with?session.get(url,?headers=headers,?params=params)?as?response:
error_page_list = []
try:
repositories = await response.json()
error_page_list.append(None)
return [repo['name'] for repo in repositories], error_page_list
except:
print(f"Error while fetching repositories {page}")
error_page_list.append(page)
return [], error_page_list
async def get_all_repository_names(org_name, access_token, num_pages):
async with aiohttp.ClientSession() as session:
tasks = [get_repository_names_onepage(session, org_name, access_token, page) for page in range(1, num_pages + 1)]
res = await asyncio.gather(*tasks)
????????return?res
????????
????????
async def get_error_repository_names(org_name, access_token, error_pages):
async with aiohttp.ClientSession() as session:
tasks = [get_repository_names_onepage(session, org_name, access_token, page) for page in error_pages]
res = await asyncio.gather(*tasks)
return res
async def get_total_pages(org_name, access_token):
url = f'https://gitee.com/api/v5/orgs/{org_name}/repos'
headers = {'Authorization': f'Bearer {access_token}'}
response = requests.get(url, headers=headers)
print(response.headers)
total_pages = int(response.headers.get("total_page"))
total_counts = int(response.headers.get("total_count"))
return total_pages, total_counts
async def write_to_excel(org_name, repository_names):
wb = openpyxl.Workbook()
ws = wb.active
ws.append(['Repository Name'])
for name in repository_names:
ws.append([str(name)]) # Convert to string before appending
excel_filename = f'{org_name}_repositories.xlsx'
wb.save(excel_filename)
print(f'Repositories information written to {excel_filename}')
async def main():
org_name = 'src-oepkgs'
access_token = 'ed1d0fb3d6aa397c514569b4b965e3a9'
num_pages, total_repo = await get_total_pages(org_name, access_token)
print(num_pages)
print(total_repo)
repository_names_page = await get_all_repository_names(org_name, access_token, num_pages)
repository_names = [tup[0] for tup in repository_names_page]
repository_names = list(itertools.chain(*repository_names))
pages = [tup[1] for tup in repository_names_page]
error_pages = list(itertools.chain(*pages))
error_pages = list(filter(None, error_pages))
print(error_pages)
start_time = time.time() # 记录循环开始的时间
while error_pages and time.time() - start_time < 300:
repository_names_page = await get_error_repository_names(org_name, access_token, error_pages)
repository_names_error_page = [tup[0] for tup in repository_names_page]
repository_names_error_page = list(itertools.chain(*repository_names_error_page))
repository_names.extend(repository_names_error_page)
pages = [tup[1] for tup in repository_names_page]
error_pages = list(itertools.chain(*pages))
error_pages = list(filter(None, error_pages))
await write_to_excel(org_name, repository_names)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
????loop.run_until_complete(main())
代码逻辑
先获取该组织项目列表的总页数,将采集每一页的仓库名定义为一个协程函数,然后并发执行,返回组织下所有页数的仓库名,试了一下可行,但是某页页可能会报错:
将发生异常的页码记录下来,然后增加一个时间延迟,再去尝试并发获取这些异常页码的信息,添加到之前采集的列表里,依次循环,直到error_page的页码为空。最后加一个统计检验,对比采集的repo_name数量和组织实际的项目仓库总数,如果两者相等,则说明没有遗漏,否则打印出遗漏的数量。
可能报错:RuntimeError: Event loop is closed
这个错误通常出现在事件循环(event loop)已经关闭的情况下,但后续仍然有异步操作尝试使用事件循环。这可能与 asyncio.run(main())
结束后,事件循环已关闭,但在某些异步任务或资源的清理中还在尝试使用事件循环有关。尝试使用传统的方式来运行异步代码,以便更好地控制事件循环的生命周期。请使用 loop.run_until_complete(main())
代替 asyncio.run(main())
。
src-oepkgs_repositories.xlsx:
作者简介:
读研期间发表6篇SCI数据挖掘相关论文,现在某研究院从事数据算法相关科研工作,结合自身科研实践经历不定期分享关于Python、机器学习、深度学习、人工智能系列基础知识与应用案例。致力于只做原创,以最简单的方式理解和学习,关注我一起交流成长。需要数据集和源码的小伙伴可以关注底部公众号添加作者微信!