Web 应用程序开发中,及时高效处理常规任务至关重要,包括定时收集数据或管理任务计划。针对强大且性能卓越的?FastAPI?框架,我们可以通过几种策略来管理这些必要的定时任务。
本指南将探讨在 FastAPI 环境中管理定时任务的三种实用方法:使用 APScheduler,利用 Celery 任务队列的力量,以及利用内置的 asyncio 进行调度。
APScheduler 是 Python 调度库,以其灵活性和易于集成而著称。以下是如何在?FastAPI?中使用它:
pip install APScheduler
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
def tick():
print('Tick! The time is: %s' % datetime.now())
from fastapi import FastAPI
app = FastAPI()
@app.on_event("startup")
async def app_start():
scheduler.add_job(execute_periodic_function, 'interval', seconds=3)
scheduler.start()
Celery 是一个高效的分布式任务队列系统,可与 FastAPI 无缝集成。
pip install celery
from celery import Celery
celery_app = Celery('my_fastapi_app')
@celery_app.task
def celery_periodic_task():
print('执行了 Celery 任务')
from celery.schedules import crontab
@app.on_event("startup")
async def app_start():
celery_app.conf.beat_schedule = {
'每半分钟执行': {
'task': 'celery_periodic_task',
'schedule': 30.0,
},
}
Python 的原生异步库 asyncio 也可用于调度定时任务。
import asyncio
@app.on_event("startup")
async def app_start():
asyncio.create_task(async_cron())
async def async_cron():
while True:
print('执行 Async 定时任务')
await asyncio.sleep(10)
以下是完整的使用 APScheduler 管理定时任务的 FastAPI 应用示例:
from fastapi import FastAPI
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
app = FastAPI()
scheduler = BackgroundScheduler()
def periodic_function():
print(f'定时执行的操作时间:{datetime.now()}')
@app.on_event("startup")
async def app_start():
scheduler.add_job(periodic_function, 'interval', seconds=3)
scheduler.start()
@app.get("/")
async def welcome():
return {"message": "欢迎访问定时任务演示"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)
使用?Apifox?这样的工具可以简化 API 测试,这是 Postman 等竞品的更强大的替代品。Apifox 将 Postman、Swagger、Mock?和 JMeter 的功能整合在一起,简化了对各种协议 API 的调试,提高了项目投产效率。
无论选择 APScheduler、Celery 还是 asyncio,FastAPI 都为实现定时任务提供了强大的解决方案。每种方法都有其优点,APScheduler 使用友好,asyncio 与 FastAPI 的异步特性相契合。根据您的具体需求和场景选择最合适的方法。
知识扩展:
参考链接: