from multiprocessing import Process, Lock, Pipe # 管道
import time
import random
def producer_action(producer, pro, consumer_acc): # 生产端函数
for item in range(20):
time.sleep(random.random())
data = pro + '生产数据' + str(item)
producer.send(data)
print(data)
for item in range(consumer_acc): # 有几个消费端就 send 几个 None
producer.send(None)
producer.close()
def consumer_action(consumer, pro, lock): # 消费进程函数
while True:
lock.acquire() # 加锁,防止多个进程同时取数据导致数据的不安全性
time.sleep(random.random())
data = consumer.recv() # 接受数据
lock.release() # 释放锁
if data:
print('{} 收到了{}'.format(pro, data))
else:
consumer.close() # 表示没有数据可以接受了,关闭consume
break
if __name__ == '__main__':
producer_obj, consumer_obj = Pipe() # 管道,接受俩参数 pro传数据, consume 接受数据
lock_obj = Lock() # 基于管道的不安全性,创建一把锁,保证子进程不能同时接受数据造成数据传递和丢失之间的混乱
p_consumer_list = []
consumer_num = 3 # 3 个消费端
for i in range(consumer_num):
p_consumer = Process(target=consumer_action, args=(consumer_obj, '消费端-> {}'.format(i), lock_obj))
p_consumer_list.append(p_consumer)
p_consumer.start()
p_producer = Process(target=producer_action, args=(producer_obj, '生产端', consumer_num))
p_producer.start()
p_producer.join()
producer_obj.close() # 生产端结束
for p_consumer in p_consumer_list:
p_consumer.join()
consumer_obj.close() # 消费端结束
# apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞
# apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别)
# close() 关闭pool,使其不在接受新的任务。
# terminate() 结束工作进程,不再处理未完成的任务。
# join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
# 返回结果先 append 进列表,否则变成了单进程
from multiprocessing import Pool, JoinableQueue, Process
import time
import os
def producer(que): # 模拟动态创建数据(请求)
i = 1
while 1:
que.put('数据--> {}'.format(i))
if i == 20:
que.put(None)
i += 1
def consumer(item):
time.sleep(3) # 进程花了一些时间干了一些事
print('进程号', os.getpid(), '拿到了', item)
return item + '的返回值'
def callback(item):
print('callback : ', item)
if __name__ == '__main__':
que_producer = JoinableQueue() # 生产者队列
p_producer = Process(target=producer, args=(que_producer,))
p_producer.daemon = True
p_producer.start()
pool_obj = Pool(3)
res_list = []
while 1:
data = que_producer.get()
if data is None:
pool_obj.close() # 关闭进程池,使进程池不再接受新的任务
break
res = pool_obj.apply_async(consumer, args=(data,), callback=callback) # 非阻塞的
# print(res.get()) # 千万不要在这里打印结果,否则变成了单进程,先 append ,全部完成后 .get() 取值
res_list.append(res)
# if do something: # 关闭线程池,不再处理任务
# pool_obj.terminate()
pool_obj.join() # 主进程等待进程池的任务全部结束,先 close() 再 join(),否则报错
for res in res_list:
print(res)
from multiprocessing import Process, Manager, Lock
def action(data, lock, num=1):
lock.acquire()
data['number'] -= num
print(data)
lock.release()
if __name__ == '__main__':
p_list = []
# Manage 对象的字典,数据所有的进程都可以修改
dic = Manager().dict({'number': 101})
lock_obj = Lock()
for i in range(100):
# 起 100 个进程
p = Process(target=action, args=(dic, lock_obj))
p.start()
p_list.append(p)
for p in p_list: # 必须全部 join(),否则主程序执行完毕直接报错
p.join()
# 程序的最后打印这个值
print('-->', dic)
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。