当前位置: 动力学知识库 > 问答 > 编程问答 >

Python multiprocessing Pool Queues communication

问题描述:

I'm trying to implement a pool of two processes that run in parallel and communicate through a queue.

The goal is to have a writer process that passes a message to a reader process by using a queue.

Each process is printing a feedback on the terminal in order to have a feedback.

Here is the code:

#!/usr/bin/env python

import os

import time

import multiprocessing as mp

import Queue

def writer(queue):

pid = os.getpid()

for i in range(1,4):

msg = i

print "### writer ", pid, " -> ", msg

queue.put(msg)

time.sleep(1)

msg = 'Done'

print '### '+msg

queue.put(msg)

def reader(queue):

pid = os.getpid()

time.sleep(0.5)

while True:

print "--- reader ", pid, " -> ",

msg = queue.get()

print msg

if msg == 'Done':

break

if __name__ == "__main__":

print "Initialize the experiment PID: ", os.getpid()

mp.freeze_support()

queue = mp.Queue()

pool = mp.Pool()

pool.apply_async(writer, (queue))

pool.apply_async(reader, (queue))

pool.close()

pool.join()

The output I am expecting should be something like this:

Initialize the experiment PID: 2341

writer 2342 -> 1

reader 2343 -> 1

writer 2342 -> 2

reader 2343 -> 2

writer 2342 -> 3

reader 2343 -> 3

Done

However I only get the line:

Initialize the experiment PID: 2341

then the script quits.

What is the correct way to implement the interprocess communication of two processes in a pool that communicates through a queue?

网友答案:

I Used mp.Manager().Queue() as the queue because we couldn't directly pass Queue. Trying to directly use the Queue was causing exceptions but getting unhandled since we were using apply_async.

I updated your codes to:

#!/usr/bin/env python

import os
import time
import multiprocessing as mp
import Queue

def writer(queue):
    pid = os.getpid()
    for i in range(1,4):
        msg = i
        print "### writer ", pid, " -> ", msg
        queue.put(msg)
        time.sleep(1)
        msg = 'Done'
    print '### '+msg
    queue.put(msg)

def reader(queue):
    pid = os.getpid()
    time.sleep(0.5)
    while True:
        print "--- reader ", pid, " -> ",
        msg = queue.get()
        print msg
        if msg == 'Done':
            break

if __name__ == "__main__":
    print "Initialize the experiment PID: ", os.getpid()
    manager = mp.Manager()

    queue = manager.Queue()

    pool = mp.Pool()
    pool.apply_async(writer, (queue,))
    pool.apply_async(reader, (queue,))

    pool.close()
    pool.join()

And I got this output:

Initialize the experiment PID:  46182
### writer  46210  ->  1
--- reader  46211  ->  1
### writer  46210  ->  2
--- reader  46211  ->  2
### writer  46210  ->  3
--- reader  46211  ->  3
### Done
--- reader  46211  ->  Done

I believe this is what you expected.

分享给朋友:
您可能感兴趣的文章:
随机阅读: