
点击上方“Deephub Imba”,关注公众号,好文章不错过 !
深度学习模型参数量和训练数据集的爆炸式增长,以 Llama 3.1 为例:4050 亿参数、15.6 万亿 token 的训练量,如果仅靠单 GPU可能需要数百年才能跑完,或者根本无法加载模型。
并行计算(Parallelism)通过将训练任务分发到多个 GPU(单机多卡或多机多卡),并利用通信原语同步状态,能让训练过程变得可控且高效。

本文讲详细探讨Pytorch的数据并行(Data Parallelism)
扩展训练规模无非两种方式:纵向扩展(Vertical Scaling) 和 横向扩展(Horizontal Scaling)。
纵向扩展:

简单粗暴地升级硬件。比如把 10GB 显存的显卡换成 30GB 的。这种方式不需要改动代码,原本跑不起来的模型现在能跑了或者可以调大 batch size加快训练速度。
横向扩展:

增加机器数量。比如增加 7 台同配置(10GB)的机器,通过网络互联每台机器可以挂载单卡或多卡。这种方式需要代码层面的适配,利用 PyTorch 的分布式模块进行通信。
数据并行 (Data Parallelism):

当模型本身能塞进单张 GPU,但数据量太大时,我们可以将模型复制到所有 GPU 上,将数据切分(Split),每个 GPU 并行处理一部分数据,在反向传播时同步梯度。
模型并行 (Model Parallelism):

当模型大到单张 GPU 放不下时使用。将模型切分成不同部分,分配给不同 GPU。每个 GPU 负责计算前向/后向传播中的一部分层。
实际超大模型训练中,通常是两者的混合。
在讲 DDP 之前,先回顾一个基础技巧:梯度累积(Gradient Accumulation)。PyTorch 的设计中,loss.backward() 计算出的梯度默认是累加的,而非覆盖。
import torch
# Let us define a tensor with requires_grad = True
x = torch.tensor(4.0, requires_grad=True)
# Creating a function y=x^2
y = x*x
# Calculating dy/dx
y.backward(retain_graph=True)
# retain_graph = True because pytorch automatically removes the computation
# graph and intermediate tensors once backward is called to save memory
# If we want to call backward again, we need to tell pytorch not to delete
# the computation graph and intermediate tensors
print(f"Gradient of y w.r.t x after the first backward: {x.grad}")
# Output: 8.0 as dy/dx = 2*x = 2*4
# Now let us try to call backward again
y.backward()
print(f"Gradient of y w.r.t x after the second backward: {x.grad}")
# Output = 16 because 8 from the previous backward is added up here利用这个特性,当大 Batch Size 导致 OOM 时,可以将其切分为多个 Mini-batch,连续执行多次 backward() 累积梯度,最后执行一次 optimizer.step()。这是单卡解决显存瓶颈的常用手段。
PyTorch 的 DistributedDataParallel (DDP) 是实现数据并行的核心模块,基于 c10d 的 ProcessGroup 进行通信,每个进程(Process)通常控制一个 GPU 并持有一个模型副本。
DDP 的标准执行流程如下:

分布式训练中,点对点(Point-to-Point)通信效率太低。假设要把 5MB 参数发给 5 个节点,逐个发送会导致耗时线性增长。集合通信(Collective Communication) 利用拓扑结构(如树状、环状)并行传输,是高性能计算的基石。

DDP 主要依赖两个原语:
在分布式环境中,节点故障是常态,一旦某个 Rank 挂了,重启整个集群从零训练成本过高。必须使用 Checkpointing:定期将模型权重写入共享存储(Shared Disk)。

