Using a a manager for updating a Queue in a Python multiprocess(在 Python 多进程中使用管理器更新队列)
问题描述
我正在设计一个 Python 多处理代码以在一个队列中工作,该队列可能会在处理过程中更新.下面的代码有时会起作用,或者卡住,或者出现 Empty 错误.
I am designing a Python multiprocessing code to work in a queue that might be updated along the processing. The following code sometimes works, or get stuck, or rises an Empty error.
import multiprocessing as mp
def worker(working_queue, output_queue):
while True:
if working_queue.empty() is True:
break
else:
picked = working_queue.get_nowait()
if picked % 2 == 0:
output_queue.put(picked)
else:
working_queue.put(picked+1)
return
if __name__ == '__main__':
manager = mp.Manager()
static_input = xrange(100)
working_q = manager.Queue()
output_q = mp.Queue()
for i in static_input:
working_q.put(i)
processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())]
for proc in processes:
proc.start()
for proc in processes:
proc.join()
results_bank = []
while True:
if output_q.empty() is True:
break
results_bank.append(output_q.get_nowait())
print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed.
results_bank.sort()
print results_bank
我应该使用列表作为全局变量并锁定它,而不是 manager.Queue()?
Should I use a list as a global variable, and lock it, instead of a manager.Queue()?
推荐答案
我刚刚添加了一个 try:
和 except Exception:
来处理 Empty 错误.现在的结果似乎是一致的.如果您发现我在此解决方案中忽略的问题,请告诉我.
I just added a try:
and except Exception:
to handle the Empty error. The results seem to be consistent now. Please let me know If you find problems I overlooked in this solution.
import multiprocessing as mp
def worker(working_queue, output_queue):
while True:
try:
if working_queue.empty() is True:
break
else:
picked = working_queue.get_nowait()
if picked % 2 == 0:
output_queue.put(picked)
else:
working_queue.put(picked+1)
except Exception:
continue
return
if __name__ == '__main__':
#Manager seem to be unnecessary.
#manager = mp.Manager()
#working_q = manager.Queue()
working_q = mp.Queue()
output_q = mp.Queue()
static_input = xrange(100)
for i in static_input:
working_q.put(i)
processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())]
for proc in processes:
proc.start()
for proc in processes:
proc.join()
results_bank = []
while True:
if output_q.empty() is True:
break
results_bank.append(output_q.get_nowait())
print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed.
results_bank.sort()
print results_bank
这篇关于在 Python 多进程中使用管理器更新队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:在 Python 多进程中使用管理器更新队列
基础教程推荐
- 线程时出现 msgbox 错误,GUI 块 2022-01-01
- 何时使用 os.name、sys.platform 或 platform.system? 2022-01-01
- 筛选NumPy数组 2022-01-01
- Python kivy 入口点 inflateRest2 无法定位 libpng16-16.dll 2022-01-01
- Dask.array.套用_沿_轴:由于额外的元素([1]),使用dask.array的每一行作为另一个函数的输入失败 2022-01-01
- 如何让 python 脚本监听来自另一个脚本的输入 2022-01-01
- 如何在海运重新绘制中自定义标题和y标签 2022-01-01
- 使用PyInstaller后在Windows中打开可执行文件时出错 2022-01-01
- 用于分类数据的跳跃记号标签 2022-01-01
- 在 Python 中,如果我在一个“with"中返回.块,文件还会关闭吗? 2022-01-01