asyncio:不成功便成仁

asyncio:不成功便成仁


常规异步编程方案有一种“不成功便成仁”式的傲骨侠风,这就是问题所在。你孤注一掷重写代码,要么彻底避免阻塞,要么纯属浪费时间。 —— Alvaro Videla 和 Jason J. W. Williams

当你了解Python的协程之后,你会和我一样对这段话无比赞同。

协程代码长什么样? #

在Python中,协程是指一个可以挂起自身并在以后恢复的函数。Python协程通常在事件循环(也在同一个线程中)的监督下在单个线程中运行。协程支持协作式多任务处理:一个协程必须使用await关键字显式放弃控制权,另一个协程才可以并发(而非并行)开展工作。这意味着,协程中只要有导致阻塞的代码,事件循环和其他所有协程的执行就都会受到阻塞。

《流畅的Python》一书中有一个非常经典的使用例:执行一段比较耗时的函数时,在命令行中显示一个不停旋转的指针:

import asyncio
import itertools

async def spin(msg: str):  # 用`async def`关键字定义一个协程
    for char in itertools.cycle(r'\|/-'):
        status = f'\r{char} {msg}'
        print(status, flush=True, end='')
        try:
            await asyncio.sleep(.1)  # 用`await`关键字暂停当前任务, 允许其他任务执行
        except asyncio.CancelledError:  # 当接收到`asyncio.CancelledError`异常时, 跳出循环并结束此任务
            break
        blanks = ' ' * len(status)
        print(f'\r{blanks}\r', end='')

async def slow() -> int:
    await asyncio.sleep(3)  # 需要使用`asyncio.sleep`代替`time.sleep`, 否则将阻塞整个线程
    return 42

async def supervisor() -> int:
    spinner = asyncio.create_task(spin("thinking!"))  # 将协程封装成任务, 并调度其执行
    print(f'spinner object: {spinner}')
    result = await slow()  # 在执行`slow()`时, 等待结果的同时允许其他任务执行
    spinner.cancel()  # task的`cancel`方法将在协程内部抛出一个`asyncio.CancelledError`异常
    return result

def main():
    result = asyncio.run(supervisor())  # 运行一个协程, 并开启事件循环
    print(f'Answer: {result}')

if __name__ == "__main__":
    main()

在阅读协程代码时,如果看到代码运行到await,那就意味着该协程可以在此处暂停,事件循环会在等待结果的同时去寻找并执行其他已调度的协程。

这段示例代码中,我们定义了三个协程:spin用于显示旋转指针,slow是我们定义的耗时任务,supervisor则是所有异步代码的入口。

supervisor中,我们先执行并调度了spin("thinking!"),然后用await关键字调用slow()。此时我们就可以简单地认为:在主线程中有两个协程任务(spin("thinking!")slow()),当代码执行到slow()await asyncio.sleep(3)时,会将线程的执行权转交给另一个协程spin("thinking!");而当代码执行到spin("thinking!")await asyncio.sleep(.1)时,同样也会转交线程的执行权给另一个协程(在本示例中由于只有两个协程任务,因此只能转交回给slow())。线程在两个协程(任务)之间不断切换执行,这样就实现了一个线程并发执行了两个协程(任务)。

不同于多进程/多线程,协程的调度切换是用户手动控制的,因此更加灵活,并且协程与协程之间切换的开销要小于多进程/多线程,因此理论上协程代码的运行效率要优于多进程/多线程代码。

为什么我不喜欢写协程代码? #

一处协程,处处协程 #

async定义一个协程,而await也只能wait一个协程任务(而不是一个普通的函数)。这意味着如果你想把自己的多进程/多线程代码重构成协程,那么很多代码都不能复用,必须得重写。

你得把你已经用得很上手了的 requests 库替换成 aiohttp 库,你得把你已经写好的函数全部加上async,你还得琢磨应该在协程内部的何处await以避免长时间阻塞,最后,你还得花大量的时间进行测试和调试(协程代码调试起来可比多进程/多线程代码困难多了)。

一旦你编写了第一个async def之后,那么你的代码中一定会有越来越多的asyncawait,此时再想使用非异步库,难度可想而知。

不能await该怎么办? #

当你沉迷于写协程代码后,你就会很无奈地发现,能够await的场景实在是很有限。

比如:你遇到了需要本地读写文件的场景,但Python底层并没有提供异步文件IO的支持,那该怎么办呢?总不能在等待文件IO期间一直阻塞吧。

办法还是有的:利用run_in_executor将阻塞的代码“转化”为协程并丢到另一个线程或进程中执行:

import asyncio

def blocking_io():
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

async def main():
    loop = asyncio.get_running_loop()

    result = await loop.run_in_executor(None, blocking_io)
    print('default thread pool', result)

if __name__ == '__main__':
    asyncio.run(main())

“转化”一词当然要加引号,run_in_executor只是让阻塞的代码变得可以await而已。

很显然,在某种程度上,这违背了协程的“终极理想”。

其实还有一种更投机的办法:上面这个例子中不是从/dev/urandom中读取100个字节嘛,那么我们可以每读几个字节就asyncio.sleep一下,好让其他协程也有机会执行:

import asyncio

async def aio():
    bytes_list = []
    with open('/dev/urandom', 'rb') as f:
        # 每次读10个字节, 循环10次
        for _ in range(10):
            bytes_list.append(f.read(10))
            await asyncio.sleep(0)  # `asyncio.sleep(0)`并不是没有意义的, 倒不如说很常见
    return b''.join(bytes_list)

async def main():
    result = await aio()
    print(result)

if __name__ == '__main__':
    asyncio.run(main())

但,这又是何必呢?

别跟我提 aiofiles 库,简单分析一下源代码你就会发现,本质上还是run_in_executor

仍然无法绕过的GIL #

协程的本质是在一个线程中调度并执行多个任务,这意味着协程仍然受到GIL的影响,因此在CPU密集型场景不适合用协程。

总结 #

抨击线程的往往是系统开发者,他们考虑的使用场景对一般的应用程序开发者来说,也许一生都不会遇到。……应用程序开发者遇到的使用场景,99%的情况下只需知道如何派生一堆独立的线程,然后利用队列收集结果。 —— Michelle Simionato

多线程、多进程、协程,这些都是实现并发编程的手段,各有优劣。在某个场景下或许有一个最佳方案,但不存在能通吃所有场景的方案。

作为一名Python程序员,当你遇到并发场景时,绝大多数情况下只需生成一个进程池或线程池,之后把任务丢到里边,然后等待收集结果即可,就像农民种地一样:开垦、播种、收获。

请记住,你拥有一台多核且性能不错的PC或服务器,是为了让你写多线程/多进程并发代码以充分发挥其多核优势和性能优势,而不是如戴着镣铐跳舞一般绞尽脑汁写协程代码只为了让自己感觉“高人一等”。

参考资料 #

  1. 《流畅的Python(第2版)》
  2. 并发编程(进程、线程、协程) - Martin8866 - 博客园
  3. Python异步编程详解 - HatBoy的个人主页

Table of Contents