掌握 Python 异步编程:综合指南

发布时间:2024年01月19日

一、说明

?? 在不断发展的软件开发领域,效率和性能至关重要。Python 以其简单性和可读性而闻名,已成为各种应用程序的首选语言。然而,随着开发人员不断突破 Python 的可能性,对更复杂的编程技术的需求变得显而易见。这就是异步编程(一种允许同时处理多个任务的范例)成为游戏规则改变者的地方。

?? 本综合指南旨在阐明 Python 异步编程的复杂性。它面向初学者和经验丰富的程序员,旨在提供对异步编程模型及其在 Python 中的实际应用的透彻理解。

二、了解基础知识

?? 在深入研究 Python 中异步编程的复杂性之前,有必要对异步编程是什么以及它与传统同步编程的对比有一个基本的了解。这些知识不仅有助于稍后掌握 asyncio 更复杂的方面,而且有助于理解这种强大的编程范例的细微差别。

2.1 什么是异步编程?

?? 异步编程是一种允许程序同时执行多个操作的范例。它允许启动任务,然后将其搁置直到需要结果为止,从而允许其他任务同时运行。这种方法在涉及等待外部资源或长时间运行计算的情况下特别有用。

?? 想象一下餐厅的厨房。在同步烹饪中,厨师将完成一道菜的每一项任务,然后再开始下一道菜。在异步烹饪中,当一道菜在烤箱中时,厨师就开始准备下一道菜。

2.2 Python 异步编程的演变

?? 像Twisted这样的早期框架为异步编程提供了基础,但很复杂并且学习曲线陡峭。Python 3.3 中引入的基于生成器的协程简化了异步编程,从而导致了 Python 3.4 中的 asyncio。Python 3.5中async / await语法的引入进一步增强了代码的可读性和可维护性,标志着Python的异步编程能力向前迈进了一大步。

2.3 同步与异步编程

?? 同步和异步编程之间的主要区别在于任务的执行和管理方式。同步编程很简单,但效率可能较低,而异步编程更复杂,但在某些情况下效率明显更高。

2.4 异步示例:

import asyncio


async def fetch_data():
    print("Start fetching")
    await asyncio.sleep(2)  # Simulating an I/O operation
    print("Done fetching")
    return {'data': 1}

async def print_numbers():
    for i in range(10):
        print(i)
        await asyncio.sleep(1)

async def main():
    task1 = asyncio.create_task(fetch_data())
    task2 = asyncio.create_task(print_numbers())
    value = await task1
    print(value)

asyncio.run(main())


>>>> Start fetching
>>> 0
>>> 1
>>> Done fetching
>>> {'data': 1}
>>> 2
>>> 3
...
>>> 9

?? 此示例说明了使用 Python 进行基本异步编程asyncio。定义了两个任务:fetch_data模拟延迟的 I/O 操作,以及print_numbers延迟打印数字。这两个任务在函数中同时执行main。请注意如何await使用 来等待操作完成而不阻塞整个程序。

?? 同步示例:

2.5 同步代码示例

# Synchronous code example
import time


def fetch_data():
    print("Start fetching")
    time.sleep(2)  # Simulating a blocking I/O operation
    print("Done fetching")
    return {'data': 1}


def print_numbers():
    for i in range(10):
        print(i)
        time.sleep(1)


def main():
    value = fetch_data()
    print(value)
    print_numbers()


main()



>>> 开始获取
>>> 完成获取
>>> {'data': 1} 
>>> 0 
>>> 1 
>>> 2 
... 
>>> 9

?? 在这个同步版本中,程序fetch_data在启动之前就完成了功能print_numbers。执行的顺序性质很明显,因为仅在数据获取完成后才打印数字。

2.6 最佳实践和注意事项

?? 对于 I/O 密集型操作,异步编程通常是更好的选择,因为它可以防止程序在等待外部操作完成时被阻塞。
在计算密集且连续的 CPU 密集型任务中,传统的同步或多线程方法可能更可取。
?? 在异步编程中仔细管理共享资源至关重要,以避免竞争条件并确保线程安全。
极端案例和挑战
?? 将异步代码与同步库集成可能具有挑战性,因为它需要仔细管理事件循环。
由于其非线性执行流程,调试异步代码可能会更加复杂。
了解异步编程的这些核心原理为深入研究 Python 中 asyncio 的更复杂方面奠定了坚实的基础。对于旨在优化应用程序性能和效率的 Python 开发人员来说,了解何时以及如何有效地使用异步编程至关重要。

