前言
我们使用Python和它的数据处理库套件(如panda和scikiti -learn)进行大量数据处理时候,可能使用了大量的计算资源。如何监视程序的内存使用情况就显得尤为重要。
1.询问操作系统
跟踪内存使用情况的最简单方法是使用操作系统本身。您可以使用top来提供您在一段时间内使用的资源的概述。或者,如果您想要现场检查资源使用情况,您可以使用ps命令:
$ ps -m -o %cpu,%mem,command
%CPU %MEM COMMAND
23.4 7.2 python analyze_data.py
0.0 0.0 bash
m标志指示ps按照进程使用最多内存的顺序显示结果。o标志控制显示每个进程的哪些属性——在本例中是使用的CPU百分比、消耗的系统内存百分比和正在执行的进程的命令行。CPU百分比将一个完整的CPU核心计算为100%的使用率,因此如果您有一个4核的机器,可能会看到总计高达400%的CPU使用率。还有其他输出选项用于显示其他进程属性,以及用于控制显示哪些进程的ps的其他标志。
结合一些创造性的shell脚本,可以编写一个监视脚本,使用ps跟踪任务的内存使用情况。
2.tracemalloc
Python解释器的操作中有大量的hooks,可以在Python代码运行时用于监视和内省。pdb使用这些钩子来提供调试;覆盖率也使用它们来提供测试覆盖率。tracemalloc模块还使用它们来提供一个了解内存使用情况的窗口。
tracemalloc是在Python 3.4中添加的一个标准库模块,它跟踪Python解释器分配的每个单独的内存块。tracemalloc能够提供关于运行Python进程中内存分配的非常细粒度的信息:
import tracemalloc
tracemalloc.start()
my_complex_analysis_method()
current, peak = tracemalloc.get_traced_memory()
print(f"Current memory usage is {current / 10**6}MB; Peak was {peak / 10**6}MB")
tracemalloc.stop()
调用tracemplugin .start()启动跟踪进程。在进行跟踪时,您可以询问分配了哪些内容的详细信息;在本例中,我们只要求当前和峰值内存分配。调用tracemplugin .stop()将删除hook并清除已经收集的任何跟踪。
不过,这种程度的细节是要付出代价的。tracemalloc将自己深深地注入到正在运行的Python进程中——正如您所预期的那样,这会带来性能损失。在我们的测试中,我们观察到在运行分析时使用tracemalloc的速度下降了30%。在分析单个进程时,这可能是可以的,但在生产中,您确实不希望仅仅为了监视内存使用情况而降低30%的性能。
3.抽样
幸运的是,Python标准库提供了另一种观察内存使用情况的方法—resource模块。resource模块为程序分配的资源提供基本控制,包括内存使用:
import resource
usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
getrusage()的调用返回程序所使用的资源。常量RUSAGE_SELF表示我们只对这个进程使用的资源感兴趣,而不是它的子进程。返回的对象是一个结构,它包含一系列操作系统资源,包括CPU时间、信号、上下文切换等;但就我们的目的而言,我们感兴趣的是maxrss——最大驻留集大小——它是进程当前在RAM中持有的内存量。
但是,与tracemalloc模块不同的是,资源模块不随时间跟踪使用情况—它只提供点采样。因此,我们需要实现一种方法来随时间对内存使用情况进行采样。
首先,我们定义一个类来执行内存监控:
import resource
from time import sleep
class MemoryMonitor:
def __init__(self):
self.keep_measuring = True
def measure_usage(self):
max_usage = 0
while self.keep_measuring:
max_usage = max(
max_usage,
resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
)
sleep(0.1)
return max_usage
在这个类的实例上调用measure_usage()时,它将进入一个循环,每0.1秒测量一次内存使用情况。将跟踪内存使用量的任何增加,并在循环退出时返回最大内存分配。
但是什么告诉循环退出呢?我们在哪里调用被监视的代码?我们在单独的线程中完成。
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
monitor = MemoryMonitor()
mem_thread = executor.submit(monitor.measure_usage)
try:
fn_thread = executor.submit(my_analysis_function)
result = fn_thread.result()
finally:
monitor.keep_measuring = False
max_usage = mem_thread.result()
print(f"Peak memory usage: {max_usage}")
ThreadPoolExecutor为提交要在线程中执行的任务提供了一种方便的方法。我们向执行程序提交两个任务——监视器和my_analysis_function(如果分析函数需要额外的参数,可以通过提交调用传入它们)。对fn_thread.result()的调用将被阻塞,直到分析函数完成并获得其结果,此时我们可以通知监视器停止并获得最大内存。try/finally模块确保了如果分析函数抛出异常,内存线程仍然会被终止。
使用这种方法,我们可以有效地随时间对内存使用情况进行抽样。大部分工作将在主分析线程中完成;但是每0.1秒,监视器线程就会被唤醒,进行一次内存测量,如果内存使用量增加就将其存储,然后返回睡眠状态。