Python multiprocessing - tracking the process of pool.map operation(Python多处理——跟踪pool.map操作的过程)
问题描述
我有一个函数可以执行一些模拟和返回一个字符串格式的数组.
我想运行模拟(函数)不同的输入参数值,超过 10000 个可能的输入值,并将结果写入单个文件.
我正在使用多处理,特别是 pool.map 函数并行运行模拟.
自整个过程运行模拟功能超过10000次需要很长的时间,我真的很想跟踪整个操作的过程.
我认为下面我当前代码中的问题是,pool.map 运行该函数 10000 次,在这些操作期间没有任何进程跟踪.一旦并行处理完成运行 10000 个模拟(可能是几小时到几天),然后我会继续跟踪 10000 个模拟结果何时保存到文件中.所以这并不是真正跟踪 pool.map 操作的处理.
我的代码是否有一个简单的修复方法可以允许进程跟踪?
def simFunction(输入):# 进行一些模拟并输出 simResult返回 str(simResult)# 并行处理输入 = np.arange(0,10000,1)如果 __name__ == "__main__":numCores = multiprocessing.cpu_count()池 = 多处理.池(进程 = numCores)t = pool.map(simFunction,输入)与 open('results.txt','w') 一样:print("开始模拟" + str(len(inputs)) + "输入值...")计数器 = 0对于我在 t:out.write(i + '
')计数器 = 计数器 + 1如果计数器%100==0:print(str(counter) + " of " + str(len(inputs)) + " 输入值模拟")print('完成了!!!!')
请注意,我使用的是 pathos.multiprocessing
而不是 multiprocessing
. 它只是 multiprocessing
的一个分支,使您能够使用多个输入执行 map
函数,具有更好的序列化,并允许您执行 map
在任何地方调用(不仅仅是在 __main__
中).您也可以使用 multiprocessing
来执行以下操作,但是代码会略有不同.
如果您使用迭代的 map
函数,则很容易跟踪进度.
from pathos.multiprocessing import ProcessingPool as Pooldef simFunction(x,y):进口时间时间.sleep(2)返回 x**2 + yx,y = 范围(100),范围(-100,100,2)res = Pool().imap(simFunction, x,y)与 open('results.txt', 'w') 一样:对于 x 中的 i:out.write("%s
" % res.next())如果 i%10 为 0:打印%s of %s 模拟";% (i, len(x))
<块引用>
0 of 100 模拟100 个模拟中的 10 个100 个模拟中的 20 个100 个模拟中的 30 个100 个模拟中的 40 个100 个模拟中的 50 个100 个模拟中的 60 个100 个模拟中的 70 个100 个模拟中的 80 个100 个模拟中的 90 个
或者,您可以使用异步 map
.在这里我会做一些不同的事情,只是为了混合起来.
导入时间res = Pool().amap(simFunction, x,y)虽然不是 res.ready():打印等待..."时间.sleep(5)
<块引用>
等待中...等待...等待...等待...
res.get()[-100、-97、-92、-85、-76、-65、-52、-37、-20、-1、20、43、68、95、124、155、188、223、260、299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1663, 1748,,2015,2108,2203,2300,2399,2500,2603,2708,2815,2924,3035,3148,3263,3380,3499,30020,323,3868,3995,4124,4255,4388,4124,4255,4388,4523,4660,4799,4940,5083,5228,5375,5524,5675,5828,5983,6140,6299,6460,6623,6788,6955,7124,7295,7468,7124,7820,7999,7643,7820,7999,8180,8363,8548,8735,8924, 9115, 9308, 9503, 9700, 9899]
无论是迭代的还是异步的map
,您都可以编写任何您想要更好地跟踪流程的代码.例如,传递一个唯一的id".到每个工作,并观察哪个返回,或者让每个工作返回它的进程 ID.有很多方法可以跟踪进度和过程……但以上内容应该可以为您提供一个开始.
您可以在这里获取pathos
.
I have a function which performs some simulation and returns an array in string format.
I want to run the simulation (the function) for varying input parameter values, over 10000 possible input values, and write the results to a single file.
I am using multiprocessing, specifically, pool.map function to run the simulations in parallel.
Since the whole process of running the simulation function over 10000 times takes a very long time, I really would like to track the process of the entire operation.
I think the problem in my current code below is that, pool.map runs the function 10000 times, without any process tracking during those operations. Once the parallel processing finishes running 10000 simulations (could be hours to days.), then I keep tracking when 10000 simulation results are being saved to a file..So this is not really tracking the processing of pool.map operation.
Is there an easy fix to my code that will allow process tracking?
def simFunction(input):
# Does some simulation and outputs simResult
return str(simResult)
# Parallel processing
inputs = np.arange(0,10000,1)
if __name__ == "__main__":
numCores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes = numCores)
t = pool.map(simFunction, inputs)
with open('results.txt','w') as out:
print("Starting to simulate " + str(len(inputs)) + " input values...")
counter = 0
for i in t:
out.write(i + '
')
counter = counter + 1
if counter%100==0:
print(str(counter) + " of " + str(len(inputs)) + " input values simulated")
print('Finished!!!!')
Note that I'm using pathos.multiprocessing
instead of multiprocessing
. It's just a fork of multiprocessing
that enables you to do map
functions with multiple inputs, has much better serialization, and allows you to execute map
calls anywhere (not just in __main__
). You could use multiprocessing
to do the below as well, however the code would be very slightly different.
If you use an iterated map
function, it's pretty easy to keep track of progress.
from pathos.multiprocessing import ProcessingPool as Pool
def simFunction(x,y):
import time
time.sleep(2)
return x**2 + y
x,y = range(100),range(-100,100,2)
res = Pool().imap(simFunction, x,y)
with open('results.txt', 'w') as out:
for i in x:
out.write("%s
" % res.next())
if i%10 is 0:
print "%s of %s simulated" % (i, len(x))
0 of 100 simulated 10 of 100 simulated 20 of 100 simulated 30 of 100 simulated 40 of 100 simulated 50 of 100 simulated 60 of 100 simulated 70 of 100 simulated 80 of 100 simulated 90 of 100 simulated
Or, you can use an asynchronous map
. Here I'll do things a little differently, just to mix it up.
import time
res = Pool().amap(simFunction, x,y)
while not res.ready():
print "waiting..."
time.sleep(5)
waiting... waiting... waiting... waiting...
res.get()
[-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899]
Either an iterated or asynchronous map
will enable you to write whatever code you want to do better process tracking. For example, pass a unique "id" to each job, and watch which come back, or have each job return it's process id. There are lots of ways to track progress and processes… but the above should give you a start.
You can get pathos
here.
这篇关于Python多处理——跟踪pool.map操作的过程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:Python多处理——跟踪pool.map操作的过程
基础教程推荐
- Dask.array.套用_沿_轴:由于额外的元素([1]),使用dask.array的每一行作为另一个函数的输入失败 2022-01-01
- Python kivy 入口点 inflateRest2 无法定位 libpng16-16.dll 2022-01-01
- 筛选NumPy数组 2022-01-01
- 线程时出现 msgbox 错误,GUI 块 2022-01-01
- 用于分类数据的跳跃记号标签 2022-01-01
- 在 Python 中,如果我在一个“with"中返回.块,文件还会关闭吗? 2022-01-01
- 如何让 python 脚本监听来自另一个脚本的输入 2022-01-01
- 何时使用 os.name、sys.platform 或 platform.system? 2022-01-01
- 使用PyInstaller后在Windows中打开可执行文件时出错 2022-01-01
- 如何在海运重新绘制中自定义标题和y标签 2022-01-01