且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何在多处理中使用队列设置管道

更新时间:2021-09-18 22:26:27

一个multiprocessing.Pool已经设置了必要的IPC机制,以允许您在作业启动后向其工作者提交作业,但是您无法通过ist队列或类似参数稍后作为参数.这就是为什么您的代码不起作用的原因.在子流程启动时,它必须知道如何与父流程进行通信.

A multiprocessing.Pool already sets up the necessary IPC mechanisms to allow you to submit jobs to its workers after it has been started, but you can't pass ist a Queue or similar later as an argument. That's why your code doesn't work. At the time a subprocess is started, it aleady has to know how to communicate with its parent.

因此,如果您需要设置自己的队列,则应直接使用multiprocessing.Process.另外,您要编写的是典型的工作程序,这些工作程序循环等待新作业并对其进行处理.在工作人员池上运行这样的工作人员不是您想要做的事情.

So if you need to set up your own Queues, you should use multiprocessing.Process directly. Also, what you're writing are typical workers, which wait in a loop for new jobs and process them. Running such a worker on a pool of workers is not something you want to do.

这样,您的代码将起作用:

This way your code would work:

import multiprocessing


def produce(n, queue):
    for i in xrange(n):
        queue.put(i)

def worker(in_queue, out_queue):
    for i in iter( in_queue.get, None):
        out_queue.put(i*i)

def consumer(queue):
    ans = []
    for i in iter( queue.get, None):
        print(i)
        ans.append(i)
    return ans


def main(n):
    in_queue = multiprocessing.Queue()
    out_queue = multiprocessing.Queue()
    producer = multiprocessing.Process(target=produce, args=(n, in_queue))
    for i in range(2):
        w = multiprocessing.Process(target=worker, args=(in_queue, out_queue))
        w.start()
    producer.start()
    res = consumer(out_queue)

main(200)

我已经在您的consumer中添加了一条打印语句,以表明发生了某些事情. consumer函数永远不会终止,因为您从队列中读取的代码会等待终止的None,而该终止不会出现,因为工人和生产者都不曾将其放入队列...

I've added a print statement in your consumer to show that there is something happening. The consumer function just never terminates, because the code where you reading from the queues waits for a terminating None which never comes because neither workers nor producer ever put one into the queues...