上篇教程《深入理解Python异步编程(上)》中,我们深入学习了异步编程的相关概念,如阻塞/非阻塞、同步/异步、并行/并发;也学习了异步的难点与在Python中的发展历程,学习了asyncio
的原型,知道了事件循环(Event Loop)、回调(Callback)、协程(Coroutine)、未来对象(Future)、任务(Task)之间的联系,彻底弄清楚了异步编程是什么、为什么、怎么样。此篇教程将带领大家学习如何使用asyncio
。
提示:本文共2万余字,建议先走马观花看个大概,再细读重点关心的部分。当然,也欢迎从头到尾仔细阅读。
内容安排 上篇
了解 异步编程及其紧密相关的概念,如阻塞/非阻塞、同步/异步、并发/并行等
理解 异步编程是什么,以及异步编程的困难之处
理解 为什么需要异步编程
熟悉 如何从同步阻塞发展到异步非阻塞的
掌握epoll + Callback + Event loop是如何工作的
掌握 Python 是如何逐步从回调到生成器再到原生协程以支持异步编程的
掌握 asyncio 的工作原理
掌握 asyncio 标准库基本使用
掌握 asyncio 的事件循环
掌握 协程与任务如何使用与管理(如调度与取消调度)
掌握 同步原语的使用(Lock、Event、Condition、Queue)
掌握 asyncio 和多进程、多线程结合使用
掌握 常用的异步编程模式和最佳实践
了解 Python3.6到3.12各版本的asyncio的变化
理解 GIL 对异步编程的影响
理解 asyncio 踩坑经验
理解 回调、协程、绿程(Green-Thread)、线程对比总结
掌握 多进程、多线程、协程各自的适用场景
了解 Gevent/libev、uvloop/libuv 与asyncio的区别和联系
掌握 Python异步编程的一些指导细则
掌握 asyncio 在大型应用中的可扩展性和性能优化
注:为减轻学习负担,本教程基于 Python 3.6,历史遗留的某些旧的、或不再推荐的用法将刻意避开。所以讨论细节时,除非文中特别提及其他版本做对比,可能只适应于 3.6。
注2:本文撰写时是Python3.6发布不久,更新时已是Python3.12。这期间的变动主要是性能改进、提供了新的上下文管理器和任务处理方法,并不影响本系列文章的主线内容,本文“中篇”会在文末列出3.6到3.12各版本的主要变化。若因作者疏漏导致文中有版本不兼容的代码示例,恳请大家斧正。
1 原生协程语法 1.1 async def
在Python 3.5之后,引入了async/await
关键字。用它们定义的函数/方法被称为原生协程。需注意,确切讲async def
定义的是一个原生协程函数,而调用协程函数得到的返回值才是该函数对应的协程。不过在口头交流中,一般不会刻意区分这两者。
async def read_data(db):
pass
# 类型为 function,原生协程函数
type(read_data)
# 类型为 coroutine,调用协程函数返回的才是协程
coro = read_data(db)
type(coro)
要注意的是,async def
的函数内可以没有await
语句。另外,3.5 和 3.6 有所区别:
async def func():
yield 42 # Python 3.5 语法错误
async def func():
yield 42 # Python 3.6 正确,func() 是一个异步生成器
async def func():
yield from coro() # Python 3.5 语法错误
async def func():
yield from coro() # Python 3.6语法错误
1.2 await
await
用于接受可被异步等待的awaitable
的对象的返回值,返回值可以绑定到一个变量。例如:
async def read_data(db):
data = await db.fetch('SELECT ...')
可被异步等待的awaitable
对象有如下几种:
调用原生协程函数返回的原生协程;
被
@asyncio.coroutine
装饰的生成器函数调用后返回的生成器协程;实现了
__await__
魔术方法的对象;__await__
方法必须返回迭代器,否则会报TypeError
。实现了__await__
方法的对象也被称为类似未来对象(Future-like)。
关键字await
后面只能跟着awaitable
对象,否则会报TypeError
。而且await
必须在async def
的函数内使用。那么问题来了:
async def say_hello():
print('in say_hello')
return 'Hello'
async def say_world():
print('in say_world')
return 'World'
async def say_helloworld():
print('in say_helloworld')
value = await say_hello() + await say_world()
return value
按上述代码,say_hello()
和say_world()
都使用await
发起调用并接收返回值。那say_helloworld()
这个入口又该怎么调用?又如何接收到它的结果呢?
2 协程的调用
在《上篇》中,我们已经学习了EventLoop、Task、Future、Coroutine
之间的联系。Future
用来放置异步操作的返回结果,Task
用来管理协程,把Coroutine
对象封装到Task
中,然后启动EventLoop
程序就开始执行了。接下来学习如何使用asyncio
来完成这一系列操作。
2.1 内置事件循环
在《上篇》中,我们学习到了如何让事件循环不用关心业务逻辑,而只做事件监听的技术操作。所以EventLoop
可以作为基础技术组件事先编写好,asyncio
就提供了对EventLoop
的封装。
2.1.1 事件循环策略
Python 内置的事件循环比我们在《上篇》中自己写的强大。首先,它引入了事件循环策略的概念。事件循环策略是全局对象,每进程一个。asyncio
提供了API可以程序员获取和更改每个Python进程内的事件循环策略。如此,让应用开发有了更多的选择,可以使用asyncio
默认的策略,也可以使用第三方提供或自己编写的策略。
asyncio.get_event_loop_policy() # 获取当前的事件循环策略
asyncio.set_event_loop_policy(policy) # 设置当前的事件循环策略
我们可能听说过uvloop
是一个很高效的第三方事件循环框架,以上两个接口就给我们提供了利用uvloop
代替内置事件循环的可能。
policy = uvloop.EventLoopPolicy() # 获取 uvloop 提供的事件循环策略
asyncio.set_event_loop_policy(policy) # 用uvloop的循环策略代替内置策略
事件循环策略中,又引入了事件循环上下文的概念。一个事件循环的上下文,就是指一个线程,即每个线程可以设置不同的事件循环。
2.1.2 事件循环
asyncio
提供了如下三个接口来获取或设置事件循环:
default_loop = asyncio.get_event_loop() # 获取当前线程的事件循环
default_loop = asyncio.get_event_loop_policy().get_event_loop() # 与上一行等效
asyncio.set_event_loop(loop) # 为当前线程设置事件循环
asyncio.get_event_loop_policy().set_event_loop(loop) # 与上一行等效
new_loop = asyncio.new_event_loop() # 根据当前事件循环策略生成一个新的事件循环
new_loop = asyncio.get_event_loop_policy().new_event_loop() # 与上一行等效
调用get_event_loop()
得到的是默认事件循环。如果是 Unix-like 系统,会调用asyncio.SelectorEventLoop()
得到基于epoll
或kqueue
选择机制的事件循环;如果是Windows系统,会调用asyncio.ProactorEventLoop()
得到基于IOPC
(I/O完成端口)的事件循环。
注意,这里我们遇到了asyncio
的第一个坑。每个线程可以设置不同的事件循环,但是每个进程又只能有一个事件循环策略。导致想要使用多种事件循环的情况混乱。具体现象下篇再解释。先记住经验1:同一个Python进程中,最好只使用一种事件循环策略,且在多线程情况下只使用该策略生成的事件循环对象。
第二个容易被忽略的坑是,使用默认策略时,主线程能够asyncio.get_event_loop()
得到默认事件循环,但是子线程内做此操作却不行。会报RuntimeError
,提示当前线程无事件循环。uvloop
的策略也是这样。再记住经验2:子线程中有异步时,需在子线程内先设置事件循环,或将主线程中获取到的循环对象传递给子线程。
从经验1和经验2可以推导出经验3:一个Python进程最好只有一种事件循环策略,只使用一个事件循环对象。
上述经验在Python < 3.6.x 版本中有效,随着Python的发展也许这些经验会发生变化。2.1.3 协程加入事件循环调度
在《上篇》教程中,我们在fetch()
协程内调用了selector
并在其上面注册了事件和对应的回调,然后再把协程封装到Task
中,loop()
里使用了与fetch()
内相同的selector
,所以loop()
启动后即可监听fetch()
内的事件并持续运行。
在使用asyncio
时,没有亲自调用selector
注册事件,如何将协程封装为Task
并加入到asyncio
提供的EventLoop
上呢?
# 获取事件循环对象
loop = asyncio.get_event_loop()
loop.create_task(say_helloworld())
按上述代码,我们就将say_helloworld()
这个协程封装为了Task
对象,并加入了loop
中开始调度。只等着loop
启动,say_helloworld()
就会被执行。
2.1.4 事件循环的启停
asyncio
的事件循环对象提供了下列方法控制它的启停和状态检查。
loop.run_forever()
:启动事件循环,直到stop()
方法被调用。若先调stop()
后调run_forever()
,则把已经加入到loop
中的任务全部执行完后退出退出;若先调run_forever()
后调stop()
则把当前已加入loop
的任务执行完然后退出。loop.run_until_complete(coro_or_future)
: 启动事件循环,直到传递给它的协程执行结束或未来对象set_result()
得到结果,并且返回协程或未来对象的结果,然后结束事件循环。故,此方法可以同时调度协程运行并得到返回值。如果在调用此方法前已经向loop
加入了其他任务,则其他任务也会执行。loop.is_running()
:loop
是否在运行中。loop.stop()
: 停止事件循环的运行,但不会立即停止,参看run_forever()
说明。loop.is_closed()
:loop
是否已被关闭。loop.close()
: 关闭loop
,所有已调度但未执行的任务将被强制清理,也不会等待正在执行的任务结束。这个操作是毁灭性的,需谨慎。当loop
被关闭之后,它则不可以再次运行,若要再次调度任务,必须使用新创建的loop
。若一个loop
正在运行中,则无法被关闭,调close()
是会报RuntimeError
的。
有些同学懵逼了,stop()
和close()
为啥要存在这两种不同的方法?因为这是两个不同的概念。loop
就像汽车的引擎,stop()
是熄火,close()
是报废。当引擎出现故障时,需要把它报废,报废之后必须换新的才能用。如果只是正常的跑完了目标距离,可以熄火,有了新任务重新点火就跑。正在运行中的引擎是不可以强制报废的。
2.2 放在一起执行
现在我们终于知道了如何解决 1.2 节末尾的问题。如下代码所示:
import asyncio
# 根据当前事件循环策略获取事件循环
# 得到的是 asyncio 默认的 UnixSelectorEventLoop
loop = asyncio.get_event_loop()
loop.create_task(say_helloworld())
loop.run_forever()
上述代码完成了say_helloworld()
协程的调度和执行,say_hello()
和say_world()
也会随之被调用。但是上述代码并不能得到say_helloworld()
的返回值,怎么才能输出它的返回值'helloworld'
呢?
import asyncio
import uvloop
# 顺便演示替换事件循环的用法
# 用 uvloop 的事件循环策略作为 asyncio 的默认策略
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 根据当前事件循环策略获取事件循环
# 得到的是 uvloop.Loop
loop = asyncio.get_event_loop()
retval = loop.run_until_complete(say_helloworld())
print(retval)
上述代码同样完成了对say_helloworld()
的调用,和前一种方案不同的是:事件循环被替换了,循环执行完任务以后会自动退出,接收到了协程的返回值。
2.3 总结
本节我们学习了如何定义原生协程,如何在协程内如何调用别的协程,如何在将协程封装为任务加入事件循环,如何启停事件循环,如何接收协程的返回值,如何替换asyncio
的默认事件循环。还介绍了一些应该注意的采坑经验。
到此为止,我们已经学习了利用asyncio
库调度协程运行的基本用法。接下来,我们将学习asyncio
提供的Task
和Future
提供了哪些接口以及如何使用。以免在面对复杂问题时不知所措。
3 Future 和 Task 3.1 回顾 asyncio 核心原理
在《上篇》中,我们学习了asyncio的核心运行原理,讲解了EventLoop + Coroutine + Task + Future
是如何协同完成异步程序的调度和执行的。
下面我们看看《上篇》异步爬虫的关键运行过程,asyncio
的框架的核心也是如此。
asyncio 核心原理图
如上图所示,asyncio 封装了事件循环、任务对象、未来对象。程序员用协程编写需要异步处理的业务,然后把这些协程按 asyncio 提供的接口转换为Task
任务对象,再启动事件循环,当事件发生以后,给对应的Future
对象设置结果,再执行到Task
里的step()
... 这样一直循环下去,就能执行完所有异步任务。
聪明的读者看到此处,应该会有两个疑惑:
不是改为原生协程了吗,为何图上还写着 yield 解释生成器版本的协程?
在第2节的学习中,并没创建
Future
对象,怎么回事?
先解释第一个疑惑,为什么上述原理图中还画着生成器协程。在《上篇》教程中我们提及过,基于async/await
语法的原生协程背后和生成器协程复用着共同的实现。来看一段代码:
# 复用 1.2 节的 say_hello(), say_world(), say_helloworld()
# 新写一个 run() 函数
def run(coro):
try:
coro.send(None)
except StopIteration as e:
return e.value
result = run(say_helloworld())
print(result)
上述代码执行后会打印出HelloWorld
,没有EventLoop
也可以执行协程,我们做到了。其原因就是async/await
的背后还是生成器。可以看出,原生协程里的return
其实是把返回值放到异常对象里,在处理异常时读出返回值。这点是不是和Tornado
一样?
本小节只是为了演示原生协程的背后和生成器的关系。实际编程过程中我们不要玩这个技巧。因为自己要正确处理协程的所有事情很不容易。还是乖乖地按照 asyncio 规定的接口去编程。
3.2 Future
Future
是用于产生表示未来执行结果对象的类。所有的异步任务的返回结果,实际都会被封装到future
对象中。
3.2.1 Future与Task的瓜葛
通过《上篇》的学习,我们已经掌握了自己怎么写Future
和Task
的核心部分。重新审视一下设计理念,Future
代表的是“未来的执行结果”。Task
是为了把协程包装成任务,并驱动协程的执行,实际就是说Task
表示“未来将要执行的异步任务”。
若把“任务”和“未来对象”做完善,那“任务”当然需要知道自己是否还在执行、其结果是什么、如何注册或注销等等,“未来对象”也需要知道自己等待的结果好了没有,可以选择等或者不等这个结果。它们的行为很相似。从3.1节的原理图中可以看出Task
与Future
的关系紧密。
再审视一下代码,Task
的初始化方法中使用到了一个Future
对象,这个未来对象除了代表一个None
值,没其他用处。而且“任务”和“未来对象”那么多相似行为,Task
的主要不同只有step()
方法。
基于多种因素考虑,asyncio
里的Task
是派生自Future
的子类。3.1节第二个疑惑的答案就在此处,create_task()
的时候,实际上也创建了future
对象。
3.2.2 Future 未来对象
注意要和concurrent.futures.Future
区分开,这两者目前不完全兼容。concurrent.futures.Future
是为多线程和多进程执行异步任务而准备的,而asyncio.Future
是为单线程内用协程做异步编程而准备的。
要使用Future
对象,首先得学会创建Future
对象。显式地创建和获取它有如下两种方式。
方式1:
fut = asyncio.Future(loop=None)
方式2:
fut = loop.create_future()
方式1创建对象时可以传递loop
参数,若不传递则为默认值None
,将使用asyncio.get_event_loop()
获取事件循环。方式2创建的则使用调用create_future()
的loop
。
显式地创建出来的Future
对象没有和任何协程关联,只是被创建出来等待我们主动使用它。Future
也实现了__await__
方法,它们是可以被await
关键字调用的。
这两种创建未来对象的区别在于,方式1的与其绑定的事件循环是默认用asyncio.get_event_loop()
得到的,而方式2的则是与调用create_future()
的loop
绑定。在实际使用时,根据需要选择创建方式。
下面了解asyncio.Future
对象具有的方法:
cancel()
:取消future
并执行它的回调函数。若它已经拿到结果或已被取消,返回False
,否则,取消它并执行回调,再返回True
。cancelled()
:若future
已被取消则返回True
。done()
:若future
已经拿到结果,或抛了异常,或被取消成功都返回True
。result()
:返future
代表的结果。若已被取消,则抛出CancelledError
;若还未拿到结果,则抛出InvalidStateError
;如果既拿到结果,也有异常,则抛出异常。exception()
:返回在future
上设置的异常。如果没有则为None
。add_done_callback(fn)
:为future
设置回调函数,这些回调函数将在future
拿到结果后执行。注意,这里只接受一个可调用的函数fn
,当执行回调的时候,只会给这个函数传递一个参数,就是future
自己,即fn(self)
。如果想给回调函数传递额外参数,需借助functools.partial
。remove_done_callback(fn)
:从future
的回调函数列表里删除fn
的所有实例。如果用fn==callback
为True
,则该callback就会被移除。set_result(result)
:为future
设置结果,并标记其状态为已完成。若future
已经被完成,则抛出InvalidStateError
。set_exception(exception)
:为future
设置异常,并标记其状态为已完成。若future
已经被完成,则抛出InvalidStateError
。
通过第2节的学习,我们学会了无需显式调用Future
和Task
类,也能够调度协程到EventLoop
上运行。但有些时候不够灵活,例如,我们想在某个Future
得到结果后,不仅仅是返回结果,而且要做一个其他操作,怎么办?这时候就需要显式地使用Future
对象。
import asyncio
loop = asyncio.get_event_loop()
async def take_exam(fut):
# 进行考试
await asyncio.sleep(1)
fut.set_result(100)
return 'Exam is completed.'
def check_score(fut):
score = fut.result()
if score >= 60:
print("Passed.")
else:
print("Failed.")
fut = asyncio.Future()
fut.add_done_callback(check_score)
task = asyncio.ensure_future(take_exam(fut))
retval = loop.run_until_complete(fut)
print(task.result()) # Exam is completed.
print(fut.result()) # 100
print(retval) # 100
在上述代码中,我们定义了take_exam()
协程函数,接收一个Future
对象作为参数,在协程内会进行异步非阻塞代替非阻塞I/O操作(用sleep()
代替),当异步操作完成以后,再设置fut
对象的值,最后该协程自己也返回一个数据。
asyncio.ensure_future(coro_or_future)
函数会将传递给它的协程封装为Task
,加入EventLoop
的调度,并返回该协程的Task
对象。故而task.result()
是协程return
的值。关于ensure_future()
的更多细节在《下篇》中继续讨论。
3.3 Task
Task
主要用于管理需调度到EventLoop
上执行的协程对象。它的行为与Future
很类似,它实际也是Future
的子类,所以Future
具有的操作它也有。参考 3.2.1节。
显式地创建Task
对象有如下三种方式:
方式1:
task = asyncio.Task(coro, *, loop=None)
方式2:
task = asyncio.ensure_future(coro, *, loop=None)
方式3:
task = loop.create_task(coro)
以上三种方式都能获取封装了coro
协程的task
对象,并且把task
调度到相应的事件循环上。区别在于,方式1和方式2可以接受一个可选关键字参数loop
,会把task
调度到传递的那个loop
上去,默认为asyncio.get_event_loop()
获取的loop
。而方式3则会将task
调度到调用create_task()
的loop
上。
3.3.1ensure_future()
create_task()
区别
ensure_future()
里实际调用的是create_task()
,create_task()
里实际调用了Task
来创建对象。为什么有了create_task()
以后还需要ensure_future()
呢?
如果传递给这两个函数的参数是协程,它们没什么区别。如果要把Future
或Future-like
(实现了__await__
方法)的对象传递给create_task()
就不行了,它只接受协程。ensure_future()
在接收到Future
对象时,什么也不做,直接返回该Future
对象,而接收到Future-like
对象时,需要先把Future-like
对象包装到一个协程里,再把协程传给create_task()
。
3.3.2 Task 对 Future 的扩展
Task
是Future
的子类,它除了具有Future
所具有的方法外,还扩展了如下几个方法:
classmethod all_tasks(loop=None)
:类方法,用于获取已调度到loop
上的所有任务。classmethod current_task(loop=None)
:类方法,用于获取loop
上正在执行的任务。get_stack(*, limit=None)
:获取任务对象的栈帧,若该任务对象状态已经为done
,则返回空列表。print_stack(*, limit=None, file=None)
:将任务对象的栈帧输出到文件。
得到Task
对象以后,可以通过add_done_callback()
来为它设置回调函数。所以可以通过Task
把 Twisted 这种回调风格的框架和 asyncio 这种原生协程的框架结合起来。
一般地,在Python 3.6及其之后,推荐尽可能使用原生协程来做异步编程。而在这种情况下,我们无需直接调用Task
类的初始化方法来得到协程的任务对象,而是用create_task()
或ensure_future()
。
3.4 总结
本节我们学习了asyncio
库中Future
和Task
的核心概念和使用方式。Future
对象作为异步操作的结果占位符,为异步编程提供了极大的灵活性和控制能力。而Task
,作为Future
的子类,进一步封装了协程的调度和管理,使得异步编程更加高效和易于管理。通过深入理解这些概念,读者可以更好地掌握如何使用asyncio
来构建高效、可读性强的异步应用。
4 协程的调度与执行 4.1 单个协程的调度
在异步编程中,理解如何正确地调度和执行单个协程是很重要的。
4.1.1 协程的定义
在Python中,协程通过async def
定义。这种定义创建了一个协程函数,调用这个函数会返回一个协程对象。
async def my_coroutine():
# 协程的逻辑
4.1.2 调度单个协程
Python 3.7及更高版本中,可以使用asyncio.run()
来启动和调度协程。这个函数接收一个协程对象,运行它,并自动管理事件循环。
import asyncio
async def main():
# 协程的逻辑
# 启动协程
asyncio.run(main())
4.1.3 协程的等待与结果获取
在协程内部,可以使用await
来等待另一个协程的结果。await
会暂停当前协程的执行,直到等待的协程完成。
async def fetch_data():
# 模拟数据获取
await asyncio.sleep(2)
return "data"
async def main():
data = await fetch_data()
print(data)
asyncio.run(main())
4.1.4 错误处理
协程中可能出现的异常可以使用try...except
块来捕获和处理。
async def might_fail():
raise Exception("出错了!")
async def main():
try:
await might_fail()
except Exception as e:
print(f"错误捕获: {e}")
asyncio.run(main())
以下是一个完整的示例,展示了从定义协程函数到启动协程,并获取结果的流程:
import asyncio
async def compute_square(number):
await asyncio.sleep(1)
return number * number
async def main():
result = await compute_square(2)
print(f"结果: {result}")
asyncio.run(main())
通过以上步骤,我们展示了如何创建、调度,并执行单个协程,同时处理可能出现的错误。这为深入理解Python中的异步编程奠定了基础。
4.2 同时执行多个协程
前文我们已经学习了Future
和Task
如何使用以及它们的区别和联系,也知道如何利用EventLoop
执行异步任务。
但仍有个问题,回到《上篇》中的爬虫代码,之所以利用异步编程提高了它的整体效率,是因为我们一次性创建了10个不同的抓取任务,EventLoop
同时监听这10个异步任务的sock
对象的状态。同理,转换到asyncio
库时,我们也需要能一次性创建多个任务,并执行。
方式1:
asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
方式2:
asyncio.as_completed(fs, *, loop=None, timeout=None)
方式3:
asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
用于等待多个未来对象或协程的执行。需注意本方法也是一个协程,需要用await
关键字来调用或使用EventLoop
调度协程执行的接口来执行。
asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
futures
: 必选参数,表示多个Future
或Coroutine
对象,如果是Coroutine
则会被封装为Task
。loop
: 可选参数,调度futures
执行的事件循环对象,默认为通过get_event_loop()
获取。timeout
:可选参数,等待futures
执行的超时时间(秒),默认为没有超时限制。注意,并不会抛出超时异常,只是结束wait
。return_when
: 可选参数,表示wait
返回的时机。可以取的值为FIRST_COMPLETED
,FIRST_EXCEPTION
,ALL_COMPLETED
,分别代表“执行完一个之后”、“碰到第一个异常之后”、“所有都完成之后”。
asyncio.wait
返回一个包含两个元素的元组,分别为done
和pending
状态的未来对象或协程集合。默认情况下pending状态的集合为空,因为没有超时限制且等待了所有任务执行完成。
done, pending = await asyncio.wait([say_hello(), say_world()], timeout=1.5, return_when=asyncio.FIRST_COMPLETED)
上述代码演示了如何在协程内调用wait
等待多个异步任务的执行结果。示例中设置了1.5秒的超时时间,而且wait
的返回时机为FIRST_COMPLETED
第一个完成后。
注意:超时并非严格的指协程从头执行到尾是否超过该时间限制,某些情况下done
和pending
集合中得到的结果非预期;例如协程中有计算密集型任务耗时较多,而非阻塞异步操作的时间又恰好与超时限制一致,此时最稳妥的办法是遍历这两个集合中的任务,根据task.done()
自行判断任务的状态。当然这种极端情况很少,不过我们需要知道存在这种问题,以免遇到done
和pending
集合非预期时不知所措,更详细的部分在《下篇》中探讨。
如果要获取已经执行完成的异步任务的返回值,可以取出done
集合中的Task
对象,通过task.result()
获取。
4.2.2 asyncio.as_completed
asyncio.as_completed(fs, *, loop=None, timeout=None)
此函数接收一组Future
或Coroutine
对象,返回一个迭代器。可以接收loop
和timeout
参数,默认无超时限制。
其用法如下:
async def main():
values = []
for f in asyncio.as_completed([say_hello(), say_world()], timeout=2):
ret = await f
values.append(ret)
return values
需要注意三个点,一是迭代器返回的future或task对象的顺序与传参时的顺序无关,二是超时以后会抛出asyncio.TimeoutError
异常(与asyncio.wait()
函数的超时不同),三是迭代器中的每个future
对象最好都用await
等待其结果,否则本该被异步调用的任务没有被异步调用,程序可能会有非预期行为。
当有多个协程或未来对象需一个接一个地、顺序无关地执行,且需要设置超时限制时,本函数as_completed()
适用。否则,直接用for
循环迭代协程列表一个一个地await
就可以了。
4.2.3 asyncio.gather
asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
本函数接收多个协程或未来对象,并返回一个future
对象,通过await
或loop
等待该future
的结果则可得到该组异步任务的结果列表,顺序与传参时对应。当return_exceptions
为False
时,则遇到异常就抛出,否则将异常作为结果返回到结果列表中,并不抛出异常。
当有多个协程或未来对象需同时执行,也要获取其返回结果时,本函数gather()
适用。
用法如下:
results = await asyncio.gather(say_hello(), say_world(), return_exceptions=True)
# 或者
results = loop.run_until_complete(asyncio.gather(*[say_hello(),say_world()]))
4.3 总结
本节我们学习了如何利用Future
和Task
对象对协程进行调度和执行。在 asyncio 中没有显式的调度器,只需要通过create_task
、ensure_future
、run_until_complete
等即可将协程加入loop
调度执行。我们还学习了单个协程和多个协程的调度执行。
5 同步操作
虽然大多数时候我们都是在单线程内使用协程并发地执行异步任务,但即便是单线程内,协程的调度以及完成顺序难以预测和控制。而在某些场景下,我们需要让一组有关联的协程按顺序执行,这时候我们就需要把异步的变成同步的。
asyncio
提供了锁和队列两种机制来把多个异步任务变成同步执行的。
5.1 锁
顾名思义,是一种保护措施。在计算机术语中,锁是指用来保护临界资源的机制。临界资源是指不能被被多个程序同时访问/操作的数据(包括用数据结构来表示的文件、网络连接等任何资源)。例如存储了库存、银行账户余额的变量,它们就是临界资源,不能够被多个程序同时去修改。
当一段程序想要访问临界资源时,必须取得保护该临界资源的锁的权限,才可以进一步操作该临界资源。站在临界资源的角度,锁是保护它的,使临界资源在整个生命周期内保持正确性。站在异步并发程序的角度,锁是一种限制它访问临界资源的同步机制,以避免并发产生的副作用。
单纯地讲“锁”这个术语,就是上述的概念,还比较基础和抽象。为了应付软件的复杂性和沟通的效率,在不同的情况下还会衍生出不同类型的锁,其命名也花样百出。有些叫法是从临界资源是什么格式的数据角度出发,例如“表锁、行锁、文件锁”;有些叫法是从保护临界资源不被什么行为操作而变更的角度出发,例如“读锁、写锁”;还有些叫法是从锁的实现原理上命名,例如“乐观锁、悲观锁、自旋锁”;还有的叫法则是从锁是否可以被多个程序共享或互斥性质角度,如“排他锁、共享锁、独占锁、互斥锁”;还有的称呼是描述并发程序在获取临界资源时而进入了某种特殊状态,如“死锁、活锁、饥饿”;还有的锁的实现依赖于分布式系统,则称之为“分布式锁”……
希望大家不要对些数不清的锁望而生畏,其实锁的基本原理、基本特征、存在的意义就本节开头表述的那一句话而已,不过是针对具体场景、具体实现、或者观察角度不同而叫法不同。
就像生活中,有“电子锁、机械锁、自行车锁、门锁、自动锁、指纹锁、U型锁、环形锁、铜锁、金锁、智能锁、愚蠢锁……”。锁是什么?锁就是锁!计算机软件中也一样。
asyncio
为协程提供了5种具体的实现,分别是Lock
、Event
、Condition
、Semaphore
、BoundedSemaphore
。下面介绍它们的作用与用法。
5.1.1Lock
asyncio.Lock
实例化以后可以得到原始的锁对象,它只具有locked
和unlocked
两种状态。刚初始化的Lock
对象的状态是unlocked
。可以通过acquire()
方法将状态从unlocked
更改为locked
,反之,通过release()
方法可以把locked
改为unlocked
。
Lock
对象主要会用到以下三个方法:
locked()
检查锁对象是否处于locked
状态,返回True
或False
。acquire()
加锁,把锁对象改为已加锁状态,并返回True
。如果多个协程都调用acquire()
方法来加锁,那么会按照调用顺序依次加锁。release()
释放锁,把锁对象从locked
改为unlocked
状态,然后返回,以便别的协程可以加锁。如果锁对象本就处于unlocked
状态,再调用release()
会报RuntimeError
。
注意,acquire()
是一个协程,需按协程的方式调用。Lock
对象还实现了上下文管理器,可以使用with
语句。
用生活中的场景类比,几个人(不同的任务)要吃同一口锅里的饭(共享数据),但只配了一把勺子(锁),只有拿到勺子的人可以去锅里打饭吃(访问或修改共享数据),而且一个人用完勺子(释放锁)后,该让谁(在争用锁的协程)接着用,是按先来后到的顺序。
根据这个比喻大家还可以思考一下,更多的人、更多的锅、更多的勺子、有的饭只能被一个勺子打、有的饭可以同时被多个勺子打等这多种场景应该如何处理?同样可以借鉴到编程中来,以下给出Lock
的使用场景:
互斥访问共享资源:当多个协程需要访问相同的共享资源(例如共享数据结构、文件等)时,
Lock
可以防止数据竞争和不一致性,确保每次只有一个协程能访问该资源。防止资源重入:在某些情况下,一个协程在完成前不应被再次调用。使用
Lock
可以防止这种重入,确保协程安全完成其工作。
Event
Event
是事件的实现。这种对象可以理解为事件通知器。多个协程可以利用它等待某个事件发生并获取通知。它内部有一个标志变量,初始化时为False
,调用它的wait()
方法后进入阻塞态,等待它的标志变为True
,即等待事件的发生。当一个事件发生后,可以通过set()
方法来改变标志状态为True
。
通过上述例子,可以看出Event
的作用跟比赛用的信号弹类似,信号弹爆炸了大家都得按它行动。一颗信号弹也只能用一次。以下给出Event
的使用场景:
协程间的信号通知:当一个协程需要等待来自另一个协程的信号时,可以使用
Event
。例如,在一个数据处理流程中,数据生产者可以通过设置Event
来通知数据消费者开始处理。状态变更通知:如果系统的某部分状态发生变化(例如配置更新、系统就绪等),并且这个状态变化需要通知给多个协程,
Event
可以作为一个简单的通知机制。
Condition
Condition
是条件变量的实现,它可以让一个或多个协程一直等待某个条件达成以后再继续运行。Conditon
和Event
很像,都是通过一个信号让别的协程继续运行下去,但有不同点:
Event
是一次性通知所有阻塞在该事件上的协程,而Condition
可以分别通知其中的一个或多个,而未被通知的继续等待。Condition
底层实现使用了Lock
对象,而Event
没有。
关于上述第1点我们参考如下示例:
如果仅按上述示例的用途,Condition
和Event
几乎没有差异,尤其是用cond.notify_all()
的时候,效果跟Event
一毛一样,这不是重复造轮子吗?那么Condition
的特别之处有啥?
源于上述第2点不同。那为啥Condition
就得依靠锁来实现呢?其实条件变量设计的本意比Event
复杂,它通常会和三种对象关联:指示条件是否已满足的对象、串行化访问该对象的互斥对象、等待条件的条件变量。
条件变量通常和决定是否成立的共享对象关联,所以条件变量的底层实现使用了互斥锁对象Lock
。提供了acquire()
和release()
方法,当某个协程想要改动共享数据,告诉别人条件已修改,就必须使用acquire
来取得修改权。
Condition
对象还提供了coroutine wait_for(predicate)
方法,参数predicate
需要是一个直接调用就可以得到返回值的函数。当predicate()
的返回值的布尔上下文为真时,代码将继续执行。上述例子我们还可以换成如下写法:
本小节讲解了asyncio
的同步机制之一:条件变量(Condition)。并展示了其三种主要使用方法,每种使用方法都适用于不同的情景,需在实践中不断体会。以下给出常用场景:
生产者-消费者问题:在生产者和消费者模式中,
Condition
可以用来确保消费者只在有数据可消费时才消费,同时允许生产者在生产新数据时通知消费者。这种情况下,Condition
用于同步生产者和消费者对共享资源(如队列)的访问。等待多个协程完成特定任务:在某些情况下,可能需要等待多个协程完成它们的任务,直到一个特定的条件被满足(如数据达到特定状态)。这时,
Condition
可以用来阻塞一个协程,直到另一个协程改变了状态并通知该条件对象。顺序控制:如果有多个协程必须按特定顺序执行,
Condition
可以用来控制执行流程,确保每个协程在前一个协程完成特定任务后才开始执行。定期检查共享资源:在某些应用中,可能需要定期检查共享资源的状态,并在状态达到特定条件时执行操作。
Condition
的wait_for()
方法可以在这种场景下很好地使用,它允许协程在条件满足时被唤醒并执行相应的操作。
在这些场景中使用Condition
的主要优势在于其能够提供比简单的Lock
或Event
更复杂的同步机制,允许在复杂的协程间同步场景中精确控制执行流程。
5.1.4Semaphore
Semaphore
信号量,原本是铁路交通系统中的一个术语,后被荷兰计算机科学家Dijkstra引入计算机科学领域。用于控制仅支持有限个用户同时操作的共享资源,在计算机系统中有着广泛的应用。
信号量对象的内部会维护一个计数值,取值范围是任意正整数,默认为1。当一个协程中对信号量进行一次acquire()
,其计数值就减1,release()
一次则加1。当计数值减至0时,后来的协程将阻塞在acquire()
操作上,直到有其他协程release()
该信号量。
当信号量对象内的计数值初始化为大于等于2的正整数,称其为一般信号量或计数信号量;计数器初始化为1时,则其变动范围只有{0,1}两种可能,又被称为二进制信号量,二进制信号量又被称为互斥锁。虽然asyncio
中的Lock
和Semaphore
分别由两个不同的类实现,但其源代码十分相似。从原理上我们已经知道,互斥锁是信号量的一个特例。
threading
模块也提供了Semaphore
,原理都一样,使用方法也非常相似。不再赘述。
asyncio信号量
上图示例演示了Semaphore
对象的用法,只允许3个worker同时被调度执行,第四个和第五个worker必须等待前面的release()之后,有“名额”了才能执行。以下给出信号量的使用场景:
限制资源访问:在有限资源(如数据库连接、网络带宽)的环境中,使用信号量可以限制同时访问这些资源的协程数量,防止资源过度使用。
控制并发度:在执行并发操作时,如API调用或网络请求,信号量可以用来控制同时运行的操作数量,以避免超出服务器或API的负载能力。
BoundedSemaphore
BoundedSemaphore
称为有界信号量,它和一般信号量只有在计数值管理上存在细微差别。有界信号量的计数值的初始化值就是上限,不允许越界,而Semaphore
的计数值在初始化之后却是可以通过多次release()
来提高上限。
以下给出有界信号量的使用场景:
严格控制资源访问上限:在需要严格限制同时访问某个资源的最大协程数时使用,确保资源不会因为程序错误而被过度使用。
防止程序逻辑错误:在设计需要精确控制资源访问计数的场景时,
BoundedSemaphore
可以帮助捕捉程序中的逻辑错误,如不恰当的release()调用。资源池管理:管理有限资源的池(例如数据库连接池),确保池中资源数量不会因错误操作而超出预设的界限。
队列本质是一个用于存储容器,它的使用模式是“存进去——取出来”,正是因为这样的过程,所以队列的基本职责就是“缓冲、暂存”。
在此基础上,让存于其中的元素们具有某种特征的存取顺序,就构成了不同类型的队列,常见的有先进先出队列(FIFO Queue),优先级队列(Priority Queue),后进先出队列(LIFO Queue)。
由队列的存储实质可以看出,存储组件(如磁盘、数据库)具有的考量点它都具有,比如需考察队列的读写能力(I/O吞吐能力)、存储容量、可靠性(数据防缺损)等等。
正因为是“存进去——取出来”模式,故而队列扮演的角色就是协调数据存入方与数据取出方,另一个称呼是数据的生产者和消费者。那么什么时候需要用到队列?很简单,既然是协调生产者和消费者,这两者不协调的时候就让它上!
那具体该如何思考?之前我们讲过,认识事物,从其时间结构和空间结构着手,当生产者与消费者在时间或空间上不协调时,队列就可以发挥作用。比如,生产者与消费者的工作速率不匹配,或者它们无法在同一个时间点直接交互(比如消费者数量不够),就是时间上的不协调。具体场景如高并发应用,产生请求的速率太高(QPS很高)而处理请求的程序(服务端的线程/协程数有限),所谓削峰填谷是也。再比如,生产者不论主动或被动地无法直接与消费者交互,这就是空间不协调,比如程序解耦,跨域(跨作用域、线程、进程、网络域等等,不同级别的程序,有不同级别的队列)通信等。
我们只有像上面这样探究,从根本性质出发,才不会被各种表象所迷惑,也不会人云亦云而遇到困难时手足无措。对事物的本质认识越深刻,掌握它越轻松,越能够应付千变万化。
现在我们要学习的是Python标准库asyncio
里提供的异步队列,无他,不过是“存——取”这两个操作支持异步罢了。
5.2.1Queue
asyncio.Queue
是一个搭配协程使用的先进先出队列,跟搭配线程使用的quque.Queue
和搭配进程使用的multiprocessing.Queue
很相似,只是搭配的对象不同,进而实现细节不同。
asyncio.Queue(maxsize=0,*,loop=None)
有2个初始化参数,loop
指明调度该队列的事件循环对象,maxsize
限制队列的最大长度。当maxsize <= 0 时,队列无限长度;当maxsize>0时,如果已经存入队列的元素数量超过限制(队列已满),则后来的元素想put()
存入队列,则会阻塞,直到别的元素被get()
取出留出空位。
asyncio.Queue
的常用方法如下:
empty()
检查队列是否为空,为空返回True,否则返回False。full()
检查队列是否已满,满了返回True,否则返回False,无限长队列只会返回False。coroutine get()
从队列中取出元素,如果队列为空,则一直等到有元素可取。get_nowait()
从队列中取出元素并立即返回该元素,如果队列为空则抛出QueueEmpty
异常,而不是等待有元素可取出。coroutine put(item)
向队列中存入元素,若队满则等着,直到有空位再存入。put_nowait()
向队列中存入元素,若队列已满,则抛出QueueFull
异常。qsize()
返回队列中已有元素的数量,注意这个值并不准确,因为非阻塞队列在被统计时,也有元素在不断地进出。coroutine join()
和task_done()
这两个方法是 Python 3.4.4 版才新加入的,它俩需要搭配使用。这两方法稍显鸡肋,后文分析。
Queue
示例代码:
大家先忽略上述代码中第13、14、26、27行,即调用task_done()
和join()
的地方,实践中也不必写这几行代码,它们对元素的存取并无影响。剩下的代码很容易理解,producer 不断地存入元素,只放了5个元素后退出,而 consumer 不断地取出元素进行处理,直到队列为空时退出。
join()
和task_done()
的探讨(不关心可忽略):Queue
内部维护了一个名为_unfinished_tasks
的计数变量,其初始值为0,每当调用put()
或者put_nowait()
时该变量的值加1,而调用task_done()
时,该变量值减1。调用join()
后,若该值大于0,则会阻塞着直到该值变为0,然后返回join()
处继续往下执行。
问题来了,名为“未完成”的计数变量其增加是因为存入元素,而较少却并不是因为其直接取出元素,通常做法是取出元素进行处理之后,调用task_done()
来减少该值。如果对这两个方法不熟悉的同学,可能并不会将它们配对使用,就会出错,比如程序执行到join()
处就不继续执行了,即便队列中元素已经取出完毕。
鸡肋的另一点原因,因为队列中不定时地有元素在进进出出,并不能确切地知道_unfinished_tasks
变量何时变为0。那么join()
阻塞后再被唤醒的时机不好控制,如果刻意控制则会使代码逻辑变得晦涩。
关于join()
和task_done()
的讨论我们就此打住,像更深入探讨的同学欢迎留言。驹哥更推荐使用asyncio.Event
或者其他同步机制来显式而明确地控制协程在某种条件下的切换时机。而join()
和task_done()
背后的实现就是基于asyncio.Event
的。
5.2.2 PriorityQueue
PriorityQueue
的对象所具有的方法跟Queue
完全一样,只是元素取出的顺序是按优先级取出。通常用(priority number, data)
这样的二元组表示一个元素,注意,priority number
值越小优先级越高。
PriorityQueue
底层实现依赖于heapq
,实际上,队列中个元素的比较是按**富比较(Rich-Compare)**的方式进行的,所以存入其中的元素不仅可以是上述二元组的形式,还可以是字符串、整数、小数、自定义的对象等各种凡是实现了富比较
方法的对象。
但需注意付比较的细节规则,比如(1,2,3)
不可以和(1,2,'3')
比较,因为他们的第三个元素的数据类型不同,整数无法与字符串直接比较。但(1,2)
是可以和(1,2,3)
比较的。关于富比较
的更多细节,本微信公众号"驹说码事"以后再发文解释。在彻底理解富比较之前,要存入优先级队列的元素先纯数字或二元组表示。一般情况下我们都应该这么做,让程序足够简明。
5.2.3LifoQueue
LIFO(Last-In-First-Out)顾名思义它是后进先出队列。它所拥有的操作方法也跟Queue
完全一样,仅仅是元素取出的顺序是越后存入队列的越先被取出。
其底层实现依赖于内置数据类型list
,put()
方法里直接调用list.append(element)
,get()
方法则直接调用了list.pop()
。
5.2.4 异步编程中Queue的使用场景
任务调度:在异步应用中,用于在生产者和消费者之间分发任务,特别是当任务生产和消费的速率不一致时。
流量控制:当处理高并发请求时,asyncio.Queue可以作为缓冲区,帮助控制数据流,以避免过载。
数据共享:在多个异步任务之间安全地共享数据。
异步工作流程:用于实现复杂的异步工作流程,如管道(pipeline)模式,其中数据逐步通过一系列的处理阶段。
事件处理:在事件驱动的应用中,用于管理和分发事件消息。
本节内容对asyncio
支持的同步操作的总结非常全面和详尽,介绍了asyncio
中多种同步机制的作用和应用场景。从锁(Lock
)的基本概念和用途,事件(Event
)的信号通知机制,条件变量(Condition
)的更复杂同步控制,到信号量(Semaphore
和BoundedSemaphore
)的资源访问控制,以及队列(Queue
,PriorityQueue
,LifoQueue
)在异步编程中的应用,这些内容详细描述了asyncio
库中不同同步操作的特点和使用方法。每个部分都配以合适的示例和解释,有助于深入理解异步编程中同步机制的重要性和实际应用。
6 回调函数的调度和执行
即便asyncio
提供了协程,但在有些情况下还是规避不了跟普通函数交互,即用不同函数写的回调函数。而且有些第三异步库的主要编程模式还是基于回调的,为了让这类框架要与asyncio
兼容也促使asyncio
提供了调度常规函数的方法。
6.1 调度接口
asyncio
调度回调函数有以下三种方式:
方式1:
EventLoop.call_soon(callback, *args)
方式2:
EventLoop.call_later(delay, callback, *args)
方式3:
EventLoop.call_at(when, callback, *args)
方式1call_soon
是立即调度callback
函数到EventLoop
中,将在下一次事件循环中被执行;方式2call_later
是延迟delay
秒后执行;方式3call_at
是在when
时刻执行。
注意,asyncio
内部维护了一个单调递增时钟EventLoop.time()
,其实就是time.monotonic()
,call_later
的delay
参数和call_at
的when
参数都是以它为标准,而不是time.time()
。最大的影响在于call_at()
的when
参数并不能直接传递某个具体的时间戳。
另外有个注意点,上述调度函数中的*args
是传递给callback
的参数,并不能接收关键字参数。如果要传递关键字参数时,则需使用functools.partical()
方法封一层。
三种方式的示例代码如下:
上述代码中需特别留意之处是定时调用call_at
函数,一定不能直接传递datetime
对象的时间戳。其实call_later
是基于call_at
实现的,请读者自己思考下实现方案。
6.2 周期性回调
asyncio
本身并没有提供周期性执行回调函数的方法,不像Tornado
提供了PeriodicCallback
类。按PEP 3156
的解释,asyncio
若是提供此方法是多此一举,如果是简单情况,那么不断地call_later()
自身就好了,也可以在循环里sleep()
,总之实现起来特别容易。如果是精度要求高的复杂情况,每种情境下边界要求又不一样,所以asyncio
里提供通用方法也不合适。
实际项目中往往要求很简单,精度也只需要控制在秒级,下面给出一段代码来实现类似Tornaodo
的PeriodicCallback
。
6.3 回调处理
前面介绍了如何通过三种方式(call_soon()
,call_later()
,call_at()
)去调度回调函数,下面我们介绍协程如何被用作回调函数,以及这些回调函数被调度后,又该如何控制?比如想取消怎么办?
6.3.1 协程作回调函数
在asyncio
中,协程可以通过特定的方法被用作回调函数。这通常涉及到将协程封装成一个可以被事件循环调用的普通函数。这可以通过asyncio.ensure_future
或loop.create_task
来实现。这些函数接受一个协程对象,并返回一个Task
对象,这个Task
对象可以被当作回调函数使用。
示例代码如下:
async def my_coroutine():
# 协程逻辑
loop = asyncio.get_event_loop()
task = loop.create_task(my_coroutine())
loop.call_soon(task)
6.3.2 回调处理器
在Python的asyncio
模块中,Handle
和TimerHandle
是与事件循环调度回调相关的类:
asyncio.Handle
用途:Handle是一个回调包装对象,由loop.call_soon()和loop.call_soon_threadsafe()返回。方法:
get_context(): 返回与句柄相关联的contextvars.Context对象。这是Python 3.12中的新功能。
cancel(): 取消回调。如果回调已经被取消或执行,此方法无效。
cancelled(): 如果回调已被取消,则返回True。这是Python 3.7中的新功能。
asyncio.TimerHandle
用途:TimerHandle是Handle的子类,由loop.call_later()和loop.call_at()返回,用于调度具有特定延迟或计划时间的回调。方法:
when(): 返回计划的回调时间(以浮点秒为单位)。时间是绝对时间戳,使用与loop.time()相同的时间参考。这是Python 3.7中的新功能。
这俩用于管理和控制事件循环中的回调。例如,可能会使用call_later()来调度一些需要在将来某个时刻执行的操作,并使用返回的TimerHandle
来管理这个操作(如取消或查询计划时间)。
示例代码:
import asyncio
import contextvars
# 定义一个简单的回调函数
def my_callback(name):
print(f"Hello, {name}!")
# 设置一个事件循环
loop = asyncio.get_event_loop()
# 调用 call_soon() 来计划回调的执行
handle = loop.call_soon(my_callback, "Alice")
# 检查回调是否被取消
print(f"Cancelled: {handle.cancelled()}")
# 取消回调
handle.cancel()
# 使用 call_later() 来延迟回调的执行
timer_handle = loop.call_later(5, my_callback, "Bob")
# 获取计划执行的时间
print(f"Scheduled at: {timer_handle.when()}")
# 取消计划的回调
timer_handle.cancel()
# 运行一次事件循环以使调度的回调有机会执行
loop.run_until_complete(asyncio.sleep(0.1))
loop.close()
6.4 异步读文件
在asyncio中,由于直接的文件IO操作通常不是非阻塞的,因此asyncio并没有提供直接的异步文件读取API。然而,我们可以通过使用线程池来实现异步读文件的效果。asyncio提供了loop.run_in_executor方法,该方法可以在指定的执行器(例如线程池)中运行给定的函数。
代码示例如下:
import asyncio
async def read_file_async(file_path):
loop = asyncio.get_event_loop()
with open(file_path, 'r') as file:
return await loop.run_in_executor(None, file.read)
# 使用协程读取文件
async def main():
content = await read_file_async('path/to/your/file.txt')
print(content)
asyncio.run(main())
上面只是个基本示例,仿佛看不出来这种异步读文件的用处是什么。首先,在广泛以异步编程为主导的应用中,提倡每一个操作都是一部的,避免阻塞其他协程,这对提高系统的整体效率和响应性非常重要,所以把文件操作也改为非阻塞IO操作。进一步,异步读取使得CPU可以在等待文件IO完成时执行其他任务,从而提高了资源的利用率。在处理大文件时,异步读取可以显著提高性能。在用户界面中,也可以即时响应,避免UI卡顿,提供更流畅的用户体验。
7 asyncio和多线程和多进程搭配
本节将学习为什么有了协程可以大幅提高程序性能的时候,还需要关心多线程和多进程?又如何结合使用?
7.1 理解使用场景
其一,对于I/O密集型或需要执行阻塞调用的任务,例如文件读写或者阻塞的网络通信,使用多线程可以避免阻塞整个事件循环,提高整体性能。
其二,在处理CPU密集型任务时,例如大规模数学计算,使用多进程可以有效利用多核处理器,因为Python的全局解释器锁(GIL)限制了同一时间只有一个线程执行Python字节码。
7.2 结合多线程使用
在使用asyncio
时,会遇到需要执行阻塞I/O操作或调用非异步兼容的库函数的情况。在这种情况下,将这些操作放在单独的线程中执行是一种有效的策略,以避免阻塞整个事件循环。asyncio
提供了loop.run_in_executor()
函数,可以线程池中执行阻塞调用。
run_in_executor
的第一个参数为None则默认在线程池中调度,当需要自定义线程池时可以再手工创建线程池对象传入。
示例代码:
import asyncio
def blocking_io():
# 执行阻塞I/O操作
print("start blocking I/O")
time.sleep(1)
print("end blocking I/O")
async def main():
loop = asyncio.get_running_loop()
# 在默认的线程池中运行阻塞I/O函数
await loop.run_in_executor(None, blocking_io)
print("main continues")
asyncio.run(main())
结合多线程使用的注意事项
需要确保线程安全尤其重要。多个线程可能会同时访问和修改共享资源,如全局变量,从而引发竞态条件。竞态条件可能导致数据不一致或程序行为异常。
可以采取的措施如下:
使用锁:利用
threading.Lock
或asyncio.Lock
(取决于是在普通线程还是在异步协程内)来确保在任何时刻只有一个线程可以访问共享资源。在读操作远多于写操作的场景中,使用读写锁(如...