Multiprocessing Queue.get() hangs(多处理 Queue.get() 挂起)
问题描述
我正在尝试实现基本的多处理,但遇到了问题.下面附上python脚本.
I'm trying to implement basic multiprocessing and I've run into an issue. The python script is attached below.
import time, sys, random, threading
from multiprocessing import Process
from Queue import Queue
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency
append_queue = Queue(10)
database = FrequencyStore()
def add_to_append_queue(_list):
append_queue.put(_list)
def process_append_queue():
while True:
item = append_queue.get()
database.append(item)
print("Appended to database in %.4f seconds" % database.append_time)
append_queue.task_done()
return
def main():
database.load_db()
print("Database loaded in %.4f seconds" % database.load_time)
append_queue_process = Process(target=process_append_queue)
append_queue_process.daemon = True
append_queue_process.start()
#t = threading.Thread(target=process_append_queue)
#t.daemon = True
#t.start()
while True:
path = raw_input("file: ")
if path == "exit":
break
a = AnalyzeFrequency(path)
a.analyze()
print("Analyzed file in %.4f seconds" % a._time)
add_to_append_queue(a.get_results())
append_queue.join()
#append_queue_process.join()
database.save_db()
print("Database saved in %.4f seconds" % database.save_time)
sys.exit(0)
if __name__=="__main__":
main()
AnalyzeFrequency 分析文件中单词的频率,get_results()
返回所述单词和频率的排序列表.列表非常大,可能有 10000 项.
The AnalyzeFrequency analyzes the frequencies of words in a file and get_results()
returns a sorted list of said words and frequencies. The list is very large, perhaps 10000 items.
然后将该列表传递给 add_to_append_queue
方法,该方法将其添加到队列中.process_append_queue 一项一项地获取项目并将频率添加到数据库".此操作比 main()
中的实际分析需要更长的时间,因此我尝试对此方法使用单独的过程.当我尝试使用线程模块执行此操作时,一切正常,没有错误.当我尝试使用 Process 时,脚本挂在 item = append_queue.get()
.
This list is then passed to the add_to_append_queue
method which adds it to a queue. The process_append_queue takes the items one by one and adds the frequencies to a "database". This operation takes a bit longer than the actual analysis in main()
so I am trying to use a seperate process for this method. When I try and do this with the threading module, everything works perfectly fine, no errors. When I try and use Process, the script hangs at item = append_queue.get()
.
有人能解释一下这里发生了什么,或许可以指导我解决问题吗?
Could someone please explain what is happening here, and perhaps direct me toward a fix?
感谢所有答案!
更新
泡菜错误是我的错,只是一个错字.现在我在多处理中使用 Queue 类,但 append_queue.get() 方法仍然挂起.新代码
import time, sys, random
from multiprocessing import Process, Queue
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency
append_queue = Queue()
database = FrequencyStore()
def add_to_append_queue(_list):
append_queue.put(_list)
def process_append_queue():
while True:
database.append(append_queue.get())
print("Appended to database in %.4f seconds" % database.append_time)
return
def main():
database.load_db()
print("Database loaded in %.4f seconds" % database.load_time)
append_queue_process = Process(target=process_append_queue)
append_queue_process.daemon = True
append_queue_process.start()
#t = threading.Thread(target=process_append_queue)
#t.daemon = True
#t.start()
while True:
path = raw_input("file: ")
if path == "exit":
break
a = AnalyzeFrequency(path)
a.analyze()
print("Analyzed file in %.4f seconds" % a._time)
add_to_append_queue(a.get_results())
#append_queue.join()
#append_queue_process.join()
print str(append_queue.qsize())
database.save_db()
print("Database saved in %.4f seconds" % database.save_time)
sys.exit(0)
if __name__=="__main__":
main()
更新 2
这是数据库代码:
class FrequencyStore:
def __init__(self):
self.sorter = Sorter()
self.db = {}
self.load_time = -1
self.save_time = -1
self.append_time = -1
self.sort_time = -1
def load_db(self):
start_time = time.time()
try:
file = open("results.txt", 'r')
except:
raise IOError
self.db = {}
for line in file:
word, count = line.strip("
").split("=")
self.db[word] = int(count)
file.close()
self.load_time = time.time() - start_time
def save_db(self):
start_time = time.time()
_db = []
for key in self.db:
_db.append([key, self.db[key]])
_db = self.sort(_db)
try:
file = open("results.txt", 'w')
except:
raise IOError
file.truncate(0)
for x in _db:
file.write(x[0] + "=" + str(x[1]) + "
")
file.close()
self.save_time = time.time() - start_time
def create_sorted_db(self):
_temp_db = []
for key in self.db:
_temp_db.append([key, self.db[key]])
_temp_db = self.sort(_temp_db)
_temp_db.reverse()
return _temp_db
def get_db(self):
return self.db
def sort(self, _list):
start_time = time.time()
_list = self.sorter.mergesort(_list)
_list.reverse()
self.sort_time = time.time() - start_time
return _list
def append(self, _list):
start_time = time.time()
for x in _list:
if x[0] not in self.db:
self.db[x[0]] = x[1]
else:
self.db[x[0]] += x[1]
self.append_time = time.time() - start_time
推荐答案
评论建议您尝试在 Windows 上运行它.正如我在评论中所说,
Comments suggest you're trying to run this on Windows. As I said in a comment,
如果你在 Windows 上运行它,它就不能工作 - Windows 不能有 fork()
,所以每个进程都有自己的队列,他们什么都没有彼此做.整个模块由从头开始"导入Windows 上的每个进程.您需要在 main()
中创建队列,并将其作为参数传递给工作函数.
If you're running this on Windows, it can't work - Windows doesn't have
fork()
, so each process gets its own Queue and they have nothing to do with each other. The entire module is imported "from scratch" by each process on Windows. You'll need to create the Queue inmain()
, and pass it as an argument to the worker function.
这里充实了您需要做的事情以使其可移植,尽管我删除了所有数据库内容,因为它与您迄今为止描述的问题无关.我还删除了 daemon
摆弄,因为这通常只是避免干净地关闭事物的一种懒惰方式,而且通常以后会回来咬你:
Here's fleshing out what you need to do to make it portable, although I removed all the database stuff because it's irrelevant to the problems you've described so far. I also removed the daemon
fiddling, because that's usually just a lazy way to avoid shutting down things cleanly, and often as not will come back to bite you later:
def process_append_queue(append_queue):
while True:
x = append_queue.get()
if x is None:
break
print("processed %d" % x)
print("worker done")
def main():
import multiprocessing as mp
append_queue = mp.Queue(10)
append_queue_process = mp.Process(target=process_append_queue, args=(append_queue,))
append_queue_process.start()
for i in range(100):
append_queue.put(i)
append_queue.put(None) # tell worker we're done
append_queue_process.join()
if __name__=="__main__":
main()
输出是明显"的东西:
processed 0
processed 1
processed 2
processed 3
processed 4
...
processed 96
processed 97
processed 98
processed 99
worker done
注意:因为 Windows 不(不能)fork()
,所以工作进程不可能继承 Windows 上的任何 Python 对象.每个进程从一开始就运行整个程序.这就是为什么您的原始程序无法运行的原因:每个进程都创建了自己的 Queue
,与另一个进程中的 Queue
完全无关.在上面显示的方法中,只有主进程创建了一个 Queue
,主进程将它(作为参数)传递给工作进程.
Note: because Windows doesn't (can't) fork()
, it's impossible for worker processes to inherit any Python object on Windows. Each process runs the entire program from its start. That's why your original program couldn't work: each process created its own Queue
, wholly unrelated to the Queue
in the other process. In the approach shown above, only the main process creates a Queue
, and the main process passes it (as an argument) to the worker process.
这篇关于多处理 Queue.get() 挂起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:多处理 Queue.get() 挂起
基础教程推荐
- 使用PyInstaller后在Windows中打开可执行文件时出错 2022-01-01
- 用于分类数据的跳跃记号标签 2022-01-01
- 线程时出现 msgbox 错误,GUI 块 2022-01-01
- 筛选NumPy数组 2022-01-01
- 何时使用 os.name、sys.platform 或 platform.system? 2022-01-01
- 如何让 python 脚本监听来自另一个脚本的输入 2022-01-01
- 如何在海运重新绘制中自定义标题和y标签 2022-01-01
- Python kivy 入口点 inflateRest2 无法定位 libpng16-16.dll 2022-01-01
- Dask.array.套用_沿_轴:由于额外的元素([1]),使用dask.array的每一行作为另一个函数的输入失败 2022-01-01
- 在 Python 中,如果我在一个“with"中返回.块,文件还会关闭吗? 2022-01-01