并行训练(数据并行与模型并行)与分布式训练是深度学习中加速训练的两种常用方式,相对于并行训练,分布式是更优的加速方案,也是PyTorch官方推荐的方法:
Multi-Process Single-GPU
This is the highly recommended way to use DistributedDataParallel, with multiple processes, each of which operates on a single GPU. This is currently the fastest approach to do data parallel training using PyTorch and applies to both single-node(multi-GPU) and multi-node data parallel training. It is proven to be significantly faster than torch.nn.DataParallel for single-node multi-GPU data parallel training.
个人理解:分布式训练其实也是并行训练的一种方式,只是相对于数据并行、模型并行有所不同。简单来说,分布式针对多机多卡,而数据并行针对单机多卡。
Distributed Training Code
下面内容主要指出分布式训练代码中与常规训练过程之间的主要区别。
Imports
分布式训练主要涉及到的库主要有torch.nn.parallel,torch.distributed,torch.utils.data.distributed以及torch.multiprocessing。需要注意的是我们需要把multiprocessing的start method设置为spawn或forkserver(仅Python3支持),因为默认的方法为fork,容易导致死锁情况发生。
1 | import time |
Train Function
第一个区别,在分布式训练过程中,需要设置数据的non_blocking
的属性设置为True
。该操作使得不同GPU上的数据副本允许重叠计算,并且可以输出训练时的统计数据,以便我们可以跟踪整个训练过程的进度。
1 | def train(train_loader, model, criterion, optimizer, epoch): |
Validation Function
与训练过程相同,唯一的区别是获取数据时需要设置non_blocking=True
。
1 | def validation(val_loader, model, criterion): |
Inputs
相对于标准模型训练,分布式训练在定义数据输入时也略有不同,有些参数为分布式训练任务特定的。参数说明如下:
- batch_size-batch size for each process in the distributed training group. Total batch size across distributed model is batch_size*world_size
- workers - number of worker processes used with the dataloaders in each process
- num_epochs - total number of epochs to train for
- starting_lr - starting learning rate for training
- world_size - number of processes in the distributed training environment
- dist_backend - backend to use for distributed training communication (i.e. NCCL, Gloo, MPI, etc.). In this tutorial, since we are using several multi-gpu nodes, NCCL is suggested.
- dist_url - URL to specify the initialization method of the process group. This may contain the IP address and port of the rank0 process or be a non-existant file on a shared file system. Here, since we do not have a shared file system this will incorporate the node0-privateIP and the port on node0 to use.
1 | print("Collect Inputs...") |
Initialize process group
设置进程组。该过程可由函数
1
torch.distributed.init_process_group
实现。函数的参数说明如下:
- backend-the backend to use (i.e. NCCL, Gloo, MPI, etc.)
- init_method-which is either a url containing the address and port of the rank0 machine or a path to a non-existant file on the shared file system. Note, to use the file init_method, all machines must have access to the file, similarly for the url method, all machines must be able to communicate on the network so make sure to configure any firewalls and network settings to accomodate.
- rank-the rank of this process when run
- world_size-the number of processes in the collective
The init_method input can also be “env://”. In this case, the address and port of the rank0 machine will be read from the following two environment variables respectively: MASTER_ADDR, MASTER_PORT. If rank*and *world_size arguments are not specified in the init_process_group function, they both can be read from the following two environment variables respectively as well: RANK, WORLD_SIZE.
设置进程的lock_rank。该操作用于为进程指定设备(即使用哪个GPU),同时也用于创建分布式数据并行模型时指定设备。
1 | print("Initialize Process Group...") |
Initialize Model
在定义模型时,需要将其指定为分布式模式。
1 | print("Initialize Model...") |
Initialize Dataloaders
分布式训练的最后一个特定情况是将训练数据指定为DistributedSampler,与DistributedDataParallel模型结合使用。
1 | print("Initialize Dataloaders...") |
Training Loop
开始训练,与标准模型训练唯一的不同的需要更新DistributedSampler的epoch。
1 | for epoch in range(num_epochs): |
总结
PyTorch中,分布式训练相对于标准训练主要有以下几点不同:
- 更改输入数据的
non_blocking
属性。
1 | input = input.cuda(non_blocking=True) |
- 初始化进程组,设置local rank
1 | dist.init_process_group(backend=dist_backend, init_method=dist_url, rank=int(sys.argv[1]), world_size=world_size) |
- 指定模型为分布式数据并行
1 | model = torch.nn.parallel.DistributedDataParallel(model, device_ids=dp_device_ids, output_device=local_rank) |
- 指定数据集为分布式样本
1 | train_sampler = torch.utils.data.distributed.DistributedSampler(trainset) |