多处理 - 管道与队列

Multiprocessing - Pipe vs Queue(多处理 - 管道与队列)

本文介绍了多处理 - 管道与队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Python 的多处理包中的队列和管道的根本区别是什么?p>

在什么情况下应该选择一个而不是另一个?什么时候使用 Pipe() 比较有利?什么时候使用 Queue() 比较有利?

解决方案

  • 一个 Pipe() 只能有两个端点.

  • 一个 Queue() 可以有多个生产者和消费者.

何时使用它们

如果您需要两个以上的点进行通信,请使用 Queue().

如果您需要绝对性能,Pipe() 更快,因为 Queue() 是建立在 Pipe() 之上的.

绩效基准测试

假设您想要生成两个进程并尽快在它们之间发送消息.这些是使用 Pipe()Queue() 的类似测试之间的拉力赛的计时结果......这是在运行 Ubuntu 11.10 和 Python 2.7 的 ThinkpadT61 上.2.

仅供参考,我输入了 JoinableQueue() 作为奖励;JoinableQueue() 在调用 queue.task_done() 时计算任务(它甚至不知道具体的任务,它只计算队列中未完成的任务),以便 queue.join() 知道工作已完成.

此答案底部的每个代码...

mpenning@mpenning-T61:~$ python multi_pipe.py向 Pipe() 发送 10000 个数字需要 0.0369849205017 秒向 Pipe() 发送 100000 个数字需要 0.328398942947 秒向 Pipe() 发送 1000000 个数字需要 3.17266988754 秒mpenning@mpenning-T61:~$ python multi_queue.py向 Queue() 发送 10000 个号码需要 0.105256080627 秒向 Queue() 发送 100000 个号码需要 0.980564117432 秒向 Queue() 发送 1000000 个数字需要 10.1611330509 秒mpnening@mpenning-T61:~$ python multi_joinablequeue.py向 JoinableQueue() 发送 10000 个号码需要 0.172781944275 秒向 JoinableQueue() 发送 100000 个号码需要 1.5714070797 秒向 JoinableQueue() 发送 1000000 个号码需要 15.8527247906 秒mpenning@mpenning-T61:~$

总而言之,Pipe()Queue() 快大约三倍.甚至不要考虑 JoinableQueue() 除非你真的必须有好处.

奖励材料 2

多处理在信息流中引入了细微的变化,这使得调试变得困难,除非您知道一些捷径.例如,您可能有一个脚本在许多条件下通过字典索引时工作正常,但很少会因某些输入而失败.

通常当整个python进程崩溃时我们会得到失败的线索;但是,如果多处理功能崩溃,您不会将未经请求的崩溃回溯打印到控制台.如果不知道是什么导致了进程崩溃,就很难追踪未知的多进程崩溃.

我发现追踪多处理崩溃信息的最简单方法是将整个多处理函数包装在 try/except 中并使用 traceback.print_exc():

导入回溯定义运行(自我,参数):尝试:# 在这里插入要多处理的东西返回 args[0]['那个']除了:print "FATAL: reader({0}) exited while multiprocessing".format(args)追溯.print_exc()

现在,当您发现崩溃时,您会看到如下内容:

FATAL: reader([{'crash': 'this'}]) 在多处理时退出回溯(最近一次通话最后):__init__ 中的文件foo.py",第 19 行自我运行(参数)运行中的文件foo.py",第 46 行键错误:'那个'

源代码:

<小时>

<代码>"""multi_pipe.py"""从多处理导入过程,管道进口时间def reader_proc(管道):## 从管道中读取;这将作为一个单独的进程产生p_output, p_input = 管道p_input.close() # 我们只是在阅读而真:msg = p_output.recv() # 从输出管道读取,什么也不做如果味精=='完成':休息def writer(计数,p_input):对于 xrange(0, count) 中的 ii:p_input.send(ii) # 将 'count' 数字写入输入管道p_input.send('完成')如果 __name__=='__main__':对于 [10**4, 10**5, 10**6] 中的计数:# 管道是单向的,有两个端点:p_input ------>p_outputp_output, p_input = Pipe() # writer() 从 _this_ 进程写入 p_inputreader_p = Process(target=reader_proc, args=((p_output, p_input),))reader_p.daemon = Truereader_p.start() # 启动阅读器进程p_output.close() # 我们不再需要这部分 Pipe()_start = time.time()writer(count, p_input) # 发送很多东西到 reader_proc()p_input.close()reader_p.join()print("向 Pipe() 发送 {0} 个数字需要 {1} 秒".format(count,(time.time() - _start)))

<小时>

<代码>"""多队列.py"""从多处理导入进程,队列进口时间导入系统def reader_proc(队列):## 从队列中读取;这将作为一个单独的进程产生而真:msg = queue.get() # 从队列中读取,什么也不做如果(味精 == '完成'):休息def writer(计数,队列):## 写入队列对于范围内的 ii(0,计数):queue.put(ii) # 将 'count' 数写入队列queue.put('完成')如果 __name__=='__main__':pqueue = Queue() # writer() 从 _this_ 进程写入 pqueue对于 [10**4, 10**5, 10**6] 中的计数:### reader_proc() 作为单独的进程从 pqueue 读取reader_p = Process(target=reader_proc, args=((pqueue),))reader_p.daemon = Truereader_p.start() # 启动 reader_proc() 作为一个单独的 python 进程_start = time.time()writer(count, pqueue) # 发送很多东西给 reader()reader_p.join() # 等待阅读器完成print("发送 {0} 个数字到 Queue() 花费了 {1} 秒".format(count,(time.time() - _start)))