这样恢复训练时,可以从最新的 Checkpoint 加载权重。这里需要注意的是只允许 Rank 0 写入 Checkpoint,避免多进程同时写文件造成损坏。
下面通过代码演示 DDP 的完整流程。先以 CPU 模拟多进程环境,再迁移到 GPU。
基础组件:Dataset 与 Mode
import torch
import torch.nn as nn
from torch.utils.data import Dataset
class SimpleDataset(Dataset):
def __init__(self, size=100):
self.size = size
self.data = torch.randn(size, 10) # generate 100 samples each having dimension 10
self.labels = torch.randn(size, 1) # generate labels corresponding to each sample
def __len__(self):
return self.size
def __getitem__(self, idx):
return self.data[idx], self.labels[idx]
class SimpleModel(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(10, 5)
self.fc2 = nn.Linear(5, 1)
def forward(self, x):
x = torch.relu(self.fc1(x))
return self.fc2(x)初始化环境
setup 函数负责建立进程组。
import os
import torch.distributed as dist
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost' # IP of the "master" node
os.environ['MASTER_PORT'] = '12355' # Port of communication
# If we have 4 processes, each process independently calls setup() with
# its own rank
dist.init_process_group(backend='gloo', rank=rank, world_size=world_size)
# 'gloo' is the collective communication backend used for CPU
# nccl is used for CUDA
print(f"\n{'='*60}")
print(f"[Rank {rank}] Process initialized!")
print(f"[Rank {rank}] Backend: {dist.get_backend()}")
print(f"[Rank {rank}] World Size: {dist.get_world_size()}")
print(f"{'='*60}\n")数据切分:DistributedSampler
这是数据并行的关键。DistributedSampler 会根据 Rank ID 自动切分数据集,确保每个进程拿到不重叠的数据子集。
注意:必须在每个 epoch 开始前调用 set_epoch(epoch),否则每个 epoch 的数据切分顺序将完全一样导致模型只见过部分数据,影响泛化能力。
# Example usage conceptual
# Create DistributedSampler for each rank
# sampler_rank0 = DistributedSampler(dataset, num_replicas=4, rank=0)
# ...
# Loop
for epoch in range(num_epochs):
train_sampler.set_epoch(epoch) # Different shuffle each epoch
for batch in train_loader:
# Training code核心训练 Loop (Worker)
from torch.nn.parallel import DistributedDataParallel as DDP
def print_separator(rank, message):
print(f"\n[Rank {rank}] {'-'*40}")
print(f"[Rank {rank}] {message}")
print(f"[Rank {rank}] {'-'*40}")
def train_worker(rank, world_size, num_epochs=2, batch_size=8):
# setup the distributed environment
setup(rank, world_size)
model = SimpleModel()
# wrap the model with DDP
# This is where weights are synchronized across ranks
ddp_model = DDP(model)
dataset = SimpleDataset(size=32)
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=False)
dataloader = DataLoader(dataset, batch_size=batch_size, sampler=sampler)
optimizer = optim.SGD(ddp_model.parameters(), lr=0.01)
criterion = nn.MSELoss()
for epoch in range(num_epochs):
sampler.set_epoch(epoch) # Ensure different shuffling per epoch
epoch_loss = 0.0
for batch_idx, (data, target) in enumerate(dataloader):
optimizer.zero_grad()
output = ddp_model(data)
loss = criterion(output, target)
# Backward pass - THIS IS WHERE DDP MAGIC HAPPENS
loss.backward()
# Gradients are synchronized (All-Reduce) here automatically
optimizer.step()
epoch_loss += loss.item()
dist.destroy_process_group()
print(f"[Rank {rank}] Training completed and cleaned up!\n")验证集不能像训练集那样随意。有两种处理策略:
Rank 0 独占:只在 Rank 0 上跑全量验证集。这个方法比较简单但会造成其他 GPU 等待所以效率低。
分布式验证:各 Rank 跑一部分最后聚合 Loss 和样本数,一般都会用这个方案。
def validate(model, val_loader, criterion, rank, epoch):
model.eval()
val_loss = 0.0
num_samples = 0
with torch.no_grad():
for batch_idx, (data, target) in enumerate(val_loader):
output = model(data)
loss = criterion(output, target)
val_loss += loss.item() * data.size(0)
num_samples += data.size(0)
# Aggregate metrics across all processes
total_loss_tensor = torch.tensor([val_loss])
total_samples_tensor = torch.tensor([num_samples])
# Sum across all processes
dist.all_reduce(total_loss_tensor, op=dist.ReduceOp.SUM)
dist.all_reduce(total_samples_tensor, op=dist.ReduceOp.SUM)
global_avg_loss = total_loss_tensor.item() / total_samples_tensor.item()
return global_avg_loss启动进程,CPU 演示通常用 mp.spawn:
defmain():
world_size=2
mp.spawn(
demo_worker,
args=(world_size,),
nprocs=world_size,
join=True
)在实际 GPU 训练中,需要做 5 点改动:
gloo ->nccl (NVIDIA 专用,速度最快)。model.cuda(rank)。DDP(model, device_ids=[rank])。data.cuda(rank)。torch.cuda.set_device(rank)。启动方式不再推荐使用mp.spawn,而是直接使用Torch自带的CLI工具 torchrun,它能自动处理环境变量(RANK, WORLD_SIZE, LOCAL_RANK)并支持故障重启。
# Code expects env vars
rank = int(os.environ["RANK"])
local_rank = int(os.environ["LOCAL_RANK"])
world_size = int(os.environ["WORLD_SIZE"])
demo_worker(rank, world_size) 运行命令:
torchrun --nproc_per_node=4 train.pyPyTorch DDP 之所以快,不仅仅是因为分了数据,更在于它对通信的优化。
通信与计算重叠 (Communication Overlap)
我们可能认为要等所有层的梯度算完再同步,但这会导致 GPU 出现长时间空闲(Wait)。所以DDP 的做法是一旦某层的梯度算出来,如果不被依赖,就立刻发送同步。

如上图,Layer 4 的梯度一算好,在计算 Layer 3 的同时,Rank 0 已经在处理 Layer 4 的同步了。
分桶 (Bucketing)
为了避免频繁发送小包导致网络拥塞,DDP 会将梯度打包进 Bucket(默认 25MB)。
当一个 Bucket 被填满(例如包含 Layer 4, 5, 6 的梯度),就会触发一次 All-Reduce。这种批量处理大幅降低了通信开销。

这是一个为您准备的结尾总结,保持了之前设定的专业且行动导向的风格,同时也呼应了原作者关于“下一篇讲解模型并行”的预告:
我们已经拆解了 PyTorch DDP 的核心运作机制:从底层的 ProcessGroup 通信握手,到梯度的 All-Reduce 同步,再到 Bucket 分桶与计算通信重叠的性能优化。掌握这些,你已经具备了将单卡代码低成本迁移到多卡集群的能力,不再受限于单机的训练时长。
作者:Trinanjan Mitra
本文分享自 DeepHub IMBA 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!