master + slave:master控制队列,过滤,传递任务;slave负责执行
升级策略2:分离响应异步下载与异步处理,避免一方阻塞另一方
升级策略3:日志监控捕获错误,并实时通报。ELK
还有一种,master 只负责过滤重复请求;slave自己负责维护自己的队列,只需要 slave 执行任务前询问 master是否有重复值即可
队列组件
过滤器组件
下载器组件
异步组件
数据解析提取组件
数据清洗组件
数据存储组件
程序监控组件
可视化控制组件
import requests
from spiderSystem.response import Response
class RequestsDownloader(object):
"""根据request发起请求,构建response对象"""
def fetch(self, request):
if request.method.upper() == "GET":
resp = requests.get(request.with_query_url, headers=request.headers)
elif request.method.upper() == "POST":
resp = requests.post(request.with_query_url, headers=request.headers, body=request.body)
else:
raise Exception('only support GET or POST Method')
return Response(request, status_code=resp.status_code, url=resp.url, headers=resp.headers, body=resp.content)
from .request_manager import RequestManager
from .request_manager.utils.redis_tools import get_redis_queue_cls
from .downloader import RequestsDownloader
from .request import Request
FIFO_QUEUE = get_redis_queue_cls('fifo')
class Slave(object):
def __init__(self, spiders, project_name, request_manager_config):
self.filter_queue = FIFO_QUEUE("filter_queue", host="localhost")
self.request_manager = RequestManager(**request_manager_config)
self.downloader = RequestsDownloader() # 用 requests 同步请求的下载器
self.spiders = spiders
self.project_name = project_name
def handle_request(self):
# 1. 获取一个请求
request = self.request_manager.get_request(self.project_name)
# 2. 发起请求
response = self.downloader.fetch(request) # 每次都同步去请求 !!!
# 3. 获取爬虫对象
spider = self.spiders[request.name]()
# 4. 处理 response
for result in spider.parse(response):
if result is None:
raise Exception('不允许返回None')
elif isinstance(result, Request):
self.filter_queue.put(result)
else:
# 意味着是一个数据
new_result = spider.data_clean(result)
spider.data_save(new_result)
def run(self):
while True:
self.handle_request()
from tornado.httpclient import HTTPClient, HTTPRequest, AsyncHTTPClient
from spiderSystem.response import Response
# tornado 也有同步请求方式 (可以忽略)
class TornadoDownloader(object):
def __init__(self):
self.httpclient = HTTPClient()
def fetch(self, request):
print("tornado 同步客户端发的请求")
tornado_request = HTTPRequest(request.with_query_url, method=request.method.upper(), headers=request.headers)
tornado_response = self.httpclient.fetch(tornado_request)
return Response(request=request, status_code=tornado_response.code, url=tornado_response.effective_url,
body=tornado_response.buffer.read())
"""
同步的请求,不能复用,需要用完后关闭
"""
def __del__(self):
self.httpclient.close()
# tornado 也有异步请求方式
class AsyncTornadoDownloader(object):
def __init__(self):
self.async_http_client = AsyncHTTPClient()
async def fetch(self, request): # 开启协程
print("tornado 异步客户端发的请求")
tornado_request = HTTPRequest(request.with_query_url, method=request.method.upper(), headers=request.headers)
tornado_response = await self.async_http_client.fetch(tornado_request) # 等待
return Response(request=request, status_code=tornado_response.code, url=tornado_response.effective_url,
headers=request.headers,
body=tornado_response.buffer.read())
import asyncio
import tornado.ioloop
from .request_manager import RequestManager
from .request_manager.utils.redis_tools import get_redis_queue_cls
from .downloader import RequestsDownloader, TornadoDownloader, AsyncTornadoDownloader
from .request import Request
FIFO_QUEUE = get_redis_queue_cls('fifo')
class Slave(object):
def __init__(self, spiders, project_name, request_manager_config):
self.filter_queue = FIFO_QUEUE("filter_queue", host="localhost")
self.request_manager = RequestManager(**request_manager_config)
self.downloader = AsyncTornadoDownloader() # 异步下载器
self.spiders = spiders
self.project_name = project_name
async def handle_request(self):
# request = self.request_manager.get_request(self.project_name) 阻塞改异步
io_loop = tornado.ioloop.IOLoop.current()
# 1. 获取一个请求
future = io_loop.run_in_executor(None, self.request_manager.get_request,self.project_name) # 不支持协程的函数,可以自己获取事件循环,去定义执行,让其支持协程
request = await future
# 2. 发起请求
response = await self.downloader.fetch(request)
# 3. 获取爬虫对象
spider = self.spiders[request.name]()
# 4. 处理 response
for result in spider.parse(response):
if result is None:
raise Exception('不允许返回None')
elif isinstance(result, Request):
# self.filter_queue.put(result) 可能阻塞,改异步
await io_loop.run_in_executor(None, self.filter_queue.put,result)
else:
# 意味着是一个数据
new_result = spider.data_clean(result)
spider.data_save(new_result)
async def run(self):
while True:
# 不能写成 await self.handle_request(),否则,也是相当于同步请求了
await asyncio.wait([
self.handle_request(),
self.handle_request(),
])
if __name__ == '__main__':
spiders = {BaiduSpider.name: BaiduSpider}
# 同步请求,用 requests 发请求
# Slave(spiders, project_name=PROJECT_NAME, request_manager_config=REQUEST_MANAGER_CONFIG).run()
# 要用异步方式去请求
slave = Slave(spiders, project_name=PROJECT_NAME, request_manager_config=REQUEST_MANAGER_CONFIG)
io_loop = tornado.ioloop.IOLoop.current()
io_loop.run_sync(slave.run)
tornado库 io_loop.run_sync 用于将阻塞函数转换为同步函数并在 IOLoop 上执行,它会阻塞当前协程。 io_loop.run_in_executor 用于在指定的线程池中异步执行耗时的、阻塞的操作,不会阻塞当前协程,并允许 IOLoop 继续处理其他事件。
asyncio库 实现类似于 run_sync 的效果:您可以使用 loop.run_until_complete 方法来运行一个协程并等待其完成。这个方法会阻塞当前线程,直到协程执行完毕 实现类似于 run_in_executor 的效果:您可以使用 loop.run_in_executor 方法将耗时的、阻塞的操作转移到一个线程池中执行,以避免阻塞事件循环。
- 下载器中,用到的所有异步的地方,必须是协程 async 定义
- await 后面跟着的,一定是支持协程的方法,要不是一个 协程对象,future 或者 task 对象,比如 self.async_http_client.fetch ,如果不支持协程,会报错
- 连带着的,所有调用 async 的方法,也必须是协程函数
- 对于不支持协程的函数,可以自己获取事件循环,去定义执行,让其支持协程;如果一个函数是一个协程函数后,如果这个协程函数中,有任意可以阻塞的,或耗时操作,都应该改成异步的 await ,不然可能会阻塞整个线程
# self.request_manager.get_request 本身不支持异步,或者改造成异步,嵌套要改的太深,可以用 io_loop.run_in_executor 来替代
io_loop = tornado.ioloop.IOLoop.current()
# 1. 获取一个请求
future = io_loop.run_in_executor(None, self.request_manager.get_request,self.project_name)
request = await future
- 在最开始调用的地方,比如 run ,启动的方式,必须是 用 asyncio.wait 或用其他方式启动(asyncio.gather 或 asyncio.as_completed)
# 开启2个协程,去执行。asyncio.wait 能让其变成一个异步关系
async def run(self):
while True:
# 不能写成 await self.handle_request(),否则,也是相当于同步请求了
await asyncio.wait([
self.handle_request(),
self.handle_request(),
])
def run(self):
# self.run_start_requests()
# self.run_filter_queue()
# 两个线程去做
threading.Thread(target=self.run_start_requests).start()
threading.Thread(target=self.run_filter_queue).start()
├── setup.py
├── spiderSystem
├── README.md
from setuptools import setup, find_packages
setup(
name="spiderSystem",
version="0.1",
description="spiderSystem module",
author='raoju',
url="url",
license="license",
packages=find_packages(exclude=[]), # 当前所有模块都安装
install_requires=[
"tornado >= 5.1",
"pycurl",
]
)
.markdown-body pre,.markdown-body pre>code.hljs{color:#333;background:#f8f8f8}.hljs-comment,.hljs-quote{color:#998;font-style:italic}.hljs-keyword,.hljs-selector-tag,.hljs-subst{color:#333;font-weight:700}.hljs-literal,.hljs-number,.hljs-tag .hljs-attr,.hljs-template-variable,.hljs-variable{color:teal}.hljs-doctag,.hljs-string{color:#d14}.hljs-section,.hljs-selector-id,.hljs-title{color:#900;font-weight:700}.hljs-subst{font-weight:400}.hljs-class .hljs-title,.hljs-type{color:#458;font-weight:700}.hljs-attribute,.hljs-name,.hljs-tag{color:navy;font-weight:400}.hljs-link,.hljs-regexp{color:#009926}.hljs-bullet,.hljs-symbol{color:#990073}.hljs-built_in,.hljs-builtin-name{color:#0086b3}.hljs-meta{color:#999;font-weight:700}.hljs-deletion{background:#fdd}.hljs-addition{background:#dfd}.hljs-emphasis{font-style:italic}.hljs-strong{font-weight:700}
master + slave:master控制队列,过滤,传递任务;slave负责执行
升级策略2:分离响应异步下载与异步处理,避免一方阻塞另一方
升级策略3:日志监控捕获错误,并实时通报。ELK
还有一种,master 只负责过滤重复请求;slave自己负责维护自己的队列,只需要 slave 执行任务前询问 master是否有重复值即可
队列组件
过滤器组件
下载器组件
异步组件
数据解析提取组件
数据清洗组件
数据存储组件
程序监控组件
可视化控制组件
import requests
from spiderSystem.response import Response
class RequestsDownloader(object):
"""根据request发起请求,构建response对象"""
def fetch(self, request):
if request.method.upper() == "GET":
resp = requests.get(request.with_query_url, headers=request.headers)
elif request.method.upper() == "POST":
resp = requests.post(request.with_query_url, headers=request.headers, body=request.body)
else:
raise Exception('only support GET or POST Method')
return Response(request, status_code=resp.status_code, url=resp.url, headers=resp.headers, body=resp.content)
from .request_manager import RequestManager
from .request_manager.utils.redis_tools import get_redis_queue_cls
from .downloader import RequestsDownloader
from .request import Request
FIFO_QUEUE = get_redis_queue_cls('fifo')
class Slave(object):
def __init__(self, spiders, project_name, request_manager_config):
self.filter_queue = FIFO_QUEUE("filter_queue", host="localhost")
self.request_manager = RequestManager(**request_manager_config)
self.downloader = RequestsDownloader() # 用 requests 同步请求的下载器
self.spiders = spiders
self.project_name = project_name
def handle_request(self):
# 1. 获取一个请求
request = self.request_manager.get_request(self.project_name)
# 2. 发起请求
response = self.downloader.fetch(request) # 每次都同步去请求 !!!
# 3. 获取爬虫对象
spider = self.spiders[request.name]()
# 4. 处理 response
for result in spider.parse(response):
if result is None:
raise Exception('不允许返回None')
elif isinstance(result, Request):
self.filter_queue.put(result)
else:
# 意味着是一个数据
new_result = spider.data_clean(result)
spider.data_save(new_result)
def run(self):
while True:
self.handle_request()
from tornado.httpclient import HTTPClient, HTTPRequest, AsyncHTTPClient
from spiderSystem.response import Response
# tornado 也有同步请求方式 (可以忽略)
class TornadoDownloader(object):
def __init__(self):
self.httpclient = HTTPClient()
def fetch(self, request):
print("tornado 同步客户端发的请求")
tornado_request = HTTPRequest(request.with_query_url, method=request.method.upper(), headers=request.headers)
tornado_response = self.httpclient.fetch(tornado_request)
return Response(request=request, status_code=tornado_response.code, url=tornado_response.effective_url,
body=tornado_response.buffer.read())
"""
同步的请求,不能复用,需要用完后关闭
"""
def __del__(self):
self.httpclient.close()
# tornado 也有异步请求方式
class AsyncTornadoDownloader(object):
def __init__(self):
self.async_http_client = AsyncHTTPClient()
async def fetch(self, request): # 开启协程
print("tornado 异步客户端发的请求")
tornado_request = HTTPRequest(request.with_query_url, method=request.method.upper(), headers=request.headers)
tornado_response = await self.async_http_client.fetch(tornado_request) # 等待
return Response(request=request, status_code=tornado_response.code, url=tornado_response.effective_url,
headers=request.headers,
body=tornado_response.buffer.read())
import asyncio
import tornado.ioloop
from .request_manager import RequestManager
from .request_manager.utils.redis_tools import get_redis_queue_cls
from .downloader import RequestsDownloader, TornadoDownloader, AsyncTornadoDownloader
from .request import Request
FIFO_QUEUE = get_redis_queue_cls('fifo')
class Slave(object):
def __init__(self, spiders, project_name, request_manager_config):
self.filter_queue = FIFO_QUEUE("filter_queue", host="localhost")
self.request_manager = RequestManager(**request_manager_config)
self.downloader = AsyncTornadoDownloader() # 异步下载器
self.spiders = spiders
self.project_name = project_name
async def handle_request(self):
# request = self.request_manager.get_request(self.project_name) 阻塞改异步
io_loop = tornado.ioloop.IOLoop.current()
# 1. 获取一个请求
future = io_loop.run_in_executor(None, self.request_manager.get_request,self.project_name) # 不支持协程的函数,可以自己获取事件循环,去定义执行,让其支持协程
request = await future
# 2. 发起请求
response = await self.downloader.fetch(request)
# 3. 获取爬虫对象
spider = self.spiders[request.name]()
# 4. 处理 response
for result in spider.parse(response):
if result is None:
raise Exception('不允许返回None')
elif isinstance(result, Request):
# self.filter_queue.put(result) 可能阻塞,改异步
await io_loop.run_in_executor(None, self.filter_queue.put,result)
else:
# 意味着是一个数据
new_result = spider.data_clean(result)
spider.data_save(new_result)
async def run(self):
while True:
# 不能写成 await self.handle_request(),否则,也是相当于同步请求了
await asyncio.wait([
self.handle_request(),
self.handle_request(),
])
if __name__ == '__main__':
spiders = {BaiduSpider.name: BaiduSpider}
# 同步请求,用 requests 发请求
# Slave(spiders, project_name=PROJECT_NAME, request_manager_config=REQUEST_MANAGER_CONFIG).run()
# 要用异步方式去请求
slave = Slave(spiders, project_name=PROJECT_NAME, request_manager_config=REQUEST_MANAGER_CONFIG)
io_loop = tornado.ioloop.IOLoop.current()
io_loop.run_sync(slave.run)
tornado库 io_loop.run_sync 用于将阻塞函数转换为同步函数并在 IOLoop 上执行,它会阻塞当前协程。 io_loop.run_in_executor 用于在指定的线程池中异步执行耗时的、阻塞的操作,不会阻塞当前协程,并允许 IOLoop 继续处理其他事件。
asyncio库 实现类似于 run_sync 的效果:您可以使用 loop.run_until_complete 方法来运行一个协程并等待其完成。这个方法会阻塞当前线程,直到协程执行完毕 实现类似于 run_in_executor 的效果:您可以使用 loop.run_in_executor 方法将耗时的、阻塞的操作转移到一个线程池中执行,以避免阻塞事件循环。
- 下载器中,用到的所有异步的地方,必须是协程 async 定义
- await 后面跟着的,一定是支持协程的方法,要不是一个 协程对象,future 或者 task 对象,比如 self.async_http_client.fetch ,如果不支持协程,会报错
- 连带着的,所有调用 async 的方法,也必须是协程函数
- 对于不支持协程的函数,可以自己获取事件循环,去定义执行,让其支持协程;如果一个函数是一个协程函数后,如果这个协程函数中,有任意可以阻塞的,或耗时操作,都应该改成异步的 await ,不然可能会阻塞整个线程
# self.request_manager.get_request 本身不支持异步,或者改造成异步,嵌套要改的太深,可以用 io_loop.run_in_executor 来替代
io_loop = tornado.ioloop.IOLoop.current()
# 1. 获取一个请求
future = io_loop.run_in_executor(None, self.request_manager.get_request,self.project_name)
request = await future
- 在最开始调用的地方,比如 run ,启动的方式,必须是 用 asyncio.wait 或用其他方式启动(asyncio.gather 或 asyncio.as_completed)
# 开启2个协程,去执行。asyncio.wait 能让其变成一个异步关系
async def run(self):
while True:
# 不能写成 await self.handle_request(),否则,也是相当于同步请求了
await asyncio.wait([
self.handle_request(),
self.handle_request(),
])
def run(self):
# self.run_start_requests()
# self.run_filter_queue()
# 两个线程去做
threading.Thread(target=self.run_start_requests).start()
threading.Thread(target=self.run_filter_queue).start()
├── setup.py
├── spiderSystem
├── README.md
from setuptools import setup, find_packages
setup(
name="spiderSystem",
version="0.1",
description="spiderSystem module",
author='raoju',
url="url",
license="license",
packages=find_packages(exclude=[]), # 当前所有模块都安装
install_requires=[
"tornado >= 5.1",
"pycurl",
]
)
如果你对Python感兴趣,想要学习python,这里给大家分享一份Python全套学习资料,都是我自己学习时整理的,希望可以帮到你,一起加油!
😝有需要的小伙伴,可以点击下方链接免费领取或者V扫描下方二维码免费领取🆓
Python全套学习资料
对于从来没有接触过Python的同学,我们帮你准备了详细的学习成长路线图。可以说是最科学最系统的学习路线,你可以按照上面的知识点去找对应的学习资源,保证自己学得较为全面。
还有很多适合0基础入门的学习视频,有了这些视频,轻轻松松上手Python~
每节视频课后,都有对应的练习题哦,可以检验学习成果哈哈!
学习Python常用的开发软件都在这里了!每个都有详细的安装教程,保证你可以安装成功哦!
光学理论是没用的,要学会跟着一起敲代码,动手实操,才能将自己的所学运用到实际当中去,这时候可以搞点实战案例来学习。100+实战案例源码等你来拿!
如果觉得上面的实战案例有点枯燥,可以试试自己用Python编写小游戏,让你的学习过程中增添一点趣味!
我们学会了Python之后,有了技能就可以出去找工作啦!下面这些面试题是都来自阿里、腾讯、字节等一线互联网大厂,并且有阿里大佬给出了权威的解答,刷完这一套面试资料相信大家都能找到满意的工作。
上述所有资料 ?? ,朋友们如果有需要的,可以扫描下方👇👇👇二维码免费领取🆓