18.5.3. 任务和协同¶
18.5.3.1. 协同¶
与asyncio
一起使用的协程可以使用async def
语句或使用generators。在Python 3.5中添加了协程的async def
类型,如果不需要支持旧的Python版本,则推荐使用。
基于生成器的协程应该用@asyncio.coroutine
来装饰,尽管这不是严格执行的。装饰器能够与async def
协程兼容,并且还作为文档。基于生成器的协程使用在 PEP 380中引入的产生 来自 而不是原始的
yield
语法。
“协程”一词与“生成器”一样,用于两个不同的(虽然相关的)概念:
- 定义协程的函数(使用
async def
或用@asyncio.coroutine
装饰的函数定义) 。如果需要消除歧义,我们称之为协程函数(iscoroutinefunction()
返回True
)。 - 通过调用协程函数获得的对象。此对象表示将最终完成的计算或I / O操作(通常是组合)。如果需要消除歧义,我们将它称为协程对象(
iscoroutine()
返回True
)。
协程可以做的事:
result = await future
orresult = yield from future
– suspends the coroutine until the future is done, then returns the future’s result, or raises an exception, which will be propagated. (如果未来被取消,则会引发CancelledError
异常。)注意任务是 futures,关于 futures 的一切也适用于任务。结果 = await 协程
或t6> = yield 从 协程
- 等待另一个协程产生结果或引发异常,其将被传播)。coroutine
表达式必须是到另一个协程的调用。返回 表达式
- 为使用await
或yield from的协程产生一个结果
。提高 异常
- 使用await
或引发协程中正在等待此异常的异常, yield 来自
。
调用协程不会启动其代码运行 - 调用返回的协程对象在您安排其执行之前不会执行任何操作。开始运行有两种基本方法:调用await 协程
或yield from 协程
从另一个协程(假设另一个协程已经运行!),或使用ensure_future()
函数或AbstractEventLoop.create_task()
方法调度其执行。
协程(和任务)只能在事件循环运行时运行。
-
@
asyncio.
coroutine
¶ 装饰器来标记基于生成器的协程。This enables the generator use
yield from
to callasync def
coroutines, and also enables the generator to be called byasync def
coroutines, for instance using anawait
expression.没有必要装饰
async def
协程本身。如果生成器在销毁之前没有生成,则会记录一条错误消息。请参见Detect coroutines never scheduled。
注意
在本文档中,一些方法被记录为协程,即使它们是返回Future
的纯Python函数。这是有意在未来有调整这些职能的执行的自由。如果需要在回调式代码中使用这样的函数,请用ensure_future()
包围其结果。
18.5.3.1.1. Example: Hello World coroutine¶
协程显示“Hello 世界”
的示例:
import asyncio
async def hello_world():
print("Hello World!")
loop = asyncio.get_event_loop()
# Blocking call which returns when the hello_world() coroutine is done
loop.run_until_complete(hello_world())
loop.close()
也可以看看
The Hello World with call_soon() example uses the AbstractEventLoop.call_soon()
method to schedule a callback.
18.5.3.1.2. Example: Coroutine displaying the current date¶
协程使用sleep()
函数在每5秒钟内显示当前日期的示例:
import asyncio
import datetime
async def display_date(loop):
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)
loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()
使用生成器实现的同一协议:
@asyncio.coroutine
def display_date(loop):
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
yield from asyncio.sleep(1)
也可以看看
display the current date with call_later()示例使用带有AbstractEventLoop.call_later()
方法的回调显示当前日期。
18.5.3.1.3. Example: Chain coroutines¶
示例链接协程:
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
compute()
链接到print_sum()
:print_sum()
协程等待直到compute()
然后返回其结果。
序列图示例:
“任务”是由AbstractEventLoop.run_until_complete()
方法创建的,当它获取一个协程对象而不是一个任务。
该图显示了控制流,它没有准确描述内部如何工作。例如,睡眠协程在其内部创建一个 future 来调用 AbstractEventLoop.call_later()
来实现 在1秒钟内唤醒任务。
18.5.3.4. Future¶
- class
asyncio.
Future
(*, loop=None)¶ 此类别几乎与
concurrent.futures.Future
兼容。区别:
result()
和exception()
不需要超时参数,当 future 不再需要他们的时候,会抛出一个异常- 始终通过事件循环的
call_soon_threadsafe()
调用通过add_done_callback()注册
的回调。 - 此类与
concurrent.futures
包中的wait()
和as_completed()
函数不兼容。
此类为not thread safe。
-
cancel
()¶ 取消future并安排回调。
如果future已经完成或取消,返回
False
。否则,将future的状态更改为“已取消”,安排回调并返回True
。
-
cancelled
()¶ 如果未来被取消,返回
True
。
-
done
()¶ 如果未来完成,返回True。
完成意味着结果/异常可用,或未来被取消。
-
result
()¶ 返回这个未来所代表的结果。
如果未来已取消,引发
CancelledError
。如果未来的结果尚不可用,引发InvalidStateError
。如果未来完成并且设置了异常,则会引发此异常。
-
exception
()¶ 返回在此未来设置的异常。
如果未来完成,则会返回异常(如果没有设置异常,则为
None
)。如果未来已取消,引发CancelledError
。如果未来还没有完成,引发InvalidStateError
。
-
add_done_callback
(fn)¶ 添加回调以便在未来完成时运行。
回调使用单个参数调用 - 未来对象。如果未来在调用时已经完成,则使用
call_soon()
计划回调。Use functools.partial to pass parameters to the callback。例如,
fut.add_done_callback(functools.partial(print, “Future:”, flush = True)) t0 >将调用
print(“Future:”, fut, flush = True)
。
-
remove_done_callback
(fn)¶ 从“调用完成时”列表中删除回调的所有实例。
返回已删除的回调的数量。
-
set_result
(result)¶ 标记未来完成并设置其结果。
如果调用此方法时未来已经完成,则引发
InvalidStateError
。
-
set_exception
(exception)¶ 标记未来完成并设置异常。
如果调用此方法时未来已经完成,则引发
InvalidStateError
。
18.5.3.4.1. Example: Future with run_until_complete()¶
组合Future
和coroutine function的示例:
import asyncio
@asyncio.coroutine
def slow_operation(future):
yield from asyncio.sleep(1)
future.set_result('Future is done!')
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
loop.run_until_complete(future)
print(future.result())
loop.close()
协程函数负责计算(需要1秒),并将结果存储到未来。run_until_complete()
方法等待未来的完成。
注意
run_until_complete()
方法在内部使用add_done_callback()
方法在未来完成时通知您。
18.5.3.4.2. Example: Future with run_forever()¶
可以使用Future.add_done_callback()
方法来不同地编写前面的示例来明确描述控制流:
import asyncio
@asyncio.coroutine
def slow_operation(future):
yield from asyncio.sleep(1)
future.set_result('Future is done!')
def got_result(future):
print(future.result())
loop.stop()
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
future.add_done_callback(got_result)
try:
loop.run_forever()
finally:
loop.close()
在此示例中,未来用于将slow_operation()
链接到got_result()
:当slow_operation()
完成时,got_result()
与结果一起调用。
18.5.3.5. Task¶
- class
asyncio.
Task
(coro, *, loop=None)¶ 调度 协程 的执行︰ 封装成future。一个任务就是
Future
的一个子类。一项任务负责在事件循环中执行一个协程对象。如果包装的协程从未来产生,则任务暂停包装的协程的执行并等待未来的完成。当future完成时,包装的协程的执行重新开始与result或future的异常一起。
事件循环使用协同调度:事件循环一次只运行一个任务。如果其他事件循环在不同线程中运行,则其他任务可以并行运行。当任务等待未来的完成时,事件循环执行新任务。
任务的取消与未来的取消不同。调用
cancel()
会向包装的协程中抛出CancelledError
。cancelled()
only returnsTrue
if the wrapped coroutine did not catch theCancelledError
exception, or raised aCancelledError
exception.如果待处理的任务被销毁,则其包装的coroutine的执行未完成。它可能是一个错误,并记录了一个警告:请参阅Pending task destroyed。
不要直接创建
Task
实例:使用ensure_future()
函数或AbstractEventLoop.create_task()
方法。此类为not thread safe。
- classmethod
all_tasks
(loop=None)¶ 返回一组事件循环的所有任务。
默认情况下,将返回当前事件循环的所有任务。
- classmethod
current_task
(loop=None)¶ 在事件循环或
None
中返回当前正在运行的任务。默认情况下,返回当前事件循环的当前任务。
当不在
Task
的上下文中调用时返回None
。
-
cancel
()¶ 请求此任务取消本身。
这安排在通过事件循环的下一个循环中将
CancelledError
抛出到包装的协程中。协程然后有机会使用try / except / finally清理或甚至拒绝请求。与
Future.cancel()
不同,这不能保证任务将被取消:异常可能被捕获和执行,延迟任务的取消或完全阻止取消。任务也可能返回值或引发不同的异常。在调用此方法后,
cancelled()
不会返回True
(除非任务已取消)。当包装的协程以CancelledError
异常(即使cancel()
未被调用)终止时,任务将被标记为已取消。
-
get_stack
(*, limit=None)¶ 返回此任务的协程的堆栈帧的列表。
如果协程未完成,则返回它被暂停的堆栈。如果协程已成功完成或被取消,则返回一个空列表。如果协程由异常终止,则返回回溯帧列表。
帧总是从最旧到最新排序。
可选限制给出了要返回的最大帧数;默认情况下返回所有可用的帧。其含义根据是否返回堆栈或回溯而有所不同:返回堆栈的最新帧,但返回回溯的最早帧。(这与追溯模块的行为相匹配。)
由于我们无法控制的原因,对于暂停的协程只返回一个堆栈帧。
-
print_stack
(*, limit=None, file=None)¶ 打印此任务的协程的堆栈或回溯。
这为由get_stack()检索的帧产生与追溯模块类似的输出。limit参数传递给get_stack()。文件参数是写入输出的I / O流;默认情况下输出写入sys.stderr。
- classmethod
18.5.3.5.1. Example: Parallel execution of tasks¶
并行执行3个任务(A,B,C)的示例:
import asyncio
@asyncio.coroutine
def factorial(name, number):
f = 1
for i in range(2, number+1):
print("Task %s: Compute factorial(%s)..." % (name, i))
yield from asyncio.sleep(1)
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(factorial("A", 2)),
asyncio.ensure_future(factorial("B", 3)),
asyncio.ensure_future(factorial("C", 4))]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
输出:
Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24
任务在创建时自动计划执行。所有任务完成后,事件循环停止。
18.5.3.6. Task functions¶
注意
在下面的函数中,可选的loop参数允许显式设置基础任务或协程使用的事件循环对象。如果它不提供,则使用默认的事件循环。
-
asyncio.
as_completed
(fs, *, loop=None, timeout=None)¶ 返回一个迭代器,其值在等待时为
Future
实例。如果超时在所有期货完成之前发生,则引发
asyncio.TimeoutError
。例:
for f in as_completed(fs): result = yield from f # The 'yield from' may raise # Use result
注意
期货
f
不一定是fs的成员。
-
asyncio.
ensure_future
(coro_or_future, *, loop=None)¶ 调度执行一个 coroutine object:并且它封装成future。返回
任务
对象。如果参数是
Future
,则直接返回。版本3.4.4中的新功能。
在版本3.5.1中更改:该函数接受任何awaitable对象。
也可以看看
-
asyncio.
async
(coro_or_future, *, loop=None)¶ ensure_future()
的已弃用别名。自3.4.4版起已弃用。
-
asyncio.
gather
(*coros_or_futures, loop=None, return_exceptions=False)¶ 自给定的协程对象或futures返回一个future汇总结果。
所有futures必须共享相同的事件循环。如果所有任务都成功完成,则返回的future的结果是结果列表(按照原始序列的顺序,不一定是结果到达的顺序)。如果return_exceptions为True,则任务中的异常被视为与成功结果相同,并在结果列表中收集;否则,第一个引发的异常将立即传播到返回的future。
取消:如果外部未来被取消,所有的孩子(还没有完成)也被取消。如果有任何子项被取消,系统会将其视为
CancelledError
- 外部未来在此情况下取消。(这是为了防止取消一个孩子导致其他孩子被取消。)
-
asyncio.
iscoroutine
(obj)¶ 返回
True
if obj是coroutine object,其可以基于生成器或async def
协程。
-
asyncio.
iscoroutinefunction
(func)¶ 如果func被确定为coroutine function,则返回
True
,其可以是装饰的生成器函数或async def
函数。
-
asyncio.
run_coroutine_threadsafe
(coro, loop)¶ 将coroutine object提交到给定的事件循环。
返回
concurrent.futures.Future
以访问结果。此函数旨在从与运行事件循环的线程不同的线程调用。用法:
# Create a coroutine coro = asyncio.sleep(1, result=3) # Submit the coroutine to a given loop future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for the result with an optional timeout argument assert future.result(timeout) == 3
如果在协程中出现异常,则将通知返回的未来。它也可以用来取消事件循环中的任务:
try: result = future.result(timeout) except asyncio.TimeoutError: print('The coroutine took too long, cancelling the task...') future.cancel() except Exception as exc: print('The coroutine raised an exception: {!r}'.format(exc)) else: print('The coroutine returned: {!r}'.format(result))
请参阅文档的concurrency and multithreading部分。
注意
与模块中的其他函数不同,
run_coroutine_threadsafe()
需要明确传递循环参数。版本3.5.1中的新功能。
- coroutine
asyncio.
sleep
(delay, result=None, *, loop=None)¶ 创建一个在给定时间(以秒为单位)后完成的协程。如果提供result,则当协程完成时,将生成给调用者。
睡眠的分辨率取决于事件循环的granularity of the event loop
此函数是coroutine。
-
asyncio.
shield
(arg, *, loop=None)¶ 等待未来,屏蔽它取消。
该声明:
res = yield from shield(something())
正好等于语句:
res = yield from something()
除了,如果包含它的协程被取消,在
something()
中运行的任务不会被取消。从something()
的角度来看,取消没有发生。但是它的调用者仍然被取消,所以yield-from表达式仍然引发CancelledError
。注意:如果something()
被其他方法取消,这仍然会取消shield()
。如果要完全忽略取消(不推荐),您可以将
shield()
与try / except子句组合,如下所示:try: res = yield from shield(something()) except CancelledError: res = None
- coroutine
asyncio.
wait
(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)¶ 等待由序列futures给出的Futures和协程对象完成。协程将被包裹在任务中。返回含两个集合的
Future
:(done,pending)。序列futures不能为空。
timeout可用于控制返回前等待的最大秒数。timeout可以是一个int或float。如果未指定超时或
None
,则等待时间没有限制。return_when指示此函数应返回的时间。它必须是
concurrent.futures
模块的以下常量之一:常量 描述 FIRST_COMPLETED
当任何未来完成或被取消时,该函数将返回。 FIRST_EXCEPTION
当任何future通过抛出异常完成时,函数将返回。如果没有未来引发异常,则它等效于 ALL_COMPLETED
。ALL_COMPLETED
当所有期货完成或被取消时,该函数将返回。 此函数是coroutine。
用法:
done, pending = yield from asyncio.wait(fs)
注意
这不引发
asyncio.TimeoutError
!在超时发生时未完成的期货在第二组中返回。
- coroutine
asyncio.
wait_for
(fut, timeout, *, loop=None)¶ 等待单个
Future
或coroutine object完成超时。如果超时为None
,则阻止直到未来完成。协程将包裹在
Task
中。返回未来或协程的结果。当超时发生时,它取消任务并引发
asyncio.TimeoutError
。要避免任务取消,请将其封装在shield()
中。如果等待被取消,未来fut也会被取消。
此函数是coroutine,用法:
result = yield from asyncio.wait_for(fut, 60.0)
在版本3.4.3中更改:如果等待被取消,未来fut现在也被取消。