python multiprocessing .join() deadlock depends on worker function(python multiprocessing .join() 死锁依赖于worker函数)
问题描述
我正在使用 multiprocessing
python生成库 4 Process()
对象以并行化 CPU 密集型任务.任务(灵感和代码来自这个伟大的 article) 是计算列表中每个整数的素因子.
I am using the multiprocessing
python library to spawn 4 Process()
objects to parallelize a cpu intensive task. The task (inspiration and code from this great article) is to compute the prime factors for every integer in a list.
main.py:
import random
import multiprocessing
import sys
num_inputs = 4000
num_procs = 4
proc_inputs = num_inputs/num_procs
input_list = [int(1000*random.random()) for i in xrange(num_inputs)]
output_queue = multiprocessing.Queue()
procs = []
for p_i in xrange(num_procs):
print "Process [%d]"%p_i
proc_list = input_list[proc_inputs * p_i:proc_inputs * (p_i + 1)]
print " - num inputs: [%d]"%len(proc_list)
# Using target=worker1 HANGS on join
p = multiprocessing.Process(target=worker1, args=(p_i, proc_list, output_queue))
# Using target=worker2 RETURNS with success
#p = multiprocessing.Process(target=worker2, args=(p_i, proc_list, output_queue))
procs.append(p)
p.start()
for p in jobs:
print "joining ", p, output_queue.qsize(), output_queue.full()
p.join()
print "joined ", p, output_queue.qsize(), output_queue.full()
print "Processing complete."
ret_vals = []
while output_queue.empty() == False:
ret_vals.append(output_queue.get())
print len(ret_vals)
print sys.getsizeof(ret_vals)
观察:
- 如果每个进程的目标是函数
worker1
,对于大于 4000 个元素的输入列表,主线程会卡在.join()
上,等待产生的进程终止并且永不返回. - 如果每个进程的目标是函数
worker2
,对于相同的输入列表,代码工作正常,主线程返回.
- If the target for each process is the function
worker1
, for an input list larger than 4000 elements the main thread gets stuck on.join()
, waiting for the spawned processes to terminate and never returns. - If the target for each process is the function
worker2
, for the same input list the code works just fine and the main thread returns.
这让我很困惑,因为 worker1
和 worker2
之间的唯一区别(见下文)是前者在 Queue
而后者为每个进程插入一个列表.
This is very confusing to me, as the only difference between worker1
and worker2
(see below) is that the former inserts individual lists in the Queue
whereas the latter inserts a single list of lists for each process.
为什么使用 worker1
而没有使用 worker2
目标会出现死锁?两者(或都不)不应该超出 多处理队列最大大小限制为 32767?
Why is there deadlock using worker1
and not using worker2
target?
Shouldn't both (or neither) go beyond the Multiprocessing Queue maxsize limit is 32767?
worker1 与 worker2:
def worker1(proc_num, proc_list, output_queue):
'''worker function which deadlocks'''
for num in proc_list:
output_queue.put(factorize_naive(num))
def worker2(proc_num, proc_list, output_queue):
'''worker function that works'''
workers_stuff = []
for num in proc_list:
workers_stuff.append(factorize_naive(num))
output_queue.put(workers_stuff)
<小时>
关于 SO 有很多类似的问题,但我相信这些问题的核心显然与所有问题不同.
There are a lot of similar questions on SO, but I believe the core of this questions is clearly distinct from all of them.
相关链接:
- https://sopython.com/canon/82/programs-using-multiprocessing-hang-deadlock-and-never-complete/
- python 多处理问题
- python 多处理 - 进程在加入大型队列时挂起
- Process.join() 和队列不起作用大数
- Python 3 多处理在队列为空之前调用join时出现队列死锁
- 使用多处理模块的脚本不会终止
- 为什么 multiprocessing.Process.join() 挂起?李>
- 何时在进程上调用 .join()?
- Python多处理模块的.join()方法到底是什么在做什么?
推荐答案
文档对此提出警告:
警告:如上所述,如果子进程已将项目放入队列(并且尚未使用 JoinableQueue.cancel_join_thread),则该进程将不会终止,直到所有缓冲项目都已刷新到管道.
Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.
这意味着如果您尝试加入该进程,您可能会遇到死锁,除非您确定已放入队列的所有项目都已被消耗.同样,如果子进程是非守护进程,则父进程在尝试加入其所有非守护子进程时可能会在退出时挂起.
This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.
虽然 Queue
似乎是无界的,但实际上,队列中的项目被缓冲在内存中,以避免进程间管道过载.在刷新这些内存缓冲区之前,进程无法正常结束.您的 worker1()
将比 worker2()
更多的项目 on 放入队列中,仅此而已.请注意,在实施诉诸于内存中缓冲之前可以排队的项目数量没有定义:它可能因操作系统和 Python 版本而异.
While a Queue
appears to be unbounded, under the covers queued items are buffered in memory to avoid overloading inter-process pipes. A process cannot end normally before those memory buffers are flushed. Your worker1()
puts a lot more items on the queue than your worker2()
, and that's all there is to it. Note that the number of items that can queued before the implementation resorts to buffering in memory isn't defined: it can vary across OS and Python release.
正如文档建议的那样,避免这种情况的正常方法是 .get()
所有项目 off 队列 before 您尝试.join()
进程.正如您所发现的,是否有必要这样做取决于每个工作进程已将多少项放入队列中.
As the docs suggest, the normal way to avoid this is to .get()
all the items off the queue before you attempt to .join()
the processes. As you've discovered, whether it's necessary to do so depends in an undefined way on how many items have been put on the queue by each worker process.
这篇关于python multiprocessing .join() 死锁依赖于worker函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:python multiprocessing .join() 死锁依赖于worker函数
基础教程推荐
- 用于分类数据的跳跃记号标签 2022-01-01
- 如何让 python 脚本监听来自另一个脚本的输入 2022-01-01
- 如何在海运重新绘制中自定义标题和y标签 2022-01-01
- Dask.array.套用_沿_轴:由于额外的元素([1]),使用dask.array的每一行作为另一个函数的输入失败 2022-01-01
- Python kivy 入口点 inflateRest2 无法定位 libpng16-16.dll 2022-01-01
- 线程时出现 msgbox 错误,GUI 块 2022-01-01
- 筛选NumPy数组 2022-01-01
- 在 Python 中,如果我在一个“with"中返回.块,文件还会关闭吗? 2022-01-01
- 使用PyInstaller后在Windows中打开可执行文件时出错 2022-01-01
- 何时使用 os.name、sys.platform 或 platform.system? 2022-01-01