17.2. multiprocessing —— 基于进程的并行

源代码: Lib / multiprocessing /

17.2.1. 引言

multiprocessing是一个和threading模块类似,提供API,生成进程的模块。multiprocessing包提供本地和远程并发,通过使用子进程而不是线程有效地转移全局解释器锁因此,multiprocessing模块允许程序员充分利用给定机器上的多个处理器。它在Unix和Windows上都可以运行。

multiprocessing模块还引入了threading模块中没有的API。比如Pool对象,它能很方便地对多个函数或分布式数据并行执行和运算。以下示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个使用Pool的数据并行性的基本示例,

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

将打印到标准输出

[1, 4, 9]

17.2.1.1. Process

multiprocessing中,通过创建Process对象,然后调用其start()方法来生成进程。Process遵循threading.Thread的API。多进程程序的一个简单例子是

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

要显示涉及的单个进程ID,下面是一个扩展示例:

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

关于为什么if __name__ == '__main__'是必要的,请参阅编程指南

17.2.1.2. 上下文和启动方法

根据平台,multiprocessing支持三种方式来启动进程。这些启动方法

spawn

The parent process starts a fresh python interpreter process. The child process will only inherit those resources necessary to run the process objects run() method. In particular, unnecessary file descriptors and handles from the parent process will not be inherited. Starting a process using this method is rather slow compared to using fork or forkserver.

Available on Unix and Windows. The default on Windows.

fork

The parent process uses os.fork() to fork the Python interpreter. The child process, when it begins, is effectively identical to the parent process. All resources of the parent are inherited by the child process. Note that safely forking a multithreaded process is problematic.

Available on Unix only. The default on Unix.

forkserver

When the program starts and selects the forkserver start method, a server process is started. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded so it is safe for it to use os.fork(). No unnecessary resources are inherited.

Available on Unix platforms which support passing file descriptors over Unix pipes.

版本3.4更新:所有Unix平台上添加spawn,以及为某些Unix平台添加forkserver在Windows上,子进程不再继承父进程所有可继承句柄。

在Unix上使用spawnforkserver启动方法还将启动一个信号量跟踪器进程,该进程跟踪由程序的进程创建的未链接的命名信号量。当所有进程退出后,信号量跟踪器取消任何剩余信号量的链接。通常应该没有,但如果一个过程被一个信号杀死,可能有一些“泄漏”的信号量。(取消命名信号量的链接是一件重要的事情,因为系统只允许有限的数量,并且它们将不会自动取消链接,直到下次重新启动。)

要选择启动的方法,请在main模块的if __name__ == '__main__'子句中使用set_start_method()例如:

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

在程序中set_start_method()应最多只使用一次。

或者,你可以使用get_context()来获取context对象。Context对象具有与multiprocessing模块相同的API,并允许在同一程序中使用多个启动方法。

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

注意,与context相关的对象可能与用于不同context的进程不兼容。特别地,使用fork context创建的锁不能传递到使用spawnforkserver启动方法启动的进程。

想要使用特定启动方法的库应该使用get_context(),以避免干扰库用户的选择。

17.2.1.3. 在进程之间交换对象

multiprocessing支持进程之间的两种类型的通信通道:

队列

The Queue class is a near clone of queue.Queue. For example:

from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()

Queues are thread and process safe.

管道

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join() 

The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

17.2.1.4. 进程之间的同步

multiprocessing包含了全部和threading相同的同步原语 。对于一个实例可以使用锁来保证同一时间只有一个进程在使用标准输出。

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

没有使用锁从不同进程的输出容易得到所有混合。

17.2.1.5. 进程之间的状态共享

如上所述,当进行并发编程时,最好尽量避免使用共享状态。在使用多个进程时尤其如此。

但是,如果真的需要使用一些共享数据,则multiprocessing提供了几种方法。

共享内存

Data can be stored in a shared memory map using Value or Array. For example, the following code

from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:]) 

will print

3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9] 

The 'd' and 'i' arguments used when creating num and arr are typecodes of the kind used by the array module: 'd' indicates a double precision float and 'i' indicates a signed integer. These shared objects will be process and thread-safe.

For more flexibility in using shared memory one can use the multiprocessing.sharedctypes module which supports the creation of arbitrary ctypes objects allocated from shared memory.

服务器进程

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,

from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l) 

will print

{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] 

Server process managers are more flexible than using shared memory objects because they can be made to support arbitrary object types. Also, a single manager can be shared by processes on different computers over a network. They are, however, slower than using shared memory.

17.2.1.6. 使用工作进程的进程池

Pool类表示工作进程的进程池。它具有允许将任务以几种不同的方式分配到工作进程的方法。

例如:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

注意,进程池的方法只应该被创建它的进程使用。

注意

此包中的功能要求__main__模块可由子进程导入。这在编程指南中有所说明,但值得在这里指出。这意味着一些示例,如multiprocessing.pool.Pool示例在交互式解释器中不能工作。例如:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

(如果你尝试这样,它实际上会输出以半随机方式交错的三个完整的traceback,然后你可能不得不停止主进程。)

17.2.2. Reference

multiprocessing包大多复制threading模块的API。

17.2.2.1. Process and exceptions

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

进程对象表示在单独进程中运行的活动。Process类具有threading.Thread的所有方法的等价物。

应始终使用关键字参数调用构造函数。应始终为None;它仅仅与threading.Thread兼容。target是要由run()方法调用的可调用对象。它默认为None,表示不调用任何内容。name是进程名称(有关详细信息,请参阅name)。args是目标调用的参数元组。kwargs是目标调用的关键字参数的字典。如果提供,则仅限关键字守护程序参数将进程daemon标记设置为TrueFalse如果None(默认值),则此标志将从创建过程继承。

默认情况下,没有参数传递给target

如果一个子类覆盖了构造函数,它必须确保在对进程做任何其他事情之前调用基类构造函数(Process.__init__())。

在版本3.3中已更改:添加了守护程序参数。

run()

表示进程活动的方法。

您可以在子类中覆盖此方法。标准run()方法调用传递给对象构造函数的可调用对象作为目标参数(如果有),其中顺序和关键字参数取自argskwargs参数。

start()

启动进程的活动。

每个进程对象最多只能调用一次。它安排对象的run()方法在一个单独的进程中被调用。

join([timeout])

如果可选参数timeoutNone(默认值),则该方法将阻塞,直到调用join()方法的进程终止。如果超时是正数,则它最多阻止超时秒。

一个过程可以连接多次。

进程不能自己加入,因为这将导致死锁。尝试在进程启动之前加入进程是一个错误。

name

进程的名称。名称是仅用于识别目的的字符串。它没有语义。可以给多个进程指定相同的名称。

初始名称由构造函数设置。如果没有为构造函数提供显式名称,则形式为'Process-N 1的名称:N 2:...:N k t2 >',其中每个N k是其父节点的第N个子节点。

is_alive()

返回进程是否存活。

粗略地说,从start()方法返回直到子进程终止的时刻,一个进程对象是活的。

