asyncio:不成功便成仁
常规异步编程方案有一种“不成功便成仁”式的傲骨侠风,这就是问题所在。你孤注一掷重写代码,要么彻底避免阻塞,要么纯属浪费时间。 —— Alvaro Videla 和 Jason J. W. Williams
当你了解Python的协程之后,你会和我一样对这段话无比赞同。
协程代码长什么样? #
在Python中,协程是指一个可以挂起自身并在以后恢复的函数。Python协程通常在事件循环(也在同一个线程中)的监督下在单个线程中运行。协程支持协作式多任务处理:一个协程必须使用await关键字显式放弃控制权,另一个协程才可以并发(而非并行)开展工作。这意味着,协程中只要有导致阻塞的代码,事件循环和其他所有协程的执行就都会受到阻塞。
《流畅的Python》一书中有一个非常经典的使用例:执行一段比较耗时的函数时,在命令行中显示一个不停旋转的指针:
import asyncio
import itertools
async def spin(msg: str): # 用`async def`关键字定义一个协程
for char in itertools.cycle(r'\|/-'):
status = f'\r{char} {msg}'
print(status, flush=True, end='')
try:
await asyncio.sleep(.1) # 用`await`关键字暂停当前任务, 允许其他任务执行
except asyncio.CancelledError: # 当接收到`asyncio.CancelledError`异常时, 跳出循环并结束此任务
break
blanks = ' ' * len(status)
print(f'\r{blanks}\r', end='')
async def slow() -> int:
await asyncio.sleep(3) # 需要使用`asyncio.sleep`代替`time.sleep`, 否则将阻塞整个线程
return 42
async def supervisor() -> int:
spinner = asyncio.create_task(spin("thinking!")) # 将协程封装成任务, 并调度其执行
print(f'spinner object: {spinner}')
result = await slow() # 在执行`slow()`时, 等待结果的同时允许其他任务执行
spinner.cancel() # task的`cancel`方法将在协程内部抛出一个`asyncio.CancelledError`异常
return result
def main():
result = asyncio.run(supervisor()) # 运行一个协程, 并开启事件循环
print(f'Answer: {result}')
if __name__ == "__main__":
main()
在阅读协程代码时,如果看到代码运行到await
,那就意味着该协程可以在此处暂停,事件循环会在等待结果的同时去寻找并执行其他已调度的协程。
这段示例代码中,我们定义了三个协程:spin
用于显示旋转指针,slow
是我们定义的耗时任务,supervisor
则是所有异步代码的入口。
在supervisor
中,我们先执行并调度了spin("thinking!")
,然后用await
关键字调用slow()
。此时我们就可以简单地认为:在主线程中有两个协程任务(spin("thinking!")
和slow()
),当代码执行到slow()
的await asyncio.sleep(3)
时,会将线程的执行权转交给另一个协程spin("thinking!")
;而当代码执行到spin("thinking!")
的await asyncio.sleep(.1)
时,同样也会转交线程的执行权给另一个协程(在本示例中由于只有两个协程任务,因此只能转交回给slow()
)。线程在两个协程(任务)之间不断切换执行,这样就实现了一个线程并发执行了两个协程(任务)。
不同于多进程/多线程,协程的调度切换是用户手动控制的,因此更加灵活,并且协程与协程之间切换的开销要小于多进程/多线程,因此理论上协程代码的运行效率要优于多进程/多线程代码。
为什么我不喜欢写协程代码? #
一处协程,处处协程 #
async
定义一个协程,而await
也只能wait一个协程任务(而不是一个普通的函数)。这意味着如果你想把自己的多进程/多线程代码重构成协程,那么很多代码都不能复用,必须得重写。
你得把你已经用得很上手了的 requests 库替换成 aiohttp 库,你得把你已经写好的函数全部加上async
,你还得琢磨应该在协程内部的何处await
以避免长时间阻塞,最后,你还得花大量的时间进行测试和调试(协程代码调试起来可比多进程/多线程代码困难多了)。
一旦你编写了第一个async def
之后,那么你的代码中一定会有越来越多的async
和await
,此时再想使用非异步库,难度可想而知。
不能await该怎么办? #
当你沉迷于写协程代码后,你就会很无奈地发现,能够await
的场景实在是很有限。
比如:你遇到了需要本地读写文件的场景,但Python底层并没有提供异步文件IO的支持,那该怎么办呢?总不能在等待文件IO期间一直阻塞吧。
办法还是有的:利用run_in_executor
将阻塞的代码“转化”为协程并丢到另一个线程或进程中执行:
import asyncio
def blocking_io():
with open('/dev/urandom', 'rb') as f:
return f.read(100)
async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_io)
print('default thread pool', result)
if __name__ == '__main__':
asyncio.run(main())
“转化”一词当然要加引号,
run_in_executor
只是让阻塞的代码变得可以await
而已。
很显然,在某种程度上,这违背了协程的“终极理想”。
其实还有一种更投机的办法:上面这个例子中不是从/dev/urandom
中读取100个字节嘛,那么我们可以每读几个字节就asyncio.sleep
一下,好让其他协程也有机会执行:
import asyncio
async def aio():
bytes_list = []
with open('/dev/urandom', 'rb') as f:
# 每次读10个字节, 循环10次
for _ in range(10):
bytes_list.append(f.read(10))
await asyncio.sleep(0) # `asyncio.sleep(0)`并不是没有意义的, 倒不如说很常见
return b''.join(bytes_list)
async def main():
result = await aio()
print(result)
if __name__ == '__main__':
asyncio.run(main())
但,这又是何必呢?
别跟我提 aiofiles 库,简单分析一下源代码你就会发现,本质上还是run_in_executor。
仍然无法绕过的GIL #
协程的本质是在一个线程中调度并执行多个任务,这意味着协程仍然受到GIL的影响,因此在CPU密集型场景不适合用协程。
总结 #
抨击线程的往往是系统开发者,他们考虑的使用场景对一般的应用程序开发者来说,也许一生都不会遇到。……应用程序开发者遇到的使用场景,99%的情况下只需知道如何派生一堆独立的线程,然后利用队列收集结果。 —— Michelle Simionato
多线程、多进程、协程,这些都是实现并发编程的手段,各有优劣。在某个场景下或许有一个最佳方案,但不存在能通吃所有场景的方案。
作为一名Python程序员,当你遇到并发场景时,绝大多数情况下只需生成一个进程池或线程池,之后把任务丢到里边,然后等待收集结果即可,就像农民种地一样:开垦、播种、收获。
请记住,你拥有一台多核且性能不错的PC或服务器,是为了让你写多线程/多进程并发代码以充分发挥其多核优势和性能优势,而不是如戴着镣铐跳舞一般绞尽脑汁写协程代码只为了让自己感觉“高人一等”。
参考资料 #
- 《流畅的Python(第2版)》
- 并发编程(进程、线程、协程) - Martin8866 - 博客园
- Python异步编程详解 - HatBoy的个人主页