17.2. multiprocessing
—— 基于进程的并行¶
17.2.1. 引言¶
multiprocessing
是一个和threading
模块类似,提供API,生成进程的模块。multiprocessing
包提供本地和远程并发,通过使用子进程而不是线程有效地转移全局解释器锁。因此,multiprocessing
模块允许程序员充分利用给定机器上的多个处理器。它在Unix和Windows上都可以运行。
multiprocessing
模块还引入了threading
模块中没有的API。比如Pool
对象,它能很方便地对多个函数或分布式数据并行执行和运算。以下示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个使用Pool
的数据并行性的基本示例,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
将打印到标准输出
[1, 4, 9]
17.2.1.1. Process
类¶
在multiprocessing
中,通过创建Process
对象,然后调用其start()
方法来生成进程。Process
遵循threading.Thread
的API。多进程程序的一个简单例子是
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
要显示涉及的单个进程ID,下面是一个扩展示例:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
关于为什么if __name__ == '__main__'
是必要的,请参阅编程指南。
17.2.1.2. 上下文和启动方法¶
根据平台,multiprocessing
支持三种方式来启动进程。这些启动方法是
- spawn
The parent process starts a fresh python interpreter process. The child process will only inherit those resources necessary to run the process objects
run()
method. In particular, unnecessary file descriptors and handles from the parent process will not be inherited. Starting a process using this method is rather slow compared to using fork or forkserver.Available on Unix and Windows. The default on Windows.
- fork
The parent process uses
os.fork()
to fork the Python interpreter. The child process, when it begins, is effectively identical to the parent process. All resources of the parent are inherited by the child process. Note that safely forking a multithreaded process is problematic.Available on Unix only. The default on Unix.
- forkserver
When the program starts and selects the forkserver start method, a server process is started. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded so it is safe for it to use
os.fork()
. No unnecessary resources are inherited.Available on Unix platforms which support passing file descriptors over Unix pipes.
版本3.4更新:所有Unix平台上添加spawn,以及为某些Unix平台添加forkserver。在Windows上,子进程不再继承父进程所有可继承句柄。
在Unix上使用spawn或forkserver启动方法还将启动一个信号量跟踪器进程,该进程跟踪由程序的进程创建的未链接的命名信号量。当所有进程退出后,信号量跟踪器取消任何剩余信号量的链接。通常应该没有,但如果一个过程被一个信号杀死,可能有一些“泄漏”的信号量。(取消命名信号量的链接是一件重要的事情,因为系统只允许有限的数量,并且它们将不会自动取消链接,直到下次重新启动。)
要选择启动的方法,请在main模块的if __name__ == '__main__'
子句中使用set_start_method()
。例如:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
在程序中set_start_method()
应最多只使用一次。
或者,你可以使用get_context()
来获取context对象。Context对象具有与multiprocessing模块相同的API,并允许在同一程序中使用多个启动方法。
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
注意,与context相关的对象可能与用于不同context的进程不兼容。特别地,使用fork context创建的锁不能传递到使用spawn或forkserver启动方法启动的进程。
想要使用特定启动方法的库应该使用get_context()
,以避免干扰库用户的选择。
17.2.1.3. 在进程之间交换对象¶
multiprocessing
支持进程之间的两种类型的通信通道:
队列
The
Queue
class is a near clone ofqueue.Queue
. For example:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()Queues are thread and process safe.
管道
The
Pipe()
function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()The two connection objects returned by
Pipe()
represent the two ends of the pipe. Each connection object hassend()
andrecv()
methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
17.2.1.4. 进程之间的同步¶
multiprocessing
包含了全部和threading
相同的同步原语 。对于一个实例可以使用锁来保证同一时间只有一个进程在使用标准输出。
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
没有使用锁从不同进程的输出容易得到所有混合。
17.2.1.5. 进程之间的状态共享¶
如上所述,当进行并发编程时,最好尽量避免使用共享状态。在使用多个进程时尤其如此。
但是,如果真的需要使用一些共享数据,则multiprocessing
提供了几种方法。
共享内存
Data can be stored in a shared memory map using
Value
orArray
. For example, the following codefrom multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])will print
3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]The
'd'
and'i'
arguments used when creatingnum
andarr
are typecodes of the kind used by thearray
module:'d'
indicates a double precision float and'i'
indicates a signed integer. These shared objects will be process and thread-safe.For more flexibility in using shared memory one can use the
multiprocessing.sharedctypes
module which supports the creation of arbitrary ctypes objects allocated from shared memory.
服务器进程
A manager object returned by
Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.A manager returned by
Manager()
will support typeslist
,dict
,Namespace
,Lock
,RLock
,Semaphore
,BoundedSemaphore
,Condition
,Event
,Barrier
,Queue
,Value
andArray
. For example,from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)will print
{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]Server process managers are more flexible than using shared memory objects because they can be made to support arbitrary object types. Also, a single manager can be shared by processes on different computers over a network. They are, however, slower than using shared memory.
17.2.1.6. 使用工作进程的进程池¶
Pool
类表示工作进程的进程池。它具有允许将任务以几种不同的方式分配到工作进程的方法。
例如:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
注意,进程池的方法只应该被创建它的进程使用。
注意
此包中的功能要求__main__
模块可由子进程导入。这在编程指南中有所说明,但值得在这里指出。这意味着一些示例,如multiprocessing.pool.Pool
示例在交互式解释器中不能工作。例如:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(如果你尝试这样,它实际上会输出以半随机方式交错的三个完整的traceback,然后你可能不得不停止主进程。)
17.2.2. Reference¶
multiprocessing
包大多复制threading
模块的API。
17.2.2.1. Process
and exceptions¶
- class
multiprocessing.
Process
(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶ 进程对象表示在单独进程中运行的活动。
Process
类具有threading.Thread
的所有方法的等价物。应始终使用关键字参数调用构造函数。组应始终为
None
;它仅仅与threading.Thread
兼容。target是要由run()
方法调用的可调用对象。它默认为None
,表示不调用任何内容。name是进程名称(有关详细信息,请参阅name
)。args是目标调用的参数元组。kwargs是目标调用的关键字参数的字典。如果提供,则仅限关键字守护程序参数将进程daemon
标记设置为True
或False
。如果None
(默认值),则此标志将从创建过程继承。默认情况下,没有参数传递给target。
如果一个子类覆盖了构造函数,它必须确保在对进程做任何其他事情之前调用基类构造函数(
Process.__init__()
)。在版本3.3中已更改:添加了守护程序参数。
-
join
([timeout])¶ 如果可选参数timeout为
None
(默认值),则该方法将阻塞,直到调用join()
方法的进程终止。如果超时是正数,则它最多阻止超时秒。一个过程可以连接多次。
进程不能自己加入,因为这将导致死锁。尝试在进程启动之前加入进程是一个错误。
-
name
¶ 进程的名称。名称是仅用于识别目的的字符串。它没有语义。可以给多个进程指定相同的名称。
初始名称由构造函数设置。如果没有为构造函数提供显式名称,则形式为'Process-N 1的名称:N 2:...:N k t2 >',其中每个N k是其父节点的第N个子节点。
-
daemon
¶ 进程的守护进程标志,一个布尔值。这必须在调用
start()
之前设置。初始值从创建过程继承。
当进程退出时,它会尝试终止所有的daemonic子进程。
注意,不允许daemonic进程创建子进程。否则,一个守护进程会使其子进程成为孤立的,如果它的父进程退出时终止。此外,这些是而不是 Unix守护程序或服务,它们是正常的进程,如果非守护进程已退出,它们将被终止(而不是加入)。
除了
threading.Thread
API,Process
对象还支持以下属性和方法:-
pid
¶ 返回进程ID。在生成进程之前,这将是
None
。
-
exitcode
¶ 孩子的退出代码。如果进程尚未终止,这将是
None
。负值-N表示孩子被信号N终止。
-
authkey
¶ 进程的认证密钥(字节字符串)。
当初始化
multiprocessing
时,使用os.urandom()
为主进程分配一个随机字符串。
-
sentinel
¶ 系统对象的数字句柄,在进程结束时将变为“就绪”。
如果要使用
multiprocessing.connection.wait()
一次等待多个事件,可以使用此值。否则调用join()
更简单。在Windows上,这是可与
WaitForSingleObject
和WaitForMultipleObjects
API调用系列一起使用的操作系统句柄。在Unix上,这是一个文件描述器,可以使用来自select
模块的原语。版本3.3中的新功能。
-
terminate
()¶ 终止进程。在Unix上,这是使用
SIGTERM
信号完成的;在Windows上使用TerminateProcess()
。注意,退出处理程序和finally子句等不会被执行。注意,进程的子进程将不会终止 - 它们将成为孤立的。
警告
如果在相关联的进程正在使用管道或队列时使用此方法,则管道或队列容易被损坏,并且可能变得不能由其他进程使用。类似地,如果进程已经获得锁或信号量等然后终止它可能导致其他进程死锁。
Note that the
start()
,join()
,is_alive()
,terminate()
andexitcode
methods should only be called by the process that created the process object.Process
的一些方法的示例用法:>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process(Process-1, initial)> False >>> p.start() >>> print(p, p.is_alive()) <Process(Process-1, started)> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process(Process-1, stopped[SIGTERM])> False >>> p.exitcode == -signal.SIGTERM True
-
- exception
multiprocessing.
ProcessError
¶ 所有
multiprocessing
异常的基类。
- exception
multiprocessing.
BufferTooShort
¶ 当提供的缓冲区对象对于消息读取而言太小时,由
Connection.recv_bytes_into()
引发的异常。如果
e
是BufferTooShort
的实例,则e.args[0]
会将消息作为字节字符串。
- exception
multiprocessing.
AuthenticationError
¶ 在出现身份验证错误时触发。
- exception
multiprocessing.
TimeoutError
¶ 在超时超时时由方法引发。
17.2.2.2. Pipes and Queues¶
当使用多个进程时,通常使用消息传递来用于进程之间的通信,并避免必须使用任何同步原语,如锁。
对于传递消息,可以使用Pipe()
(用于两个进程之间的连接)或队列(允许多个生产者和消费者)。
Queue
,SimpleQueue
和JoinableQueue
类型是在queue.Queue
They differ in that Queue
lacks the task_done()
and join()
methods introduced into Python 2.5’s queue.Queue
class.
如果您使用JoinableQueue
,那么您必须为每一个从队列中移除的任务去调用JoinableQueue.task_done()
。否则的话,那些用于记录未完成的任务数量的信号量或许最终会溢出,从而引发异常。
请注意,还可以使用管理器对象创建共享队列 - 请参阅Managers。
注意
multiprocessing
使用通常的queue.Empty
和queue.Full
例外来表示超时。它们在multiprocessing
命名空间中不可用,因此您需要从queue
中导入它们。
注意
当一个对象放在一个队列上时,该对象被腌制,后台线程随后将经过腌制的数据清洗到底层管道。这有一些后果是有点令人惊讶,但不应该导致任何实际困难 - 如果他们真的打扰你,那么你可以改为使用由manager创建的队列。
- 将对象放在空队列上之后,在队列
empty()
方法返回False
和get_nowait()
之前可能会返回一个无限小的延迟而不提高queue.Empty
。 - 如果多个进程将对象排队,则可能在另一端无序地接收对象。然而,由相同进程排队的对象将始终以相对于彼此的期望顺序。
警告
If a process is killed using Process.terminate()
or os.kill()
while it is trying to use a Queue
, then the data in the queue is likely to become corrupted. 这可能导致任何其他进程在尝试稍后使用队列时获得异常。
警告
如上所述,如果子进程已将项目放在队列上(并且它没有使用JoinableQueue.cancel_join_thread
),那么该进程将不会终止,直到所有缓冲的项目都已刷新到管道。
这意味着,如果你尝试加入这个过程,你可能会得到一个死锁,除非你确定已经被放在队列上的所有项目已经消耗。类似地,如果子进程是非守护进程,则当进程尝试加入其所有非守护进程的子进程时,父进程可能在退出时挂起。
请注意,使用管理器创建的队列没有此问题。请参阅Programming guidelines。
有关进程间通信的队列使用示例,请参见Examples。
-
multiprocessing.
Pipe
([duplex])¶ 返回表示管道末端的
Connection
对象的(conn1, conn2)
如果duplex是
True
(默认值),则管道是双向的。如果duplex是False
,则管道是单向的:conn1
只能用于接收消息,conn2
用于发送消息。
- class
multiprocessing.
Queue
([maxsize])¶ 返回进程共享的队列,底层使用管道和锁来实现。当进程首先将一个项目放在队列上时,启动了一个将对象从缓冲区传送到管道中的馈线线程。
来自标准库的
queue
模块的通常的queue.Empty
和queue.Full
异常被引发以指示超时。Queue
实现除task_done()
和join()
之外的queue.Queue
的所有方法。-
qsize
()¶ 返回队列的大致大小。由于多线程/多进程语义,这个数字是不可靠的。
请注意,这可能在Unix平台(如Mac OS X)上引发
NotImplementedError
,其中sem_getvalue()
未实现。
-
empty
()¶ 如果队列为空,返回
True
,否则返回False
。由于多线程/多进程语义,这是不可靠的。
-
full
()¶ 如果队列已满,则返回
True
,否则返回False
。由于多线程/多进程语义,这是不可靠的。
-
put
(obj[, block[, timeout]])¶ 将obj放入队列。如果可选参数块为
True
(默认值)和超时为None
直到空闲时隙可用为止。如果超时是一个正数,它最多阻塞超时秒,如果在那段时间内没有空闲时隙,则引发queue.Full
。否则(块是False
),如果空闲时隙立即可用,则将一个项目放在队列上,否则引发queue.Full
异常在这种情况下忽略超时)。
-
put_nowait
(obj)¶ 等同于
put(obj, False)
。
-
get
([block[, timeout]])¶ 从队列中删除并返回项目。如果可选的args 块为
True
(默认值)和超时为None
直到项目可用。如果timeout是正数,则它最多阻塞超时秒,如果在该时间内没有可用的项目,则引发queue.Empty
异常。Otherwise (block isFalse
), return an item if one is immediately available, else raise thequeue.Empty
exception (timeout is ignored in that case).
-
get_nowait
()¶ 等同于
get(False)
。
multiprocessing.Queue
有一些在queue.Queue
中找不到的其他方法。这些方法通常不需要大多数代码:-
close
()¶ 指示当前进程不会在此队列上放置更多数据。The background thread will quit once it has flushed all buffered data to the pipe.当队列被垃圾回收时,这被自动调用。
-
join_thread
()¶ 加入后台线程。这只能在调用
close()
之后使用。它阻塞直到后台线程退出,确保缓冲区中的所有数据都已刷新到管道。默认情况下,如果进程不是队列的创建者,那么在退出时它将尝试加入队列的后台线程。该进程可以调用
cancel_join_thread()
使join_thread()
不执行任何操作。
-
cancel_join_thread
()¶ 阻止
join_thread()
阻止。特别地,这防止后台线程在进程退出时自动连接 - 参见join_thread()
。此方法的更好名称可能是
allow_exit_without_flush()
。它可能导致入队数据丢失,你几乎肯定不会需要使用它。它只是在那里,如果你需要当前的进程立即退出,而不等待刷新排队的数据到底层的管道,你不在乎丢失的数据。
注意
此类的功能需要在主机操作系统上运行共享信号量实现。没有一个,此类中的功能将被禁用,并尝试实例化
Queue
将导致ImportError
。有关其他信息,请参见问题3770。这同样适用于下面列出的任何专门的队列类型。-
- class
multiprocessing.
SimpleQueue
¶ -
empty
()¶ 如果队列为空,返回
True
,否则返回False
。
-
get
()¶ 从队列中删除并返回项目。
-
put
(item)¶ 将项放入队列。
-
- class
multiprocessing.
JoinableQueue
([maxsize])¶ JoinableQueue
,Queue
子类是另外具有task_done()
和join()
方法的队列。-
task_done
()¶ 指示以前入队的任务已完成。由队列使用者使用。对于用于获取任务的每个
get()
,对task_done()
的后续调用会告诉队列任务上的处理已完成。If a
join()
is currently blocking, it will resume when all items have been processed (meaning that atask_done()
call was received for every item that had beenput()
into the queue).引发a
ValueError
如果调用的次数比在队列中放置的项目多。
-
join
()¶ 阻止,直到队列中的所有项目都已被获取和处理。
每当项目添加到队列时,未完成任务的计数就会增加。当消费者调用
task_done()
以指示该项目已检索并且其上的所有工作都已完成时,计数将减少。当未完成任务的计数下降为零时,join()
解除阻塞。
-
17.2.2.3. Miscellaneous¶
-
multiprocessing.
active_children
()¶ 返回当前进程的所有活的孩子的列表。
调用这会产生“加入”已经完成的任何进程的副作用。
-
multiprocessing.
cpu_count
()¶ 返回系统中的CPU数。May引发
NotImplementedError
。也可以看看
-
multiprocessing.
freeze_support
()¶ 当使用
multiprocessing
的程序已冻结以产生Windows可执行文件时,添加支持。(已使用py2exe,PyInstaller和cx_Freeze进行测试。)需要在
之后直接调用此函数 __ name __ == '__ main __' t0 >主模块的线。
例如:from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
如果省略
freeze_support()
行,则尝试运行冻结的可执行文件将引发RuntimeError
。调用
freeze_support()
在Windows以外的任何操作系统上调用时没有效果。此外,如果模块在Windows上由Python解释器正常运行(程序尚未冻结),则freeze_support()
没有效果。
-
multiprocessing.
get_all_start_methods
()¶ 返回支持的开始方法的列表,其中第一个是默认值。可能的启动方法是
'fork'
,'spawn'
和'forkserver'
。在Windows上只有'spawn'
可用。在Unix'fork'
和'spawn'
始终受支持,'fork'
版本3.4中的新功能。
-
multiprocessing.
get_context
(method=None)¶ 返回与
multiprocessing
模块具有相同属性的上下文对象。如果方法为无,则返回默认上下文。否则方法应为
'fork'
,'spawn'
,'forkserver'
如果指定的start方法不可用,则会引发ValueError
。版本3.4中的新功能。
-
multiprocessing.
get_start_method
(allow_none=False)¶ 返回用于启动进程的start方法的名称。
如果start方法未修复,并且allow_none为false,则start方法将固定为默认值,并返回名称。如果start方法未修复,并且allow_none为true,则返回无。
返回值可以是
'fork'
,'spawn'
,'forkserver'
或无。'fork'
是Unix上的默认值,而'spawn'
是Windows上的默认值。版本3.4中的新功能。
-
multiprocessing.
set_executable
()¶ 设置在启动子进程时要使用的Python解释器的路径。(默认使用
sys.executable
)。Embedders可能需要做一些事情set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
才能创建子进程。
在版本3.4中更改:现在在使用
'spawn'
start方法时支持Unix。
-
multiprocessing.
set_start_method
(method)¶ 设置应用于启动子进程的方法。方法可以是
'fork'
,'spawn'
或'forkserver'
。请注意,这应该最多调用一次,并且应该在
内保护if __ name __ == '__ main__ '
子句。版本3.4中的新功能。
17.2.2.4. Connection Objects¶
连接对象允许发送和接收可拾取对象或字符串。它们可以被认为是面向消息的连接套接字。
通常使用Pipe()
创建连接对象 - 另请参阅Listeners and Clients。
- class
multiprocessing.
Connection
¶ -
send
(obj)¶ 将对象发送到连接的另一端,应使用
recv()
读取该对象。对象必须可拾取。非常大的pickles(大约32 MB +,虽然它取决于操作系统)可能引发一个
ValueError
异常。
-
fileno
()¶ 返回文件描述器或连接使用的句柄。
-
close
()¶ 关闭连接。
当连接被垃圾回收时,这被自动调用。
-
poll
([timeout])¶ 返回是否有任何数据可供读取。
如果未指定timeout,则它将立即返回。如果timeout是一个数字,那么它指定阻止的最大时间(以秒为单位)。如果超时为
None
,则使用无限超时。请注意,可以使用
multiprocessing.connection.wait()
一次轮询多个连接对象。
-
send_bytes
(buffer[, offset[, size]])¶ 从bytes-like object发送字节数据作为完整的消息。
如果给定offset,则从缓冲器中的该位置读取数据。如果给定size,那么将从缓冲区读取许多字节。非常大的缓冲区(大约32 MB +,尽管它取决于操作系统)可能引发
ValueError
异常
-
recv_bytes
([maxlength])¶ 返回从连接的另一端发送的字节数据的完整消息作为字符串。阻塞直到有东西要接收。引发
EOFError
如果没有任何内容要接收,另一端已关闭。如果指定maxlength且消息长于maxlength,则会引发
OSError
,连接将不再可读。
-
recv_bytes_into
(buffer[, offset])¶ 将从连接另一端发送的字节数据的完整消息读入缓冲区,并返回消息中的字节数。阻塞直到有东西要接收。引发
EOFError
如果没有剩下的东西要接收,另一端被关闭。缓冲区必须是可写入的bytes-like object。如果给定offset,则消息将从该位置写入缓冲区。偏移量必须是小于缓冲区长度的非负整数(以字节为单位)。
如果缓冲区太短,则会引发
BufferTooShort
异常,并且完整消息可用为e.args[0]
,其中e
exception实例。
在版本3.3中已更改:现在可以使用
Connection.send()
和Connection.recv()
在进程之间传输连接对象本身。版本3.3中的新功能:连接对象现在支持上下文管理协议 - 请参阅Context Manager Types。
__enter__()
返回连接对象,__exit__()
调用close()
。-
例如:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
警告
Connection.recv()
方法会自动取消接收的数据,这可能是一种安全风险,除非您可以信任发送消息的进程。
因此,除非使用Pipe()
生成连接对象,因此在执行某种排序后,应该只使用recv()
和send()
请参阅Authentication keys。
警告
如果进程在尝试读取或写入管道时被杀死,则管道中的数据可能会被破坏,因为可能无法确定消息边界位于何处。
17.2.2.5. Synchronization primitives¶
通常,在多进程程序中同步原语不像在多线程程序中那样是必要的。请参阅threading
模块的文档。
请注意,还可以使用管理器对象创建同步原语 - 请参阅Managers。
- class
multiprocessing.
Barrier
(parties[, action[, timeout]])¶ 屏障对象:
threading.Barrier
的克隆。版本3.3中的新功能。
- class
multiprocessing.
BoundedSemaphore
([value])¶ 有界信号对象:
threading.BoundedSemaphore
的紧密模拟。与其近似模拟的单独差异存在:其
acquire
方法的第一个参数被命名为块,这与Lock.acquire()
一致。注意
在Mac OS X上,这与
Semaphore
无法区分,因为sem_getvalue()
未在该平台上实现。
- class
multiprocessing.
Condition
([lock])¶ 条件变量:
threading.Condition
的别名。如果指定lock,那么它应该是来自
multiprocessing
的Lock
或RLock
对象。在版本3.3中已更改:添加了
wait_for()
方法。
- class
multiprocessing.
Event
¶
- class
multiprocessing.
Lock
¶ 非递归锁定对象:
threading.Lock
的紧密模拟。一旦进程或线程获得了锁,随后从任何进程或线程获取它的尝试将阻塞,直到它被释放;任何进程或线程都可以释放它。threading.Lock
应用于线程的概念和行为在multiprocessing.Lock
中复制,因为它适用于进程或线程,除非另有说明。请注意,
Lock
实际上是一个工厂函数,它返回使用默认上下文初始化的multiprocessing.synchronize.Lock
的实例。Lock
支持context manager协议,因此可用于with
语句。-
acquire
(block=True, timeout=None)¶ 获取锁定,阻止或非阻止。
将块参数设置为
True
(默认值),方法调用将阻塞,直到锁处于未锁定状态,然后将其设置为锁定并返回True
。请注意,此第一个参数的名称不同于threading.Lock.acquire()
中的名称。将块参数设置为
False
,方法调用不会阻止。如果锁定当前处于锁定状态,则返回False
;否则将锁设置为锁定状态,并返回True
。当对超时使用正值浮点值调用时,只要不能获取锁定,最多只能阻止超时指定的秒数。对于超时的负值的调用相当于零的超时。使用超时值
None
的调用(默认值)将超时时间设置为无限。请注意,超时的负值或None
值与threading.Lock.acquire()
中实现的行为不同。如果块参数设置为False
,则timeout参数没有实际意义,因此被忽略。如果已获取锁定,则返回True
,如果已超过超时时间,则返回False
。
-
release
()¶ 释放锁。这可以从任何进程或线程调用,而不仅仅是最初获取锁的进程或线程。
行为与
threading.Lock.release()
中的行为相同,只是当在未锁定的锁上调用时,会引发ValueError
。
-
- class
multiprocessing.
RLock
¶ 递归锁定对象:
threading.RLock
的紧密模拟。递归锁必须由获取它的进程或线程释放。一旦进程或线程已经获得递归锁,相同的进程或线程可以再次获取它而不阻塞;该进程或线程必须每次释放它一次它已被获取。注意,
RLock
实际上是一个工厂函数,它返回使用默认上下文初始化的multiprocessing.synchronize.RLock
的实例。RLock
支持context manager协议,因此可以在with
语句。-
acquire
(block=True, timeout=None)¶ 获取锁定,阻止或非阻止。
使用块参数设置为
True
调用时,阻塞直到锁处于未锁定状态(不由任何进程或线程拥有),除非锁已由当前进程或线程。当前进程或线程然后获取锁的所有权(如果它还没有所有权),并且锁中的递归级别增加1,导致返回值True
。请注意,与执行threading.RLock.acquire()
相比,此第一个参数的行为有几个不同,从参数本身的名称开始。使用块参数设置为
False
调用时,不要阻止。如果锁已经被另一进程或线程获取(并因此被拥有),则当前进程或线程不取得所有权,并且锁中的递归级别不改变,导致返回值False
如果锁处于未锁定状态,则当前进程或线程获取所有权,递归级别递增,从而返回值True
。timeout参数的使用和行为与
Lock.acquire()
中的相同。注意,超时的一些行为与threading.RLock.acquire()
中实现的行为不同。
-
release
()¶ 释放锁,递减递归级别。如果在递减之后递归级别为零,将锁重置为解锁(不由任何进程或线程拥有),并且如果任何其他进程或线程被阻塞等待锁被解锁,则只允许其中一个进程继续。如果递减之后递归级别仍然为非零,则锁保持锁定并由调用进程或线程拥有。
只有在调用进程或线程拥有锁时才调用此方法。如果此方法由除所有者之外的进程或线程调用或锁定处于未锁定(无主)状态,则会引发
AssertionError
。请注意,在这种情况下引发的异常类型与threading.RLock.release()
中实现的行为不同。
-
- class
multiprocessing.
Semaphore
([value])¶ 信号量对象:
threading.Semaphore
的紧密模拟。与其近似模拟的单独差异存在:其
acquire
方法的第一个参数被命名为块,这与Lock.acquire()
一致。
注意
在Mac OS X上,sem_timedwait
不受支持,因此使用超时调用acquire()
将使用睡眠循环来模拟该函数的行为。
注意
If the SIGINT signal generated by Ctrl-C
arrives while the main thread is blocked by a call to BoundedSemaphore.acquire()
, Lock.acquire()
, RLock.acquire()
, Semaphore.acquire()
, Condition.acquire()
or Condition.wait()
then the call will be immediately interrupted and KeyboardInterrupt
will be raised.
这与threading
的行为不同,在等待阻塞调用正在进行时,SIGINT将被忽略。
注意
此程序包的某些功能需要在主机操作系统上运行共享信号量实现。没有一个,multiprocessing.synchronize
模块将被禁用,并尝试导入它将导致ImportError
。有关其他信息,请参见问题3770。
17.2.2.7. Managers¶
管理器提供一种创建可在不同进程之间共享的数据的方法,包括在不同机器上运行的进程之间通过网络共享。管理员对象控制管理共享对象的服务器进程。其他进程可以通过使用代理访问共享对象。
返回开始的
SyncManager
对象,可用于在进程之间共享对象。返回的管理器对象对应于生成的子进程,并且具有将创建共享对象并返回相应代理的方法。
管理器进程将在垃圾收集或其父进程退出时立即关闭。管理器类在multiprocessing.managers
模块中定义:
- class
multiprocessing.managers.
BaseManager
([address[, authkey]])¶ 创建一个BaseManager对象。
一旦创建,应调用
start()
或get_server().serve_forever()
,以确保管理器对象引用启动的管理器进程。address是管理器进程侦听新连接的地址。如果address为
None
,则选择任意一个。authkey是将用于检查到服务器进程的传入连接的有效性的认证密钥。如果authkey是
None
,则使用current_process().authkey
。否则使用authkey,它必须是字节字符串。-
start
([initializer[, initargs]])¶ 启动子过程以启动管理器。如果初始化器不是
None
,则子过程在启动时将调用initializer(*initargs)
。
-
get_server
()¶ 返回
Server
对象,它表示在Manager控制下的实际服务器。Server
对象支持serve_forever()
方法:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
还具有address
属性。
-
connect
()¶ 本地管理器对象连接到远程管理器进程:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 5000), authkey=b'abc') >>> m.connect()
-
register
(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶ 可以用于向管理器类注册类型或可调用的类方法。
typeid是用于标识特定类型的共享对象的“类型标识符”。这必须是字符串。
callable是用于为此类型标识符创建对象的可调用项。如果一个manager实例将使用
connect()
方法连接到服务器,或者create_method参数是False
作为None
。proxytype是
BaseProxy
的子类,用于使用此typeid创建共享对象的代理。如果None
,则会自动创建代理类。exposed用于指定应允许使用
BaseProxy._callmethod()
访问此类型代理的方法名称序列。(如果曝光是None
,则使用proxytype._exposed _代替,如果存在,则使用proxytype._exposed_
在未指定公开列表的情况下,将可访问共享对象的所有“公共方法”。(这里的“公共方法”是指具有__call__()
方法并且其名称不以'_'
开头的任何属性。method_to_typeid是用于指定应返回代理的那些公开方法的返回类型的映射。它将方法名映射到typeid字符串。(如果method_to_typeid是
None
,那么将使用proxytype._method_to_typeid_
,如果存在)。如果方法的名称不是此映射的键,或者映射None
,则方法返回的对象将按值复制。create_method确定是否应使用名称typeid创建方法,该方法可用于告知服务器进程创建新的共享对象并为其返回代理。默认情况下为
True
。
BaseManager
实例也有一个只读属性:-
address
¶ 管理器使用的地址。
在版本3.3中更改: Manager对象支持上下文管理协议 - 请参见Context Manager Types。
__enter__()
启动服务器进程(如果尚未启动),然后返回manager对象。__exit__()
调用shutdown()
。在以前的版本中,如果尚未启动管理器的服务器进程,则
__enter__()
未启动。-
- class
multiprocessing.managers.
SyncManager
¶ BaseManager
的子类,可用于进程的同步。此类型的对象由multiprocessing.Manager()
返回。它还支持创建共享列表和字典。
-
Barrier
(parties[, action[, timeout]])¶ 创建共享
threading.Barrier
对象,并为其返回代理。版本3.3中的新功能。
-
BoundedSemaphore
([value])¶ 创建共享
threading.BoundedSemaphore
对象,并为其返回代理。
-
Condition
([lock])¶ 创建共享
threading.Condition
对象并为其返回代理。如果提供lock,那么它应该是
threading.Lock
或threading.RLock
对象的代理。在版本3.3中已更改:添加了
wait_for()
方法。
-
Event
()¶ 创建共享
threading.Event
对象并为其返回代理。
-
Lock
()¶ 创建共享
threading.Lock
对象,并为其返回代理。
-
Queue
([maxsize])¶ 创建共享
queue.Queue
对象并返回其代理。
-
RLock
()¶ 创建共享
threading.RLock
对象,并为其返回代理。
-
Semaphore
([value])¶ 创建共享
threading.Semaphore
对象,并为其返回代理。
-
Array
(typecode, sequence)¶ 创建一个数组并返回一个代理。
-
Value
(typecode, value)¶ 创建具有可写入
value
属性的对象,并为其返回代理。
-
dict
()¶ dict
( 映射 )dict
( 序列 )创建共享
dict
对象,并为其返回代理。
-
list
()¶ 列表
( 序列 )创建共享
list
对象,并为其返回代理。
注意
对dict和列表代理中的可变值或项的修改不会通过管理器传播,因为代理无法知道其值或项目何时被修改。要修改此类项,您可以将修改的对象重新分配给容器代理:
# create a list proxy and append a mutable object (a dictionary) lproxy = manager.list() lproxy.append({}) # now mutate the dictionary d = lproxy[0] d['a'] = 1 d['b'] = 2 # at this point, the changes to d are not yet synced, but by # reassigning the dictionary, the proxy is notified of the change lproxy[0] = d
-
- class
multiprocessing.managers.
Namespace
¶ 可以使用
SyncManager
注册的类型。命名空间对象没有公共方法,但有可写属性。其表示显示其属性的值。
但是,当为名称空间对象使用代理时,以
'_'
开头的属性将是代理的属性,而不是引用对象的属性:>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
17.2.2.7.1. Customized managers¶
要创建自己的管理器,需要创建BaseManager
的子类,并使用register()
classmethod向管理器类注册新类型或可调用项。例如:
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
17.2.2.7.2. Using a remote manager¶
可以在一台计算机上运行管理器服务器,并让客户端从其他计算机使用它(假设涉及的防火墙允许它)。
运行以下命令为远程客户端可以访问的单个共享队列创建服务器:
>>> from multiprocessing.managers import BaseManager
>>> import queue
>>> queue = queue.Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
一个客户端可以访问服务器,如下所示:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
另一个客户端也可以使用它:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
本地进程也可以访问该队列,使用上面的代码在客户端上远程访问它:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super(Worker, self).__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
17.2.2.8. Proxy Objects¶
代理是指向到一个共享对象的对象,该对象在不同的进程中存在(可能)。共享对象被称为代理的指示符。多个代理对象可以具有相同的指示。
代理对象具有调用其指示对象的相应方法的方法(尽管不是指示对象的每个方法都必须通过代理可用)。代理通常可以以其指示的大多数相同的方式使用:
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
注意,将str()
应用于代理将返回指示对象的表示,而应用repr()
将返回代理的表示。
代理对象的一个重要特征是它们是可拾取的,因此它们可以在进程之间传递。但是,请注意,如果代理被发送到相应的管理器的进程,则取消它将产生指示符本身。这意味着,例如,一个共享对象可以包含第二个:
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[[]] []
>>> b.append('hello')
>>> print(a, b)
[['hello']] ['hello']
注意
multiprocessing
中的代理类型不支持按值进行比较。所以,对于实例,我们有:
>>> manager.list([1,2,3]) == [1,2,3]
False
在进行比较时,应该只使用指示物的副本。
- class
multiprocessing.managers.
BaseProxy
¶ 代理对象是
BaseProxy
的子类的实例。-
_callmethod
(methodname[, args[, kwds]])¶ 调用并返回代理引用对象的方法的结果。
如果
proxy
是所指对象为obj
的代理,则表达式proxy._callmethod(methodname, args, kwds)
将计算表达式
getattr(obj, methodname)(*args, **kwds)
在经理的过程中。
返回的值将是调用结果的副本或新的共享对象的代理 - 请参阅
BaseManager.register()
的method_to_typeid参数的文档。如果调用引发异常,则通过
_callmethod()
重新生成异常。如果在管理器的进程中引发了一些其他异常,那么这将被转换为RemoteError
异常,并由_callmethod()
引发。特别要注意的是,如果methodname没有暴露,则会引发异常。
_callmethod()
的用法示例:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
-
_getvalue
()¶ 返回指示对象的副本。
如果引用是不可拆分的,那么这将引发异常。
-
__repr__
()¶ 返回代理对象的表示。
-
__str__
()¶ 返回指示对象的表示。
-
17.2.2.9. 进程池¶
可以用Pool
类创建一个进程池来执行提交给它的任务。
- class
multiprocessing.pool.
Pool
([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶ 一个进程池对象,控制一个包含工作进程的进程池,作业可以提交给它。它支持具有超时和回调的异步结果,并具有并行映射实现。
processes是要使用的工作进程数。如果processes为
None
,则使用os.cpu_count()
返回的数字。如果initializer不是
None
,则每个工作进程在启动时将调用initializer(*initargs)
。maxtasksperchild是工作进程在退出并由新工作进程替换之前可以完成的任务数,以使未使用的资源可以释放。默认的maxtasksperchild是None,这意味着工作进程的生存时间将与进程池一样长。
context可用于指定用于启动工作进程的上下文。通常使用
multiprocessing.Pool()
或上下文对象的Pool()
方法创建进程池。在这两种情况下,都会正确地设置context。注意,进程池对象的方法只能由创建进程池的进程调用。
版本3.2中的新功能: maxtasksperchild
版本3.4中的新功能: context
注
Pool
内的工作进程通常在进程池的工作队列的整个持续时间内生效。在其他系统(例如Apache、mod_wsgi等)中,释放工作进程持有的资源的常见模式是允许进程池内的工作进程只完成一定数量的作业,然后再退出、清理并生成一个新的进程来替换旧的进程。Pool
的maxtasksperchild参数就是向用户提供这个功能。-
apply
(func[, args[, kwds]])¶ 使用参数args和关键字参数kwds调用func。它阻塞直到结果完成。相比这个代码块,
apply_async()
更适合并行执行工作。此外,func仅在进程池的其中一个工作进程中执行。
-
apply_async
(func[, args[, kwds[, callback[, error_callback]]]])¶ apply()
方法的一个变体,它返回一个结果对象。如果指定callback,那么它应该是一个可接受单个参数的可调用对象。当结果完成时就对它应用callback,在调用失败的情况下则应用error_callback。
如果指定error_callback,那么它应该是一个接受单个参数的可调用对象。如果目标函数失败,则以该异常实例调用error_callback。
回调应该立即完成,否则处理结果的线程将被阻塞。
-
map
(func, iterable[, chunksize])¶ 与
map()
内置函数(虽然它只支持一个iterable参数)等价的并行方式。它阻塞直到结果准备好。该方法将iterable分成多个块,然后将这些块作为单独的任务提交到进程池。这些块的(近似)大小可以通过将chunksize设置为正整数来指定。
-
map_async
(func, iterable[, chunksize[, callback[, error_callback]]])¶ 返回结果对象的
map()
方法的变体。如果指定callback,那么它应该是一个可接受单个参数的可调用对象。当结果完成时就对它应用callback,在调用失败的情况下则应用error_callback。
如果指定error_callback,那么它应该是一个接受单个参数的可调用对象。如果目标函数失败,则以该异常实例调用error_callback。
回调应该立即完成,否则处理结果的线程将被阻塞。
-
imap
(func, iterable[, chunksize])¶ map()
的惰性版本。chunksize参数与
map()
方法使用的参数相同。对于非常长的iterable,chunksize使用一个较大值的可以使得作业比使用默认值1
更快地完成。此外,如果chunksize是
1
,那么imap()
方法返回的迭代器的next()
方法有一个可选的参数timeout:如果在timeout秒内无法返回结果,那么next(timeout)
将引发multiprocessing.TimeoutError
。
-
imap_unordered
(func, iterable[, chunksize])¶ 与
imap()
相同,除了返回的迭代器的结果的顺序应该被认为是任意的。(只有当只有一个工作进程时,其顺序才保证是“正确的”。)
-
starmap
(func, iterable[, chunksize])¶ 类似
map()
,除了iterable的元素应该是可迭代对象,它们将分拆为参数。因此,
[(1,2), (3, 4)]
这个iterable将导致[func(1,2), func(3,4)]
。版本3.3中的新功能。
-
starmap_async
(func, iterable[, chunksize[, callback[, error_back]]])¶ starmap()
和map_async()
的组合,在可迭代对象组成的iterable上迭代并以分拆后的可迭代对象调用func。返回一个结果对象。版本3.3中的新功能。
-
close
()¶ 阻止任何更多的任务提交到进程池。一旦所有任务完成,工作进程将退出。
-
terminate
()¶ 立即停止工作进程,而不需要完成未完成的工作。当进程池对象被垃圾收集,
terminate()
将被立即调用。
-
join
()¶ 等待工作进程退出。在使用
join()
之前,必须调用close()
或terminate()
。
版本3.3中的新功能:进程池对象现在支持上下文管理协议 —— 请参阅上下文管理器类型。
__enter__()
返回进程池对象,__exit__()
调用terminate()
。-
- class
multiprocessing.pool.
AsyncResult
¶ The class of the result returned by
Pool.apply_async()
andPool.map_async()
.-
get
([timeout])¶ 返回结果到达时。如果超时不是
None
且结果在超时秒内未到达,则会引发multiprocessing.TimeoutError
。如果远程调用引发异常,那么该异常将由get()
重写。
-
wait
([timeout])¶ 等待结果可用或直到超时秒。
-
ready
()¶ 返回呼叫是否已完成。
-
successful
()¶ 返回调用是否完成而不引发异常。如果结果未准备就会引发
AssertionError
。
-
以下示例演示了使用进程池:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
17.2.2.10. Listeners and Clients¶
通常,使用队列或使用Pipe()
返回的Connection
对象来完成进程之间的消息传递。
但是,multiprocessing.connection
模块允许一些额外的灵活性。它基本上提供了一个高级的面向消息的API来处理套接字或Windows命名管道。它还支持摘要认证使用hmac
模块,并且同时轮询多个连接。
-
multiprocessing.connection.
deliver_challenge
(connection, authkey)¶ 将随机生成的消息发送到连接的另一端,并等待回复。
如果回复与使用authkey作为键的消息摘要相匹配,则会向连接的另一端发送欢迎消息。否则会出现
AuthenticationError
。
-
multiprocessing.connection.
answer_challenge
(connection, authkey)¶ 接收消息,使用authkey作为键计算消息的摘要,然后发送摘要。
如果未收到欢迎消息,则会引发
AuthenticationError
。
-
multiprocessing.connection.
Client
(address[, family[, authenticate[, authkey]]])¶ 尝试设置与使用地址地址的侦听器的连接,返回
Connection
。连接类型由family参数确定,但通常可以省略,因为它通常可以从地址的格式推断。(请参阅Address Formats)
如果authenticate是
True
或authkey是字节字符串,那么使用摘要认证。用于认证的密钥将是authkey或current_process().authkey
if authkey是None
。如果身份验证失败,则会出现AuthenticationError
。请参阅Authentication keys。
- class
multiprocessing.connection.
Listener
([address[, family[, backlog[, authenticate[, authkey]]]]])¶ 用于绑定套接字或Windows命名管道的“包”用于侦听连接。
address是侦听器对象的绑定套接字或命名管道使用的地址。
注意
如果使用地址“0.0.0.0”,则该地址将不是Windows上的可连接终点。如果需要可连接的端点,则应使用“127.0.0.1”。
family是要使用的套接字(或命名管道)的类型。这可以是字符串
'AF_INET'
(对于TCP套接字),'AF_UNIX'
(对于Unix域套接字)或'AF_PIPE'
其中只有第一个保证可用。如果family是None
,那么系列将根据地址的格式推断。如果地址也是None
,则选择默认值。此默认值是假定为最快可用的系列。请参阅Address Formats。请注意,如果family是'AF_UNIX'
且地址是None
,那么套接字将创建在使用tempfile.mkstemp()
。如果侦听器对象使用套接字,则一旦套接字已被绑定,backlog(默认为1)将传递给套接字的
listen()
If authenticate is
True
(False
by default) or authkey is notNone
then digest authentication is used.如果authkey是字节字符串,那么它将被用作认证密钥;否则必须为无。
如果authkey是
None
和验证是True
,则current_process().authkey
用作认证密钥。如果authkey是None
和验证是False
,则不进行验证。如果身份验证失败,则会出现AuthenticationError
。请参阅Authentication keys。-
accept
()¶ 在侦听器对象的绑定套接字或命名管道上接受连接,并返回
Connection
对象。如果尝试认证并失败,则会引发AuthenticationError
。
-
close
()¶ 关闭侦听器对象的绑定套接字或命名管道。当侦听器被垃圾回收时,这被自动调用。但是建议明确调用它。
侦听器对象具有以下只读属性:
-
address
¶ Listener对象正在使用的地址。
-
last_accepted
¶ 最后接受的连接来自的地址。如果这不可用,则
None
。
版本3.3中的新功能:监听器对象现在支持上下文管理协议 - 请参阅Context Manager Types。
__enter__()
返回侦听器对象,__exit__()
调用close()
。-
-
multiprocessing.connection.
wait
(object_list, timeout=None)¶ 等待object_list中的对象准备就绪。返回object_list中已准备好的对象的列表。如果timeout是一个浮点数,那么调用会阻塞最多几秒钟。如果超时是
None
,那么它将阻塞无限期。负超时相当于零超时。对于Unix和Windows,如果对象可以出现在object_list中
- 可读的
Connection
对象; - 连接且可读的
socket.socket
对象;要么 Process
对象的sentinel
属性。
当有数据可以从中读取或另一端已关闭时,连接或套接字对象就绪。
Unix:
wait(object_list, timeout)
almost equivalentselect.select(object_list, [], [], timeout)
. The difference is that, ifselect.select()
is interrupted by a signal, it can raiseOSError
with an error number ofEINTR
, whereaswait()
will not.Windows: An item in object_list must either be an integer handle which is waitable (according to the definition used by the documentation of the Win32 function
WaitForMultipleObjects()
) or it can be an object with afileno()
method which returns a socket handle or pipe handle. (请注意,管道句柄和套接字句柄是而不是可等待句柄。)版本3.3中的新功能。
- 可读的
示例
以下服务器代码创建一个侦听器,它使用'secret password'
作为验证密钥。然后它等待连接并向客户端发送一些数据:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
以下代码连接到服务器并从服务器接收一些数据:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
以下代码使用wait()
等待来自多个进程的消息:
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
17.2.2.11. Authentication keys¶
当使用Connection.recv
时,接收的数据将自动取消。不幸的是,从不受信任的源解压数据是一种安全风险。因此,Listener
和Client()
使用hmac
模块提供摘要身份验证。
认证密钥是可以被认为是密码的字节字符串:一旦建立连接,两端将要求证明另一方知道认证密钥。(演示两端都使用相同的键,不涉及通过连接发送密钥)。
如果请求认证但未指定认证密钥,则使用current_process().authkey
的返回值(参见Process
)。此值将由当前进程创建的任何Process
对象自动继承。这意味着(默认地)多进程程序的所有进程将共享单个认证密钥,当在它们之间建立连接时可以使用该认证密钥。
也可以使用os.urandom()
生成合适的验证密钥。
17.2.2.12. Logging¶
一些支持日志记录可用。但请注意,logging
包不使用进程共享锁,因此可能(取决于处理程序类型)来自不同进程的消息混淆。
-
multiprocessing.
get_logger
()¶ 返回
multiprocessing
使用的记录器。如果需要,将创建一个新的。当第一次创建记录器具有
logging.NOTSET
级别且没有默认处理程序。发送到此记录器的消息将不会默认传播到根记录器。请注意,在Windows子进程将只继承父进程的日志记录器的级别 - 记录器的任何其他定制将不会继承。
-
multiprocessing.
log_to_stderr
()¶ 此函数执行对
get_logger()
的调用,但除了返回由get_logger创建的记录器之外,还添加了一个处理程序,该处理程序使用格式将输出发送到
。sys.stderr
'[%(levelname)s /%(processName)s] %(message)s'
以下是启用日志记录的示例会话:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
有关日志记录级别的完整表,请参阅logging
模块。
17.2.2.13. The multiprocessing.dummy
module¶
multiprocessing.dummy
复制multiprocessing
的API,但只不过是threading
模块的包装器。
17.2.3. Programming guidelines¶
使用multiprocessing
时,应遵循一定的准则和惯用语。
17.2.3.1. All start methods¶
以下适用于所有启动方法。
避免共享状态
尽可能避免在进程之间移动大量数据。
最好坚持使用队列或管道进行进程之间的通信,而不是使用较低的级别同步原语。
可取性
确保代理方法的参数是可选的。
线程安全代理
Do not use a proxy object from more than one thread unless you protect it with a lock.
(There is never a problem with different processes using the same proxy.)
加入僵尸进程
On Unix when a process finishes but has not been joined it becomes a zombie. There should never be very many because each time a new process starts (oractive_children()
is called) all completed processes which have not yet been joined will be joined. Also calling a finished process’sProcess.is_alive
will join the process. Even so it is probably good practice to explicitly join all the processes that you start.
更好地继承比pickle / unpickle
When using the spawn or forkserver start methods many types frommultiprocessing
need to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which needs access to a shared resource created elsewhere can inherit it from an ancestor process.
避免终止进程
Using the
Process.terminate
method to stop a process is liable to cause any shared resources (such as locks, semaphores, pipes and queues) currently being used by the process to become broken or unavailable to other processes.Therefore it is probably best to only consider using
Process.terminate
on processes which never use any shared resources.
加入使用队列的进程
Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the
Queue.cancel_join_thread
method of the queue to avoid this behaviour.)This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.
An example which will deadlock is the following:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()A fix here would be to swap the last two lines (or simply remove the
p.join()
line).
将资源显式传递给子进程
On Unix using the fork start method, a child process can make use of a shared resource created in a parent process using a global resource. However, it is better to pass the object as an argument to the constructor for the child process.
Apart from making the code (potentially) compatible with Windows and the other start methods this also ensures that as long as the child process is still alive the object will not be garbage collected in the parent process. This might be important if some resource is freed when the object is garbage collected in the parent process.
So for instance
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()should be rewritten as
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
小心用“文件类对象”替换sys.stdin
multiprocessing
originally unconditionally called:os.close(sys.stdin.fileno())in the
multiprocessing.Process._bootstrap()
method — this resulted in issues with processes-in-processes. This has been changed to:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)Which solves the fundamental issue of processes colliding with each other resulting in a bad file descriptor error, but introduces a potential danger to applications which replace
sys.stdin()
with a “file-like object” with output buffering. This danger is that if multiple processes callclose()
on this file-like object, it could result in the same data being flushed to the object multiple times, resulting in corruption.If you write a file-like object and implement your own caching, you can make it fork-safe by storing the pid whenever you append to the cache, and discarding the cache when the pid changes. For example:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cacheFor more information, see issue 5155, issue 5313 and issue 5331
17.2.3.2. The spawn and forkserver start methods¶
有一些额外的限制不适用于fork start方法。
更多picklability
Ensure that all arguments toProcess.__init__()
are picklable. Also, if you subclassProcess
then make sure that instances will be picklable when theProcess.start
method is called.
全局变量
Bear in mind that if code run in a child process tries to access a global variable, then the value it sees (if any) may not be the same as the value in the parent process at the time that
Process.start
was called.However, global variables which are just module level constants cause no problems.
安全导入主模块
Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process).
For example, using the spawn or forkserver start method running the following module would fail with a
RuntimeError
:from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()Instead one should protect the “entry point” of the program by using
if __name__ == '__main__':
as follows:from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(The
freeze_support()
line can be omitted if the program will be run normally instead of frozen.)This allows the newly spawned Python interpreter to safely import the module and then run the module’s
foo()
function.Similar restrictions apply if a pool or manager is created in the main module.
17.2.4. Examples¶
演示如何创建和使用自定义管理器和代理:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
使用Pool
:
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
显示如何使用队列将任务提供给工作进程的容器并收集结果的示例:
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()