daemon

进程的守护进程标志,一个布尔值。这必须在调用start()之前设置。

初始值从创建过程继承。

当进程退出时,它会尝试终止所有的daemonic子进程。

注意,不允许daemonic进程创建子进程。否则,一个守护进程会使其子进程成为孤立的,如果它的父进程退出时终止。此外,这些是而不是 Unix守护程序或服务,它们是正常的进程,如果非守护进程已退出,它们将被终止(而不是加入)。

除了threading.Thread API,Process对象还支持以下属性和方法:

pid

返回进程ID。在生成进程之前,这将是None

exitcode

孩子的退出代码。如果进程尚未终止,这将是None负值-N表示孩子被信号N终止。

authkey

进程的认证密钥(字节字符串)。

当初始化multiprocessing时,使用os.urandom()为主进程分配一个随机字符串。

当创建Process对象时,它将继承其父进程的认证密钥,但可以通过将authkey设置为另一个字节字符串来更改。

请参阅Authentication keys

sentinel

系统对象的数字句柄,在进程结束时将变为“就绪”。

如果要使用multiprocessing.connection.wait()一次等待多个事件,可以使用此值。否则调用join()更简单。

在Windows上,这是可与WaitForSingleObjectWaitForMultipleObjects API调用系列一起使用的操作系统句柄。在Unix上,这是一个文件描述器,可以使用来自select模块的原语。

版本3.3中的新功能。

terminate()

终止进程。在Unix上,这是使用SIGTERM信号完成的;在Windows上使用TerminateProcess()注意,退出处理程序和finally子句等不会被执行。

注意,进程的子进程将不会终止 - 它们将成为孤立的。

警告

如果在相关联的进程正在使用管道或队列时使用此方法,则管道或队列容易被损坏,并且可能变得不能由其他进程使用。类似地,如果进程已经获得锁或信号量等然后终止它可能导致其他进程死锁。

Note that the start(), join(), is_alive(), terminate() and exitcode methods should only be called by the process that created the process object.

Process的一些方法的示例用法:

>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<Process(Process-1, initial)> False
>>> p.start()
>>> print(p, p.is_alive())
<Process(Process-1, started)> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<Process(Process-1, stopped[SIGTERM])> False
>>> p.exitcode == -signal.SIGTERM
True
exception multiprocessing.ProcessError

所有multiprocessing异常的基类。

exception multiprocessing.BufferTooShort

当提供的缓冲区对象对于消息读取而言太小时,由Connection.recv_bytes_into()引发的异常。

如果eBufferTooShort的实例,则e.args[0]会将消息作为字节字符串。

exception multiprocessing.AuthenticationError

在出现身份验证错误时触发。

exception multiprocessing.TimeoutError

在超时超时时由方法引发。

17.2.2.2. Pipes and Queues

当使用多个进程时,通常使用消息传递来用于进程之间的通信,并避免必须使用任何同步原语,如锁。

对于传递消息,可以使用Pipe()(用于两个进程之间的连接)或队列(允许多个生产者和消费者)。

QueueSimpleQueueJoinableQueue类型是在queue.QueueThey differ in that Queue lacks the task_done() and join() methods introduced into Python 2.5’s queue.Queue class.

如果您使用JoinableQueue,那么您必须为每一个从队列中移除的任务去调用JoinableQueue.task_done()。否则的话,那些用于记录未完成的任务数量的信号量或许最终会溢出,从而引发异常。

请注意,还可以使用管理器对象创建共享队列 - 请参阅Managers

注意

multiprocessing使用通常的queue.Emptyqueue.Full例外来表示超时。它们在multiprocessing命名空间中不可用,因此您需要从queue中导入它们。

注意

当一个对象放在一个队列上时,该对象被腌制,后台线程随后将经过腌制的数据清洗到底层管道。这有一些后果是有点令人惊讶,但不应该导致任何实际困难 - 如果他们真的打扰你,那么你可以改为使用由manager创建的队列。

  1. 将对象放在空队列上之后,在队列empty()方法返回Falseget_nowait()之前可能会返回一个无限小的延迟而不提高queue.Empty
  2. 如果多个进程将对象排队,则可能在另一端无序地接收对象。然而,由相同进程排队的对象将始终以相对于彼此的期望顺序。

警告

If a process is killed using Process.terminate() or os.kill() while it is trying to use a Queue, then the data in the queue is likely to become corrupted. 这可能导致任何其他进程在尝试稍后使用队列时获得异常。

警告

如上所述,如果子进程已将项目放在队列上(并且它没有使用JoinableQueue.cancel_join_thread),那么该进程将不会终止,直到所有缓冲的项目都已刷新到管道。

这意味着,如果你尝试加入这个过程,你可能会得到一个死锁,除非你确定已经被放在队列上的所有项目已经消耗。类似地,如果子进程是非守护进程,则当进程尝试加入其所有非守护进程的子进程时,父进程可能在退出时挂起。

请注意,使用管理器创建的队列没有此问题。请参阅Programming guidelines

有关进程间通信的队列使用示例,请参见Examples

multiprocessing.Pipe([duplex])

返回表示管道末端的Connection对象的(conn1, conn2)

如果duplexTrue(默认值),则管道是双向的。如果duplexFalse,则管道是单向的:conn1只能用于接收消息,conn2用于发送消息。

class multiprocessing.Queue([maxsize])

返回进程共享的队列,底层使用管道和锁来实现。当进程首先将一个项目放在队列上时,启动了一个将对象从缓冲区传送到管道中的馈线线程。

来自标准库的queue模块的通常的queue.Emptyqueue.Full异常被引发以指示超时。

Queue实现除task_done()join()之外的queue.Queue的所有方法。

qsize()

返回队列的大致大小。由于多线程/多进程语义,这个数字是不可靠的。

请注意,这可能在Unix平台(如Mac OS X)上引发NotImplementedError,其中sem_getvalue()未实现。

empty()

如果队列为空,返回True,否则返回False由于多线程/多进程语义,这是不可靠的。

full()

如果队列已满,则返回True,否则返回False由于多线程/多进程语义,这是不可靠的。

put(obj[, block[, timeout]])

将obj放入队列。如果可选参数True(默认值)和超时None直到空闲时隙可用为止。如果超时是一个正数,它最多阻塞超时秒,如果在那段时间内没有空闲时隙,则引发queue.Full否则(False),如果空闲时隙立即可用,则将一个项目放在队列上,否则引发queue.Full异常在这种情况下忽略超时)。

put_nowait(obj)

等同于put(obj, False)

get([block[, timeout]])

从队列中删除并返回项目。如果可选的args True(默认值)和超时None直到项目可用。如果timeout是正数,则它最多阻塞超时秒,如果在该时间内没有可用的项目,则引发queue.Empty异常。Otherwise (block is False), return an item if one is immediately available, else raise the queue.Empty exception (timeout is ignored in that case).

get_nowait()

等同于get(False)

