分布式训练-PyTorch

并行训练(数据并行与模型并行)与分布式训练是深度学习中加速训练的两种常用方式,相对于并行训练,分布式是更优的加速方案,也是PyTorch官方推荐的方法:
Multi-Process Single-GPU
This is the highly recommended way to use DistributedDataParallel, with multiple processes, each of which operates on a single GPU. This is currently the fastest approach to do data parallel training using PyTorch and applies to both single-node(multi-GPU) and multi-node data parallel training. It is proven to be significantly faster than torch.nn.DataParallel for single-node multi-GPU data parallel training.

个人理解:分布式训练其实也是并行训练的一种方式,只是相对于数据并行、模型并行有所不同。简单来说,分布式针对多机多卡,而数据并行针对单机多卡。

Distributed Training Code

下面内容主要指出分布式训练代码中与常规训练过程之间的主要区别。

Imports

分布式训练主要涉及到的库主要有torch.nn.paralleltorch.distributedtorch.utils.data.distributed以及torch.multiprocessing。需要注意的是我们需要把multiprocessing的start method设置为spawn或forkserver(仅Python3支持),因为默认的方法为fork,容易导致死锁情况发生。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import time
import sys
import torch

if __name__ == '__main__':
torch.multiprocessing.set_start_method('spawn')

import torch.nn as nn
import torch.nn.parallel
import torch.distributed as dist
import torch.optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models

Train Function

第一个区别,在分布式训练过程中,需要设置数据的non_blocking的属性设置为True。该操作使得不同GPU上的数据副本允许重叠计算,并且可以输出训练时的统计数据,以便我们可以跟踪整个训练过程的进度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def train(train_loader, model, criterion, optimizer, epoch):
# switch to train mode
model.train()

for i, (input, target) in enumerate(train_loader):
# Create non_blocking tensors for distributed training
input = input.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)

# compute output
output = model(input)
loss = criterion(output, target)

# compute gradients in a backward pass
optimizer.zero_grad()
loss.backward()

# Call step of optimizer to update model params
optimizer.step()

if i % 10 == 0:
print('Epoch: [{0}][{1}/{2}]\t'
'Loss {loss.val:.4f} ({loss.avg:.4f})'.format(
epoch, i, len(train_loader), loss=losses))

def adjust_learning_rate(initial_lr, optimizer, epoch):
"""Sets the learning rate to the initial LR decayed by 10 every 30 epochs"""
lr = initial_lr * (0.1 ** (epoch // 30))
for param_group in optimizer.param_groups:
param_group['lr'] = lr

Validation Function

与训练过程相同,唯一的区别是获取数据时需要设置non_blocking=True

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def validation(val_loader, model, criterion):
# switch to evaluate mode
model.eval()

with torch.no_grad():
for i, (input, target) in enumerate(val_loader):
input = input.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)

# compute output
output = model(input)
loss = criterion(output, target)

if i % 100 == 0:
print('Test: [{0}/{1}]\t'
'Loss {loss.val:.4f} ({loss.avg:.4f}))'.format(
i, len(val_loader), loss=losses))

Inputs

相对于标准模型训练,分布式训练在定义数据输入时也略有不同,有些参数为分布式训练任务特定的。参数说明如下:

  • batch_size-batch size for each process in the distributed training group. Total batch size across distributed model is batch_size*world_size
  • workers - number of worker processes used with the dataloaders in each process
  • num_epochs - total number of epochs to train for
  • starting_lr - starting learning rate for training
  • world_size - number of processes in the distributed training environment
  • dist_backend - backend to use for distributed training communication (i.e. NCCL, Gloo, MPI, etc.). In this tutorial, since we are using several multi-gpu nodes, NCCL is suggested.
  • dist_url - URL to specify the initialization method of the process group. This may contain the IP address and port of the rank0 process or be a non-existant file on a shared file system. Here, since we do not have a shared file system this will incorporate the node0-privateIP and the port on node0 to use.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
print("Collect Inputs...")

# Batch Size for training and testing
batch_size = 32

# Number of additional worker processes for dataloading
workers = 2

# Number of epochs to train for
num_epochs = 2

# Starting Learning Rate
starting_lr = 0.1

# Number of distributed processes
world_size = 4

# Distributed backend type
dist_backend = 'nccl'

# Url used to setup distributed training
dist_url = "tcp://172.31.22.234:23456"

Initialize process group

  1. 设置进程组。该过程可由函数

    1
    torch.distributed.init_process_group

    实现。函数的参数说明如下:

    • backend-the backend to use (i.e. NCCL, Gloo, MPI, etc.)
    • init_method-which is either a url containing the address and port of the rank0 machine or a path to a non-existant file on the shared file system. Note, to use the file init_method, all machines must have access to the file, similarly for the url method, all machines must be able to communicate on the network so make sure to configure any firewalls and network settings to accomodate.
    • rank-the rank of this process when run
    • world_size-the number of processes in the collective
      The init_method input can also be “env://”. In this case, the address and port of the rank0 machine will be read from the following two environment variables respectively: MASTER_ADDR, MASTER_PORT. If rank*and *world_size arguments are not specified in the init_process_group function, they both can be read from the following two environment variables respectively as well: RANK, WORLD_SIZE.
  2. 设置进程的lock_rank。该操作用于为进程指定设备(即使用哪个GPU),同时也用于创建分布式数据并行模型时指定设备。

1
2
3
4
5
6
7
8
9
10
11
12
13
print("Initialize Process Group...")
# Initialize Process Group
# v1 - init with url
dist.init_process_group(backend=dist_backend, init_method=dist_url, rank=int(sys.argv[1]), world_size=world_size)
# v2 - init with file
# dist.init_process_group(backend="nccl", init_method="file:///home/ubuntu/pt-distributed-tutorial/trainfile", rank=int(sys.argv[1]), world_size=world_size)
# v3 - init with environment variables
# dist.init_process_group(backend="nccl", init_method="env://", rank=int(sys.argv[1]), world_size=world_size)

# Establish Local Rank and set device on this node
local_rank = int(sys.argv[2])
dp_device_ids = [local_rank]
torch.cuda.set_device(local_rank)

Initialize Model

在定义模型时,需要将其指定为分布式模式。

1
2
3
4
5
6
7
8
9
print("Initialize Model...")
# Construct Model
model = models.resnet18(pretrained=False).cuda()
# Make model DistributedDataParallel
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=dp_device_ids, output_device=local_rank)

# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda()
optimizer = torch.optim.SGD(model.parameters(), starting_lr, momentum=0.9, weight_decay=1e-4)

Initialize Dataloaders

分布式训练的最后一个特定情况是将训练数据指定为DistributedSampler,与DistributedDataParallel模型结合使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
print("Initialize Dataloaders...")
transform = transforms.Compose(
[transforms.Resize(224),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

# Initialize Datasets. STL10 will automatically download if not present
trainset = datasets.STL10(root='./data', split='train', download=True, transform=transform)
valset = datasets.STL10(root='./data', split='test', download=True, transform=transform)

# Create DistributedSampler to handle distributing the dataset across nodes when training
# This can only be called after torch.distributed.init_process_group is called
train_sampler = torch.utils.data.distributed.DistributedSampler(trainset)

# Create the Dataloaders to feed data to the training and validation steps
train_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=(train_sampler is None), num_workers=workers, pin_memory=False, sampler=train_sampler)
val_loader = torch.utils.data.DataLoader(valset, batch_size=batch_size, shuffle=False, num_workers=workers, pin_memory=False)

Training Loop

开始训练,与标准模型训练唯一的不同的需要更新DistributedSampler的epoch。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for epoch in range(num_epochs):
# Set epoch count for DistributedSampler
train_sampler.set_epoch(epoch)

# Adjust learning rate according to schedule
adjust_learning_rate(starting_lr, optimizer, epoch)

# train for one epoch
print("\nBegin Training Epoch {}".format(epoch+1))
train(train_loader, model, criterion, optimizer, epoch)

# evaluate on validation set
print("Begin Validation @ Epoch {}".format(epoch+1))
validate(val_loader, model, criterion)

总结

PyTorch中,分布式训练相对于标准训练主要有以下几点不同:

  • 更改输入数据的non_blocking属性。
1
2
input = input.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
  • 初始化进程组,设置local rank
1
2
3
4
5
6
dist.init_process_group(backend=dist_backend, init_method=dist_url, rank=int(sys.argv[1]), world_size=world_size)

# Establish Local Rank and set device on this node
local_rank = int(sys.argv[2])
dp_device_ids = [local_rank]
torch.cuda.set_device(local_rank)
  • 指定模型为分布式数据并行
1
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=dp_device_ids, output_device=local_rank)
  • 指定数据集为分布式样本
1
train_sampler = torch.utils.data.distributed.DistributedSampler(trainset)

Reference: