python多处理:某些函数完成后不返回(队列材料太大)

python multiprocessing: some functions do not return when they are complete (queue material too big)(python多处理:某些函数完成后不返回(队列材料太大))

本文介绍了python多处理:某些函数完成后不返回(队列材料太大)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用多处理的进程和队列.我并行启动了几个函数,并且大多数函数都表现良好:它们完成,它们的输出进入它们的队列,它们显示为 .is_alive() == False.但是由于某种原因,一些函数没有运行.它们总是显示 .is_alive() == True,即使在函数的最后一行(打印语句说完成")完成之后也是如此.无论我启动了哪些功能,都会发生这种情况,即使它只有一个.如果不并行运行,则函数运行良好并正常返回.什么种类可能是问题?

I am using multiprocessing's Process and Queue. I start several functions in parallel and most behave nicely: they finish, their output goes to their Queue, and they show up as .is_alive() == False. But for some reason a couple of functions are not behaving. They always show .is_alive() == True, even after the last line in the function (a print statement saying "Finished") is complete. This happens regardless of the set of functions I launch, even it there's only one. If not run in parallel, the functions behave fine and return normally. What kind of thing might be the problem?

这是我用来管理作业的通用函数.我没有展示的只是我传递给它的函数.它们很长,经常使用 matplotlib,有时会启动一些 shell 命令,但我不知道失败的命令有什么共同点.

Here's the generic function I'm using to manage the jobs. All I'm not showing is the functions I'm passing to it. They're long, often use matplotlib, sometimes launch some shell commands, but I cannot figure out what the failing ones have in common.

def  runFunctionsInParallel(listOf_FuncAndArgLists):
    """
    Take a list of lists like [function, arg1, arg2, ...]. Run those functions in parallel, wait for them all to finish, and return the list of their return values, in order.   
    """
    from multiprocessing import Process, Queue

    def storeOutputFFF(fff,theArgs,que): #add a argument to function for assigning a queue
        print 'MULTIPROCESSING: Launching %s in parallel '%fff.func_name
        que.put(fff(*theArgs)) #we're putting return value into queue
        print 'MULTIPROCESSING: Finished %s in parallel! '%fff.func_name
        # We get this far even for "bad" functions
        return

    queues=[Queue() for fff in listOf_FuncAndArgLists] #create a queue object for each function
    jobs = [Process(target=storeOutputFFF,args=[funcArgs[0],funcArgs[1:],queues[iii]]) for iii,funcArgs in enumerate(listOf_FuncAndArgLists)]
    for job in jobs: job.start() # Launch them all
    import time
    from math import sqrt
    n=1
    while any([jj.is_alive() for jj in jobs]): # debugging section shows progress updates
        n+=1
        time.sleep(5+sqrt(n)) # Wait a while before next update. Slow down updates for really long runs.
        print('
---------------------------------------------------
'+ '	'.join(['alive?','Job','exitcode','Func',])+ '
---------------------------------------------------')
        print('
'.join(['%s:	%s:	%s:	%s'%(job.is_alive()*'Yes',job.name,job.exitcode,listOf_FuncAndArgLists[ii][0].func_name) for ii,job in enumerate(jobs)]))
        print('---------------------------------------------------
')
    # I never get to the following line when one of the "bad" functions is running.
    for job in jobs: job.join() # Wait for them all to finish... Hm, Is this needed to get at the Queues?
    # And now, collect all the outputs:
    return([queue.get() for queue in queues])

推荐答案

好吧,好像函数的输出太大时,用来填充Queue的管道被堵塞了(我粗略的理解?这是一个未解决的/关闭的错误?http://bugs.python.org/issue8237).我已经修改了我的问题中的代码,以便有一些缓冲(在进程运行时定期清空队列),这解决了我所有的问题.所以现在这需要一组任务(函数及其参数),启动它们,并收集输出.我希望它看起来更简单/更干净.

Alright, it seems that the pipe used to fill the Queue gets plugged when the output of a function is too big (my crude understanding? This is an unresolved/closed bug? http://bugs.python.org/issue8237). I have modified the code in my question so that there is some buffering (queues are regularly emptied while processes are running), which solves all my problems. So now this takes a collection of tasks (functions and their arguments), launches them, and collects the outputs. I wish it were simpler /cleaner looking.

编辑(2014 年 9 月;2017 年 11 月更新:重写以提高可读性):我正在使用我此后所做的增强来更新代码.新代码(功能相同,但功能更好)在这里:https://gitlab.com/cpbl/cpblUtilities/blob/master/parallel.py

Edit (2014 Sep; update 2017 Nov: rewritten for readability): I'm updating the code with the enhancements I've made since. The new code (same function, but better features) is here: https://gitlab.com/cpbl/cpblUtilities/blob/master/parallel.py

调用说明也在下方.

def runFunctionsInParallel(*args, **kwargs):
    """ This is the main/only interface to class cRunFunctionsInParallel. See its documentation for arguments.
    """
    return cRunFunctionsInParallel(*args, **kwargs).launch_jobs()

###########################################################################################
###
class cRunFunctionsInParallel():
    ###
    #######################################################################################
    """Run any list of functions, each with any arguments and keyword-arguments, in parallel.
The functions/jobs should return (if anything) pickleable results. In order to avoid processes getting stuck due to the output queues overflowing, the queues are regularly collected and emptied.
You can now pass os.system or etc to this as the function, in order to parallelize at the OS level, with no need for a wrapper: I made use of hasattr(builtinfunction,'func_name') to check for a name.
Parameters
----------
listOf_FuncAndArgLists : a list of lists 
    List of up-to-three-element-lists, like [function, args, kwargs],
    specifying the set of functions to be launched in parallel.  If an
    element is just a function, rather than a list, then it is assumed
    to have no arguments or keyword arguments. Thus, possible formats
    for elements of the outer list are:
      function
      [function, list]
      [function, list, dict]
kwargs: dict
    One can also supply the kwargs once, for all jobs (or for those
    without their own non-empty kwargs specified in the list)
names: an optional list of names to identify the processes.
    If omitted, the function name is used, so if all the functions are
    the same (ie merely with different arguments), then they would be
    named indistinguishably
offsetsSeconds: int or list of ints
    delay some functions' start times
expectNonzeroExit: True/False
    Normal behaviour is to not proceed if any function exits with a
    failed exit code. This can be used to override this behaviour.
parallel: True/False
    Whenever the list of functions is longer than one, functions will
    be run in parallel unless this parameter is passed as False
maxAtOnce: int
    If nonzero, this limits how many jobs will be allowed to run at
    once.  By default, this is set according to how many processors
    the hardware has available.
showFinished : int
    Specifies the maximum number of successfully finished jobs to show
    in the text interface (before the last report, which should always
    show them all).
Returns
-------
Returns a tuple of (return codes, return values), each a list in order of the jobs provided.
Issues
-------
Only tested on POSIX OSes.
Examples
--------
See the testParallel() method in this module
    """

这篇关于python多处理:某些函数完成后不返回(队列材料太大)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:python多处理:某些函数完成后不返回(队列材料太大)

基础教程推荐