multiprocessing.Queue有一些在queue.Queue中找不到的其他方法。这些方法通常不需要大多数代码:

close()

指示当前进程不会在此队列上放置更多数据。The background thread will quit once it has flushed all buffered data to the pipe.当队列被垃圾回收时,这被自动调用。

join_thread()

加入后台线程。这只能在调用close()之后使用。它阻塞直到后台线程退出,确保缓冲区中的所有数据都已刷新到管道。

默认情况下,如果进程不是队列的创建者,那么在退出时它将尝试加入队列的后台线程。该进程可以调用cancel_join_thread()使join_thread()不执行任何操作。

cancel_join_thread()

阻止join_thread()阻止。特别地,这防止后台线程在进程退出时自动连接 - 参见join_thread()

此方法的更好名称可能是allow_exit_without_flush()它可能导致入队数据丢失,你几乎肯定不会需要使用它。它只是在那里,如果你需要当前的进程立即退出,而不等待刷新排队的数据到底层的管道,你不在乎丢失的数据。

注意

此类的功能需要在主机操作系统上运行共享信号量实现。没有一个,此类中的功能将被禁用,并尝试实例化Queue将导致ImportError有关其他信息,请参见问题3770这同样适用于下面列出的任何专门的队列类型。

class multiprocessing.SimpleQueue

它是简化的Queue类型,非常接近锁定的Pipe

empty()

如果队列为空,返回True,否则返回False

get()

从队列中删除并返回项目。

put(item)

放入队列。

class multiprocessing.JoinableQueue([maxsize])

JoinableQueueQueue子类是另外具有task_done()join()方法的队列。

task_done()

指示以前入队的任务已完成。由队列使用者使用。对于用于获取任务的每个get(),对task_done()的后续调用会告诉队列任务上的处理已完成。

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

引发a ValueError如果调用的次数比在队列中放置的项目多。

join()

阻止,直到队列中的所有项目都已被获取和处理。

每当项目添加到队列时,未完成任务的计数就会增加。当消费者调用task_done()以指示该项目已检索并且其上的所有工作都已完成时,计数将减少。当未完成任务的计数下降为零时,join()解除阻塞。

17.2.2.3. Miscellaneous

multiprocessing.active_children()

返回当前进程的所有活的孩子的列表。

调用这会产生“加入”已经完成的任何进程的副作用。

multiprocessing.cpu_count()

返回系统中的CPU数。May引发NotImplementedError

也可以看看

os.cpu_count()

multiprocessing.current_process()

返回与当前进程相对应的Process对象。

类似threading.current_thread()

multiprocessing.freeze_support()

当使用multiprocessing的程序已冻结以产生Windows可执行文件时,添加支持。(已使用py2exePyInstallercx_Freeze进行测试。)

需要在之后直接调用此函数 __ name __ == '__ main __' t0 >主模块的线。例如:

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

如果省略freeze_support()行,则尝试运行冻结的可执行文件将引发RuntimeError

调用freeze_support()在Windows以外的任何操作系统上调用时没有效果。此外,如果模块在Windows上由Python解释器正常运行(程序尚未冻结),则freeze_support()没有效果。

multiprocessing.get_all_start_methods()

返回支持的开始方法的列表,其中第一个是默认值。可能的启动方法是'fork''spawn''forkserver'在Windows上只有'spawn'可用。在Unix 'fork''spawn'始终受支持,'fork'

版本3.4中的新功能。

multiprocessing.get_context(method=None)

返回与multiprocessing模块具有相同属性的上下文对象。

如果方法,则返回默认上下文。否则方法应为'fork''spawn''forkserver'如果指定的start方法不可用,则会引发ValueError

版本3.4中的新功能。

multiprocessing.get_start_method(allow_none=False)

返回用于启动进程的start方法的名称。

如果start方法未修复,并且allow_none为false,则start方法将固定为默认值,并返回名称。如果start方法未修复,并且allow_none为true,则返回

返回值可以是'fork''spawn''forkserver''fork'是Unix上的默认值,而'spawn'是Windows上的默认值。

版本3.4中的新功能。

multiprocessing.set_executable()

设置在启动子进程时要使用的Python解释器的路径。(默认使用sys.executable)。Embedders可能需要做一些事情

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

才能创建子进程。

在版本3.4中更改:现在在使用'spawn' start方法时支持Unix。

multiprocessing.set_start_method(method)

设置应用于启动子进程的方法。方法可以是'fork''spawn''forkserver'

请注意,这应该最多调用一次,并且应该在内保护if __ name __ == '__ main__ '子句。

版本3.4中的新功能。

17.2.2.4. Connection Objects

连接对象允许发送和接收可拾取对象或字符串。它们可以被认为是面向消息的连接套接字。

通常使用Pipe()创建连接对象 - 另请参阅Listeners and Clients

class multiprocessing.Connection
send(obj)

将对象发送到连接的另一端,应使用recv()读取该对象。

对象必须可拾取。非常大的pickles(大约32 MB +,虽然它取决于操作系统)可能引发一个ValueError异常。

recv()

使用send()返回从连接另一端发送的对象。阻塞,直到有它的东西要接收。引发EOFError如果没有剩下的东西要接收,另一端被关闭。

fileno()

返回文件描述器或连接使用的句柄。

close()

关闭连接。

当连接被垃圾回收时,这被自动调用。

poll([timeout])

返回是否有任何数据可供读取。

如果未指定timeout,则它将立即返回。如果timeout是一个数字,那么它指定阻止的最大时间(以秒为单位)。如果超时None,则使用无限超时。

请注意,可以使用multiprocessing.connection.wait()一次轮询多个连接对象。

send_bytes(buffer[, offset[, size]])

bytes-like object发送字节数据作为完整的消息。

如果给定offset,则从缓冲器中的该位置读取数据。如果给定size,那么将从缓冲区读取许多字节。非常大的缓冲区(大约32 MB +,尽管它取决于操作系统)可能引发ValueError异常

recv_bytes([maxlength])

返回从连接的另一端发送的字节数据的完整消息作为字符串。阻塞直到有东西要接收。引发EOFError如果没有任何内容要接收,另一端已关闭。

如果指定maxlength且消息长于maxlength,则会引发OSError,连接将不再可读。

在版本3.3中更改:此函数用于引发IOError,现在是OSError的别名。

recv_bytes_into(buffer[, offset])

将从连接另一端发送的字节数据的完整消息读入缓冲区,并返回消息中的字节数。阻塞直到有东西要接收。引发EOFError如果没有剩下的东西要接收,另一端被关闭。

缓冲区必须是可写入的bytes-like object如果给定offset,则消息将从该位置写入缓冲区。偏移量必须是小于缓冲区长度的非负整数(以字节为单位)。

如果缓冲区太短,则会引发BufferTooShort异常,并且完整消息可用为e.args[0],其中e exception实例。

在版本3.3中已更改:现在可以使用Connection.send()Connection.recv()在进程之间传输连接对象本身。

