一直以来都是用的单机单卡训练模型,虽然很多情况下已经足够了,但总有一些情况得上分布式训练:
由于还没遇到过一张显卡放不下整个模型的情况,本文的分布式训练仅限数据并行。主要从数据并行的原理和一些简单的实践例子进行说明。
与每个step一个batch数据相比,数据并行是指每个step用更多的数据(多个batch)进行计算——即多个batch的数据并行进行前向计算。既然是并行,那么就涉及到多张卡一起计算。单卡和多卡训练过程如下图1所示,主要有三个过程:
如果不使用数据并行,在显存足够的情况下,我们可以将batch_size设大,这和数据并行的区别在哪呢?如果只将batch_size设大,计算还是在一张卡上完成,速度相对来说是不如将数据均分后放在不同卡上并行计算的。当然,考虑到卡之间的通信问题,要发挥多卡并行的力量需要进行一定权衡。
torch中主要有两种数据并行方式:DP和DDP。
DP是较简单的一种数据并行方式,直接将模型复制到多个GPU上并行计算,每个GPU计算batch中的一部分数据,各自完成前向和反向后,将梯度汇总到主GPU上。其基本流程:
在DP中,只有一个主进程,主进程下有多个线程,每个线程管理一个device的训练。因此,DP中内存中只存在一份数据,各个线程间是共享这份数据的。DP和Parameter Server的方式很像。
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
# 假设我们有一个简单的数据集类
class SimpleDataset(Dataset):
def __init__(self, data, target):
self.data = data
self.target = target
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return self.data[idx], self.target[idx]
# 假设我们有一个简单的神经网络模型
class SimpleModel(nn.Module):
def __init__(self, input_dim):
super(SimpleModel, self).__init__()
self.fc = nn.Linear(input_dim, 1)
def forward(self, x):
return torch.sigmoid(self.fc(x))
# 假设我们有一些数据
n_sample = 100
n_dim = 10
batch_size = 10
X = torch.randn(n_sample, n_dim)
Y = torch.randint(0, 2, (n_sample, )).float()
dataset = SimpleDataset(X, Y)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# ===== 注意:刚创建的模型是在 cpu 上的 ===== #
device_ids = [0, 1, 2]
model = SimpleModel(n_dim).to(device_ids[0])
model = nn.DataParallel(model, device_ids=device_ids)
optimizer = optim.SGD(model.parameters(), lr=0.01)
for epoch in range(10):
for batch_idx, (inputs, targets) in enumerate(data_loader):
inputs, targets = inputs.to('cuda'), targets.to('cuda')
outputs = model(inputs)
loss = nn.BCELoss()(outputs, targets.unsqueeze(1))
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item()}')
其中最重要的一行便是:
model = nn.DataParallel(model, device_ids=device_ids)
注意,模型的参数和缓冲区都要放在device_ids[0]
上。在执行forward
函数时,模型会被复制到各个GPU上,对模型的属性进行更新并不会产生效果,因为前向完后各个卡上的模型就被销毁了。只有在device_ids[0]
上对模型的参数或者buffer进行的更新才会生效!2
DDP,顾名思义,即分布式的数据并行,每个进程独立进行训练,每个进程会加载完整的数据,但是读取不重叠的数据。DDP执行流程3:
准备阶段
init_process_group
。model = DDP(model).to(local_rank)
。准备数据
训练阶段
find_unused_parameter
为true
时,其会在forward
结束时,启动一个回溯,标记未用到的参数,提前将这些设置为ready
。ready
;ready
时,reducer开始对这个bucket的所有参数开始一个异步的all-reduce梯度平均操作;import argparse
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
# 1. 基础模块 ###
class SimpleModel(nn.Module):
def __init__(self, input_dim):
super(SimpleModel, self).__init__()
self.fc = nn.Linear(input_dim, 1)
cnt = torch.tensor(0)
self.register_buffer('cnt', cnt)
def forward(self, x):
self.cnt += 1
# print("In forward: ", self.cnt, "Rank: ", self.fc.weight.device)
return torch.sigmoid(self.fc(x))
class SimpleDataset(Dataset):
def __init__(self, data, target):
self.data = data
self.target = target
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return self.data[idx], self.target[idx]
# 2. 初始化我们的模型、数据、各种配置 ####
## DDP:从外部得到local_rank参数。从外面得到local_rank参数,在调用DDP的时候,其会自动给出这个参数
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=-1, type=int)
FLAGS = parser.parse_args()
local_rank = FLAGS.local_rank
## DDP:DDP backend初始化
torch.cuda.set_device(local_rank)
dist.init_process_group(backend='nccl')
## 假设我们有一些数据
n_sample = 100
n_dim = 10
batch_size = 25
X = torch.randn(n_sample, n_dim) # 100个样本,每个样本有10个特征
Y = torch.randint(0, 2, (n_sample, )).float()
dataset = SimpleDataset(X, Y)
sampler = torch.utils.data.distributed.DistributedSampler(dataset)
data_loader = DataLoader(dataset, batch_size=batch_size, sampler=sampler)
## 构造模型
model = SimpleModel(n_dim).to(local_rank)
## DDP: Load模型要在构造DDP模型之前,且只需要在master上加载就行了。
ckpt_path = None
if dist.get_rank() == 0 and ckpt_path is not None:
model.load_state_dict(torch.load(ckpt_path))
## DDP: 构造DDP model —————— 必须在 init_process_group 之后才可以调用 DDP
model = DDP(model, device_ids=[local_rank], output_device=local_rank)
## DDP: 要在构造DDP model之后,才能用model初始化optimizer。
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
loss_func = nn.BCELoss().to(local_rank)
# 3. 网络训练 ###
model.train()
num_epoch = 100
iterator = tqdm(range(100))
for epoch in iterator:
# DDP:设置sampler的epoch,
# DistributedSampler需要这个来指定shuffle方式,
# 通过维持各个进程之间的相同随机数种子使不同进程能获得同样的shuffle效果。
data_loader.sampler.set_epoch(epoch)
# 后面这部分,则与原来完全一致了。
for data, label in data_loader:
data, label = data.to(local_rank), label.to(local_rank)
optimizer.zero_grad()
prediction = model(data)
loss = loss_func(prediction, label.unsqueeze(1))
loss.backward()
iterator.desc = "loss = %0.3f" % loss
optimizer.step()
# DDP:
# 1. save模型的时候,和DP模式一样,有一个需要注意的点:保存的是model.module而不是model。
# 因为model其实是DDP model,参数是被`model=DDP(model)`包起来的。
# 2. 只需要在进程0上保存一次就行了,避免多次保存重复的东西。
if dist.get_rank() == 0 and epoch == num_epoch - 1:
torch.save(model.module.state_dict(), "%d.ckpt" % epoch)
结合上面的代码,一个简化版的DDP流程:
local_rank
;dist.init_process_group
;DistributedSampler
);DDP的通常启动方式:
CUDA_VISIBLE_DEVICES="0,1" python -m torch.distributed.launch --nproc_per_node 2 ddp.py
以上过程中涉及到一些陌生的概念,其实走一遍DDP的过程就会很好理解:每个进程是一个独立的训练流程,不同进程之间共享同一份数据。为了避免不同进程使用重复的数据训练,以及训练后同步梯度,进程间需要同步。因此,其中一个重点就是每个进程序号,或者说使用的GPU的序号。
node
:节点,可以是物理主机,也可以是容器;rank
和local_rank
:都表示进程在整个分布式任务中的编号。rank
是进程在全局的编号,local_rank
是进程在所在节点上的编号。显然,如果只有一个节点,那么二者是相等的。在启动脚本中的--nproc_per_node
即指定一个节点上有多少进程;world_size
:即整个分布式任务中进程的数量。本文利用pytorch进行数据并行训练进行了一个粗浅的介绍。包括DP和DDP的基本原理,以及简单的例子。实际在分布式过程中涉及到的东西还是挺多的,比如DP/DDP中梯度的回收是如何进行的,DDP中数据采样的细节,DDP中的数据同步操作等。更多的还是要基于真实的需求出发才能真的体会得到。