详细讲解Python中的aioschedule定时任务操作

发布时间:2024年01月17日

前言

如果下面的函数库无法执行,出现类似:(前提是python3.7以上)

AttributeError: module ‘asyncio‘ has no attribute ‘run‘

请检查run是否可跳转,如果无法跳转,尝试安装asyncio版本号为最新:pip install asuncio==3.4.3

1. 基本概念

aioschedule 是一个基于 asyncio 的 Python 库,用于在异步应用程序中进行任务调度。

它提供了一种方便的方式来安排和执行异步任务,类似于传统的 schedule 库,但适用于异步编程。

先科普下schedule
aioschedule 和 schedule 都是用于任务调度的 Python 库,但它们在异步和同步

编程环境执行方式依赖环境
schedule 适用于同步编程环境schedule 使用阻塞式的方式执行任务schedule 不依赖 asyncio 库
aioschedule 适用于异步编程环境aioschedule 使用非阻塞的异步方式执行任务aioschedule 基于 asyncio

两者的代码相似:

import schedule
import time

def job():
    print("Job executed!")

# 注册每隔5秒执行一次的任务
schedule.every(5).seconds.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

另一个:

import asyncio
import aioschedule

async def job():
    print("Job executed!")

# 注册每隔5秒执行一次的任务
aioschedule.every(5).seconds.do(job)

async def main():
    while True:
        await aioschedule.run_pending()
        await asyncio.sleep(1)

asyncio.run(main())

2. 基本API

aioschedule 提供了一些常用的 API 来实现任务调度。

常用的API如下:

  • every(interval): 用于指定任务执行的时间间隔。
aioschedule.every(5).seconds.do(job)
  • do(job_function, *args, **kwargs): 用于注册要执行的任务函数,可以传递参数给任务函数。
aioschedule.every(10).minutes.do(job, arg1, kwarg1='value')
  • to(target): 用于指定任务执行的终止时间,即任务不再执行的时间点。
aioschedule.every().day.at("14:30").do(job).to("15:00")
  • tag(tag): 为任务添加标签,可以通过标签取消任务。
aioschedule.every().hour.do(job).tag('hourly')
  • cancel(tag): 取消具有指定标签的所有任务。
aioschedule.cancel('hourly')
  • run_pending(): 执行所有待处理的任务。
await aioschedule.run_pending()
  • clear(tag=None): 清除所有任务或特定标签的任务。
aioschedule.clear()
# 或
aioschedule.clear('daily')

3. Demo

示例代码如下:

import asyncio
import aioschedule

async def job():
    print("Job executed!")

# 注册任务,每隔5秒执行一次
aioschedule.every(1).seconds.do(job)

async def main():
    # 异步等待,保持主程序运行
    while True:
        await aioschedule.run_pending()
        await asyncio.sleep(1)

# 运行主程序
asyncio.run(main())

截图如下:

在这里插入图片描述

另一个Demo:

import asyncio
import aioschedule

async def job(name, count):
    print(f"Job {name} executed {count} times!")

# 注册每隔3秒执行一次的任务,并传递参数
aioschedule.every(3).seconds.do(job, 'TaskA', count=1)

# 注册每隔5秒执行一次的任务,并传递参数
aioschedule.every(5).seconds.do(job, 'TaskB', count=1)

# 注册每小时执行一次的任务,并传递参数
aioschedule.every().hour.do(job, 'TaskC', count=1)

async def main():
    # 异步等待,保持主程序运行
    for i in range(10):
        # 执行所有待处理的任务
        await aioschedule.run_pending()

        # 异步等待1秒,防止事件循环阻塞
        await asyncio.sleep(1)

# 运行主程序
asyncio.run(main())

截图如下:

在这里插入图片描述

实战中的Demo也大同小异:

在这里插入图片描述

文章来源:https://blog.csdn.net/weixin_47872288/article/details/135612542
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。