更新时间:2021-09-18 22:26:27
一个multiprocessing.Pool
已经设置了必要的IPC机制,以允许您在作业启动后向其工作者提交作业,但是您无法通过ist队列或类似参数稍后作为参数.这就是为什么您的代码不起作用的原因.在子流程启动时,它必须知道如何与父流程进行通信.
A multiprocessing.Pool
already sets up the necessary IPC mechanisms to allow you to submit jobs to its workers after it has been started, but you can't pass ist a Queue or similar later as an argument. That's why your code doesn't work. At the time a subprocess is started, it aleady has to know how to communicate with its parent.
因此,如果您需要设置自己的队列,则应直接使用multiprocessing.Process
.另外,您要编写的是典型的工作程序,这些工作程序循环等待新作业并对其进行处理.在工作人员池上运行这样的工作人员不是您想要做的事情.
So if you need to set up your own Queues, you should use multiprocessing.Process
directly. Also, what you're writing are typical workers, which wait in a loop for new jobs and process them. Running such a worker on a pool of workers is not something you want to do.
这样,您的代码将起作用:
This way your code would work:
import multiprocessing
def produce(n, queue):
for i in xrange(n):
queue.put(i)
def worker(in_queue, out_queue):
for i in iter( in_queue.get, None):
out_queue.put(i*i)
def consumer(queue):
ans = []
for i in iter( queue.get, None):
print(i)
ans.append(i)
return ans
def main(n):
in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()
producer = multiprocessing.Process(target=produce, args=(n, in_queue))
for i in range(2):
w = multiprocessing.Process(target=worker, args=(in_queue, out_queue))
w.start()
producer.start()
res = consumer(out_queue)
main(200)
我已经在您的consumer
中添加了一条打印语句,以表明发生了某些事情. consumer
函数永远不会终止,因为您从队列中读取的代码会等待终止的None
,而该终止不会出现,因为工人和生产者都不曾将其放入队列...
I've added a print statement in your consumer
to show that there is something happening. The consumer
function just never terminates, because the code where you reading from the queues waits for a terminating None
which never comes because neither workers nor producer ever put one into the queues...