我希望在执行多线程编程时,在Python中有一个单一的生产者,多个使用者架构。我希望做这样的手术:
所以我需要所有的消费者从生产者那里得到同样的数据。
当我使用队列来执行这个任务时,我意识到,除了第一个使用者之外,所有的用户都需要使用我所拥有的实现。
一种可能的解决方案是为每个使用者线程创建一个唯一的队列,其中生产者在多个队列中推送相同的数据。有更好的方法吗?
from threading import Thread
import time
import random
from Queue import Queue
my_queue = Queue(0)
def Producer():
global my_queue
my_list = []
for each in range (50):
my_list.append(each)
my_queue.put(my_list)
def Consumer1():
print "Consumer1"
global my_queue
print my_queue.get()
my_queue.task_done()
def Consumer2():
print "Consumer2"
global my_queue
print my_queue.get()
my_queue.task_done()
P = Thread(name = "Producer", target = Producer)
C1 = Thread(name = "Consumer1", target = Consumer1)
C2 = Thread(name = "Consumer2", target = Consumer2)
P.start()
C1.start()
C2.start()
在上面的示例中,C2将无限期地被阻塞,因为C1会消耗P1生成的数据。我希望C1和C2都能够访问P1生成的相同数据。
谢谢你的代码/指针!
发布于 2015-02-05 08:43:09
您的制作人只创建了一项工作:
my_queue.put(my_list)
例如,放置两次my_list,两个消费者都可以工作:
def Producer():
global my_queue
my_list = []
for each in range (50):
my_list.append(each)
my_queue.put(my_list)
my_queue.put(my_list)
因此,通过这种方式,您可以使用相同的列表将两个作业放入队列。
但是,我必须警告您:在没有线程同步的情况下在不同线程中修改相同的数据通常是个坏主意。
无论如何,使用一个队列的方法不适用于您,因为一个队列应该使用相同算法的线程进行处理。
因此,我建议您继续为每个消费者提供唯一的队列,因为其他解决方案并不是那么简单。
发布于 2015-02-05 08:43:57
那么每个线程队列如何?
作为启动每个使用者的一部分,您还将创建另一个队列,并将其添加到“所有线程队列”的列表中。然后启动生产者,将所有队列的列表传递给它,然后他可以将数据推入所有队列中。
发布于 2021-01-18 13:45:53
一个单一的生产者和五个消费者的例子,证实了。
from multiprocessing import Process, JoinableQueue
import time
import os
q = JoinableQueue()
def producer():
for item in range(30):
time.sleep(2)
q.put(item)
pid = os.getpid()
print(f'producer {pid} done')
def worker():
while True:
item = q.get()
pid = os.getpid()
print(f'pid {pid} Working on {item}')
print(f'pid {pid} Finished {item}')
q.task_done()
for i in range(5):
p = Process(target=worker, daemon=True).start()
producers = []
# it is easy to extend it to multi producers.
for i in range(1):
p = Process(target=producer)
producers.append(p)
p.start()
# make sure producers done
for p in producers:
p.join()
# block until all workers are done
q.join()
print('All work completed')
解释:
https://stackoverflow.com/questions/28349302
复制