上次说了很多Linux下进程相关知识,这边不再复述,下面来说说Python的并发编程,如有错误欢迎提出~
如果遇到听不懂的可以看上一次的文章:
Python3 与 C# 并发编程之~ 上篇 (Net)
官方文档:https://docs.python.org/3/library/multiprocessing.html
Python的进程创建非常方便,看个案例:(这种方法通用,fork只适用于Linux系)
import os
# 注意一下,导入的是Process不是process(Class是大写开头)
from multiprocessing import Process
def test(name):
print("[子进程-%s]PID:%d,PPID:%d" % (name, os.getpid(), os.getppid()))
def main():
print("[父进程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
p = Process(target=test, args=("萌萌哒", )) # 单个元素的元组表达别忘了(x,)
p.start()
p.join() # 父进程回收子进程资源(内部调用了wait系列方法)
if __name__ == '__main__':
main()
运行结果:
[父进程]PID:25729,PPID:23434
[子进程-萌萌哒]PID:25730,PPID:25729
创建子进程时,传入一个执行函数和参数,用start()方法来启动进程即可
join()
方法是父进程回收子进程的封装(主要是回收僵尸子进程(点我))
其他参数可以参考源码 or 文档,贴一下源码的 init
方法:
def__init__(self,group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)
扩展: name:为当前进程实例的别名
p.is_alive()
判断进程实例p是否还在执行p.terminate()
终止进程(发 SIGTERM
信号)上面的案例如果用OOP来实现就是这样:(如果不指定方法,默认调Run方法)
import os
from multiprocessing import Process
class My_Process(Process):
# 重写了Proce类的Init方法
def __init__(self, name):
self.__name = name
Process.__init__(self) # 调用父类方法
# 重写了Process类的run()方法
def run(self):
print("[子进程-%s]PID:%d,PPID:%d" % (self.__name, os.getpid(),
os.getppid()))
def main():
print("[父进程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
p = My_Process("萌萌哒") # 如果不指定方法,默认调Run方法
p.start()
p.join() # 父进程回收子进程资源(内部调用了wait系列方法)
if __name__ == '__main__':
main()
现在说说里面的一些门道(只像用的可以忽略)
新版本的封装可能多层,这时候可以看看Python3.3.X系列(这个算是Python3早期版本了,很多代码都暴露出来,比较明了直观)
multiprocessing.process.py
# 3.4.x开始,Process有了一个BaseProcess
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/process.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/process.py
def join(self, timeout=None):
'''一直等到子进程over'''
self._check_closed()
# 断言(False就触发异常,提示就是后面的内容
# 开发中用的比较多,部署的时候可以python3 -O xxx 去除所以断言
assert self._parent_pid == os.getpid(), "只能 join 一个子进程"
assert self._popen is not None, "只能加入一个已启动的进程"
res = self._popen.wait(timeout) # 本质就是用了我们之前讲的wait系列
if res is not None:
_children.discard(self) # 销毁子进程
multiprocessing.popen_fork.py
# 3.4.x开始,在popen_fork文件中(以前是multiprocessing.forking.py)
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/popen_fork.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/popen_fork.py
def wait(self, timeout=None):
if self.returncode is None:
# 设置超时的一系列处理
if timeout is not None:
from multiprocessing.connection import wait
if not wait([self.sentinel], timeout):
return None
# 核心操作
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
return self.returncode
# 回顾一下上次说的:os.WNOHANG - 如果没有子进程退出,则不阻塞waitpid()调用
def poll(self, flag=os.WNOHANG):
if self.returncode is None:
try:
# 他的内部调用了waitpid
pid, sts = os.waitpid(self.pid, flag)
except OSError as e:
# 子进程尚未创建
# e.errno == errno.ECHILD == 10
return None
if pid == self.pid:
if os.WIFSIGNALED(sts):
self.returncode = -os.WTERMSIG(sts)
else:
assert os.WIFEXITED(sts), "Status is {:n}".format(sts)
self.returncode = os.WEXITSTATUS(sts)
return self.returncode
关于断言的简单说明:(别泛滥)
如果条件为真,它什么都不做,反之它触发一个带可选错误信息的AssertionError
def test(a, b):
assert b != 0, "哥哥,分母不能为0啊"
return a / b
def main():
test(1, 0)
if __name__ == '__main__':
main()
结果:
Traceback (most recent call last):
File "0.assert.py", line 11, in <module>
main()
File "0.assert.py", line 7, in main
test(1, 0)
File "0.assert.py", line 2, in test
assert b != 0, "哥哥,分母不能为0啊"
AssertionError: 哥哥,分母不能为0啊
运行的时候可以指定 -O参数
来忽略所以 assert
,eg:
python3-O0.assert.py
Traceback (most recent call last):
File "0.assert.py", line 11, in <module>
main()
File "0.assert.py", line 7, in main
test(1, 0)
File "0.assert.py", line 3, in test
return a / b
ZeroDivisionError: division by zero
扩展:
https://docs.python.org/3/library/unittest.html
https://www.cnblogs.com/shangren/p/8038935.html
多个进程就不需要自己手动去管理了,有Pool来帮你完成,先看个案例:
import os
import time
from multiprocessing import Pool # 首字母大写
def test(name):
print("[子进程-%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
time.sleep(1)
def main():
print("[父进程]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))
p = Pool(5) # 设置最多5个进程(不设置就默认为CPU核数)
for i in range(10):
# 异步执行
p.apply_async(test, args=(i, )) # 同步用apply(如非必要不建议用)
p.close() # 关闭池,不再加入新任务
p.join() # 等待所有子进程执行完毕回收资源(join可以指定超时时间,eg:`p.join(1)`)
print("over")
if __name__ == '__main__':
main()
图示:(join可以指定超时时间,eg: p.join(1)
)
调用 join()
之前必须先调用 close()
,调用 close()
之后就不能继续添加新的 Process
了
验证一下Pool的默认大小是CPU的核数,看源码:
multiprocessing.pool.py
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/pool.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/pool.py
class Pool(object):
def __init__(self, processes=指定的进程数,...):
if processes is None:
processes = os.cpu_count() or 1 # os.cpu_count() ~ CPU的核数
源码里面 apply_async
方法,是有回调函数(callback)的
def apply_async(self,func,args=(),kwds={},callback=None,error_callback=None):
if self._state != RUN:
raise ValueError("Pool not running")
result = ApplyResult(self._cache, callback, error_callback)
self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
return result
来看个例子:(和JQ很像)
import os
import time
from multiprocessing import Pool # 首字母大写
def test(name):
print("[子进程%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
time.sleep(1)
return name
def error_test(name):
print("[子进程%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
raise Exception("[子进程%s]啊,我挂了~" % name)
def callback(result):
"""成功之后的回调函数"""
print("[子进程%s]执行完毕" % result) # 没有返回值就为None
def error_callback(msg):
"""错误之后的回调函数"""
print(msg)
def main():
print("[父进程]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))
p = Pool() # CPU默认核数
for i in range(5):
# 搞2个出错的看看
if i > 2:
p.apply_async(
error_test,
args=(i, ),
callback=callback,
error_callback=error_callback) # 异步执行
else:
# 异步执行,成功后执行callback函数(有点像jq)
p.apply_async(test, args=(i, ), callback=callback)
p.close() # 关闭池,不再加入新任务
p.join() # 等待所有子进程执行完毕回收资源
print("over")
if __name__ == '__main__':
main()
输出:
[父进程]PID=12348,PPID=10999
[子进程0]PID=12349,PPID=12348
[子进程2]PID=12351,PPID=12348
[子进程1]PID=12350,PPID=12348
[子进程3]PID=12352,PPID=12348
[子进程4]PID=12352,PPID=12348
[子进程3]啊,我挂了~
[子进程4]啊,我挂了~
[子进程0]执行完毕
[子进程2]执行完毕
[子进程1]执行完毕
over
常用的就普及完了,这些比较简单,过几天有下篇继续深入