版本3.3中的新功能:连接对象现在支持上下文管理协议 - 请参阅Context Manager Types__enter__()返回连接对象,__exit__()调用close()

例如:

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

警告

Connection.recv()方法会自动取消接收的数据,这可能是一种安全风险,除非您可以信任发送消息的进程。

因此,除非使用Pipe()生成连接对象,因此在执行某种排序后,应该只使用recv()send()请参阅Authentication keys

警告

如果进程在尝试读取或写入管道时被杀死,则管道中的数据可能会被破坏,因为可能无法确定消息边界位于何处。

17.2.2.5. Synchronization primitives

通常,在多进程程序中同步原语不像在多线程程序中那样是必要的。请参阅threading模块的文档。

请注意,还可以使用管理器对象创建同步原语 - 请参阅Managers

class multiprocessing.Barrier(parties[, action[, timeout]])

屏障对象:threading.Barrier的克隆。

版本3.3中的新功能。

class multiprocessing.BoundedSemaphore([value])

有界信号对象:threading.BoundedSemaphore的紧密模拟。

与其近似模拟的单独差异存在:其acquire方法的第一个参数被命名为,这与Lock.acquire()一致。

注意

在Mac OS X上,这与Semaphore无法区分,因为sem_getvalue()未在该平台上实现。

class multiprocessing.Condition([lock])

条件变量:threading.Condition的别名。

如果指定lock,那么它应该是来自multiprocessingLockRLock对象。

在版本3.3中已更改:添加了wait_for()方法。

class multiprocessing.Event

克隆threading.Event

class multiprocessing.Lock

非递归锁定对象:threading.Lock的紧密模拟。一旦进程或线程获得了锁,随后从任何进程或线程获取它的尝试将阻塞,直到它被释放;任何进程或线程都可以释放它。threading.Lock应用于线程的概念和行为在multiprocessing.Lock中复制,因为它适用于进程或线程,除非另有说明。

请注意,Lock实际上是一个工厂函数,它返回使用默认上下文初始化的multiprocessing.synchronize.Lock的实例。

Lock支持context manager协议,因此可用于with语句。

acquire(block=True, timeout=None)

获取锁定,阻止或非阻止。

参数设置为True(默认值),方法调用将阻塞,直到锁处于未锁定状态,然后将其设置为锁定并返回True请注意,此第一个参数的名称不同于threading.Lock.acquire()中的名称。

参数设置为False,方法调用不会阻止。如果锁定当前处于锁定状态,则返回False;否则将锁设置为锁定状态,并返回True

当对超时使用正值浮点值调用时,只要不能获取锁定,最多只能阻止超时指定的秒数。对于超时的负值的调用相当于零的超时使用超时None的调用(默认值)将超时时间设置为无限。请注意,超时的负值或None值与threading.Lock.acquire()中实现的行为不同。如果参数设置为False,则timeout参数没有实际意义,因此被忽略。如果已获取锁定,则返回True,如果已超过超时时间,则返回False

release()

释放锁。这可以从任何进程或线程调用,而不仅仅是最初获取锁的进程或线程。

行为与threading.Lock.release()中的行为相同,只是当在未锁定的锁上调用时,会引发ValueError

class multiprocessing.RLock

递归锁定对象:threading.RLock的紧密模拟。递归锁必须由获取它的进程或线程释放。一旦进程或线程已经获得递归锁,相同的进程或线程可以再次获取它而不阻塞;该进程或线程必须每次释放它一次它已被获取。

注意,RLock实际上是一个工厂函数,它返回使用默认上下文初始化的multiprocessing.synchronize.RLock的实例。

RLock支持context manager协议,因此可以在with语句。

acquire(block=True, timeout=None)

获取锁定,阻止或非阻止。

使用参数设置为True调用时,阻塞直到锁处于未锁定状态(不由任何进程或线程拥有),除非锁已由当前进程或线程。当前进程或线程然后获取锁的所有权(如果它还没有所有权),并且锁中的递归级别增加1,导致返回值True请注意,与执行threading.RLock.acquire()相比,此第一个参数的行为有几个不同,从参数本身的名称开始。

使用参数设置为False调用时,不要阻止。如果锁已经被另一进程或线程获取(并因此被拥有),则当前进程或线程不取得所有权,并且锁中的递归级别不改变,导致返回值False如果锁处于未锁定状态,则当前进程或线程获取所有权,递归级别递增,从而返回值True

timeout参数的使用和行为与Lock.acquire()中的相同。注意,超时的一些行为与threading.RLock.acquire()中实现的行为不同。

release()

释放锁,递减递归级别。如果在递减之后递归级别为零,将锁重置为解锁(不由任何进程或线程拥有),并且如果任何其他进程或线程被阻塞等待锁被解锁,则只允许其中一个进程继续。如果递减之后递归级别仍然为非零,则锁保持锁定并由调用进程或线程拥有。

只有在调用进程或线程拥有锁时才调用此方法。如果此方法由除所有者之外的进程或线程调用或锁定处于未锁定(无主)状态,则会引发AssertionError请注意,在这种情况下引发的异常类型与threading.RLock.release()中实现的行为不同。

class multiprocessing.Semaphore([value])

信号量对象:threading.Semaphore的紧密模拟。

与其近似模拟的单独差异存在:其acquire方法的第一个参数被命名为,这与Lock.acquire()一致。

注意

在Mac OS X上,sem_timedwait不受支持,因此使用超时调用acquire()将使用睡眠循环来模拟该函数的行为。

注意

If the SIGINT signal generated by Ctrl-C arrives while the main thread is blocked by a call to BoundedSemaphore.acquire(), Lock.acquire(), RLock.acquire(), Semaphore.acquire(), Condition.acquire() or Condition.wait() then the call will be immediately interrupted and KeyboardInterrupt will be raised.

这与threading的行为不同,在等待阻塞调用正在进行时,SIGINT将被忽略。

注意

此程序包的某些功能需要在主机操作系统上运行共享信号量实现。没有一个,multiprocessing.synchronize模块将被禁用,并尝试导入它将导致ImportError有关其他信息,请参见问题3770

17.2.2.6. Shared ctypes Objects

可以使用可以由子进程继承的共享内存创建共享对象。

multiprocessing.Value(typecode_or_type, *args, lock=True)

返回从共享内存分配的ctypes对象。默认情况下,返回值实际上是对象的同步包装器。对象本身可以通过Value属性访问。

typecode_or_type确定返回对象的类型:它是ctypes类型或array模块使用的类型的一个字符类型代码。* args被传递给类型的构造函数。

如果lockTrue(默认值),则创建一个新的递归锁对象,以同步对该值的访问。如果lockLockRLock对象,那么它将用于同步对该值的访问。如果lockFalse,那么对返回的对象的访问将不会被锁自动保护,因此它不一定是“进程安全的”。

诸如+=之类的操作(其涉及读取和写入)不是原子的。所以如果,对于实例,你想原子地递增一个共享值,这是不够的,只是做

counter.value += 1

