我们从Redis撸了一些IP,是不是很酷?
async def get_proxy_ips(self):
? ? if self.PROXY_IPS is None:
? ? ? ? self.PROXY_IPS = await fetch_fresh_proxies()
? ? return self.PROXY_IPS
# 这个方法负责挥一挥大锤,有时候也会碰到点儿小麻烦。
async def fetch_url(self, session, url):
? ? if not self.PROXY_IPS:
? ? ? ? self.PROXY_IPS = await fetch_fresh_proxies()
? ? for attempt in range(self.MAX_RETRIES):
? ? ? ? if not self.PROXY_IPS:
? ? ? ? ? ? self.PROXY_IPS = await fetch_fresh_proxies()
? ? ? ? # ...
? ? ? ? try:
? ? ? ? ? ? # 尝试使用代理发起请求...
? ? ? ? except Exception as oops:
? ? ? ? ? ? # 出错啦,得记下这个不靠谱的代理
? ? ? ? ? ? await self.report_invalid_proxy(proxy_ip)
# 这是轮到淘汰机制出场的时候了
async def cleanup_after_run(self):
? ? # 假如有一堆失效的IP需要处理
? ? if self.invalid_proxies:
? ? ? ? await remove_these_nasty_proxies(self.invalid_proxies)
案例代码:
? ? ? ? 这段代码呢,是我之前做的一个案例,有很多接口(这里就不放进去了,但是,基本思路大家多看看就应该能明白)
import logging
import asyncio
import aiohttp
import aioredis
class SecondGet():
'''
传入一个id, 分别对X个url进行异步抓取html(不执行解析,只返回html)。
'''
MAX_RETRIES = 3 # 最大重试次数
PROXY_IPS = None # 初始化为None,并在需要时加载代理IP
def __init__(self, id):
# 初始化日志
self.logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# 存储id和各种请求需要的头信息
self.id = id
self.headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
}
# 需要爬取的urls
self.urls = {
'AA': f'http://www.XXXXX.com/zq/{id}.html',
......
}
# 存储失效的代理
self.invalid_proxies = []
# 假设这个方法会异步获取代理IPs
async def get_proxy_ips(self):
# 实际的实现应该是调用你封装好的异步函数
if self.PROXY_IPS is None:
self.PROXY_IPS = await some_async_task_to_get_proxies()
return self.PROXY_IPS
# 假设此函数会异步将失效的代理IP记录下来,稍后删除它们
async def report_invalid_proxy(self, proxy_ip):
self.invalid_proxies.append(proxy_ip)
# 删除所有记录的失效代理的函数,假设已经实现
async def remove_invalid_proxies(self):
if self.invalid_proxies:
await some_async_task_to_remove_proxies(self.invalid_proxies)
self.invalid_proxies = []
async def populate_proxy_ips(self):
try:
redis = await aioredis.create_redis_pool('redis://localhost')
self.PROXY_IPS += await redis.smembers('proxy_ips')
redis.close()
await redis.wait_closed()
except aioredis.RedisError as e:
self.logger.exception("从redis获取ip异常:", exc_info=e)
# 异步请求URL的函数
async def fetch_with_proxy(self, session, url, proxy):
try:
async with session.get(url, proxy=proxy, timeout=3, headers=self.headers) as response:
text = await response.text()
if '您的访问频率过快,请稍等' in text:
return None
return text
except Exception:
self.logger.exception(f"爬取相关url出错,使用的代理是{proxy}: {url}")
return None
async def fetch_url(self, session, url):
# 此函数需要调整以处理无效代理逻辑
# 如果首次调用为空,则尝试获取新的代理IP
if not self.PROXY_IPS:
self.PROXY_IPS = await get_new_proxies()
for retry in range(self.MAX_RETRIES):
if not self.PROXY_IPS: # 如果没有可用代理,尝试重新获取
self.PROXY_IPS = await get_new_proxies()
proxy_ip = self.PROXY_IPS.pop(0) # 获取第一个代理IP
proxy = f"http://{proxy_ip}"
try:
async with session.get(url, proxy=proxy, headers=self.headers) as response:
text = await response.text()
if '您的访问频率过快,请稍等' in text:
await self.report_invalid_proxy(proxy_ip) # 报告和重试
continue
return text # 爬取成功,返回文本
except Exception: # 捕获请求中发生的异常
await self.report_invalid_proxy(proxy_ip) # 报告和重试
# 所有重试都失败了,记录最后的代理IP并返回失败
await self.report_invalid_proxy(proxy_ip)
self.logger.error(f"所有重试失败: {url}")
return None # 返回失败
async def run(self):
# 存储结果的字典
results = {}
# 获取或刷新代理IP列表
self.PROXY_IPS = await get_new_proxies() # 假设函数返回代理IP列表
async with aiohttp.ClientSession() as session:
tasks = [asyncio.create_task(self.fetch_url(session, url)) for url in self.urls.values()]
responses = await asyncio.gather(*tasks, return_exceptions=True)
for name, response in zip(self.urls.keys(), responses):
if isinstance(response, Exception):
with open('爬取异常.txt', 'a') as error_file:
error_file.write(f"{name} URL: {self.urls[name]}\n异常详情: {response}\n")
results[name] = None
else:
results[name] = response
# 假设有一个函数来处理清理逻辑
await self.cleanup_after_run()
return results
async def cleanup_after_run(self):
# 执行一些清理工作,比如从Redis中删除无效的代理IP
# 假设有一个函数来异步完成此任务
await some_async_task_to_remove_invalid_proxies(self.invalid_proxies)
8个字:
保持灵活,见招拆招