tornado.locks – 同步原语

4.2 新版功能.

使用和标准库提供给线程相似的同步原语协调协程.

(请注意, 这些原语不是线程安全的, 不能被用来代替标准库中的–它 们是为了协调在单线程app中的Tornado协程, 而不是为了在一个多线程 app中保护共享对象.)

Condition

class tornado.locks.Condition[源代码]

允许一个或多个协程等待直到被通知的条件.

就像标准的 threading.Condition, 但是不需要一个被获取和释放的底层锁.

通过 Condition, 协程可以等待着被其他协程通知:

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Condition

condition = Condition()

@gen.coroutine
def waiter():
    print("I'll wait right here")
    yield condition.wait()  # Yield a Future.
    print("I'm done waiting")

@gen.coroutine
def notifier():
    print("About to notify")
    condition.notify()
    print("Done notifying")

@gen.coroutine
def runner():
    # Yield two Futures; wait for waiter() and notifier() to finish.
    yield [waiter(), notifier()]

IOLoop.current().run_sync(runner)
I'll wait right here
About to notify
Done notifying
I'm done waiting

wait 有一个可选参数 timeout , 要不然是一个绝对的时间戳:

io_loop = IOLoop.current()

# Wait up to 1 second for a notification.
yield condition.wait(timeout=io_loop.time() + 1)

...或一个 datetime.timedelta 相对于当前时间的一个延时:

# Wait up to 1 second.
yield condition.wait(timeout=datetime.timedelta(seconds=1))

这个方法将抛出一个 tornado.gen.TimeoutError 如果在最后时间之前都 没有通知.

wait(timeout=None)[源代码]

等待 notify.

返回一个 Future 对象, 如果条件被通知则为 True , 或者在超时之后为 False .

notify(n=1)[源代码]

唤醒 n 个等待者(waiters) .

notify_all()[源代码]

唤醒全部的等待者(waiters) .

Event

class tornado.locks.Event[源代码]

一个阻塞协程的事件直到它的内部标识设置为True.

类似于 threading.Event.

协程可以等待一个事件被设置. 一旦它被设置, 调用 yield event.wait() 将不会被阻塞除非该事件已经被清除:

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Event

event = Event()

@gen.coroutine
def waiter():
    print("Waiting for event")
    yield event.wait()
    print("Not waiting this time")
    yield event.wait()
    print("Done")

@gen.coroutine
def setter():
    print("About to set the event")
    event.set()

@gen.coroutine
def runner():
    yield [waiter(), setter()]

IOLoop.current().run_sync(runner)
Waiting for event
About to set the event
Not waiting this time
Done
is_set()[源代码]

如果内部标识是true将返回 True .

set()[源代码]

设置内部标识为 True. 所有的等待者(waiters)都被唤醒.

一旦该标识被设置调用 wait 将不会阻塞.

clear()[源代码]

重置内部标识为 False.

调用 wait 将阻塞直到 set 被调用.

wait(timeout=None)[源代码]

阻塞直到内部标识为true.

返回一个Future对象, 在超时之后会抛出一个 tornado.gen.TimeoutError 异常.

Semaphore

class tornado.locks.Semaphore(value=1)[源代码]

可以在阻塞之前获得固定次数的锁.

一个信号量管理着代表 release 调用次数减去 acquire 的 调用次数的计数器, 加一个初始值. 如果必要的话,`.acquire` 方 法将会阻塞, 直到它可以返回, 而不使该计数器成为负值.

信号量限制访问共享资源. 为了允许两个worker同时获得权限:

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Semaphore

sem = Semaphore(2)

@gen.coroutine
def worker(worker_id):
    yield sem.acquire()
    try:
        print("Worker %d is working" % worker_id)
        yield use_some_resource()
    finally:
        print("Worker %d is done" % worker_id)
        sem.release()

@gen.coroutine
def runner():
    # Join all workers.
    yield [worker(i) for i in range(3)]

IOLoop.current().run_sync(runner)
Worker 0 is working
Worker 1 is working
Worker 0 is done
Worker 2 is working
Worker 1 is done
Worker 2 is done

Workers 0 和 1 允许并行运行, 但是worker 2将等待直到 信号量被worker 0释放.

acquire 是一个上下文管理器, 所以 worker 可以被写为:

@gen.coroutine
def worker(worker_id):
    with (yield sem.acquire()):
        print("Worker %d is working" % worker_id)
        yield use_some_resource()

    # Now the semaphore has been released.
    print("Worker %d is done" % worker_id)

在 Python 3.5 中, 信号量自身可以作为一个异步上下文管理器:

async def worker(worker_id):
    async with sem:
        print("Worker %d is working" % worker_id)
        await use_some_resource()

    # Now the semaphore has been released.
    print("Worker %d is done" % worker_id)

在 4.3 版更改: 添加对 Python 3.5 async with 的支持.

release()[源代码]

增加counter 并且唤醒一个waiter.

acquire(timeout=None)[源代码]

递减计数器. 返回一个 Future 对象.

如果计数器(counter)为0将会阻塞, 等待 release. 在超时之后 Future 对象将会抛出 TimeoutError .

BoundedSemaphore

class tornado.locks.BoundedSemaphore(value=1)[源代码]

一个防止release() 被调用太多次的信号量.

如果 release 增加信号量的值超过初始值, 它将抛出 ValueError. 信号量通常是通过限制容量来保护资源, 所以一个信号量释放太多次是 一个错误的标志.

release()[源代码]

增加counter 并且唤醒一个waiter.

acquire(timeout=None)

递减计数器. 返回一个 Future 对象.

如果计数器(counter)为0将会阻塞, 等待 release. 在超时之后 Future 对象将会抛出 TimeoutError .

Lock

class tornado.locks.Lock[源代码]

协程的锁.

一个Lock开始解锁, 然后它立即 acquire 锁. 虽然它是锁着的, 一个协程yield acquire 并等待, 直到另一个协程调用 release.

释放一个没锁住的锁将抛出 RuntimeError.

在所有Python 版本中 acquire 支持上下文管理协议:

>>> from tornado import gen, locks
>>> lock = locks.Lock()
>>>
>>> @gen.coroutine
... def f():
...    with (yield lock.acquire()):
...        # Do something holding the lock.
...        pass
...
...    # Now the lock is released.

在Python 3.5, Lock 也支持异步上下文管理协议(async context manager protocol). 注意在这种情况下没有 acquire, 因为 async with 同时包含 yieldacquire (就像 threading.Lock):

>>> async def f():  
...    async with lock:
...        # Do something holding the lock.
...        pass
...
...    # Now the lock is released.

在 3.5 版更改: 添加Python 3.5 的 async with 支持.

acquire(timeout=None)[源代码]

尝试锁. 返回一个Future 对象.

返回一个Future 对象, 在超时之后将抛出 tornado.gen.TimeoutError .

release()[源代码]

Unlock.

在队列中等待 acquire 的第一个 coroutine 获得锁.

如果没有锁, 将抛出 RuntimeError.