<小时>

<代码>"""multi_joinablequeue.py"""从多处理导入流程,JoinableQueue进口时间def reader_proc(队列):## 从队列中读取;这将作为一个单独的进程产生而真:msg = queue.get() # 从队列中读取,什么也不做queue.task_done()def writer(计数,队列):对于 xrange(0, count) 中的 ii:queue.put(ii) # 将 'count' 数写入队列如果 __name__=='__main__':对于 [10**4, 10**5, 10**6] 中的计数:jqueue = JoinableQueue() # writer() 从 _this_ 进程写入 jqueue# reader_proc() 作为不同的进程从 jqueue 中读取...reader_p = Process(target=reader_proc, args=((jqueue),))reader_p.daemon = Truereader_p.start() # 启动阅读器进程_start = time.time()writer(count, jqueue) # 向 reader_proc() 发送很多东西(在不同的进程中)jqueue.join() # 等待阅读器完成print("向 JoinableQueue() 发送 {0} 个数字需要 {1} 秒".format(count,(time.time() - _start)))

What are the fundamental differences between queues and pipes in Python's multiprocessing package?

In what scenarios should one choose one over the other? When is it advantageous to use Pipe()? When is it advantageous to use Queue()?

解决方案

  • A Pipe() can only have two endpoints.

  • A Queue() can have multiple producers and consumers.

When to use them

If you need more than two points to communicate, use a Queue().

If you need absolute performance, a Pipe() is much faster because Queue() is built on top of Pipe().

Performance Benchmarking

Let's assume you want to spawn two processes and send messages between them as quickly as possible. These are the timing results of a drag race between similar tests using Pipe() and Queue()... This is on a ThinkpadT61 running Ubuntu 11.10, and Python 2.7.2.

FYI, I threw in results for JoinableQueue() as a bonus; JoinableQueue() accounts for tasks when queue.task_done() is called (it doesn't even know about the specific task, it just counts unfinished tasks in the queue), so that queue.join() knows the work is finished.

The code for each at bottom of this answer...

mpenning@mpenning-T61:~$ python multi_pipe.py 
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py 
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py 
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$

In summary Pipe() is about three times faster than a Queue(). Don't even think about the JoinableQueue() unless you really must have the benefits.

BONUS MATERIAL 2

Multiprocessing introduces subtle changes in information flow that make debugging hard unless you know some shortcuts. For instance, you might have a script that works fine when indexing through a dictionary in under many conditions, but infrequently fails with certain inputs.

Normally we get clues to the failure when the entire python process crashes; however, you don't get unsolicited crash tracebacks printed to the console if the multiprocessing function crashes. Tracking down unknown multiprocessing crashes is hard without a clue to what crashed the process.

The simplest way I have found to track down multiprocessing crash informaiton is to wrap the entire multiprocessing function in a try / except and use traceback.print_exc():

import traceback
def run(self, args):
    try:
        # Insert stuff to be multiprocessed here
        return args[0]['that']
    except:
        print "FATAL: reader({0}) exited while multiprocessing".format(args) 
        traceback.print_exc()

Now, when you find a crash you see something like:

FATAL: reader([{'crash': 'this'}]) exited while multiprocessing
Traceback (most recent call last):
  File "foo.py", line 19, in __init__
    self.run(args)
  File "foo.py", line 46, in run
    KeyError: 'that'

Source Code:


"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time

def reader_proc(pipe):
    ## Read from the pipe; this will be spawned as a separate Process
    p_output, p_input = pipe
    p_input.close()    # We are only reading
    while True:
        msg = p_output.recv()    # Read from the output pipe and do nothing
        if msg=='DONE':
            break

def writer(count, p_input):
    for ii in xrange(0, count):
        p_input.send(ii)             # Write 'count' numbers into the input pipe
    p_input.send('DONE')

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        # Pipes are unidirectional with two endpoints:  p_input ------> p_output
        p_output, p_input = Pipe()  # writer() writes to p_input from _this_ process
        reader_p = Process(target=reader_proc, args=((p_output, p_input),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process

        p_output.close()       # We no longer need this part of the Pipe()
        _start = time.time()
        writer(count, p_input) # Send a lot of stuff to reader_proc()
        p_input.close()
        reader_p.join()
        print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
            (time.time() - _start)))


"""
multi_queue.py
"""

from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    pqueue = Queue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:             
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to Queue() took {1} seconds".format(count, 
            (time.time() - _start)))


"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        queue.task_done()

def writer(count, queue):
    for ii in xrange(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
        # reader_proc() reads from jqueue as a different process...
        reader_p = Process(target=reader_proc, args=((jqueue),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process
        _start = time.time()
        writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
        jqueue.join()         # Wait for the reader to finish
        print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count, 
            (time.time() - _start)))

这篇关于多处理 - 管道与队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:多处理 - 管道与队列

基础教程推荐