三、深入了解 Asyncio

?? Asyncio 是 Python 中的一个库,它提供了使用 async/await 语法编写并发代码的框架。它主要用于编写单线程并发程序,非常适合 I/O 密集型和高级结构化网络代码。

3.1 了解事件循环

?? 事件循环是 Asyncio 库的核心。它是一种在程序中等待和分派事件或消息的编程结构。在 Asyncio 上下文中,事件循环运行异步任务和回调、执行网络 IO 操作并运行子进程。

?? 事件循环负责管理异步任务的执行。它跟踪所有正在运行的任务,并且当操作需要等待(例如 IO 等待)时,它会暂停任务并在操作可以继续时恢复任务。

3.2 事件循环如何工作

?? 1 运行任务和调度:循环执行任务,这些任务是计划运行的协程的实例。当任务等待 Future 时,循环会暂停该任务并继续运行其他任务。
??2处理IO和系统事件:除了运行任务之外,事件循环还处理IO和系统事件。它使用操作系统提供的选择或轮询等机制来监视多个流的活动。

import asyncio 


async  def  main (): 
    print ( 'Hello' ) 
    await asyncio.sleep( 1 ) 
    print ( 'World' ) 


loop=asyncio.get_event_loop() 
loop.run_until_complete(main()) 
loop.close()

?? 在此示例中,main() 是一个协程。Loop.run_until_complete(main()) 启动事件循环并运行主协程。 wait asyncio.sleep(1) 调用暂时暂停主协程,允许循环执行其他任务或处理 IO 事件。

3,3 自定义事件循环

?? Asyncio 允许自定义事件循环行为。开发人员可以使用不同的事件循环策略来更改默认事件循环或自定义其在不同操作系统上的行为。

if sys.platform == 'win32' : 
    Loop = asyncio.ProactorEventLoop()   # 对于 IOCP
     asyncio.set_event_loop(loop) 
else : 
    Loop = asyncio.SelectorEventLoop()   # 对于 Unix
     asyncio.set_event_loop(loop)

?? 此示例演示了根据操作系统设置不同的事件循环。

3.4 事件循环的最佳实践

?? 1始终用于asyncio.run()运行 asyncio 程序的顶级入口点。该函数创建一个新的事件循环,运行传递的协程,然后关闭循环。
?? 2除非有特定原因,否则请避免手动创建和管理事件循环。这有助于防止常见错误,例如在单线程程序中创建多个事件循环。

四、协程:异步(Asyncio) 的基础

?? 协程是 Asyncio 的基本构建块。它们是特殊的 Python 函数,旨在处理异步操作,使用async def. 与常规函数不同,协程可以暂停和恢复,使它们能够有效地处理非阻塞操作。

4.1 声明协程

?? 用 with 声明的协程async def在等待之前不会执行任何操作。该await关键字用于暂停协程的执行,将控制权交还给事件循环,然后事件循环可以继续运行其他任务或处理 I/O 操作。一旦等待的操作完成,协程就会从中断处恢复。

async  def  fetch_data (): 
    print ( "Start fetching" ) 
    wait asyncio.sleep( 2 )   # 模拟非阻塞等待
    print ( "Data fetched" )

?? 在此示例中,await asyncio.sleep(2)模拟非阻塞等待,允许事件循环在 2 秒暂停期间管理其他任务。

?? 请注意,简单地调用协程不会安排其执行:

>>> main()
<coroutine object main at 0x1053bb7c8>

?? 为了实际运行协程,asyncio 提供了以下机制:

  • asyncio.run()运行顶级入口点“main()”函数的函数(参见上面的示例。)
  • 正在等待协程。以下代码片段将在等待 1 秒后打印“hello”,然后再等待 2
    秒后打印“world ”:
