前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >day39(多进程) - 管道、进程池、进程池的返回值、进程回调函数、进程之间的数据共享

day39(多进程) - 管道、进程池、进程池的返回值、进程回调函数、进程之间的数据共享

原创
作者头像
少年包青菜
修改2020-12-17 10:36:53
1.9K0
修改2020-12-17 10:36:53
举报
文章被收录于专栏:Python 学习

1.管道.py (了解,基本都是用队列比较多)

代码语言:txt
复制
from multiprocessing import Process, Lock, Pipe  # 管道
import time
import random


def producer_action(producer, pro, consumer_acc):  # 生产端函数
    for item in range(20):
        time.sleep(random.random())
        data = pro + '生产数据' + str(item)
        producer.send(data)
        print(data)

    for item in range(consumer_acc):  # 有几个消费端就 send 几个 None
        producer.send(None)

    producer.close()


def consumer_action(consumer, pro, lock):  # 消费进程函数
    while True:
        lock.acquire()  # 加锁,防止多个进程同时取数据导致数据的不安全性
        time.sleep(random.random())
        data = consumer.recv()  # 接受数据
        lock.release()  # 释放锁
        if data:
            print('{} 收到了{}'.format(pro, data))
        else:
            consumer.close()  # 表示没有数据可以接受了,关闭consume
            break


if __name__ == '__main__':
    producer_obj, consumer_obj = Pipe()  # 管道,接受俩参数   pro传数据,  consume 接受数据
    lock_obj = Lock()  # 基于管道的不安全性,创建一把锁,保证子进程不能同时接受数据造成数据传递和丢失之间的混乱

    p_consumer_list = []
    consumer_num = 3  # 3 个消费端
    for i in range(consumer_num):
        p_consumer = Process(target=consumer_action, args=(consumer_obj, '消费端-> {}'.format(i), lock_obj))
        p_consumer_list.append(p_consumer)
        p_consumer.start()

    p_producer = Process(target=producer_action, args=(producer_obj, '生产端', consumer_num))
    p_producer.start()

    p_producer.join()
    producer_obj.close()  # 生产端结束

    for p_consumer in p_consumer_list:
        p_consumer.join()
    consumer_obj.close()  # 消费端结束

2.进程池,进程池的返回值,进程池的回调函数

# apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞

# apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别)

# close() 关闭pool,使其不在接受新的任务。

# terminate() 结束工作进程,不再处理未完成的任务。

# join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。

# 返回结果先 append 进列表,否则变成了单进程

代码语言:txt
复制
from multiprocessing import Pool, JoinableQueue, Process
import time
import os


def producer(que):  # 模拟动态创建数据(请求)
    i = 1
    while 1:
        que.put('数据--> {}'.format(i))
        if i == 20:
            que.put(None)
        i += 1


def consumer(item):
    time.sleep(3)  # 进程花了一些时间干了一些事
    print('进程号', os.getpid(), '拿到了', item)
    return item + '的返回值'


def callback(item):
    print('callback : ', item)


if __name__ == '__main__':
    que_producer = JoinableQueue()  # 生产者队列
    p_producer = Process(target=producer, args=(que_producer,))
    p_producer.daemon = True
    p_producer.start()

    pool_obj = Pool(3)
    res_list = []
    while 1:
        data = que_producer.get()
        if data is None:
            pool_obj.close()  # 关闭进程池,使进程池不再接受新的任务
            break
        res = pool_obj.apply_async(consumer, args=(data,), callback=callback)  # 非阻塞的
        # print(res.get())  # 千万不要在这里打印结果,否则变成了单进程,先 append ,全部完成后 .get() 取值
        res_list.append(res)

    # if do something:  # 关闭线程池,不再处理任务
        # pool_obj.terminate()

    pool_obj.join()  # 主进程等待进程池的任务全部结束,先 close() 再 join(),否则报错

    for res in res_list:
        print(res)

3.进程之间的数据共享之 Manager

代码语言:txt
复制
from multiprocessing import Process, Manager, Lock


def action(data, lock, num=1):
    lock.acquire()
    data['number'] -= num
    print(data)
    lock.release()


if __name__ == '__main__':
    p_list = []
    # Manage 对象的字典,数据所有的进程都可以修改
    dic = Manager().dict({'number': 101})
    lock_obj = Lock()
    for i in range(100):
        # 起 100 个进程
        p = Process(target=action, args=(dic, lock_obj))
        p.start()
        p_list.append(p)

    for p in p_list:  # 必须全部 join(),否则主程序执行完毕直接报错 
        p.join()

    # 程序的最后打印这个值
    print('-->', dic)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.管道.py (了解,基本都是用队列比较多)
  • 2.进程池,进程池的返回值,进程池的回调函数
  • 3.进程之间的数据共享之 Manager
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档