DeepSpeed is a deep learning optimization library that makes distributed training and inference easy, efficient, and effective.
img
argparse
变成deepspeed parser
parser = deepspeed.add_config_arguments(parser)
import argparse
import deepspeed
def add_argument():
parser=argparse.ArgumentParser(description='CIFAR')
#data
# cuda
parser.add_argument('--with_cuda', default=False, action='store_true',
help='use CPU in case there\'s no GPU support')
parser.add_argument('--use_ema', default=False, action='store_true',
help='whether use exponential moving average')
# train
parser.add_argument('-b', '--batch_size', default=32, type=int,
help='mini-batch size (default: 32)')
parser.add_argument('-e', '--epochs', default=30, type=int,
help='number of total epochs (default: 30)')
parser.add_argument('--local_rank', type=int, default=-1,
help='local rank passed from distributed launcher')
# Include DeepSpeed configuration arguments
parser = deepspeed.add_config_arguments(parser)
args=parser.parse_args()
return args
args, model结构和参数
变成deepspeed版本dataloder
可以用deepspeed.initialize
变成分布式(需要传入trainset
),也可以自定义(不需要传入trainset
)def initialize(args,
model,
optimizer=None,
model_parameters=None,
training_data=None,
lr_scheduler=None,
mpu=None,
dist_init_required=True,
collate_fn=None):
parameters = filter(lambda p: p.requires_grad, net.parameters())
args=add_argument()
# Initialize DeepSpeed to use the following features
# 1) Distributed model
# 2) Distributed data loader
# 3) DeepSpeed optimizer
# 需要传入`trainset`
model_engine, optimizer, trainloader, _ = deepspeed.initialize(args=args, model=net, model_parameters=parameters, training_data=trainset)
# 不需要传入`trainset`
model_engine, optimizer, _, _ = deepspeed.initialize(args=args, model=net, model_parameters=parameters, training_data=None)
DistributedSampler
# 开启分布式 用DistributedSampler
if local_rank >= 0:
if data_sampler is None:
data_sampler = DistributedSampler(dataset)
device_count = 1
# 不开启分布式 用 RandomSampler
else:
if data_sampler is None:
data_sampler = RandomSampler(dataset)
device_count = torch.cuda.device_count()
batch_size *= device_count
self.dataloader = DataLoader(self.dataset,
batch_size=self.batch_size,
pin_memory=self.pin_memory,
sampler=self.data_sampler,
num_workers=self.num_local_io_workers)
IterableDataset
) def __iter__(self):
self._create_dataloader()
return self
def __len__(self):
return self.len
def __next__(self):
if self.tput_timer:
self.tput_timer.start()
return next(self.data)
import torch
import logging
from torch.utils.data import DataLoader, RandomSampler
from torch.utils.data.distributed import DistributedSampler
from tqdm import tqdm
class DeepSpeedDataLoader(object):
def __init__(self,
dataset,
batch_size,
pin_memory,
local_rank,
tput_timer,
collate_fn=None,
num_local_io_workers=None,
data_sampler=None):
self.tput_timer = tput_timer
self.batch_size = batch_size
if local_rank >= 0: # 开启分布式 用DistributedSampler
if data_sampler is None:
data_sampler = DistributedSampler(dataset)
device_count = 1
else: # 不开启分布式 用 RandomSampler
if data_sampler is None:
data_sampler = RandomSampler(dataset)
device_count = torch.cuda.device_count()
batch_size *= device_count
if num_local_io_workers is None:
num_local_io_workers = 2 * device_count
self.num_local_io_workers = num_local_io_workers
self.data_sampler = data_sampler
self.dataset = dataset
self.collate_fn = collate_fn
self.device_count = device_count
self.batch_size = batch_size
self.pin_memory = pin_memory
self.len = len(self.data_sampler)
self.data = None
def __iter__(self):
self._create_dataloader()
return self
def __len__(self):
return self.len
def __next__(self):
if self.tput_timer:
self.tput_timer.start()
return next(self.data)
def _create_dataloader(self):
if self.collate_fn is None:
self.dataloader = DataLoader(self.dataset,
batch_size=self.batch_size,
pin_memory=self.pin_memory,
sampler=self.data_sampler,
num_workers=self.num_local_io_workers)
else:
self.dataloader = DataLoader(self.dataset,
batch_size=self.batch_size,
pin_memory=self.pin_memory,
sampler=self.data_sampler,
collate_fn=self.collate_fn,
num_workers=self.num_local_io_workers)
self.data = (x for x in self.dataloader)
return self.dataloader
data
放到相应的显卡上 inputs, labels = data[0].to(model_engine.local_rank), data[1].to(model_engine.local_rank)
loss
和optimizer
分别是model_engine.backward(loss)
和model_engine.step()
,不需要optimizer.zero_grad()
. (Zeroing the gradients is handled automatically by DeepSpeed after the weights have been updated using a mini-batch.) for i, data in enumerate(trainloader):
# get the inputs; data is a list of [inputs, labels]
inputs, labels = data[0].to(model_engine.local_rank), data[1].to(model_engine.local_rank)
outputs = model_engine(inputs)
loss = criterion(outputs, labels)
model_engine.backward(loss)
model_engine.step()
JSON file (ds_config.json)
{
"train_batch_size": 4,
"steps_per_print": 2000,
"optimizer": {
"type": "Adam",
"params": {
"lr": 0.001,
"betas": [
0.8,
0.999
],
"eps": 1e-8,
"weight_decay": 3e-7
}
},
"scheduler": {
"type": "WarmupLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": 0.001,
"warmup_num_steps": 1000
}
},
"wall_clock_breakdown": false
}
$ deepspeed deepspeed.py --deepspeed --deepspeed_config ds_config.json
deepspeed.init_inference()
returns an inference engine of type InferenceEngine
.本文分享自 iResearch666 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!