18.5.9. Develop with asyncio¶
异步编程不同于经典的顺序编程。本页列出了常见的陷阱,并解释了如何避免它们。
18.5.9.1. Debug mode of asyncio¶
asyncio
的实现是为了性能而编写的。为了简化异步代码的开发,您可以启用调试模式。
要为应用程序启用所有调试检查:
- 通过将环境变量
PYTHONASYNCIODEBUG
设置为1
或通过调用AbstractEventLoop.set_debug()
- 将asyncio logger的日志级别设置为
logging.DEBUG
。例如,在启动时调用logging.basicConfig(level=logging.DEBUG)
。 - 配置
warnings
模块以显示ResourceWarning
警告。例如,使用Python的-Wdefault
命令行选项显示它们。
示例调试检查:
- 日志coroutines defined but never “yielded from”生成
call_soon()
和call_at()
方法如果从错误的线程调用,则引发异常。- 记录选择器的执行时间
- 日志回调需要超过100毫秒才能执行。
AbstractEventLoop.slow_callback_duration
属性是“慢”回调的最小持续时间(以秒为单位)。 ResourceWarning
警告是在传输和事件循环not closed explicitly时发出的。
18.5.9.2. Cancellation¶
取消任务在经典编程中不常见。在异步编程中,不仅它是常见的,但你必须准备你的代码来处理它。
可以使用Future.cancel()
方法显式取消Future和Task。当超时发生时,wait_for()
函数取消等待的任务。还有许多其他情况下,任务可以间接取消。
如果Future被取消,请不要调用Future
的set_result()
或set_exception()
例如,写:
if not fut.cancelled():
fut.set_result('done')
不要使用AbstractEventLoop.call_soon()
直接调用set_result()
或set_exception()
如果你等待未来,你应该早点检查,如果未来被取消,以避免无用的操作。例:
@coroutine
def slow_operation(fut):
if fut.cancelled():
return
# ... slow computation ...
yield from fut
# ...
shield()
函数也可用于忽略取消。
18.5.9.3. 并发和多线程¶
事件循环在线程中运行,并在同一线程中执行所有回调和任务。当任务在事件循环中运行时,没有其他任务在同一线程中运行。但是当任务使用yield 从
时,任务将被挂起,事件循环执行下一个任务。
要计划不同线程的回调,应使用AbstractEventLoop.call_soon_threadsafe()
方法。例:
loop.call_soon_threadsafe(callback, *args)
大多数asyncio对象不是线程安全的。你只需要担心如果你访问事件循环之外的对象。例如,要取消未来,请不要直接调用其Future.cancel()
方法,但是:
loop.call_soon_threadsafe(fut.cancel)
为了处理信号和执行子进程,事件循环必须在主线程中运行。
要从不同的线程调度协程对象,应使用run_coroutine_threadsafe()
函数。它返回concurrent.futures.Future
以访问结果:
future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
result = future.result(timeout) # Wait for the result with a timeout
AbstractEventLoop.run_in_executor()
方法可以与线程池执行程序一起使用,以在不同线程中执行回调,从而不阻塞事件循环的线程。
18.5.9.4. 正确处理阻塞函数¶
不应该直接调用阻塞函数。例如,如果功能块阻塞1秒钟,其他任务会延迟1秒钟,这对性能有重要影响。
对于网络和子进程,asyncio
模块提供了高级API,如protocols。
执行器可用于在不同的线程中或甚至在不同的进程中运行任务,以不阻塞事件循环的线程。请参阅AbstractEventLoop.run_in_executor()
方法。
也可以看看
Delayed calls部分详细说明事件循环如何处理时间。
18.5.9.6. Detect coroutine objects never scheduled¶
当调用协程函数并且其结果未传递到ensure_future()
或传递给AbstractEventLoop.create_task()
方法时,协程对象的执行将永远被调度这可能是一个错误。Enable the debug mode of asyncio到log a warning以检测它。
示例与错误:
import asyncio
@asyncio.coroutine
def test():
print("never scheduled")
test()
调试模式下的输出:
Coroutine test() at test.py:3 was never yielded from
Coroutine object created at (most recent call last):
File "test.py", line 7, in <module>
test()
修复是使用协调对象调用ensure_future()
函数或AbstractEventLoop.create_task()
方法。
也可以看看
18.5.9.7. Detect exceptions never consumed¶
Python通常在未处理的异常上调用sys.displayhook()
。如果调用Future.set_exception()
,但该异常从不消耗,则不会调用sys.displayhook()
。相反,当未来被垃圾回收器删除时,发出a log is emitted,其中traceback引发异常。
未处理异常的示例:
import asyncio
@asyncio.coroutine
def bug():
raise Exception("not consumed")
loop = asyncio.get_event_loop()
asyncio.ensure_future(bug())
loop.run_forever()
loop.close()
输出:
Task exception was never retrieved
future: <Task finished coro=<coro() done, defined at asyncio/coroutines.py:139> exception=Exception('not consumed',)>
Traceback (most recent call last):
File "asyncio/tasks.py", line 237, in _step
result = next(coro)
File "asyncio/coroutines.py", line 141, in coro
res = func(*args, **kw)
File "test.py", line 5, in bug
raise Exception("not consumed")
Exception: not consumed
Enable the debug mode of asyncio以获取创建任务的回溯。调试模式下的输出:
Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3> exception=Exception('not consumed',) created at test.py:8>
source_traceback: Object created at (most recent call last):
File "test.py", line 8, in <module>
asyncio.ensure_future(bug())
Traceback (most recent call last):
File "asyncio/tasks.py", line 237, in _step
result = next(coro)
File "asyncio/coroutines.py", line 79, in __next__
return next(self.gen)
File "asyncio/coroutines.py", line 141, in coro
res = func(*args, **kw)
File "test.py", line 5, in bug
raise Exception("not consumed")
Exception: not consumed
有不同的选项来解决这个问题。第一个选项是在另一个协程中链接协程,并使用经典的try / except:
@asyncio.coroutine
def handle_exception():
try:
yield from bug()
except Exception:
print("exception consumed")
loop = asyncio.get_event_loop()
asyncio.ensure_future(handle_exception())
loop.run_forever()
loop.close()
另一个选项是使用AbstractEventLoop.run_until_complete()
函数:
task = asyncio.ensure_future(bug())
try:
loop.run_until_complete(task)
except Exception:
print("exception consumed")
也可以看看
18.5.9.8. Chain coroutines correctly¶
当协程函数调用其他协程函数和任务时,应使用yield 从
显式链接它们。否则,不能保证执行是顺序的。
使用asyncio.sleep()
来模拟慢操作的不同错误的示例:
import asyncio
@asyncio.coroutine
def create():
yield from asyncio.sleep(3.0)
print("(1) create file")
@asyncio.coroutine
def write():
yield from asyncio.sleep(1.0)
print("(2) write into file")
@asyncio.coroutine
def close():
print("(3) close file")
@asyncio.coroutine
def test():
asyncio.ensure_future(create())
asyncio.ensure_future(write())
asyncio.ensure_future(close())
yield from asyncio.sleep(2.0)
loop.stop()
loop = asyncio.get_event_loop()
asyncio.ensure_future(test())
loop.run_forever()
print("Pending tasks at exit: %s" % asyncio.Task.all_tasks(loop))
loop.close()
预期输出:
(1) create file
(2) write into file
(3) close file
Pending tasks at exit: set()
实际输出:
(3) close file
(2) write into file
Pending tasks at exit: {<Task pending create() at test.py:7 wait_for=<Future pending cb=[Task._wakeup()]>>}
Task was destroyed but it is pending!
task: <Task pending create() done at test.py:5 wait_for=<Future pending cb=[Task._wakeup()]>>
在create()
完成之前,close()
在write()
之前被调用,顺序:create()
,write()
,close()
。
要修正此示例,任务必须用yield 标记
:
@asyncio.coroutine
def test():
yield from asyncio.ensure_future(create())
yield from asyncio.ensure_future(write())
yield from asyncio.ensure_future(close())
yield from asyncio.sleep(2.0)
loop.stop()
或没有asyncio.ensure_future()
:
@asyncio.coroutine
def test():
yield from create()
yield from write()
yield from close()
yield from asyncio.sleep(2.0)
loop.stop()
18.5.9.9. Pending task destroyed¶
如果待处理的任务被销毁,则其包装的coroutine的执行未完成。这可能是一个错误,因此记录一个警告。
日志示例:
Task was destroyed but it is pending!
task: <Task pending coro=<kill_me() done, defined at test.py:5> wait_for=<Future pending cb=[Task._wakeup()]>>
Enable the debug mode of asyncio以获取创建任务的回溯。登录调试模式示例:
Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
File "test.py", line 15, in <module>
task = asyncio.ensure_future(coro, loop=loop)
task: <Task pending coro=<kill_me() done, defined at test.py:5> wait_for=<Future pending cb=[Task._wakeup()] created at test.py:7> created at test.py:15>
18.5.9.10. Close transports and event loops¶
当不再需要传输时,调用其close()
方法释放资源。事件循环也必须明确地关闭。
如果传输或事件循环未显式关闭,则会在其析构函数中发出ResourceWarning
警告。默认情况下,忽略ResourceWarning
警告。Debug mode of asyncio部分介绍如何显示它们。