asyncio -- 异步 I/O

asyncio 是用来编写 并发 代码的库,使用 async/await 语法。

asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。

asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。

asyncio 提供一组 高层级 API 用于:

  • 并发地 运行 Python 协程 并对其执行过程实现完全控制;

  • 执行 网络 IO 和 IPC;

  • 控制 子进程;

  • 通过 队列 实现分布式任务;

  • 同步 并发代码;

此外,还有一些 低层级 API 以支持 库和框架的开发者 实现:

  • 创建和管理 事件循环,以提供异步 API 用于 网络化, 运行 子进程,处理 OS 信号 等等;

  • 使用 transports 实现高效率协议;

  • 通过 async/await 语法 桥接 基于回调的库和代码。

协程与任务

协程通过 async/await 语法进行声明,是编写异步应用的推荐方式。

要真正运行一个协程,asyncio 提供了以下主要机制:

asyncio.run

asyncio.run(coro, *, debug=False):执行 coroutine coro 并返回结果。

此函数运行传入的协程,负责管理 asyncio 事件循环并 完结异步生成器

当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用。

如果 debugTrue,事件循环将以调试模式运行。

此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。

import asyncio
import time


async def main():
    print(f"started at {time.strftime('%X')}")

    print("hello")
    await asyncio.sleep(1)
    print('world')

    print(f"finished at {time.strftime('%X')}")


asyncio.run(main())

预期的输出:

started at 12:14:28
hello
world
finished at 12:14:29

注意:简单地调用一个协程并不会将其加入执行日程:

main()  # <coroutine object main at 0x10cd11c20>

以下代码段会在等待 1 秒后打印 "hello",然后 再次 等待 2 秒后打印 "world";也就是两个任务实际是同步执行

import asyncio
import time


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")


asyncio.run(main())

预期的输出:

started at 17:13:52
hello
world
finished at 17:13:55

asyncio.create_task

asyncio.create_task(coro):将 coro 协程 打包为一个 Task 排入日程准备执行,返回 Task 对象。主要用来并发运行作为 asyncio 任务 的多个协程

该任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError

此函数 在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数,同样得到Task对象。

让我们修改以上示例,并发 运行两个 say_after 协程:

import asyncio
import time


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    # In Python 3.7+
    task1 = asyncio.create_task(say_after(1, 'hello'))
	# This works in all Python versions but is less readable
    task2 = asyncio.ensure_future(say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")


asyncio.run(main())

注意,预期的输出显示代码段的运行时间比之前快了 1 秒:

started at 12:03:07
hello
world
finished at 12:03:09

以下演示Task对象的一些常用属性和方法:

import asyncio


async def say_after(delay, what):
    await asyncio.sleep(delay)
    return f'{what}{delay}'


async def main():
    # In Python 3.7+
    task1 = asyncio.create_task(say_after(1, 'hello'))
    # This works in all Python versions but is less readable
    task2 = asyncio.ensure_future(say_after(2, 'world'))

    task1.cancel()  # 请求取消 Task 对象

    try:
        await task1
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")
    await task2
    print(f'task1 cancel:{task1.cancelled()}')  # 判断 Task 对象是否取消,取消则返回True,否则返回False
    print(f'task2 end:{task2.done()}')  # 如果 Task 对象 已完成 则返回 True
    print(f'task2 result: {task2.result()}')  # 返回 Task 任务的执行结果


asyncio.run(main())

预期的输出:

main(): cancel_me is cancelled now
task1 cancel:True
task2 end:True
task2 result: world2

asyncio.current_task(loop=None):返回当前运行的 Task 实例,如果没有正在运行的任务则返回 None

asyncio.all_tasks(loop=None): 返回事件循环所运行的未完成的 Task 对象的集合。

如果 loopNone,则会使用 get_running_loop() 获取当前事件循环。

import asyncio


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(asyncio.current_task()) 
    return f'{what}{delay}'


async def main():
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))

    print(asyncio.all_tasks()) 
    await task1
    await task2


asyncio.run(main())

预期的输出:

{
    <Task pending coro=<say_after() running at /Users/lsf/PycharmProjects/pythonProject/demo.py:4>>, 
    <Task pending coro=<say_after() running at /Users/lsf/PycharmProjects/pythonProject/demo.py:4>>, 
    <Task pending coro=<main() running at /Users/lsf/PycharmProjects/pythonProject/demo.py:16> cb=[_run_until_complete_cb() at /usr/local/opt/python@3.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py:157]>
}
<Task pending coro=<say_after() running at /Users/lsf/PycharmProjects/pythonProject/demo.py:6> cb=[<TaskWakeupMethWrapper object at 0x109b65e10>()]>
<Task pending coro=<say_after() running at /Users/lsf/PycharmProjects/pythonProject/demo.py:6> cb=[<TaskWakeupMethWrapper object at 0x109b4c4d0>()]>

asyncio.sleep

asyncio.sleep(delay, result=None, *, loop=None): 阻塞 delay 指定的秒数。

如果指定了 result,则当协程完成时将其返回给调用者。

sleep() 总是会挂起当前任务,以允许其他任务运行。

以下协程示例运行 5 秒,每秒显示一次当前日期:

import asyncio
import datetime


async def main():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5
    while True:
        if loop.time() > end_time:
            break
        print(datetime.datetime.now())
        await asyncio.sleep(1)


asyncio.run(main())

asyncio.gather

asyncio.gather(*aws, loop=None, return_exceptions=False): 并发 运行 aws 序列中的 可等待对象

如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致

如果 return_exceptionsFalse (默认),所引发的首个异常会立即传播给等待 gather() 的任务。aws 序列中的其他可等待对象 不会被取消 并将继续运行。

如果 return_exceptionsTrue,异常会和成功的结果一样处理,并聚合至结果列表。

