目录

在-Python-异步编程中使用-awaitable-对象

在 Python 异步编程中使用 awaitable 对象

在 Python 异步编程中使用 awaitable 对象

awaitable 对象上,我们可以应用 await 语句。 asyncio 中的大部分函数和模块被设计为可以与 awaitable 对象一起使用。但是大部分 Python 对象和第三方库,不是为异步编程构建的。当构建异步应用程序时,有一点非常重要,那就是选择提供可用 awaitble 对象的兼容库。

awaitalbe 对象主要被分成三种类型:协程、任务和期物 (Futures)。我们已经讨论过了协程和任务。期物是一种低层次对象,像一种回调机制,用来处理来自于 async/await 的结果。期物对象通常不会暴露于用户级编程。

并发运行任务

如果我们一定要并行运行多个任务,我们可以使用 await 关键字,就像我们在之前的例子中做的那样。但是有一种更好的方法做这件事,就是使用 gather 函数。这个函数会按提供的序列运行 awaitable 对象。如果任何 awaitable 对象是协程,它将被调度为任务。后面的例子中我们会演示 gather 函数的使用。

使用队列分发任务

asyncio 包中的 Queue 队列和 Queue 模块类似,但是不是线程安全的。但是 asyncio 模块提供了多种队列实现,比如 FIFO 队列、优先级队列和 LIFO 队列。 asyncio 模块中的队列可以用来把工作负载发布为任务。

为了演示任务对列的使用,我们会写一个小程序,通过随机休眠一段时间来模拟实际函数的执行时间。为 10 次这样的执行计算随机休眠时间,并且由主进程作为工作项添加到 Queue 对象中。 Queue 队列被传递到一个有三个任务的池中。任务池中的每一个任务执行给定的协程,协程会消费每一个对它可见的队列记录中的执行时间(该协程根据其可用的队列条目消耗执行时间)。下面是完整的代码示例:

import time

async def executer(name, queue):
    while True:
        exec_time = await queue.get()
        await asyncio.sleep(exec_time)
        queue.task_done()

async def main():
    myqueue = asyncio.Queue()
    calc_execution_time = 0
    for _ in range(10):
        sleep_for = random.uniform(0.4, 0.8)
        calc_execution_time += sleep_for
        myqueue.put_nowait(sleep_for)

    tasks = []
    for id in range(3):
        task = asyncio.create_task(executer(f'任务-{id + 1}', myqueue))
        tasks.append(task)

    start_time = time.monotonic()
    await myqueue.join()
    total_exec_time = time.monotonic() - start_time

    for task in tasks:
        task.cancel()

    await asyncio.gather(*tasks, return_exceptions=True)

    print(f"执行时间:{calc_execution_time:0.2f}")
    print(f"实际执行时间:{total_exec_time:0.2f}")


asyncio.run(main())

我们使用 Queue 队列的 put_nowait 函数,因为它是一个非阻塞操作。上面的代码的执行结果是:

执行时间:6.22
实际执行时间:2.40

这清晰的演示了任务是并行执行的,执行时间是顺序执行的近三分之一。

<完>