从底层到第三方库,全面讲解python的异步编程。这节讲述的是asyncio实现异步,详细了解需要配合下一节观看哦。纯干货,无概念,代码实例讲解。
本系列有6章左右,点击头像或者专栏查看更多内容,陆续更新,欢迎关注。
部分资料来源及参考链接:
https://www.bilibili.com/video/BV1Li4y1j7RY/
https://docs.python.org/zh-cn/3.7/library/asyncio-eventloop.html
异步并不是多线程,只是它自己控制自己,有一个挂起和恢复的操作。例如同步的情况下:A>>>B>>>C>>>D ,B任务非常耗时,此时就可以对B任务进行挂起,先处理C,D任务,当B任务快要完成时再进行恢复从而提高效率。
所以,要记住,异步不是多线程
。
asyncio就是python用来编写并发代码的库。开始学习吧
来看一个最简单的吧
import asyncio
async def get_data():#async 只是标记这个函数为异步函数,没实际意义
return 11
#调用方法一(推荐)
async def main():
# get_data() #错误写法 异步函数不能直接调用
d = await get_data() #await 为等待的意思
print(d)
asyncio.run(main())
方法二可以这样:
async def main():
task = asyncio.create_task(get_data())#create_task()作用是在运行某个任务的同时可以并发的运行多个任务
data = await task
print(data)
asyncio.run(main())
重点:
异步常用于 IO 密集型
用在 CPU 密集型 发挥作用不大
后面用的比较多,先学习一下
轮询
。前面说到了是检测任务状态,而这就是用轮询来实现的,如果需要轮询的状态过多,速度反而会变慢,所以cpu密集型发挥就一般
事件处理程序(Event_handler):https://en.wikipedia.org/wiki/Event_(computing)#Event_handler
事件循环(Event loop):https://en.wikipedia.org/wiki/Event_loop
事件驱动的编程(Event-driven programming):https://en.wikipedia.org/wiki/Event-driven_programming
消息传递接口(Message Passing Interface):https://en.wikipedia.org/wiki/Message_Passing_Interface
阻塞(Blocking):https://en.wikipedia.org/wiki/Blocking_(computing)
任务只有两种状态:完成的任务
,未完成的任务
import asyncio
async def liangzai():
print('正在暂停')
await asyncio.sleep(3)
print('正在恢复')
print('\n普通协程类型:{}'.format(type(liangzai())))#错误写法,但输出了协程类型
one_data = type(asyncio.ensure_future(liangzai()))#
print ('ensure_future类型:{}'.format(one_data))#输出类型
two_data = type(asyncio.Future())#标准future
print('Future类型:{}'.format(two_data))#输出类型
输出结果为:
通过issubclass,可以判断出来,future是task的子类
关系是这样的:
coroutine(协程):单纯的孤儿,没啥API
Task(任务):将coroutine(协程)封装起来,提供大量API,用于管理孤儿
Futrue(未来):作为Task(任务)的基类,提供大量API,用于管理Task(任务)
如果你要运行单个协程,直接 await coroutine(协程)即可
如果你要并发多个协程,可以 await Task(任务) 或者 await Futrue(未来)
官方推荐:await Task(任务)
官方不推荐日常开发使用:await Futrue(未来),作为基类会暴露大量API
跟上面的例子不同,现在已经有更加明了的api来创建Task。
官方链接:
https://docs.python.org/zh-cn/3.7/library/asyncio-task.html#asyncio.create_task
asyncio.create_task(coro)
将 coro 协程 打包为一个 Task 排入日程准备执行。返回 Task 对象。
该任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError。
此函数 在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数。
这里提到了get_running_loop()
,也就是前面提到的事件循环
相关内容,再来看看这方面资料
官方链接:
https://docs.python.org/zh-cn/3.7/library/asyncio-eventloop.html
asyncio.get_running_loop()
返回当前 OS 线程中正在运行的事件循环。如果没有正在运行的事件循环则会引发 RuntimeError。 此函数只能由协程或回调来调用。
3.7 新版功能.
asyncio.get_event_loop() 获取当前事件循环。
如果当前 OS 线程没有设置当前事件循环,该 OS 线程为主线程,并且 set_event_loop() 还没有被调用,则 asyncio
将创建一个新的事件循环并将其设为当前事件循环。由于此函数具有相当复杂的行为(特别是在使用了自定义事件循环策略的时候),更推荐在协程和回调中使用 get_running_loop()
函数而非 get_event_loop()。应该考虑使用 asyncio.run() 函数而非使用低层级函数来手动创建和关闭事件循环。
事件循环也拿到了,那么如何运行呢?再翻阅一下官方文档,有这些东西
这里,咱就使用loop.run_until_complete
来试试
import asyncio
async def get_data():
print('执行开始\n')
await asyncio.sleep(1)
print('执行完毕')
return 'abc'
loop = asyncio.get_event_loop()
task = loop.create_task(get_data())#作用是在运行某个任务的同时可以并发的运行多个任务
print('\n运行情况:', task)
loop.run_until_complete(task)
print('再看下运行情况:', task)# task存在了result
print('\n我是返回值:{}'.format(task.result()))#读取返回值
loop.close()
执行结果是这样的:
可以看到task
有一个状态的变化,从pending到finished,执行完毕后,又多出了一个result的值。
其实此时我们已经在走向底层了,如果你学过《微机原理》,在微机原理中,进程的状态可以分为就绪态(Ready State)、运行态(Running State)和阻塞态(Blocked State)
,用于描述进程的不同运行情况。是不是觉得有些熟悉呢
现在,我们就可以用异步来再次实现协程了
首先,回忆几个关键点
1. 异步模型是事件驱动模型的基础
2. 异步活动的执行模型可以只有一个单一的主控制流
3. 能在单核心系统和多核心中运行,在并发执行的异步模型中,许多任务被穿插在同一时间线上,所有的任务都有一个控制流执行(单一线程)
事件驱动表示,是由于某种事件,才会有阻塞恢复行为。重要的是,它是一个单一线程,千万不能和多线程搞混了。只是在阻塞空闲的时候做额外的事。
来看看下面的代码:
import asyncio
import time
async def get_data(i):
print('正在执行:{}'.format(i))
await asyncio.sleep(3)#异步睡眠时间
print('执行完毕:{}'.format(i))
if __name__ == '__main__':
start_time = time.time()#程序启动时间
loop = asyncio.get_event_loop()#获取本机事件循环
tasks = [loop.create_task(get_data(i)) for i in range(4)]#生成4个任务
loop.run_until_complete(asyncio.wait(tasks))# run_until_complete()直到所有循环 循环结束
# asyncio.wait()为等待任务
loop.close()#事件循环关闭
print('程序总耗时:{}'.format(time.time() - start_time))
上述的代码,你觉得运行结果应该是什么呢?
运行结果如下:
如果是顺序结构,那么3*4应该运行时间为12秒,但是这里为3秒多,说明异步确实提高了效率。同时后面执行完毕的顺序也表明,任务实现了挂起,没有顺序执行。
协程的初步实现就完成了。
上述的例子,我们初步实现了协程。但是协程中产生的错误,在批量提交任务时,我们要如何捕获呢?
import asyncio
import time
#定义异步函数
async def get_data(i):
print('正在执行:',i)
await asyncio.sleep(3)
print('执行完毕:',i)
return '返回值:{}'.format(i)
if __name__ == '__main__':
start_time = time.time()#程序启动时间
loop = asyncio.get_event_loop()#获取本机事件循环
tasks = [loop.create_task(get_data(i)) for i in range(4)]
loop.run_until_complete(asyncio.wait(tasks))
print()
for task in tasks:
print(task.result())
loop.close()#事件循环关闭
print('程序总耗时:{}'.format(time.time() - start_time))
其实,就变化了一点点。在异步函数中,出现了return关键字,同时可以使用tasks的result方法拿出值。这个原理前面也说了,其实就是产生了StopIteration,然后对返回值进行了捕获。
这里有一个细节,如果设置的沉睡时间是特殊的,你会发现,无论运行多少次,某些任务的完成时间,总是较快的。其实通过这个,就可以控制任务执行的优先级,很多框架也叫权重。
异步当然也是可以上锁的,用于保证同一时间的单一协程的独立资源访问。
官方链接:
https://docs.python.org/zh-cn/3.7/library/asyncio-sync.html#lock
总共有三个方法,非常简单。
acquire() #获取锁
release() #解除锁
locked() #返回布尔值,表示当前锁的状态
使用上下文管理器,来完成自动开关锁。
lock = asyncio.Lock()
# ... later
async with lock:
xxxxx
这就等价于
lock = asyncio.Lock()
# ... later
await lock.acquire()
try:
# access shared state
finally:
lock.release()