import asyncio
import time


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")


asyncio.run(main())

预期输出:

>>> started at 17:13:52
>>> hello
>>> world
>>> finished at 17:13:55

asyncio.create_task()作为 asyncio 同时运行协程的函数Tasks。
让我们修改上面的示例并同时say_after运行两个协程:

async  def  main (): 
    task1 = asyncio.create_task( 
        say_after( 1 , 'hello' )) 

    task2 = asyncio.create_task( 
        say_after( 2 , 'world' )) 

    print ( f"started at {time.strftime( '%X ) ' )} " ) 

    # 等待两个任务完成(
    大约需要 2 秒。)
    await task1 
    wait task2 
    print ( f"finished at {time.strftime( '%X' )} " )

?? 请注意,预期输出现在显示该代码段的运行速度比以前快了 1 秒:

>>> started at 17:13:52
>>> hello
>>> world
>>> finished at 17:13:55

?? 该类asyncio.TaskGroup提供了更现代的替代方案create_task()。使用此 API,最后一个示例将变为:

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

?? 时间和输出应该与以前的版本相同。

4.2 任务:同时运行协程

?? Asyncio 中的任务用于同时调度协程。当您创建任务时,它会安排协程的执行:然后事件循环可以管理多个任务,同时运行它们。

?? 当协程被包装到具有类似协程功能的任务asyncio.create_task()中时,协程会自动安排很快运行:

import asyncio
async def nested():
    return 42
async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task
asyncio.run(main())

4.3 行为和特征

  • ?? 无线程并发:协程和任务允许并发,而不需要传统线程。这种并发性是通过协作式多任务处理来实现的,其中每个任务在某些点上产生对事件循环的控制await。
  • ?? 错误处理:协程中的错误处理方式与常规 Python 函数类似。可以在协程内引发和捕获异常。如果任务中发生异常且未处理,则会在等待任务时将异常传播到任务的调用方。
  • ?? 取消:任务可以被取消,这会引发一个asyncio.CancelledError等待的协程。这允许异步取消操作,这是响应式应用程序的一项关键功能。
async  def  main (): 
    task = asyncio.create_task(fetch_data()) 
    wait asyncio.sleep( 1 ) 
    task.cancel() 

    try :
        等待任务
    except asyncio.CancelledError: 
        print ( "fetch_data 被取消!" )

?? 本例中,fetch_data任务在 1 秒后被取消,演示了如何取消任务并处理取消。

五 Futures:管理异步结果

?? Asyncio 中的 Future 是比任务和协程更低级别的构造,主要用于管理异步操作的最终结果。Future 表示异步计算的最终结果。

5.1 Futures(期货)的作用

?? 期货充当尚未计算的结果的占位符。当创建 Future 对象时,它没有结果。异步任务完成后就会设置结果。Future 通常在 Asyncio 内部使用,但也可以直接用于更复杂的异步编程模式。

5.2 创建和使用 Futures

?? Futures 通常由 Asyncio 函数和方法创建,例如loop.create_future()。它们提供了一种跟踪异步操作何时完成并检索其结果的方法。

async  def  main (): 
    Loop = asyncio.get_running_loop() 
    future = loop.create_future() 

    # 安排结果的设置
    loop.call_soon(future.set_result, "Future is done!" ) 

    result = wait future 
    print (result)

?? 在此示例中,创建了一个 future,并使用 来设置其结果call_soon。未来await将等待,直到结果可用。

5.3 期货与任务比较

  • ?? 区别:任务用于调度和执行协程,而 future 是更通用的对象,用于表示异步操作的结果。任务实际上是 future 的子类。
  • ?? 用法:任务对于常规 Asyncio 编程通常更方便,因为它们是专门为协程设计的。Future 更适合与较低级别的异步操作集成或与其他异步系统进行互操作。

5.4 处理结果和异常

?? 获得结果:使用该result()方法获得未来的结果。如果 future 没有完成,调用result()将引发InvalidStateError. 如果未来被取消,它将引发CancelledError.
?? 错误处理:如果 future 封装的操作引发异常,则 future 会捕获该异常。可以使用该方法检索它exception()。