如果 gather() 被取消,所有被提交 (尚未完成) 的可等待对象也会 被取消

如果 aws 序列中的任一 Task 或 Future 对象 被取消,它将被当作引发了 CancelledError 一样处理 -- 在此情况下 gather() 调用 不会 被取消。这是为了防止一个已提交的 Task/Future 被取消导致其他 Tasks/Future 也被取消。

import asyncio
import datetime
import time


async def say_after(delay, what):
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5
    while True:
        if loop.time() > end_time:
            break
        print(datetime.datetime.now(), what)
        await asyncio.sleep(delay)
    return f'{what}{delay}'


async def main():
    print(f"started at {time.strftime('%X')}")

    results = await asyncio.gather(
        say_after(1, 'AAA'),
        say_after(2, 'BBB'),
        say_after(3, 'CCC'),
    )
    print(results)
    print(f"finished at {time.strftime('%X')}")


asyncio.run(main())

预期的输出:

started at 14:57:10
2021-10-31 14:57:10.341888 AAA
2021-10-31 14:57:10.341930 BBB
2021-10-31 14:57:10.341954 CCC
2021-10-31 14:57:11.342177 AAA
2021-10-31 14:57:12.342331 BBB
2021-10-31 14:57:12.342488 AAA
2021-10-31 14:57:13.345196 CCC
2021-10-31 14:57:13.345289 AAA
2021-10-31 14:57:14.342706 BBB
2021-10-31 14:57:14.346219 AAA
['AAA1', 'BBB2', 'CCC3']
finished at 14:57:16

asyncio.wait_for

asyncio.wait_for(aw, timeout, *, loop=None):等待 aw 可等待对象 完成,指定 timeout 秒数后超时。

timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果 timeoutNone,则等待直到完成。

如果发生超时,任务将取消并引发 asyncio.TimeoutError

import asyncio


async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')


async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')


asyncio.run(main())

asyncio.wait

asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED):并发运行 aws 指定的 可等待对象 并阻塞线程直到满足 return_when 指定的条件。

返回两个 Task/Future 集合: (done, pending)

用法:

done, pending = await asyncio.wait(aws)

如指定 timeout (float 或 int 类型) 则它将被用于控制返回之前等待的最长秒数。

请注意此函数不会引发 asyncio.TimeoutError。当超时发生时,未完成的 Future 或 Task 将在指定秒数后被返回。

return_when 指定此函数应在何时返回。它必须为以下常数之一:

常数 描述
FIRST_COMPLETED 函数将在任意可等待对象结束或取消时返回。
FIRST_EXCEPTION 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED
ALL_COMPLETED 函数将在所有可等待对象结束或取消时返回。

wait_for() 不同,wait() 在超时发生时不会取消可等待对象。

import asyncio


async def say_after(delay, what):
    await asyncio.sleep(delay)
    return f'{what}{delay}'


async def main():
    task1 = asyncio.create_task(say_after(2, 'hello'))
    task2 = asyncio.create_task(say_after(4, 'world'))

    done, pending = await asyncio.wait([task2, task1], timeout=3)
    print(done)
    print(pending)
    if task1 not in pending:
        print(f'task1 result: {task1.result()}')
    if task2 in done:
        print(f'task2 result: {task2.result()}')

    await asyncio.wait(pending)
    print("task2 end")
    print(f'task2 result: {task2.result()}')


asyncio.run(main())

预期的输出:

{<Task finished coro=<say_after() done, defined at /Users/lsf/PycharmProjects/pythonProject/demo.py:4> result='hello2'>}
{<Task pending coro=<say_after() running at /Users/lsf/PycharmProjects/pythonProject/demo.py:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10e72f150>()]>>}
task1 result: hello2
task2 end
task2 result: world4

asyncio.as_completed

asyncio.as_completed(aws, *, loop=None, timeout=None)

并发地运行 aws 集合中的 可等待对象。返回一个 Future 对象的迭代器。返回的每个 Future 对象代表来自剩余可等待对象集合的最早结果。

如果在所有 Future 对象完成前发生超时则将引发 asyncio.TimeoutError

import asyncio


async def say_after(delay, what):
    await asyncio.sleep(delay)
    return f'{what}{delay}'


async def main():
    task1 = asyncio.create_task(say_after(2, 'hello'))
    task2 = asyncio.create_task(say_after(4, 'world'))

    for f in asyncio.as_completed([task2, task1]):
        earliest_result = await f
        print(f'task result: {earliest_result}')


asyncio.run(main())

预期的输出:

task result: hello2
task result: world4

异步回调

爬取某个平台下的所有图片的时候,我们需要下载图片,如果你一个个地下载会出现这样的情况:

  1. 如果某个请求堵塞,整个队列都会被堵塞
  2. 如果是小文件,单线程下载太慢

这时候异步下载就派上用场了,在你请求第一个图片获得数据的时候,它会切换请求第二个图片或其他图片,等第一个图片获得所有数据后再切换回来。从而实现多线程批量下载的功能,速度超快,下载超清大图用这个方法可以一秒一张。

async def download_img(session, url):
    name = url.split('/')[-1]

    img = await session.get(url)

    imgcode = await img.read()

    with open("img/" + str(name), 'wb') as f:
        f.write(imgcode)

    return str(url)


async def download_all_img(URL):
    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.create_task(download_img(session, url)) for url in URL]
        all_results = await asyncio.gather(*tasks)


async def main():
    url = [
        'http://pic1.win4000.com/wallpaper/2018-09-11/5b9784aa5ec31.jpg',
        'http://pic1.win4000.com/wallpaper/2018-09-11/5b9784aaef17f.jpg',
    ]
    await asyncio.create_task(download_all_img(url))


if __name__ == '__main__':
    asyncio.run(main())