假设相关的锁是递归的(默认情况下),你可以改为做

with counter.get_lock():
    counter.value += 1

请注意,lock是一个仅关键字的参数。

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

返回从共享内存分配的ctypes数组。默认情况下,返回值实际上是数组的同步包装器。

typecode_or_type确定返回数组的元素类型:它是ctypes类型或array模块使用的类型的一个字符类型代码。如果size_or_initializer是一个整数,则它确定数组的长度,并且数组将初始置零。否则,size_or_initializer是用于初始化数组的序列,其长度决定了数组的长度。

如果lockTrue(默认值),则创建一个新的锁对象,以同步对该值的访问。如果lockLockRLock对象,那么它将用于同步对该值的访问。如果lockFalse,那么对返回的对象的访问将不会被锁自动保护,因此它不一定是“进程安全的”。

请注意,lock是一个仅关键字的参数。

请注意,ctypes.c_char的数组具有raw属性,允许使用它来存储和检索字符串。

17.2.2.6.1. The multiprocessing.sharedctypes module

multiprocessing.sharedctypes模块提供了从共享内存中分配ctypes对象的功能,这些对象可以由子进程继承。

注意

虽然可以在共享存储器中存储指针,但请记住这将指向特定进程的地址空间中的位置。然而,指针很可能在第二进程的上下文中是无效的,并且试图从第二进程解引用指针可能导致崩溃。

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

返回从共享内存分配的ctypes数组。

typecode_or_type确定返回数组的元素类型:它是ctypes类型或array模块使用的类型的一个字符类型代码。如果size_or_initializer是一个整数,那么它确定数组的长度,并且数组将初始置零。否则size_or_initializer是用于初始化数组的序列,其长度决定数组的长度。

注意,设置和获取元素可能是非原子的 - 使用Array()来确保访问是使用锁自动同步的。

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

返回从共享内存分配的ctypes对象。

typecode_or_type确定返回对象的类型:它是ctypes类型或array模块使用的类型的一个字符类型代码。* args被传递给类型的构造函数。

请注意,设置和获取值可能是非原子的 - 使用Value()而不是确保使用锁自动同步访问。

Note that an array of ctypes.c_char has value and raw attributes which allow one to use it to store and retrieve strings – see documentation for ctypes.

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

RawArray()相同,但根据lock的值,可能会返回一个进程安全同步包装,而不是原始ctypes数组。

如果lockTrue(默认值),则创建一个新的锁对象,以同步对该值的访问。如果lockLockRLock对象,那么它将用于同步对该值的访问。如果lockFalse,那么对返回的对象的访问将不会被锁自动保护,因此它不一定是“进程安全的”。

请注意,lock是一个强制关键字参数。

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

RawValue()相同,只是根据lock的值,可能会返回一个进程安全同步包装,而不是原始ctypes对象。

如果lockTrue(默认值),则创建一个新的锁对象,以同步对该值的访问。如果lockLockRLock对象,那么它将用于同步对该值的访问。如果lockFalse,那么对返回的对象的访问将不会被锁自动保护,因此它不一定是“进程安全的”。

请注意,lock是一个强制关键字参数。

multiprocessing.sharedctypes.copy(obj)

返回从共享内存分配的ctypes对象,它是ctypes对象obj的副本。

multiprocessing.sharedctypes.synchronized(obj[, lock])

返回一个使用lock来同步访问的ctypes对象的进程安全包装器对象。如果lockNone(默认值),则会自动创建multiprocessing.RLock对象。

同步包装器除了包装的对象外还有两个方法:get_obj()返回包装对象,get_lock()返回用于同步的锁定对象。

注意,通过包装器访问ctypes对象比访问raw ctypes对象要慢得多。

在3.5版本中已更改:同步对象支持context manager协议。

下表比较了使用正常ctypes语法从共享内存创建共享ctypes对象的语法。(在表MyStruct中是ctypes.Structure的某个子类)

ctypes使用类型的共享类型使用类型代码的共享类型
c_double(2.4)RawValue(c_double,2.4)RawValue('d',2.4)
MyStruct(4,6)RawValue(MyStruct,4,6)
(c_short * 7)()RawArray(c_short,7)RawArray('h',7)
(c_int * 3)(9,2,8)RawArray(c_int,(9,2,8))RawArray('i',(9,2,8))

下面是一个例子,其中一些子进程修改了一些ctypes对象:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

打印的结果是

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

17.2.2.7. Managers

管理器提供一种创建可在不同进程之间共享的数据的方法,包括在不同机器上运行的进程之间通过网络共享。管理员对象控制管理共享对象的服务器进程。其他进程可以通过使用代理访问共享对象。

multiprocessing.Manager()

返回开始的SyncManager对象,可用于在进程之间共享对象。返回的管理器对象对应于生成的子进程,并且具有将创建共享对象并返回相应代理的方法。

管理器进程将在垃圾收集或其父进程退出时立即关闭。管理器类在multiprocessing.managers模块中定义:

class multiprocessing.managers.BaseManager([address[, authkey]])

创建一个BaseManager对象。

一旦创建,应调用start()get_server().serve_forever(),以确保管理器对象引用启动的管理器进程。

address是管理器进程侦听新连接的地址。如果addressNone,则选择任意一个。

authkey是将用于检查到服务器进程的传入连接的有效性的认证密钥。如果authkeyNone,则使用current_process().authkey否则使用authkey,它必须是字节字符串。

start([initializer[, initargs]])

启动子过程以启动管理器。如果初始化器不是None,则子过程在启动时将调用initializer(*initargs)

get_server()

返回Server对象,它表示在Manager控制下的实际服务器。Server对象支持serve_forever()方法:

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server还具有address属性。

connect()

本地管理器对象连接到远程管理器进程:

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 5000), authkey=b'abc')
>>> m.connect()
shutdown()

停止经理使用的进程。仅当start()用于启动服务器进程时,此选项才可用。

这可以多次调用。

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

可以用于向管理器类注册类型或可调用的类方法。

typeid是用于标识特定类型的共享对象的“类型标识符”。这必须是字符串。

callable是用于为此类型标识符创建对象的可调用项。如果一个manager实例将使用connect()方法连接到服务器,或者create_method参数是False作为None

proxytypeBaseProxy的子类,用于使用此typeid创建共享对象的代理。如果None,则会自动创建代理类。