async  def  main (): 
    future = asyncio.Future() 

    # 设置异常 future.set_exception
     (RuntimeError( 'There was an error' )) 

    try : 
        result = wait future 
    except RuntimeError as e: 
        print ( f"Caught error: { e} " ) 


asyncio.run(main())

?? 此代码演示了将来设置和处理异常。

六、 流:处理网络操作

?? Asyncio 中的流是处理网络 I/O 操作(例如读取和写入套接字)的抽象。它们提供了一个高级接口来处理网络通信,抽象了低级套接字操作的复杂性。

6.1 流的组成部分:

  • Reader:表示连接的可读端的对象。它提供了、readline、read 、readexactly 等API,用于各种读取操作。
  • Writer:代表可写端的对象。它提供了类似write和的方法drain来促进写入连接。

6.2 创建和使用流

?? Asyncio 提供asyncio.open_connection()建立 TCP 连接的功能,该连接返回读取器和写入器对象。

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message}')
    writer.write(message.encode())

    data = await reader.read(100)
    print(f'Received: {data.decode()}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()
asyncio.run(tcp_echo_client( 'Hello World!' )

?? 在此示例中,建立了与在端口 8888 上的本地主机上运行的服务器的 TCP 连接。消息“你好世界!” 已发送,等待回复。

6.3 使用 Stream 处理服务器端

?? asyncio.start_server()用于启动服务器。它接受客户端处理程序协程,每次建立新的客户端连接时都会使用读取器和写入器对象调用该协程。

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message} from {addr}")

    print(f"Send: {message}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()


async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()
asyncio.run(main())

?? 在这里,服务器在端口 8888 上侦听 localhost。对于每个连接,它读取数据、打印数据、回显数据,然后关闭连接。

七、流媒体功能和最佳实践

?? 缓冲和流量控制:流在内部处理缓冲和流量控制,从而更容易管理大型或零星数据流。
?? 错误处理:流操作中正确的错误处理至关重要。始终确保在出现错误或操作完成时关闭连接。
?? SSL/TLS 支持:Streams 支持开箱即用的 SSL/TLS,以最少的额外配置实现安全连接。

7.1 流的高级使用

?? 对于更复杂的场景,例如处理并发连接或实现自定义协议,请考虑深入研究较低级别的构造,例如传输和协议,它们提供更多控制,但使用起来更复杂。

7.2 同步原语:确保线程安全

?? 同步原语是帮助协调并发操作、确保资源高效、安全使用的工具。在 Asyncio 中,这些主要用于防止数据竞争并确保异步编程中处理共享资源时的线程安全操作。

7.3 常见的 Asyncio 同步原语

?? 锁:锁用于保证对资源的独占访问。一次只有一个协程可以持有锁。当持有锁时,任何其他尝试获取该锁的协程都将暂停,直到锁被释放。

async def locked_task(lock, name):
    async with lock:
        print(f"{name} has the lock")
        await asyncio.sleep(1)
    print(f"{name} released the lock")

async def main():
    lock = asyncio.Lock()
    await asyncio.gather(
        locked_task(lock, 'First'),
        locked_task(lock, 'Second')
    )
asyncio.run(main())

?? 此示例演示了两个任务尝试获取同一锁。“First”任务首先获取它,“Second”任务必须等待直到锁被释放。

?? 事件:事件用于通知多个协程某些条件已变为真。事件对象管理一个内部标志,可以使用set()方法将其设置为 true,并使用 方法将其重置为 false clear()。

async def waiter(event):
    print('waiting for the event')
    await event.wait()
    print('event is set')


async def main():
    event = asyncio.Event()

    waiter_task = asyncio.create_task(waiter(event))
    await asyncio.sleep(1)

    print('setting the event')
    event.set()

    await waiter_task
asyncio.run(main())

?? 在此示例中,“waiter”协程等待设置事件。该事件在一秒延迟后设置,并且“waiter”协程恢复。

?? 信号量:信号量用于限制一次可以访问特定资源的协程数量。它是用一个计数器初始化的,当获取信号量时计数器递减,释放信号量时计数器递增。

async def resource_access(semaphore, name):
    async with semaphore:
        print(f"{name} acquired the semaphore")
        await asyncio.sleep(1)
    print(f"{name} released the semaphore")


async def main():
    semaphore = asyncio.Semaphore(2)  # Allow 2 concurrent accesses
    await asyncio.gather(
        resource_access(semaphore, 'Task 1'),
        resource_access(semaphore, 'Task 2'),
        resource_access(semaphore, 'Task 3')
    )
asyncio.run(main())

?? 此示例显示了一个信号量,允许两个任务同时访问资源,而第三个任务必须等待。

7.4 同步原语的最佳实践

?? 避免死锁:小心死锁,如果协程以循环方式相互等待,则可能会发生死锁。正确构建控制流并避免长时间持有锁可以缓解这种情况。
??作用域锁定:使用async with语句来管理锁,确保它们始终被释放,即使出现异常也是如此。
首选更高级别的构造:只要有可能,请使用 Asyncio 提供的高级同步原语,因为它们旨在与事件循环和协程无缝协作。

八、 Asyncio 中的高级主题

?? 当我们深入研究 Asyncio 的世界时,我们遇到了一个领域,异步编程的基本原理与该库更复杂、更强大的功能交织在一起。“Asyncio 高级主题”部分专为那些已掌握 Asyncio 基础知识并准备探索其更复杂功能的人员而设计。

8.1 传输和协议:较低级别的网络处理

??传输和协议是 Asyncio 网络层的核心组件,提供比流更低级别的接口来处理网络通信。它们提供更多的控制和灵活性,使其适合实现自定义通信协议和处理复杂的网络场景。

8.1.1 传输

??功能:传输负责数据的实际传输。它们抽象了各种类型的网络通信(TCP、UDP、SSL等)的细节,并提供了发送和接收数据的统一接口。
自定义传输:虽然 Asyncio 提供标准传输,但您还可以实现自定义传输来处理独特的网络行为或与不同的网络库集成。
协议

8.1.2 行为

??协议定义网络连接的应用程序级行为。他们解析传入的数据并决定如何响应。这是您实现通信逻辑的地方。
状态管理:协议通常维护有关连接的状态信息,其中可以包括接收的数据量、通信序列的当前阶段或错误状态等信息。

import asyncio
class EchoProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
    def data_received(self, data):
        self.transport.write(data)  # Echoing back received data
async def main():
    loop = asyncio.get_running_loop()
    server = await loop.create_server(EchoProtocol, '127.0.0.1', 8888)
    await server.serve_forever()
asyncio.run(main())

??此示例说明了基本的 TCP 回显服务器。当接收到数据时,将其发送回客户端。该协议单独处理每个连接。

8.2 极端情况和处理异常

??连接管理:正确处理连接的打开和关闭。确保即使在错误状态下也能释放资源。
??数据边界问题:注意 TCP 协议中的数据边界问题。TCP 是一种面向流的协议,没有固有的消息边界,因此您的协议必须实现自己的消息帧。

8.3 最佳实践

??资源清理:始终清理资源。确保当连接丢失或终止时正确关闭传输。
错误处理:在协议方法中实现全面的错误处理。网络通信容易出现各种问题,例如连接超时、数据损坏和断开连接。
测试:在各种网络条件下彻底测试您的协议和传输实现,以确保可靠性和稳健性。

8.4 高级用法

??自定义协议实现:对于特殊需求,例如专有通信协议,实现您自己的协议类,定义如何解析传入数据并对其进行响应。
与外部库集成:为了与本身不支持 Asyncio 的网络库集成,可以创建自定义传输来弥补差距,从而允许这些库在 Asyncio 生态系统中使用。

九、 子流程:使用外部流程

??Asyncio 提供对运行子进程并与它们异步交互的支持。此功能对于运行外部命令、处理其输出以及在 Asyncio 事件循环中管理其执行特别有用。

9.1 创建和管理子流程

??在 Asyncio 中运行子流程的主要方式是通过asyncio.create_subprocess_exec和asyncio.create_subprocess_shell函数。这些函数是协程,并返回一个Process表示正在运行的子进程的对象。

async  def  run_command (): 
    process = wait asyncio.create_subprocess_shell( 
        'echo "Hello World"' , 
        stdout=asyncio.subprocess.PIPE, 
        stderr=asyncio.subprocess.PIPE) 

    stdout, stderr = wait process.communicate() 
    print ( f "[stdout]\n {stdout.decode()} " ) 
    print ( f"[stderr]\n {stderr.decode()} " ) 


asyncio.run(run_command())

??此示例演示异步运行 shell 命令并捕获其输出。

9.2 与子流程交互

??与子进程通信:您可以通过读取和写入其 stdin、stdout 和 stderr 流(这些流可用作asyncio.StreamReader和asyncio.StreamWriter对象)来与子进程进行交互。
??等待完成:该Process对象提供了诸如wait()等待进程完成、communicate()将数据发送到 stdin 以及从 stdout 和 stderr 读取等方法。

9.3 处理输入和输出

??读取输出stdout:使用对象的属性异步读取子进程的输出Process。
发送输入stdin:使用StreamWriter与其关联的对象write的方法写入子进程。

9.4 最佳实践和注意事项

??处理大输出:对于生成大输出的进程,持续读取其 stdout 和 stderr 以避免死锁至关重要。
??管理流程生命周期:确保子流程生命周期的正确管理。关闭所有管道并正确终止进程以避免资源泄漏。
??安全注意事项:使用时asyncio.create_subprocess_shell,谨防shell注入漏洞。更喜欢asyncio.create_subprocess_exec运行已知命令。
高级用法

??自定义流程管理:对于复杂的场景,您可以创建自定义例程来管理多个子流程、处理其 I/O 流并同时监控其状态。
??与其他 Asyncio 组件集成:子进程可以与其他 Asyncio 组件(如队列或事件)集成,以实现复杂的进程管理和 IPC(进程间通信)。

十、队列:管理数据流

??Asyncio 中的队列是线程安全的异步数据结构,可以在应用程序的不同部分之间(特别是生产者协程和消费者协程之间)传递数据。它们对于管理异步应用程序中的数据流和协调任务至关重要。

10.1 创建和使用队列

??队列创建:Asyncio 队列是使用创建的asyncio.Queue。它可以容纳由参数定义的固定数量的项目maxsize。具有(默认)的队列maxsize=0是无界的。

queue = asyncio.Queue(maxsize=10) 

??将项目添加到队列:使用协程将项目添加到队列put。如果队列已满,put将阻塞直到有可用空间。

await queue.put(item)

从队列中删除项目:使用协程从队列中检索项目get。如果队列为空,get将阻塞直到有项目可用。

item = await queue.get()

10.2 生产者-消费者模式

??生产者:生产者协程将项目放入队列中。这些项目可以是从网络获取的数据、生成的计算等。
??消费者:消费者协程从队列中获取项目进行处理。多个消费者协程可用于并行处理项目。

async def producer(queue):
    for i in range(5):
        await queue.put(f'item {i}')
        await asyncio.sleep(1)


async def consumer(queue):
    while True:
        item = await queue.get()
        print(f'Processed {item}')
        queue.task_done()


async def main():
    queue = asyncio.Queue()

    producers = [asyncio.create_task(producer(queue))]
    consumers = [asyncio.create_task(consumer(queue)) for _ in range(2)]

    await asyncio.gather(*producers)
    await queue.join()  # Wait until all items are processed

    for c in consumers:
        c.cancel()


asyncio.run(main())

10.3 最佳实践和注意事项

??流量控制:使用maxsize参数来控制应用程序中的数据流并防止队列无限增长。
??任务完成:用于task_done()指示先前排队的任务已完成。queue.join()当用于等待所有项目被处理时,这一点尤其重要。
??处理生产者-消费者终止:确保生产者和消费者协程正常终止,特别是在长时间运行的应用程序中。
高级用法和技术

??优先队列:对于需要在其他项目之前处理某些项目的场景,asyncio.PriorityQueue可以使用。
??LIFO 队列:如果需要后进先出顺序,asyncio.LifoQueue则提供此功能。
??扩展 Asyncio:创建自定义组件
扩展 Asyncio 涉及创建与 Asyncio 架构无缝集成的自定义组件,例如自定??义事件循环、传输、协议或实用程序。这允许定制解决方案,满足标准库未涵盖的特定应用程序要求。

??自定义事件循环

??目的:有时,默认事件循环可能不适合特定需求(例如与其他框架集成或针对特定 I/O 模式的优化)。
??实现:创建自定义事件循环涉及子类化asyncio.AbstractEventLoop。您需要提供诸如run_forever、run_until_complete、 以及任务的调度和处理之类的基本方法的实现。

# This is a conceptual example. Actual implementation details may vary.
class CustomEventLoop(asyncio.AbstractEventLoop):
    def run_forever(self):
        while True:
            events = external_event_check()  # Hypothetical external event check
            for event in events:
                self._process_event(event)

??在这里,CustomEventLoop将外部事件系统集成到 Asyncio 的事件循环中。这对于依赖非标准事件源的应用程序可能是必要的。

10.4 自定义执行器

??目的:Asyncio 中的执行器允许在单独的线程或进程中执行同步代码,从而实现与同步库或长时间运行的任务的非阻塞集成。
??定制:您可能需要一个专门的执行器来管理同步任务的运行方式,也许与特定的线程或进程管理系统集成。通过实现自定义执行器,您可以精确定义如何处理、计划和执行这些任务。

import concurrent.futures


class CustomExecutor(concurrent.futures.ThreadPoolExecutor):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Additional initialization here

    # Custom behavior can be implemented here


# Usage
executor = CustomExecutor()
asyncio.get_event_loop().run_in_executor(executor, some_blocking_io_task)

??这CustomExecutor扩展了标准线程池执行器,允许自定义线程管理,可以针对特定类型的阻塞 I/O 任务进行定制。

10.5 创建自定义传输和协议

??自定义传输:虽然 Asyncio 为 TCP、UDP 和其他协议提供标准传输,但您可能会遇到需要专门传输层的情况。例如,与非标准网络库或硬件集成。在这种情况下,您可以创建从asyncio.BaseTransport.
??自定义协议:同样,如果现有的协议抽象不能满足您的需求,您可以定义自定义协议类。这对于实现专有通信协议或处理特定格式的数据可能是必要的。

class CustomProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
        print("Custom Protocol Connection Made")

    def data_received(self, data):
        print(f"Data received: {data.decode()}")
        self.transport.write(b"Echo: " + data)


async def main():
    loop = asyncio.get_running_loop()
    server = await loop.create_server(
        lambda: CustomProtocol(),
        '127.0.0.1', 8888)
    async with server:
        await server.serve_forever()


asyncio.run(main())

??此示例演示了回显接收到的数据的基本自定义协议。该协议可以进一步扩展以处理更复杂的数据处理。

10.6 扩展 Asyncio 的最佳实践

??确保兼容性:扩展 Asyncio 时,保持与其异步模型的兼容性。自定义组件不应阻塞事件循环。
??强大的测试:在各种条件下广泛测试自定义组件,以确保它们与 Asyncio 生态系统可靠地工作。
??性能注意事项:请注意自定义扩展的性能影响,尤其是在高负载场景的情况下。

十一、结论

??在本文中,我们探索了 Python 中 Asyncio 的多方面世界,探索了它从基本概念到高级功能的关键特性。我们从异步编程的基础知识开始,然后逐步了解核心 Asyncio 组件,例如事件循环、协程、任务和 future。然后,我们讨论了更复杂的主题,包括网络操作流、同步原语以及自定义传输、协议和子进程管理等高级领域。

??Asyncio 的深入之旅揭示了它在处理并发操作方面的强大功能和多功能性,使其成为现代 Python 开发人员的宝贵工具。随着 Asyncio 的不断发展,它为高效且可扩展的应用程序开发提供了不断扩展的环境。

??在您的 Python 项目中不断试验和利用 Asyncio 库,并加入充满活力的社区,在这个令人兴奋的异步编程领域持续学习和成长。

文章来源:https://blog.csdn.net/gongdiwudu/article/details/135695326
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。