tornado.queues – 协程的队列

4.2 新版功能.

Classes

Queue

class tornado.queues.Queue(maxsize=0)[源代码]

协调生产者消费者协程.

如果maxsize 是0(默认配置)意味着队列的大小是无限的.

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue(maxsize=2)

@gen.coroutine
def consumer():
    while True:
        item = yield q.get()
        try:
            print('Doing work on %s' % item)
            yield gen.sleep(0.01)
        finally:
            q.task_done()

@gen.coroutine
def producer():
    for item in range(5):
        yield q.put(item)
        print('Put %s' % item)

@gen.coroutine
def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    yield producer()     # Wait for producer to put all tasks.
    yield q.join()       # Wait for consumer to finish all tasks.
    print('Done')

IOLoop.current().run_sync(main)
Put 0
Put 1
Doing work on 0
Put 2
Doing work on 1
Put 3
Doing work on 2
Put 4
Doing work on 3
Doing work on 4
Done

在Python 3.5, Queue 实现了异步迭代器协议, 所以 consumer() 可以被重写为:

async def consumer():
    async for item in q:
        try:
            print('Doing work on %s' % item)
            yield gen.sleep(0.01)
        finally:
            q.task_done()

在 4.3 版更改: 为Python 3.5添加 async for 支持 in Python 3.5.

maxsize

队列中允许的最大项目数.

qsize()[源代码]

当前队列中的项目数.

put(item, timeout=None)[源代码]

将一个项目放入队列中, 可能需要等待直到队列中有空间.

返回一个Future对象, 如果超时会抛出 tornado.gen.TimeoutError .

put_nowait(item)[源代码]

非阻塞的将一个项目放入队列中.

如果没有立即可用的空闲插槽, 则抛出 QueueFull.

get(timeout=None)[源代码]

从队列中删除并返回一个项目.

返回一个Future对象, 当项目可用时resolve, 或者在超时后抛出 tornado.gen.TimeoutError .

get_nowait()[源代码]

非阻塞的从队列中删除并返回一个项目.

如果有项目是立即可用的则返回该项目, 否则抛出 QueueEmpty.

task_done()[源代码]

表明前面排队的任务已经完成.

被消费者队列使用. 每个 get 用来获取一个任务, 随后(subsequent) 调用 task_done 告诉队列正在处理的任务已经完成.

如果 join 正在阻塞, 它会在所有项目都被处理完后调起; 即当每个 put 都被一个 task_done 匹配.

如果调用次数超过 put 将会抛出 ValueError .

join(timeout=None)[源代码]

阻塞(block)直到队列中的所有项目都处理完.

返回一个Future对象, 超时后会抛出 tornado.gen.TimeoutError 异常.

PriorityQueue

class tornado.queues.PriorityQueue(maxsize=0)[源代码]

一个有优先级的 Queue 最小的最优先.

写入的条目通常是元组, 类似 (priority number, data).

from tornado.queues import PriorityQueue

q = PriorityQueue()
q.put((1, 'medium-priority item'))
q.put((0, 'high-priority item'))
q.put((10, 'low-priority item'))

print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
(0, 'high-priority item')
(1, 'medium-priority item')
(10, 'low-priority item')

LifoQueue

class tornado.queues.LifoQueue(maxsize=0)[源代码]

一个后进先出(Lifo)的 Queue.

from tornado.queues import LifoQueue

q = LifoQueue()
q.put(3)
q.put(2)
q.put(1)

print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
1
2
3

Exceptions

QueueEmpty

exception tornado.queues.QueueEmpty[源代码]

当队列中没有项目时, 由 Queue.get_nowait 抛出.

QueueFull

exception tornado.queues.QueueFull[源代码]

当队列为最大size时, 由 Queue.put_nowait 抛出.