17.7. queue
- 同步队列类¶
源代码: Lib / queue.py
queue
模块实现多生产者,多消费者队列。它特别适用于信息必须在多个线程间安全地交换的多线程程序中。该模块中的Queue
类实现了所有需要的锁定语义。这取决于Python中线程支持的可用性;请参见threading
模块。
模块实现了三类队列,主要差别在于取得数据的顺序上。在FIFO(First In First Out,先进先出)队列中,最早加入的任务会被最先得到。在LIFO(Last In First Out,后进先出)队列中,最后加入的任务会被最先得到(就像栈一样)。在优先队列中,任务被保持有序(使用heapq
模块),拥有最小值的任务(优先级最高)被最先得到。
queue
模块定义了以下类和异常:
- class
queue.
Queue
(maxsize=0)¶ 构造一个FIFO队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
- class
queue.
LifoQueue
(maxsize=0)¶ 构造一个LIFO队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
- class
queue.
PriorityQueue
(maxsize=0)¶ 构造一个优先队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
首先检索最低值条目(最低值条目是由
sorted(list(entries))[0]
返回的条目。任务的典型模式就是如(priority_number, data)
这样的元组。
- exception
queue.
Empty
¶ 在空的
get()
对象上调用非阻塞的get_nowait()
(或者Queue
)会抛出此异常。
- exception
queue.
Full
¶ 在满的
put()
对象上调用非阻塞的put_nowait()
(或者Queue
)会抛出此异常。
17.7.1. Queue Objects¶
Queue对象(Queue
、LifoQueue
和PriorityQueue
)提供了下述的公共方法。
-
Queue.
qsize
()¶ 返回队列的近似大小。注意,qsize()> 0不保证随后的get()不会阻塞,也不会保证qsize()
-
Queue.
empty
()¶ 如果队列为空,返回
True
,否则返回False
。如果empty()返回True
,它不保证后续调用put()不会阻塞。类似的,如果empty()返回False
也不能保证接下来的get()调用不会被阻塞。
-
Queue.
full
()¶ 如果队列已满,则返回
True
,否则返回False
。如果full()返回True
,它不保证后续调用get()不会阻塞。类似的,如果full()返回False
并不能保证接下来的put()调用不会被阻塞。
-
Queue.
put
(item, block=True, timeout=None)¶ 将item放入队列中。如果可选的参数block为真且timeout为空对象(默认的情况,阻塞调用,无超时),如有必要(比如队列满),阻塞调用线程,直到有空闲槽可用。如果超时是正数,则它最多阻塞超时秒,如果在该时间内没有空闲插槽,则引发
Full
异常。如果block为假,如果有空闲槽可用将数据放入队列,否则立即抛出Full
异常(非阻塞调用,timeout被忽略)。
-
Queue.
put_nowait
(item)¶ 等同于
put(item, False)
(非阻塞调用)。
-
Queue.
get
(block=True, timeout=None)¶ 从队列中移除并返回一个数据。如果可选的参数block为真且timeout为空对象(默认的情况,阻塞调用,无超时),阻塞调用进程直到有数据可用。如果超时是正数,则它最多阻塞超时秒,如果在该时间内没有可用的项,则引发
Empty
异常。如果block为假,如果有数据可用返回数据,否则立即抛出Empty
异常(非阻塞调用,timeout被忽略)。
-
Queue.
get_nowait
()¶ 等同于
get(False)
(非阻塞调用)。
为了跟踪入队任务被消费者线程完全的处理掉,Queue对象提供了两个额外的方法。
-
Queue.
task_done
()¶ 意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个
get()
调用得到一个任务,接下来的task_done()
调用告诉队列该任务已经处理完毕。If a
join()
is currently blocking, it will resume when all items have been processed (meaning that atask_done()
call was received for every item that had beenput()
into the queue).如果该方法被调用的次数多于被放入队列中的任务的个数,
ValueError
异常会被抛出。
-
Queue.
join
()¶ 阻塞调用线程,直到队列中的所有任务被处理掉。
只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用
task_done()
以指示该项目已检索并且其上的所有工作都已完成时,计数将减少。当未完成的任务数降到0,join()
解除阻塞。
等待入队任务被怎样完成的例子:
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for item in source():
q.put(item)
# block until all tasks are done
q.join()
# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
参见
- 类
multiprocessing.Queue
- 用于在多进程(而不是多线程)上下文中使用的队列类。
collections.deque
是不需要锁定的快速原子append()
和popleft()
操作的无界队列的替代实现。