exposed用于指定应允许使用BaseProxy._callmethod()访问此类型代理的方法名称序列。(如果曝光None,则使用proxytype._exposed _代替,如果存在,则使用proxytype._exposed_在未指定公开列表的情况下,将可访问共享对象的所有“公共方法”。(这里的“公共方法”是指具有__call__()方法并且其名称不以'_'开头的任何属性。

method_to_typeid是用于指定应返回代理的那些公开方法的返回类型的映射。它将方法名映射到typeid字符串。(如果method_to_typeidNone,那么将使用proxytype._method_to_typeid_,如果存在)。如果方法的名称不是此映射的键,或者映射None,则方法返回的对象将按值复制。

create_method确定是否应使用名称typeid创建方法,该方法可用于告知服务器进程创建新的共享对象并为其返回代理。默认情况下为True

BaseManager实例也有一个只读属性:

address

管理器使用的地址。

在版本3.3中更改: Manager对象支持上下文管理协议 - 请参见Context Manager Types__enter__()启动服务器进程(如果尚未启动),然后返回manager对象。__exit__()调用shutdown()

在以前的版本中,如果尚未启动管理器的服务器进程,则__enter__()未启动。

class multiprocessing.managers.SyncManager

BaseManager的子类,可用于进程的同步。此类型的对象由multiprocessing.Manager()返回。

它还支持创建共享列表和字典。

Barrier(parties[, action[, timeout]])

创建共享threading.Barrier对象,并为其返回代理。

版本3.3中的新功能。

BoundedSemaphore([value])

创建共享threading.BoundedSemaphore对象,并为其返回代理。

Condition([lock])

创建共享threading.Condition对象并为其返回代理。

如果提供lock,那么它应该是threading.Lockthreading.RLock对象的代理。

在版本3.3中已更改:添加了wait_for()方法。

Event()

创建共享threading.Event对象并为其返回代理。

Lock()

创建共享threading.Lock对象,并为其返回代理。

Namespace()

创建共享Namespace对象,并为其返回代理。

Queue([maxsize])

创建共享queue.Queue对象并返回其代理。

RLock()

创建共享threading.RLock对象,并为其返回代理。

Semaphore([value])

创建共享threading.Semaphore对象,并为其返回代理。

Array(typecode, sequence)

创建一个数组并返回一个代理。

Value(typecode, value)

创建具有可写入value属性的对象,并为其返回代理。

dict()
dict 映射
dict 序列

创建共享dict对象,并为其返回代理。

list()
列表 序列

创建共享list对象,并为其返回代理。

注意

对dict和列表代理中的可变值或项的修改不会通过管理器传播,因为代理无法知道其值或项目何时被修改。要修改此类项,您可以将修改的对象重新分配给容器代理:

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# reassigning the dictionary, the proxy is notified of the change
lproxy[0] = d
class multiprocessing.managers.Namespace

可以使用SyncManager注册的类型。

命名空间对象没有公共方法,但有可写属性。其表示显示其属性的值。

但是,当为名称空间对象使用代理时,以'_'开头的属性将是代理的属性,而不是引用对象的属性:

>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

17.2.2.7.1. Customized managers

要创建自己的管理器,需要创建BaseManager的子类,并使用register() classmethod向管理器类注册新类型或可调用项。例如:

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

17.2.2.7.2. Using a remote manager

可以在一台计算机上运行管理器服务器,并让客户端从其他计算机使用它(假设涉及的防火墙允许它)。

运行以下命令为远程客户端可以访问的单个共享队列创建服务器:

>>> from multiprocessing.managers import BaseManager
>>> import queue
>>> queue = queue.Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

一个客户端可以访问服务器,如下所示:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

另一个客户端也可以使用它:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

本地进程也可以访问该队列,使用上面的代码在客户端上远程访问它:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super(Worker, self).__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

17.2.2.8. Proxy Objects

代理是指向到一个共享对象的对象,该对象在不同的​​进程中存在(可能)。共享对象被称为代理的指示符多个代理对象可以具有相同的指示。

代理对象具有调用其指示对象的相应方法的方法(尽管不是指示对象的每个方法都必须通过代理可用)。代理通常可以以其指示的大多数相同的方式使用:

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

注意,将str()应用于代理将返回指示对象的表示,而应用repr()将返回代理的表示。

代理对象的一个​​重要特征是它们是可拾取的,因此它们可以在进程之间传递。但是,请注意,如果代理被发送到相应的管理器的进程,则取消它将产生指示符本身。这意味着,例如,一个共享对象可以包含第二个:

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[[]] []
>>> b.append('hello')
>>> print(a, b)
[['hello']] ['hello']

注意

multiprocessing中的代理类型不支持按值进行比较。所以,对于实例,我们有:

>>> manager.list([1,2,3]) == [1,2,3]
False

在进行比较时,应该只使用指示物的副本。

class multiprocessing.managers.BaseProxy

代理对象是BaseProxy的子类的实例。

_callmethod(methodname[, args[, kwds]])

调用并返回代理引用对象的方法的结果。

如果proxy是所指对象为obj的代理,则表达式

proxy._callmethod(methodname, args, kwds)

将计算表达式

getattr(obj, methodname)(*args, **kwds)

在经理的过程中。

返回的值将是调用结果的副本或新的共享对象的代理 - 请参阅BaseManager.register()method_to_typeid参数的文档。

如果调用引发异常,则通过_callmethod()重新生成异常。如果在管理器的进程中引发了一些其他异常,那么这将被转换为RemoteError异常,并由_callmethod()引发。

特别要注意的是,如果methodname没有暴露,则会引发异常。

_callmethod()的用法示例:

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

返回指示对象的副本。

如果引用是不可拆分的,那么这将引发异常。

__repr__()

返回代理对象的表示。

__str__()

返回指示对象的表示。

17.2.2.8.1. Cleanup

代理对象使用一个weakref回调,所以当它被垃圾收集时,它从拥有其指示对象的管理器中注销它自己。

当不再有任何代理引用它时,共享对象将从管理器进程中删除。

17.2.2.9. 进程池

可以用Pool类创建一个进程池来执行提交给它的任务。

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

一个进程池对象,控制一个包含工作进程的进程池,作业可以提交给它。它支持具有超时和回调的异步结果,并具有并行映射实现。

processes是要使用的工作进程数。如果processesNone,则使用os.cpu_count()返回的数字。

如果initializer不是None,则每个工作进程在启动时将调用initializer(*initargs)

maxtasksperchild是工作进程在退出并由新工作进程替换之前可以完成的任务数,以使未使用的资源可以释放。默认的maxtasksperchild是None,这意味着工作进程的生存时间将与进程池一样长。

context可用于指定用于启动工作进程的上下文。通常使用multiprocessing.Pool()或上下文对象的Pool()方法创建进程池。在这两种情况下,都会正确地设置context

注意,进程池对象的方法只能由创建进程池的进程调用。

版本3.2中的新功能: maxtasksperchild

版本3.4中的新功能: context

Pool内的工作进程通常在进程池的工作队列的整个持续时间内生效。在其他系统(例如Apache、mod_wsgi等)中,释放工作进程持有的资源的常见模式是允许进程池内的工作进程只完成一定数量的作业,然后再退出、清理并生成一个新的进程来替换旧的进程。Poolmaxtasksperchild参数就是向用户提供这个功能。

apply(func[, args[, kwds]])

使用参数args和关键字参数kwds调用func它阻塞直到结果完成。相比这个代码块,apply_async()更适合并行执行工作。此外,func仅在进程池的其中一个工作进程中执行。

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

apply()方法的一个变体,它返回一个结果对象。

如果指定callback,那么它应该是一个可接受单个参数的可调用对象。当结果完成时就对它应用callback,在调用失败的情况下则应用error_callback

如果指定error_callback,那么它应该是一个接受单个参数的可调用对象。如果目标函数失败,则以该异常实例调用error_callback

回调应该立即完成,否则处理结果的线程将被阻塞。

map(func, iterable[, chunksize])

map()内置函数(虽然它只支持一个iterable参数)等价的并行方式。它阻塞直到结果准备好。

该方法将iterable分成多个块,然后将这些块作为单独的任务提交到进程池。这些块的(近似)大小可以通过将chunksize设置为正整数来指定。

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

返回结果对象的map()方法的变体。

如果指定callback,那么它应该是一个可接受单个参数的可调用对象。当结果完成时就对它应用callback,在调用失败的情况下则应用error_callback

如果指定error_callback,那么它应该是一个接受单个参数的可调用对象。如果目标函数失败,则以该异常实例调用error_callback

回调应该立即完成,否则处理结果的线程将被阻塞。

imap(func, iterable[, chunksize])

map()的惰性版本。

chunksize参数与map()方法使用的参数相同。对于非常长的iterable,chunksize使用一个较大值的可以使得作业比使用默认值1更快地完成。

此外,如果chunksize1,那么imap()方法返回的迭代器的next()方法有一个可选的参数timeout:如果在timeout秒内无法返回结果,那么next(timeout)将引发multiprocessing.TimeoutError

imap_unordered(func, iterable[, chunksize])

imap()相同,除了返回的迭代器的结果的顺序应该被认为是任意的。(只有当只有一个工作进程时,其顺序才保证是“正确的”。)

starmap(func, iterable[, chunksize])

类似map(),除了iterable的元素应该是可迭代对象,它们将分拆为参数。

因此,[(1,2), (3, 4)]这个iterable将导致[func(1,2), func(3,4)]

版本3.3中的新功能。

starmap_async(func, iterable[, chunksize[, callback[, error_back]]])

starmap()map_async()的组合,在可迭代对象组成的iterable上迭代并以分拆后的可迭代对象调用func返回一个结果对象。

版本3.3中的新功能。

close()

阻止任何更多的任务提交到进程池。一旦所有任务完成,工作进程将退出。

terminate()

立即停止工作进程,而不需要完成未完成的工作。当进程池对象被垃圾收集,terminate()将被立即调用。

join()

等待工作进程退出。在使用join()之前,必须调用close()terminate()

版本3.3中的新功能:进程池对象现在支持上下文管理协议 —— 请参阅上下文管理器类型__enter__()返回进程池对象,__exit__()调用terminate()

class multiprocessing.pool.AsyncResult

The class of the result returned by Pool.apply_async() and Pool.map_async().

get([timeout])

返回结果到达时。如果超时不是None且结果在超时秒内未到达,则会引发multiprocessing.TimeoutError如果远程调用引发异常,那么该异常将由get()重写。

wait([timeout])

等待结果可用或直到超时秒。

ready()

返回呼叫是否已完成。

successful()

返回调用是否完成而不引发异常。如果结果未准备就会引发AssertionError

以下示例演示了使用进程池:

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

17.2.2.10. Listeners and Clients

通常,使用队列或使用Pipe()返回的Connection对象来完成进程之间的消息传递。

但是,multiprocessing.connection模块允许一些额外的灵活性。它基本上提供了一个高级的面向消息的API来处理套接字或Windows命名管道。它还支持摘要认证使用hmac模块,并且同时轮询多个连接。

multiprocessing.connection.deliver_challenge(connection, authkey)

将随机生成的消息发送到连接的另一端,并等待回复。

如果回复与使用authkey作为键的消息摘要相匹配,则会向连接的另一端发送欢迎消息。否则会出现AuthenticationError

multiprocessing.connection.answer_challenge(connection, authkey)

接收消息,使用authkey作为键计算消息的摘要,然后发送摘要。

如果未收到欢迎消息,则会引发AuthenticationError

multiprocessing.connection.Client(address[, family[, authenticate[, authkey]]])

尝试设置与使用地址地址的侦听器的连接,返回Connection

连接类型由family参数确定,但通常可以省略,因为它通常可以从地址的格式推断。(请参阅Address Formats

如果authenticateTrueauthkey是字节字符串,那么使用摘要认证。用于认证的密钥将是authkeycurrent_process().authkey if authkeyNone如果身份验证失败,则会出现AuthenticationError请参阅Authentication keys

class multiprocessing.connection.Listener([address[, family[, backlog[, authenticate[, authkey]]]]])

用于绑定套接字或Windows命名管道的“包”用于侦听连接。

address是侦听器对象的绑定套接字或命名管道使用的地址。

注意

如果使用地址“0.0.0.0”,则该地址将不是Windows上的可连接终点。如果需要可连接的端点,则应使用“127.0.0.1”。

family是要使用的套接字(或命名管道)的类型。这可以是字符串'AF_INET'(对于TCP套接字),'AF_UNIX'(对于Unix域套接字)或'AF_PIPE'其中只有第一个保证可用。如果familyNone,那么系列将根据地址的格式推断。如果地址也是None,则选择默认值。此默认值是假定为最快可用的系列。请参阅Address Formats请注意,如果family'AF_UNIX'且地址是None,那么套接字将创建在使用tempfile.mkstemp()

如果侦听器对象使用套接字,则一旦套接字已被绑定,backlog(默认为1)将传递给套接字的listen()

If authenticate is True (False by default) or authkey is not None then digest authentication is used.

如果authkey是字节字符串,那么它将被用作认证密钥;否则必须为

如果authkeyNone验证True,则current_process().authkey用作认证密钥。如果authkeyNone验证False,则不进行验证。如果身份验证失败,则会出现AuthenticationError请参阅Authentication keys

accept()

在侦听器对象的绑定套接字或命名管道上接受连接,并返回Connection对象。如果尝试认证并失败,则会引发AuthenticationError

close()

关闭侦听器对象的绑定套接字或命名管道。当侦听器被垃圾回收时,这被自动调用。但是建议明确调用它。

侦听器对象具有以下只读属性:

address

Listener对象正在使用的地址。

last_accepted

最后接受的连接来自的地址。如果这不可用,则None

版本3.3中的新功能:监听器对象现在支持上下文管理协议 - 请参阅Context Manager Types__enter__()返回侦听器对象,__exit__()调用close()

multiprocessing.connection.wait(object_list, timeout=None)

等待object_list中的对象准备就绪。返回object_list中已准备好的对象的列表。如果timeout是一个浮点数,那么调用会阻塞最多几秒钟。如果超时None,那么它将阻塞无限期。负超时相当于零超时。

对于Unix和Windows,如果对象可以出现在object_list

当有数据可以从中读取或另一端已关闭时,连接或套接字对象就绪。

Unix: wait(object_list, timeout) almost equivalent select.select(object_list, [], [], timeout). The difference is that, if select.select() is interrupted by a signal, it can raise OSError with an error number of EINTR, whereas wait() will not.

Windows: An item in object_list must either be an integer handle which is waitable (according to the definition used by the documentation of the Win32 function WaitForMultipleObjects()) or it can be an object with a fileno() method which returns a socket handle or pipe handle. (请注意,管道句柄和套接字句柄是而不是可等待句柄。)

版本3.3中的新功能。

示例

以下服务器代码创建一个侦听器,它使用'secret password'作为验证密钥。然后它等待连接并向客户端发送一些数据:

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

以下代码连接到服务器并从服务器接收一些数据:

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

以下代码使用wait()等待来自多个进程的消息:

import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # We close the writable end of the pipe now to be sure that
        # p is the only process which owns a handle for it.  This
        # ensures that when p closes its handle for the writable end,
        # wait() will promptly report the readable end as being ready.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

17.2.2.10.1. Address Formats

  • 'AF_INET'地址是(主机名, 端口)形式的元组,其中/ t5>是一个字符串,port是一个整数。

  • 'AF_UNIX'地址是表示文件系统上的文件名的字符串。

  • 'AF_PIPE'地址是一个形式的字符串

    r'\\。\ pipe \ PipeName '要使用Client()连接到名为ServerName的远程计算机上的命名管道,应使用r'\\ ServerName \ pipe \ PipeName '

请注意,任何以两个反斜杠开头的字符串默认为'AF_PIPE'地址,而不是'AF_UNIX'地址。

17.2.2.11. Authentication keys

当使用Connection.recv时,接收的数据将自动取消。不幸的是,从不受信任的源解压数据是一种安全风险。因此,ListenerClient()使用hmac模块提供摘要身份验证。

认证密钥是可以被认为是密码的字节字符串:一旦建立连接,两端将要求证明另一方知道认证密钥。(演示两端都使用相同的键,涉及通过连接发送密钥)。

如果请求认证但未指定认证密钥,则使用current_process().authkey的返回值(参见Process)。此值将由当前进程创建的任何Process对象自动继承。这意味着(默认地)多进程程序的所有进程将共享单个认证密钥,当在它们之间建立连接时可以使用该认证密钥。

也可以使用os.urandom()生成合适的验证密钥。

17.2.2.12. Logging

一些支持日志记录可用。但请注意,logging包不使用进程共享锁,因此可能(取决于处理程序类型)来自不同进程的消息混淆。

multiprocessing.get_logger()

返回multiprocessing使用的记录器。如果需要,将创建一个新的。

当第一次创建记录器具有logging.NOTSET级别且没有默认处理程序。发送到此记录器的消息将不会默认传播到根记录器。

请注意,在Windows子进程将只继承父进程的日志记录器的级别 - 记录器的任何其他定制将不会继承。

multiprocessing.log_to_stderr()

此函数执行对get_logger()的调用,但除了返回由get_logger创建的记录器之外,还添加了一个处理程序,该处理程序使用格式将输出发送到sys.stderr '[%(levelname)s /%(processName)s] %(message)s'

以下是启用日志记录的示例会话:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

有关日志记录级别的完整表,请参阅logging模块。

17.2.2.13. The multiprocessing.dummy module

multiprocessing.dummy复制multiprocessing的API,但只不过是threading模块的包装器。

17.2.3. Programming guidelines

使用multiprocessing时,应遵循一定的准则和惯用语。

17.2.3.1. All start methods

以下适用于所有启动方法。

避免共享状态

尽可能避免在进程之间移动大量数据。

最好坚持使用队列或管道进行进程之间的通信,而不是使用较低的级别同步原语。

可取性

确保代理方法的参数是可选的。

线程安全代理

Do not use a proxy object from more than one thread unless you protect it with a lock.

(There is never a problem with different processes using the same proxy.)

加入僵尸进程

On Unix when a process finishes but has not been joined it becomes a zombie. There should never be very many because each time a new process starts (or active_children() is called) all completed processes which have not yet been joined will be joined. Also calling a finished process’s Process.is_alive will join the process. Even so it is probably good practice to explicitly join all the processes that you start.

更好地继承比pickle / unpickle

When using the spawn or forkserver start methods many types from multiprocessing need to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which needs access to a shared resource created elsewhere can inherit it from an ancestor process.

避免终止进程

Using the Process.terminate method to stop a process is liable to cause any shared resources (such as locks, semaphores, pipes and queues) currently being used by the process to become broken or unavailable to other processes.

Therefore it is probably best to only consider using Process.terminate on processes which never use any shared resources.

加入使用队列的进程

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.

An example which will deadlock is the following:

from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get() 

A fix here would be to swap the last two lines (or simply remove the p.join() line).

将资源显式传递给子进程

On Unix using the fork start method, a child process can make use of a shared resource created in a parent process using a global resource. However, it is better to pass the object as an argument to the constructor for the child process.

Apart from making the code (potentially) compatible with Windows and the other start methods this also ensures that as long as the child process is still alive the object will not be garbage collected in the parent process. This might be important if some resource is freed when the object is garbage collected in the parent process.

So for instance

from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start() 

should be rewritten as

from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start() 

小心用“文件类对象”替换sys.stdin

multiprocessing originally unconditionally called:

os.close(sys.stdin.fileno()) 

in the multiprocessing.Process._bootstrap() method — this resulted in issues with processes-in-processes. This has been changed to:

sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False) 

Which solves the fundamental issue of processes colliding with each other resulting in a bad file descriptor error, but introduces a potential danger to applications which replace sys.stdin() with a “file-like object” with output buffering. This danger is that if multiple processes call close() on this file-like object, it could result in the same data being flushed to the object multiple times, resulting in corruption.

If you write a file-like object and implement your own caching, you can make it fork-safe by storing the pid whenever you append to the cache, and discarding the cache when the pid changes. For example:

@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cache 

For more information, see issue 5155, issue 5313 and issue 5331

17.2.3.2. The spawn and forkserver start methods

有一些额外的限制不适用于fork start方法。

更多picklability

Ensure that all arguments to Process.__init__() are picklable. Also, if you subclass Process then make sure that instances will be picklable when the Process.start method is called.

全局变量

Bear in mind that if code run in a child process tries to access a global variable, then the value it sees (if any) may not be the same as the value in the parent process at the time that Process.start was called.

However, global variables which are just module level constants cause no problems.

安全导入主模块

Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process).

For example, using the spawn or forkserver start method running the following module would fail with a RuntimeError:

from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start() 

Instead one should protect the “entry point” of the program by using if __name__ == '__main__': as follows:

from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start() 

(The freeze_support() line can be omitted if the program will be run normally instead of frozen.)

This allows the newly spawned Python interpreter to safely import the module and then run the module’s foo() function.

Similar restrictions apply if a pool or manager is created in the main module.

17.2.4. Examples

演示如何创建和使用自定义管理器和代理:

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

使用Pool

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

显示如何使用队列将任务提供给工作进程的容器并收集结果的示例:

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()