编者按: 如今传统的单机单卡模式已经无法满足超大模型进行训练的要求,如何更好地、更轻松地利用多个?GPU?资源进行模型训练成为了人工智能领域的热门话题。
我们今天为大家带来的这篇文章详细介绍了一种名为?DDP(Distributed Data Parallel)的并行训练技术,作者认为这项技术既高效又易于实现。
文章要点如下:
(1)DDP?的核心思想是将模型和数据复制到多个?GPU 上并行训练,然后汇总平均梯度。
(2)DDP?比传统的?DP?模式更加高效,可以轻松扩展到多节点,同时也介绍了?DDP?的局限性。
(3)DDP?的?Python?实现非常简洁,主要分为进程初始化、设置?Distributed DataLoader(分布式数据加载器)和模型训练与测试三步。
(4)文中还解析了?DDP?中?Node、Master Node、Local Rank、Global Rank?等关键术语的具体含义。
(5)提供了从单?GPU?到单节点多?GPU?再到多节点场景的?DDP?应用案例源代码。
DDP?要求将整个模型加载到一个GPU上,这使得大模型的训练需要使用额外复杂的设置进行模型分片。期待未来有更多简单、高效、易用,还能满足大模型场景的模型训练并行技术出现!
作者 | Fran?ois Porcher
编译?|?岳扬
🚢🚢🚢欢迎小伙伴们加入AI技术软件及技术交流群,追踪前沿热点,共探技术难题~
本文将介绍一种名为?DDP?(Distributed Data Parallel)的技术,使用这种技术可以实现同时在多个?GPU?上训练模型。
我上学的时候只能用云服务平台的?GPU?进行训练。然而,当我进入企业上班后,情况就不同了。如果你所在的公司在人工智能领域投入了大量资源,特别是如果你在一家科技巨头公司工作,那么很可能你可以随时使用大量的GPU集群。
本教程旨在让读者掌握如何同时利用多个GPU,实现快速高效的训练。而且,也许会让你惊讶的是,这种技术比你想象的还要简单!在你继续阅读本文之前,建议先去充分了解?PyTorch(一种机器学习框架)相关内容,包括其核心组件,如Datasets、?DataLoaders、Optimizers、CUDA?和?Training loop。
一开始,我也认为?DDP?是一种复杂的、几乎无法实现的技术,认为它需要一个庞大的技术团队来建立必要的基础设施。不过,我向你们保证,DDP不仅直观易懂,而且简洁明了,只需要几行代码就可以实现。 让我们一起踏上这段充满启迪的旅程吧!
分布式数据并行(DDP)是一个简单明了的概念。假如我们拥有一个由?4?个?GPU?组成的?GPU?集群。在DDP中,我们将相同的模型复制到每个GPU上进行训练。每个GPU都有自己的优化器,用于更新模型的参数。重点在于数据的划分。(译者注:通常情况下,我们将训练数据划分为多个?mini-batches,然后将这些?mini-batches 分配给多个GPU进行并行处理。每个GPU独立地计算梯度和更新参数,然后将结果同步到其他GPU上。)
DDP,图片摘自?PyTorch?教程[1]
如果你对深度学习比较熟悉,应该会知道?DataLoader,这是一种将数据集划分成不同?batches?的工具。通常情况下,我们会将整个数据集分成多个?batches ,模型在每个?batch?上进行计算,并根据计算结果更新模型参数。
DDP?进一步细化了这一过程,将每个?batch 划分为?“sub-batches”。实质上,每个模型副本都会处理?primary batch?的一个部分,从而让每个?GPU?都能独立地计算梯度,并根据其处理的数据片段来更新模型的参数。
在DDP中,我们通过一种名为?DistributedSampler?的工具将?batch 分成?sub-batches ,如下图所示:
DDP,图片摘自?PyTorch?教程[1]
在将每个?sub-batch 分配给各个GPU后,每个GPU都会独立地对其所处理的数据进行计算,并计算出自己独特的梯度(gradient)。
DDP,图片摘自?PyTorch?教程[1]
DDP,图片摘自?PyTorch?教程[1]
在深入代码之前,先了解?DDP?技术相关术语的含义十分重要。来解释一下这些术语的含义:
从这个角度来看,如果只使用一台机器,情况就会简单明了,因为?Local Rank?等同于?Global Rank。
可以用一张图片来说明这一点:
Local rank,图片摘自?PyTorch?教程[2]
Global?rank,图片摘自?PyTorch?教程[2]
分布式数据并行(DDP)在许多深度学习工作流中都起到了变革性的作用,但了解其局限性也很重要。
DDP?的局限性主要在于其内存消耗。使用?DDP?时,每个?GPU?都会加载模型副本、优化器和对应?batch?的数据副本。
GPU?的内存大小通常从几?GB?到?80GB?不等。对于较小的模型,使用单个?GPU?都不是问题。但是,当涉及大语言模型(LLM)领域或类似于?GPT?的架构时,单个?GPU?的内存可能就不够用了。
在计算机视觉领域,虽然有大量轻量级模型,但当?batch sizes 增加时,特别是在涉及三维图像或物体检测任务的场景下,就会面临挑战。
全分片数据并行(Fully Sharded Data Parallel,FSDP)应运而生了。FSDP不仅将数据分布到不同的GPU上,还将模型和优化器的状态也分散到各个?GPU?的内存中。虽然这种方法看起来很好,但?FSDP?增多了?GPU?之间的通信,可能会降低训练速度。
总之:
在?PyTorch?的介绍网页中,其实是有两个选项的:?DP?和?DDP。但本文此处提及这一内容只是为了避免读者迷失或混淆:?实践中只需使用?DDP,它更快速,而且不局限于单个?Node。
DP?和?DDP?的比较,内容摘自?PyTorch?教程[3]
实现分布式深度学习比我们想象的要更简单。它的美妙之处在于,我们不再需要被复杂的GPU配置或梯度分布所困扰。
可以在以下链接找到所有的代码模板和脚本:
https://github.com/FrancoisPorcher/awesome-ai-tutorials
下面是详细步骤分解:
首先,编写下面这段代码,在单?GPU?上加载数据集、创建模型并进行end-to-end(端到端)的训练。这是项目的起点:
import?torch
import?torch.nn.functional?as?F
from?torch.utils.data?import?Dataset,?DataLoader
from?sklearn.datasets?import?load_wine
from?sklearn.model_selection?import?train_test_split
from?sklearn.preprocessing?import?StandardScaler
import?numpy?as?np
class WineDataset(Dataset):
def __init__(self,?data,?targets):
????????self.data?=?data
????????self.targets?=?targets
def __len__(self):
return len(self.data)
def __getitem__(self,?idx):
return?torch.tensor(self.data[idx],?dtype=torch.float),?torch.tensor(self.targets[idx],?dtype=torch.long)
class SimpleNN(torch.nn.Module):
def __init__(self):
super(SimpleNN,?self).__init__()
????????self.fc1?=?torch.nn.Linear(13, 64)
????????self.fc2?=?torch.nn.Linear(64, 3)
def forward(self,?x):
????????x?=?F.relu(self.fc1(x))
????????x?=?self.fc2(x)
return?x
class Trainer():
def __init__(self,?model,?train_data,?optimizer,?gpu_id,?save_every):
????????self.model?=?model
????????self.train_data?=?train_data
????????self.optimizer?=?optimizer
????????self.gpu_id?=?gpu_id
????????self.save_every?=?save_every
????????self.losses?= []
def _run_batch(self,?source,?targets):
????????self.optimizer.zero_grad()
????????output?=?self.model(source)
????????loss?=?F.cross_entropy(output,?targets)
????????loss.backward()
????????self.optimizer.step()
return?loss.item()
def _run_epoch(self,?epoch):
????????total_loss?= 0.0
????????num_batches?= len(self.train_data)
for?source,?targets?in?self.train_data:
????????????source?=?source.to(self.gpu_id)
????????????targets?=?targets.to(self.gpu_id)
????????????loss?=?self._run_batch(source,?targets)
????????????total_loss?+=?loss
????????avg_loss?=?total_loss?/?num_batches
????????self.losses.append(avg_loss)
print(f"Epoch?{epoch},?Loss:?{avg_loss:.4f}")
def _save_checkpoint(self,?epoch):
????????checkpoint?=?self.model.state_dict()
????????PATH?= f"model_{epoch}.pt"
????????torch.save(checkpoint,?PATH)
print(f"Epoch?{epoch}?|?Model saved to?{PATH}")
def train(self,?max_epochs):
????????self.model.train()
for?epoch?in range(max_epochs):
????????????self._run_epoch(epoch)
if?epoch?%?self.save_every?== 0:
????????????????self._save_checkpoint(epoch)
def load_train_objs():
????wine_data?=?load_wine()
????X?=?wine_data.data
????y?=?wine_data.target
#?Normalize and split
????X_train,?X_test,?y_train,?y_test?=?train_test_split(X,?y,?test_size=0.2,?random_state=42)
????scaler?=?StandardScaler().fit(X_train)
????X_train?=?scaler.transform(X_train)
????X_test?=?scaler.transform(X_test)
????train_set?=?WineDataset(X_train,?y_train)
????test_set?=?WineDataset(X_test,?y_test)
print("Sample from dataset:")
????sample_data,?sample_target?=?train_set[0]
print(f"Data:?{sample_data}")
print(f"Target:?{sample_target}")
????model?=?SimpleNN()
????optimizer?=?torch.optim.Adam(model.parameters(),?lr=0.001)
return?train_set,?model,?optimizer
def prepare_dataloader(dataset,?batch_size):
return?DataLoader(dataset,?batch_size=batch_size,?pin_memory=True,?shuffle=True)
def main(device,?total_epochs,?save_every,?batch_size):
????dataset,?model,?optimizer?=?load_train_objs()
????train_data?=?prepare_dataloader(dataset,?batch_size)
????trainer?=?Trainer(model,?train_data,?optimizer,?device,?save_every)
????trainer.train(total_epochs)
main(device=torch.device("cuda:0" if?torch.cuda.is_available() else "cpu"),?total_epochs=100,?save_every=50,?batch_size=32)
现在,我们将在一个?Node?上使用所有?GPU,步骤如下:
对于所需的库,我们可以通过以下操作导入:
import?torch.multiprocessing?as?mp
from?torch.utils.data.distributed?import?DistributedSampler
from?torch.nn.parallel?import?DistributedDataParallel?as?DDP
from?torch.distributed?import?init_process_group,?destroy_process_group
import?os
如果在一个?Node?上有8个GPU,我们将会调用以下函数?8?次,为每个GPU设置一个单独的进程,并且为每个进程指定正确的local_rank参数。
def ddp_setup(rank,?world_size):
"""
????Set up the distributed environment.
????Args:
????????rank:?The rank of the current process.?Unique identifier for each process in the distributed training.
????????world_size:?Total number of processes participating in the distributed training.
????"""
#?Address of the main node.?Since we are doing single-node training,?it's set to localhost.
????os.environ["MASTER_ADDR"] = "localhost"
#?Port on which the master node is expected to listen for communications from workers.
????os.environ["MASTER_PORT"] = "12355"
#?Initialize the process group.?
#?'backend'?specifies the communication backend to be used,?"nccl"?is optimized for GPU training.
????init_process_group(backend="nccl",?rank=rank,?world_size=world_size)
#?Set the current CUDA device to the specified device?(identified by rank).
#?This ensures that each process uses a different GPU in a multi-GPU setup.
????torch.cuda.set_device(rank)
关于该函数的一些解释:
然后需要对?Trainer?类稍作更改。我们只需用?DDP?函数对模型进行封装即可:
class Trainer():
def __init__(self,?model,?train_data,?optimizer,?gpu_id,?save_every):
????????self.model?=?model.to(gpu_id)
????????self.train_data?=?train_data
????????self.optimizer?=?optimizer
????????self.gpu_id?=?gpu_id
????????self.save_every?=?save_every
????????self.losses?= []
#?This changes
????????self.model?=?DDP(self.model,?device_ids=[gpu_id])
Trainer?类的其他部分都是一样的,amazing!
这种情况下需要调整数据加载器(dataloader),以便在多GPU训练中正确地将批次数据分发到每个GPU上进行处理。
def prepare_dataloader(dataset:?Dataset,?batch_size: int):
return?DataLoader(
????????dataset,
????????batch_size=batch_size,
????????pin_memory=True,
????????shuffle=False,
????????sampler=DistributedSampler(dataset)
)
现在,我们可以修改?main?函数,每个进程都将调用该函数(本文这种情况是调用?8?次):
def main(rank: int,?world_size: int,?save_every: int,?total_epochs: int,?batch_size: int):
"""
????Main training function for distributed data parallel?(DDP)?setup.
????Args:
????????rank?(int):?The rank of the current process?(0?<=?rank?<?world_size).?Each process is assigned a unique rank.
????????world_size?(int):?Total number of processes involved in the distributed training.
????????save_every?(int):?Frequency of model checkpoint saving,?in terms of epochs.
????????total_epochs?(int):?Total number of epochs for training.
????????batch_size?(int):?Number of samples processed in one iteration?(forward and backward pass).
????"""
#?Set up the distributed environment,?including setting the master address,?port,?and backend.
????ddp_setup(rank,?world_size)
#?Load the necessary training objects?-?dataset,?model,?and optimizer.
????dataset,?model,?optimizer?=?load_train_objs()
#?Prepare the data loader for distributed training.?It partitions the dataset across the processes and handles shuffling.
????train_data?=?prepare_dataloader(dataset,?batch_size)
#?Initialize the trainer instance with the loaded model,?data,?and other configurations.
????trainer?=?Trainer(model,?train_data,?optimizer,?rank,?save_every)
#?Train the model for the specified number of epochs.
????trainer.train(total_epochs)
#?Cleanup the distributed environment after training is complete.
????destroy_process_group()
最后,在执行脚本时,我们将需要启动8个进程。这可以通过使用mp.spawn()函数来实现(译者注:mp.spawn()函数是PyTorch提供的用于在多个进程中启动训练任务的功能,它可以方便地启动多个进程,并为每个进程分配相应的GPU和其他资源。):
if?__name__?== "__main__":
import?argparse
????parser?=?argparse.ArgumentParser(description='simple distributed training job')
????parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
????parser.add_argument('save_every', type=int, help='How often to save a snapshot')
????parser.add_argument('--batch_size',?default=32, type=int, help='Input batch size on each device?(default:?32)')
????args?=?parser.parse_args()
????world_size?=?torch.cuda.device_count()
????mp.spawn(main,?args=(world_size,?args.save_every,?args.total_epochs,?args.batch_size),?nprocs=world_size)
恭喜您到达了最后一步!这一步是在不同?Node?上调用所有可用的?GPU。如果您理解了前文所做的工作,这一步就非常容易了。
在跨多个?Node?进行扩展时,关键区别在于从?local_rank?到?global_rank?的转变。这一点十分重要,因为每个进程都需要一个唯一的标识符。例如,如果使用两个?Node?,每个?Node?有?8?个?GPU,那么进程?0?和进程?8?的?local_rank?都是?0。
global_rank 的计算公式非常直观:
global_rank?=?node_rank?*?world_size_per_node?+?local_rank
因此,我们首先要修改?ddp_setup?函数:
def ddp_setup(local_rank,?world_size_per_node,?node_rank):
????os.environ["MASTER_ADDR"] = "MASTER_NODE_IP" #?<--?Replace with your master node IP
????os.environ["MASTER_PORT"] = "12355"
????global_rank?=?node_rank?*?world_size_per_node?+?local_rank
????init_process_group(backend="nccl",?rank=global_rank,?world_size=world_size_per_node*torch.cuda.device_count())
????torch.cuda.set_device(local_rank)
还需要调整主函数,该函数现在需要接受world_size_per_node作为参数。
def main(local_rank: int,?world_size_per_node: int,?save_every: int,?total_epochs: int,?batch_size: int,?node_rank: int):
????ddp_setup(local_rank,?world_size_per_node,?node_rank)
#?...?(rest of the main function)
最后,我们还调整了?mp.spawn()?函数的?world_size_per_node?值:
if?__name__?== "__main__":
import?argparse
????parser?=?argparse.ArgumentParser(description='simple distributed training job')
????parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
????parser.add_argument('save_every', type=int, help='How often to save a snapshot')
????parser.add_argument('--batch_size',?default=32, type=int, help='Input batch size on each device?(default:?32)')
????parser.add_argument('--node_rank',?default=0, type=int, help='The rank of the node in multi-node training')
????args?=?parser.parse_args()
????world_size_per_node?=?torch.cuda.device_count()
????mp.spawn(main,?args=(world_size_per_node,?args.save_every,?args.total_epochs,?args.batch_size,?args.node_rank),?nprocs=world_size_per_node)
现在您已经准备好将训练任务发送到集群上。非常简单,你只需调用所需的节点数即可。
以下是?SLURM?脚本的模板:
#!/bin/bash
#SBATCH?--job-name=DDPTraining???????#?Name of the job
#SBATCH?--nodes=$1???????????????????#?Number of nodes specified by the user
#SBATCH?--ntasks-per-node=1??????????#?Ensure only one task runs per node
#SBATCH?--cpus-per-task=1????????????#?Number of CPU cores per task
#SBATCH?--gres=gpu:1?????????????????#?Number of GPUs per node
#SBATCH?--time=01:00:00??????????????#?Time limit hrs:min:sec?(1 hour in this example)
#SBATCH?--mem=4GB????????????????????#?Memory limit per GPU
#SBATCH?--output=training_%j.log?????#?Output and error log name?(%j expands to jobId)
#SBATCH?--partition=gpu??????????????#?Specify the partition or queue
srun python3 your_python_script.py?--total_epochs?10 --save_every?2 --batch_size?32 --node_rank?$SLURM_NODEID
现在您可以使用以下命令从终端启动训练:
sbatch train_net.sh?2 #?for using 2 nodes
Congratulations,?you’ve made it!
Thanks for reading!
END
参考资料
[1]https://www.youtube.com/watch?v=Cvdhwx-OBBo
[2]https://www.youtube.com/watch?v=KaAJtI1T2x4
[3]https://pytorch.org/tutorials/beginner/ddp_series_theory.html
本文经原作者授权,由Baihai IDP编译。如需转载译文,请联系获取授权。
原文链接:
https://towardsdatascience.com/a-comprehensive-guide-of-distributed-data-parallel-ddp-2bb1d8b5edfb