在传统的软件开发中,自动化单元测试是确定代码是否完成预期任务的主要方法。它帮助开发人员信任他们的代码,并可以自信地进行更改。破坏性的更改会被单元测试检测出来。从GitHub上许多科研库的情况来看,深度学习的从业者们还不喜欢这种方法。我希望,这篇文章至少能够说服某些人在他们的深度学习项目中使用单元测试。
本文最初发布于Tilman Krokotsch的个人博客,由InfoQ中文站翻译并分享。
深度学习是一门很难评估代码正确性的学科。随机初始化、大型数据集和权重有限的可解释性,所有这些都意味着,要找到模型无法训练的确切原因,大多数时候都需要反复试验。在传统的软件开发中,自动化单元测试是确定代码是否完成预期任务的主要方法。它帮助开发人员信任他们的代码,并可以自信地进行更改。破坏性的更改会被单元测试检测出来。
从GitHub上许多科研库的情况来看,深度学习的从业者们还不喜欢这种方法。从业者不知道他们的代码是否正常工作,他们觉得这样很好吗?通常,由于上述三个原因,定义学习系统中每个组件的预期行为并不容易。然而,我认为,从业者和研究人员应该重新审视他们对单元测试的抵触,因为它可以让研究过程更加顺利。你需要学习下如何信任你的代码。
显然,我不是第一个,也不是最后一个谈论深度学习单元测试的人。如果你对这个话题感兴趣,可以看看下面这些资料:
这篇文章的灵感就是来自于上述内容,可能还有很多我一时想不起来的。我想再补充一些内容,我们将重点关注如何编写可重用的单元测试,以便你“不再重复自己”。
本文的示例将测试一个用 PyTorch 编写的系统的组件,该系统在 MNIST 上训练变分自编码器(VAE)。
这里: https://www.github.com/tilman151/unittest_dl ,提供了本文用到的所有代码。
如果你熟悉单元测试,可以跳过此部分。对于其他人,我们一起看下 Python 中的单元测试是什么样子。简单起见,我们将使用内置的包unittest
,而不是其他复杂异常的包。
一般来说,单元测试的目的是检查代码是否能正确运行。通常,在文件末尾,你会看到这样的东西:
if __name__ == 'main':
net = Network()
x = torch.randn(4, 1, 32, 32)
y = net(x)
print(y.shape)
如果直接执行该文件,该代码片段将构建一个网络,执行前向传递并打印输出形状。这样,我们就可以看到前向传递是否会抛出错误,以及输出的形状是否合理。如果你将代码分发到了不同的文件中,则必须手动运行每个文件,并检查打印到控制台的内容。更糟糕的是,这个代码片段有时会在运行后被删除,在有变化时被重写。 原则上,这已经是一个基本的单元测试。我们所要做的就是将它形式化一点,使它很容易自动运行,如下所示:
import unittest
class MyFirstTest(unittest.TestCase):
def test_shape(self):
net = Network()
x = torch.randn(4, 1, 32, 32)
y = net(x)
self.assertEqual(torch.Size((10,)), y.shape)
unittest
包的主要组件是TestCase
类。单个单元测试是TestCase
子类的成员函数。在我们的例子中,包将自动检测类MyFirstTest
并运行函数test_shape
。如果满足assertEqual
调用的条件,则测试成功。否则,或者如果它崩溃了,则测试失败。
如果你需要进一步了解这个包是如何工作的,请点击这里查看官方文档。在实践中,我总是建议使用带有测试运行集成的 IDE,比如 PyCharm。你只需按下按钮就可以运行所需的测试。
现在,我们已经了解了单元测试的工作原理,下一个问题是我们应该测试什么。下面是我们的示例的代码结构:
|- src
|- dataset.py
|- model.py
|- trainer.py
|- run.py
前三个文件中的内容正如其文件名,而最后一个文件会创建所有训练组件并启动。我们将测试每个文件中的函数,run.py
除外,因为它是程序的入口点。
我们在示例中使用的数据集是torchvision
MNIST 类。因此,我们可以假设像加载图像和训练 / 测试分割这样的基本功能可以正常工作。然而,MNIST 类提供了充足的配置项,因此,我们应该测试是否正确配置了所有内容。dataset.py
文件包含一个名为MyMNIST
的类,它有两个成员变量。成员train_data
包含torchvision
MNIST 类的一个实例,它根据配置加载数据的训练分割,而test_data
中的实例则加载测试分割。它们都会在每幅图像的每条边上填充 2 个像素,并将像素值规格化为 [- 1,1] 之间的值。此外,train_data
会对每幅图像应用随机旋转变换来扩充数据。
为了继续使用上面的代码片段,我们将首先测试数据集输出是否是我们想要的形状。图像填充意味着,它们现在的大小应该是 32x32 像素。我们的测试看起来是这样的:
def test_shape(self):
dataset = MyMNIST()
sample, _ = dataset.train_data[0]
self.assertEqual(torch.Shape((1, 32, 32)), sample.shape)
现在,我们可以确定我们的填充操作符合预期。这可能看起来很琐碎,有一些人可能会认为我测试这个很迂腐,但是,我都数不清楚我有多少次遇到形状错误了,因为我没弄清楚填充函数是如何工作的。像这样的简单测试编写起来很快,并且以后可以为你省去许多麻烦。
下一项配置是数据缩放。在我们的例子中,这非常简单。我们希望确保每幅图像的像素值在 [- 1,1] 之间。与之前的测试相反,我们将对数据集中的所有图像运行测试。通过这种方式,我们就可以确定,对于整个数据集而言,我们的数据缩放是合理的。
def test_scaling(self):
dataset = MyMNIST()
for sample, _ in dataset.train_data:
self.assertGreaterEqual(1, sample.max())
self.assertLessEqual(-1, sample.min())
self.assertTrue(torch.any(sample < 0))
self.assertTrue(torch.any(sample > 0))
如你所见,我们不仅要测试每幅图像的最大值和最小值是否在范围内。我们还要测试,大于零和小于零的断言没有意外地将值缩放到 [0,1] 之间。这个测试之所以有效,是因为我们可以假设 MNIST 中的每幅图像都覆盖了整个取值范围。对于更复杂的数据,比如自然图像,我们需要一个更复杂的测试条件。如果缩放是基于数据的统计信息,那么测试一下是否只使用训练分割来计算这些统计信息会是一个不错的主意。
扩充训练数据有助于极大地提高模型的性能,特别是在数据量有限的情况下。另一方面,我们不会扩充我们的测试数据,因为我们想要保持模型评估的确定性。这意味着,我们应该测试我们的训练数据是否扩充了,而我们的测试数据没有。关于这一点,敏锐的读者会注意到一些重要的东西。到目前为止,我们的测试只涵盖了训练数据。有一点需要强调一下:
总是在训练和测试数据上运行测试。
代码在一个数据分割上没问题,并不能保证在另一个分割上不存在未检测到的 Bug。对于数据扩充,我们甚至希望针对每个分割断言代码的不同行为。
现在,对于我们的数据扩充问题,一个简单的测试是加载一个示例两次,然后检查两个版本是否相同。一个简单的解决方案是为每一个分割编写一个测试函数:
def test_augmentation_active_train_data(self):
dataset = MyMNIST()
are_same = []
for i in range(len(dataset.train_data)):
sample_1, _ = dataset.train_data[i]
sample_2, _ = dataset.train_data[i]
are_same.append(0 == torch.sum(sample_1 - sample_2))
self.assertTrue(not all(are_same))
def test_augmentation_inactive_test_data(self):
dataset = MyMNIST()
are_same = []
for i in range(len(dataset.test_data)):
sample_1, _ = dataset.test_data[i]
sample_2, _ = dataset.test_data[i]
are_same.append(0 == torch.sum(sample_1 - sample_2))
self.assertTrue(all(are_same))
这些函数测试我们想要测试的内容,但是,如你所见,它们几乎是互相重复。这主要有两个缺点。首先,如果在测试中需要更改某些内容,就必须得记住两个函数都要更改。其次,如果我们想另外添加一个分割,例如一个验证分割,就不得不第三次复制测试。要解决这个问题,我们应该将测试功能提取到一个单独的函数中,然后由真正的测试函数调用两次。重构后的测试类似下面这样:
def test_augmentation(self):
dataset = MyMNIST()
self._check_augmentation(dataset.train_data, active=True)
self._check_augmentation(dataset.test_data, active=False)
def _check_augmentation(self, data, active):
are_same = []
for i in range(len(data)):
sample_1, _ = data[i]
sample_2, _ = data[i]
are_same.append(0 == torch.sum(sample_1 - sample_2))
if active:
self.assertTrue(not all(are_same))
else:
self.assertTrue(all(are_same))
对于给定的数据集,_check_augmentation
函数会判断是否激活了扩充功能,有效地消除了代码中的重复内容。unittest
包不会自动运行函数,因为它不是以test_
开头的。鉴于我们的测试函数现在真的很短,我们把它们合并成一个组合函数。它们只测试一个概念,即扩充是如何工作的,因此应该属于相同的测试函数。但是,这种组合引入了另一个问题。现在,如果测试失败了,就很难直接看到哪一个分割失败了。这个包只会告诉我们组合函数的名称。现在看下subTest
函数。subTest
是TestCase
类的一个成员函数,它使得在测试函数中标记不同的测试组件成为可能。这样,包就可以准确地告诉我们测试的哪一部分失败了。最终的函数是这样的:
def test_augmentation(self):
dataset = MyMNIST()
with self.subTest(split='train'):
self._check_augmentation(dataset.train_data, active=True)
with self.subTest(split='test'):
self._check_augmentation(dataset.test_data, active=False)
现在,我们有了一个无重复、精确定位、可重用的测试功能。我们在此所使用的核心原则可以应用到我们在前面几节中编写的所有其他单元测试中。你可以在相应的存储库中查看最终的测试。
数据集的最后一类单元测试与我们的示例关系不大,因为我们使用的是内置数据集。无论如何我们都会把它包含进来,因为它是我们学习系统的一个重要组成部分。通常,你会在一个数据加载类中使用数据集,该类会控制批次并可以并行化加载过程。因此,测试下数据加载类在单进程和多进程模式下是否都可以处理该数据集是一个好主意。参考我们在扩充测试中所学到的内容,测试函数如下所示:
def test_single_process_dataloader(self):
dataset = MyMNIST()
with self.subTest(split='train'):
self._check_dataloader(dataset.train_data, num_workers=0)
with self.subTest(split='test'):
self._check_dataloader(dataset.test_data, num_workers=0)
def test_multi_process_dataloader(self):
dataset = MyMNIST()
with self.subTest(split='train'):
self._check_dataloader(dataset.train_data, num_workers=2)
with self.subTest(split='test'):
self._check_dataloader(dataset.test_data, num_workers=2)
def _check_dataloader(self, data, num_workers):
loader = DataLoader(data, batch_size=4, num_workers=num_workers)
for _ in loader:
pass
_check_dataloader
函数不会对加载的数据做任何测试。我们只是想检查加载过程有没有抛出错误。理论上,你也可以检查诸如正确的批次大小或填充不同长度的序列数据。因为我们使用了数据加载器的最基本配置,所以可以省略这些检查。
同样,这个测试可能看起来琐碎而没有必要,但让我们通过一个例子看看这个简单的检查如何拯救了我。这个项目需要从 pandas 数据帧加载序列数据,并从这些数据帧上的一个滑动窗口构建样本。我们的数据集太大了,无法装入内存,所以我们必须按需加载数据帧,并从中剪切出需要的序列。为了提高加载速度,我们决定用一个 LRU 缓存缓存一些数据帧。在我们早期的单流程实验中,它可以像预期的那样工作,因此,我们决定将它包含在代码库中。结果是,这个缓存并不适合多处理,但幸好,我们的单元测试提前发现了这个问题。在使用多处理时,我们停用了缓存,避免以后出现令人不快的意外。
有些人可能已经在我们的单元测试中看到了另一个重复模式。每个测试都会在训练数据上运行一次,在测试数据上运行一次,这会产生四行相同的代码:
with self.subTest(split='train'):
self._check_something(dataset.train_data)
with self.subTest(split='test'):
self._check_dataloader(dataset.test_data)
我们完全有理由消除这种重复。遗憾的是,这涉及创建一个高阶函数,该函数将_check_something
作为参数。有时,例如对于扩充测试,我们还需要向_check_something
函数传递额外的参数。最后,所需的程序结构的复杂性会大幅增加,并使得所要测试的概念更模糊。一般的规则是,测试代码的复杂性要满足可读性和可重用性的需要。
模型可以说是学习系统的核心组件,而且通常需要是完全可配置的。这意味着,还有很多东西需要测试。幸运的是,PyTorch 中用于神经网络模型的 API 非常简洁,大多数实践者都会严格遵守使用规范。这使得为模型编写可重用的单元测试相当容易。
我们的模型是一个简单的 VAE,由一个全连接的编码器和解码器组成(如果你不熟悉 VAE,请看这里)。前向函数接受输入图像,对其进行编码,执行再参量化操作,然后将隐编码解码为图像。虽然相对简单,但这种前向传递可以从几个方面说明为什么单元测试值得做。
在本文开头,我们看到的第一段代码是几乎每个人都要做的测试。我们也已经知道,这个测试是如何写成单元测试的。我们要做的唯一一件事就是添加要测试的正确形状。对于自动编码器,该形状和输入相同:
@torch.nograd()
def test_shape(self):
net = model.MLPVAE(input_shape=(1, 32, 32), bottleneck_dim=16)
inputs = torch.randn(4, 1, 32, 32)
outputs = net(x)
self.assertEqual(inputs.shape, outputs.shape)
这同样很简单,但有助于找到一些最恼人的 Bug。例如,在将模型输出从平面表示重新塑形时忘记添加通道维度。
对于我们的测试,最后要增加的是torch.nograd
装饰器。它告诉 PyTorch 这个函数不需要记录梯度,为我们带来了小幅的加速。对于单个测试来说可能不是很多,但是,你永远不知道需要编写多少测试。同样,对于单元测试,还有一句名言:
测试运行要快。否则,没有人会想要运行它们。
在开发期间,单元测试会运行得非常频繁。如果测试运行时间很长,那么你可能会跳过它们。
大多数时候,在 CPU 上训练深度神经网络都非常慢。这就是为什么我们使用 GPU 来加速。为此,我们所有的模型参数都必须驻留在 GPU 上。因此,我们应该断言我们的模型可以在设备(CPU 和多个 GPU)之间正确地移动。
我们可以用一个常见的错误来说明我们的示例 VAE 中存在的问题。从中可以看到,bottleneck
函数执行再参量化操作:
def bottleneck(self, mu, log_sigma):
noise = torch.randn(mu.shape)
latent_code = log_sigma.exp() * noise + mu
return latent_code
它接收潜在先验参数,从标准高斯分布中采样生成一个noise
张量,并使用参数对其进行变换。这在 CPU 上运行没有问题,但当模型移到 GPU 时会失败。Bug 原因是noise
张量默认是在 CPU 内存中创建的,不会移动到模型所在的设备上。一个简单的 Bug,一个简单的解决方案。我们只要用noise = torch.randn_like(mu)
替换这行有问题的代码。这就创建了一个与张量mu
形状相同的张量noise
,并且在相同的设备上。
可以帮助我们尽早捕获此类 Bug 的测试简单明了:
@torch.no_grad()
@unittest.skipUnless(torch.cuda.is_available(), 'No GPU was detected')
def test_device_moving(self):
net = model.MLPVAE(input_shape=(1, 32, 32), bottleneck_dim=16)
net_on_gpu = net.to('cuda:0')
net_back_on_cpu = net_on_gpu.cpu()
inputs = torch.randn(4, 1, 32, 32)
torch.manual_seed(42)
outputs_cpu = net(inputs)
torch.manual_seed(42)
outputs_gpu = net_on_gpu(inputs.to('cuda:0'))
torch.manual_seed(42)
outputs_back_on_cpu = net_back_on_cpu(inputs)
self.assertAlmostEqual(0., torch.sum(outputs_cpu - outputs_gpu.cpu()))
self.assertAlmostEqual(0., torch.sum(outputs_cpu - outputs_back_on_cpu))
我们把网络从 CPU 移到 GPU,然后再移回来,只是为了确保模型可移动。现在,我们的网络有了三份拷贝(网络移动时对它们进行了复制),并使用相同的输入张量向前传递。如果网络移动没问题,那么前向传递应该正常运行而不抛出错误,并且每次产生相同的输出。
要运行这个测试,我们显然需要一个 GPU,但也许我们想在笔记本电脑上做一些快速测试。如果 PyTorch 没有检测到 GPU,那么unittest.skipUnless
装饰器让我们可以跳过测试。这样可以避免失败的测试弄乱测试结果。
你还可以看到,我们在每一轮中都固定了 PyTorch 的随机种子。我们必须这样做,因为 VAE 是非确定性的,要不然我们会得到不同的结果。这说明了深度学习代码单元测试的另一个重要概念:
控制测试中的随机性。
如果不能确保模型达到罕见的边界条件,那么该如何测试模型的边界情况呢?如何确保模型的输出是确定的?如何知道一个测试的失败是由于随机的偶然还是由于你引入的 Bug ?通过手动设置深度学习框架的种子,可以消除等式中的随机性。此外,还应该将 CuDNN 设置为确定性模式。这主要影响卷积,但无论如何是一个好主意。要进一步了解如何保证 PyTorch 的确定性,请查阅文档。
注意,固定你正在使用的所有框架的种子。Numpy 和内置的Python 随机数生成器有它们自己的种子,必须分别设置。下面这样一个效用函数是很有用的:
def make_deterministic(seed=42):
# PyTorch
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
# Numpy
np.random.seed(seed)
# Built-in Python
random.seed(seed)
在 99.99% 的情况下,你都想用某种形式的随机梯度下降来训练模型。你给模型一个(迷你)批次的样本,并计算它们的平均损失。批量处理训练样本的前提是你的模型可以逐个处理样本,就像你可以分别喂给它一样。换句话说,模型在处理批次中的样本时不会相互影响。这个假设很脆弱,如果在一个错误的张量维度上进行错位重塑或聚合,就会打破这个假设。
下面的测试通过执行与输入相关的前向和后向传递来检查样本独立性。在对批次损失取均值之前,我们把损失乘以零。如果我们的模型可以保证样本独立性,就会产生一个零梯度。我们唯一需要断言的是,掩蔽样本梯度是否为零:
def test_batch_independence(self):
inputs = torch.randn(4, 1, 32, 32)
inputs.requires_grad = True
net = model.MLPVAE(input_shape=(1, 32, 32), bottleneck_dim=16)
# Compute forward pass in eval mode to deactivate batch norm
net.eval()
outputs = net(inputs)
net.train()
# Mask loss for certain samples in batch
batch_size = inputs[0].shape[0]
mask_idx = torch.randint(0, batch_size, ())
mask = torch.ones_like(outputs)
mask[mask_idx] = 0
outputs = outputs * mask
# Compute backward pass
loss = outputs.mean()
loss.backward()
# Check if gradient exists and is zero for masked samples
for i, grad in enumerate(inputs.grad):
if i == mask_idx:
self.assertTrue(torch.all(grad == 0).item())
else:
self.assertTrue(not torch.all(grad == 0))
如果你仔细阅读了上述代码片段,就会注意到我们将模型设置为评估模式。这是因为批标准化(batch normalization)违反了我们上面的假设。运行均值和标准差对批次样本造成了交叉污染,所以我们通过评估模式停用样本更新。我们可以这样做,是因为我们的模型在训练和评估模式中的行为一致。如果你的模型不一致,那么为了测试,你将不得不另找一种方法来停用它。一个选项是临时用实例标准化来替代它。 上面的测试函数非常通用,可以按原样复制。如果你的模型接受多个输入则不在此列。有必要添加额外的处理代码。
下一个测试也与梯度有关。当网络架构变得越来越复杂时,比如 Inception 结构,很容易构建死子图。死子图是网络的一部分,其中包含可学习参数,要么前向传递不用,要么后向传递不用或两者都不用。这就像在构造函数中构建一个网络层,然后忘了在forward
函数中应用它。
要找出这些死子图,可以运行优化步骤并检查网络参数的梯度:
def test_all_parameters_updated(self):
net = model.MLPVAE(input_shape=(1, 32, 32), bottleneck_dim=16)
optim = torch.optim.SGD(net.parameters(), lr=0.1)
outputs = net(torch.randn(4, 1, 32, 32))
loss = outputs.mean()
loss.backward()
optim.step()
for param_name, param in self.net.named_parameters():
if param.requires_grad:
with self.subTest(name=param_name):
self.assertIsNotNone(param.grad)
self.assertNotEqual(0., torch.sum(param.grad ** 2))
parameters
函数返回的模型的所有参数在优化步骤后都应该有一个梯度张量。此外,我们所使用的损失不应该是零。测试假设模型中的所有参数都需要梯度。即使是那些不应该被更新的参数也会首先检查requires_grad
标志。如果任何参数在测试中失败,则子测试的名称可以提示你在哪里查找问题。
现在,我们已经编写了模型的所有测试,我们可以将它们作为一个整体分析一下。我们注意到,这些测试有两个共同点。所有测试都从创建模型和定义样本输入批次开始。与往常一样,这种冗余级别有可能导致拼写错误和不一致。此外,你不希望在更改模型的构造函数时分别更新每个测试。
幸运的是,unittest
针对这个问题提供了一个简单的解决方案,即setUp
函数。这个函数在执行TestCase
中的每个测试函数之前被调用,通常为空。通过在setUp
中将模型和输入定义为TestCase
的成员变量,我们可以在一个地方初始化测试组件。
class TestVAE(unittest.TestCase):
def setUp(self):
self.net = model.MLPVAE(input_shape=(1, 32, 32), bottleneck_dim=16)
self.test_input = torch.random(4, 1, 32, 32)
... # Test functions
现在,我们用相应的成员变量替换net
和input
的每次出现,这样就完成了。如果你想更进一步,在所有测试中使用相同的模型实例,那么你可以使用setUpClass
。这个函数在构造TestCase
时调用一次。如果构造速度很慢,并且你不想多次进行构建,那么这会非常有用。
在这一点上,我们有一个简洁的系统来测试我们的 VAE 模型。我们可以轻松地添加测试,并且可以确保每次都测试模型的相同版本。但是如果你想引入一种新的 VAE,一个带卷积层的,会发生什么呢?它将在相同的数据上运行,行为也应该相同,因此同样的测试也适用。
直接复制整个TestCase
显然不是首选的解决方案,但是使用setUp
,我们已经在正确的轨道上了。我们将所有测试函数转移到一个基类中,把setUp
作为一个抽象函数。
class AbstractTestVAE(unittest.TestCase):
def setUp(self):
raise NotImplementedError
... # Test functions
IDE 会提示,该类没有成员变量net
和test_input
,但 Python 并不关心这些。只要子类添加了它们,就没有问题。对于我们想要测试的每个模型,我们会创建这个抽象类的一个子类,在其中实现setUp
。为多个模型或同一个模型的多个配置创建TestCases
就像下面的代码这么简单:
class TestCNNVAE(AbstractTestVAE):
def setUp(self):
self.test_inputs = torch.randn(4, 1, 32, 32)
self.net = model.CNNVAE(input_shape=(1, 32, 32), bottleneck_dim=16)
class TestMLPVAE(AbstractTestVAE):
def setUp(self):
self.test_inputs = torch.randn(4, 1, 32, 32)
self.net = model.MLPVAE(input_shape=(1, 32, 32), bottleneck_dim=16)
极好。现在只剩下一个问题了。unittest
包发现并运行unittest.TestCase
的所有子类。因为其中包括不能实例化的抽象基类,所以我们的套件中总会有一个失败的测试。
解决方案是由一个流行的设计模式提出的。删除作为AbstractTestVAE
父类的TestCase
,它就不会再被发现了。取而代之,我们使具体的测试类有两个父类:TestCase
和AbstractTestVAE
。抽象类和具体类之间的关系不再是父类和子类之间的关系,而是具体类使用抽象类提供的共享功能。这种模式称为MixIn。
class AbstractTestVAE:
...
class TestCNNVAE(unittest.TestCase, AbstractTestVAE):
...
class TestMLPVAE(unittest.TestCase, AbstractTestVAE):
...
父类的顺序很重要,因为方法查找是从左到右进行的。这意味着TestCase
将覆盖AbstractTestVAE
的共享方法。在我们的例子中,这不是问题,但你最好知道下。
学习系统的最后一个组成部分是训练器类。它将所有的组件(数据集、优化器和模型)整合在一起,使用它们来训练模型。此外,它还实现了一个评价例程,输出测试数据上的平均损失。在训练时,所有的损失和度量都会被写入一个 TensorBoard 事件文件,以便实现可视化。
在这一部分中,编写可重用测试是最困难的,因为其实现自由度最高。有些从业者只在脚本文件中使用简单的代码进行训练,有些人将其封装在函数中,还有一些人试图采用更面向对象的风格。我不会建议你首选哪种方式。我唯一要说的是,根据我的经验,恰当封装的训练器类使单元测试变得更轻松。
然而,我们会发现,我们之前学过的一些原则在这里也适用。
大多数情况下,你会很愿意从torch.nn
模块中选择一个预先实现的损失函数。但话说回来,你所选择的损失函数可能没有实现。这种情况可能是由于实现相对简单,或者因为函数太小或者太新。无论如何,如果你自己实现了它,就应该测试它。
我们的例子使用 Kulback-Leibler (KL) 散度作为整体损失函数的一部分,这在 PyTorch 中是不存在的。我们的实现是这样的:
def _kl_divergence(log_sigma, mu):
return 0.5 * torch.sum((2 * log_sigma).exp() + mu ** 2 - 1 - 2 * log_sigma)
该函数接收多变量高斯分布的标准差对数和平均值,并计算闭合形式的标准高斯分布 KL 散度。
在这种情况下,检查损失的一种方法是手工计算,然后通过硬编码进行比较。更好的方法是在另一个包中找一个参考实现,并根据其输出检查代码。幸运的是,scipy
包有一个离散 KL 散度的实现可供我们使用:
@torch.no_grad()
def test_kl_divergence(self):
mu = np.random.randn(10) * 0.25 # means around 0.
sigma = np.random.randn(10) * 0.1 + 1. # stds around 1.
standard_normal_samples = np.random.randn(100000, 10)
transformed_normal_sample = standard_normal_samples * sigma + mu
bins = 1000
bin_range = [-2, 2]
expected_kl_div = 0
for i in range(10):
standard_normal_dist, _ = np.histogram(standard_normal_samples[:, i], bins, bin_range)
transformed_normal_dist, _ = np.histogram(transformed_normal_sample[:, i], bins, bin_range)
expected_kl_div += scipy.stats.entropy(transformed_normal_dist, standard_normal_dist)
actual_kl_div = self.vae_trainer._kl_divergence(torch.tensor(sigma).log(), torch.tensor(mu))
self.assertAlmostEqual(expected_kl_div, actual_kl_div.numpy(), delta=0.05)
我们首先从标准高斯函数和一个均值和标准差不同的高斯函数中抽取一个足够大的样本。然后我们用np.histogram
函数来得到底层 PDF 的离散逼近。利用这些,我们就可以使用scipy.stats.entropy
函数获得一个 KL 散度来做比较。我们使用相对较大的delta
进行比较,因为scipy.stats.entropy
只是一个近似值。
你可能已经注意到,我们没有创建一个Trainer
对象,而是使用了TestCase
的一个成员。我们在这里使用了与模型测试相同的技巧,在setUp
函数中创建了它。在那里,我们还固定了 PyTorch 和 NumPy 的种子。因为我们不需要任何梯度,所以在这里,我们还用@torch.no_grad
来装饰函数。
我们使用 TensorBoard 来记录训练过程的损失和度量。为此,我们希望确保所有的日志都按预期写入。一种方法是在训练后打开事件文件,查找正确的事件。这是种有效的方式,但我们将采用另一种方式,借助unittest
包中一个有趣的函数:mock
。
mock
让你可以替换或封装一个函数或对象,并可以查看它是如何被调用的。我们将替换summary
写入器的add_scalar
函数,并确保以这种方式记录我们关心的所有损失和度量。
def test_logging(self):
with mock.patch.object(self.vae_trainer.summary, 'add_scalar') as add_scalar_mock:
self.vae_trainer.train(1)
expected_calls = [mock.call('train/recon_loss', mock.ANY, 0),
mock.call('train/kl_div_loss', mock.ANY, 0),
mock.call('train/loss', mock.ANY, 0),
mock.call('test/loss', mock.ANY, 0)]
add_scalar_mock.assert_has_calls(expected_calls)
assert_has_calls
函数会将预期调用列表和实际调用记录进行匹配。使用mock.ANY
表示我们不关心记录的标量值,因为我们无论如何都不可能知道。
由于我们不需要对整个数据集进行迭代,所以我们在setUp
中将训练数据配置为只有一个批次。这样可以显著地提高测试速度。
最后一个问题也是最难回答的。我的训练最终会收敛吗?要确切地回答这个问题,我们需要用所有的数据进行一次完整的训练并进行评估。
由于这非常耗时,我们将使用一种更快的方法。我们将看看我们的训练是否能使模型在单个批次的数据上过度拟合。测试函数相当简单:
def test_overfit_on_one_batch(self):
self.vae_trainer.train(500)
self.assertGreaterEqual(30, self.vae_trainer.eval())
如前一节所述,setUp
函数创建训练器时使用了只包含一个批次的数据集。此外,我们还使用训练数据作为测试数据。这样,我们就可以从eval
函数中得到训练批次的损失,并将其与我们的预期损失进行比较。
对于分类问题,当完全过度拟合时,我们希望损失为零。VAE
的问题是,它是一个非确定性生成模型,零损失是不现实的。这就是为什么我们的预期损失为 30 的原因,这相当于每像素的误差为 0.04。
这是迄今为止运行时间最长的测试,因为它迭代了 500 次。最终,在我的笔记本电脑上,它用了大约一分半钟,这还算合理。在没有 GPU 的机器上,为了进一步提升速度,我们可以在setUp
中添加下面这行代码:
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
这样一来,如果有 GPU,我们就可以利用它,如果没有,就利用 CPU 进行训练。
在我们进行日志记录时,你可能会注意到,针对训练器的单元测试往往会使你的文件夹里满是事件文件。为了避免这种情况,我们使用tempfile
包为训练器创建一个临时日志目录。测试结束后,我们只需要删除该目录及其中的内容。为此,我们使用了孪生函数setUp
和tearDown
。该函数在每个测试函数后调用,清理过程如下所示,非常简单:
def tearDown(self):
shutil.rmtree(self.log_dir)
在文章最后,让我们评估一下我们这么费事得到了什么。
针对这个小例子,我们编写了一个包含 58 个单元测试的测试套件,整个运行需要大约三分半钟(在我 2012 款的笔记本电脑上)。对于这 58 个测试,我们只编写了 20 个函数。所有测试都可以确定地、独立地运行。如果有 GPU,我们可以运行额外的测试。大多数测试,例如数据集和模型测试,都可以在其他项目中轻松重用。我们可以:
setUp
和tearDown
函数一致地初始化和清理测试;torch.no_grad
装饰器禁用梯度计算(可能的时候);mock
模块检查函数是否被正确调用。最后我希望,我至少能够说服某些人在他们的深度学习项目中使用单元测试,可以以本文配套的存储库作为起点。如果你认为我漏掉了你最喜欢的测试,或者你想到一种方法可以提高这些测试的可重用性,请随时与我联系。我会尽量对这篇文章做相应的更新。
查看英文原文:
https://krokotsch.eu/cleancode/2020/08/11/Unit-Tests-for-Deep-Learning.html
领取专属 10元无门槛券
私享最新 技术干货