PyTorch 分佈式訓練簡明教程

點擊上方“視學算法”,馬上關注


神經網絡訓練加速的最簡單方法是使用GPU,對弈神經網絡中常規操作(矩陣乘法和加法)GPU運算速度要倍超於CPU。隨著模型或數據集越來越大,一個GPU很快就會變得不足。例如,BERT和GPT-2等大型語言模型是在數百個GPU上訓練的。對於多GPU訓練,需要一種在不同GPU之間對模型和數據進行切分和調度的方法。

PyTorch是非常流行的深度學習框架,它在主流框架中對於靈活性和易用性的平衡最好。Pytorch有兩種方法可以在多個GPU上切分模型和數據:nn.DataParallel和nn.distributedataparallel。DataParallel更易於使用(只需簡單包裝單GPU模型)。然而,由於它使用一個進程來計算模型權重,然後在每個批處理期間將分發到每個GPU,因此通信很快成為一個瓶頸,GPU利用率通常很低。而且,nn.DataParallel要求所有的GPU都在同一個節點上(不支持分佈式),而且不能使用Apex進行混合精度訓練。nn.DataParallel和nn.distributedataparallel的主要差異可以總結為以下幾點(譯者注):

  1. DistributedDataParallel支持模型並行,而DataParallel並不支持,這意味如果模型太大單卡顯存不足時只能使用前者;
  2. DataParallel是單進程多線程的,只用於單卡情況,而DistributedDataParallel是多進程的,適用於單機和多機情況,真正實現分佈式訓練;
  3. DistributedDataParallel的訓練更高效,因為每個進程都是獨立的Python解釋器,避免GIL問題,而且通信成本低其訓練速度更快,基本上DataParallel已經被棄用;
  4. 必須要說明的是DistributedDataParallel中每個進程都有獨立的優化器,執行自己的更新過程,但是梯度通過通信傳遞到每個進程,所有執行的內容是相同的;

總的來說,Pytorch文檔是相當完備和清晰的,尤其是在1.0x版本後。但是關於DistributedDataParallel的介紹卻較少,主要的文檔有以下三個:

  1. Writing Distributed Applications with PyTorch:主要介紹分佈式API,分佈式配置,不同通信機制以及內部機制,但是說實話大部分人不太同意看懂,而且很少會直接用這些;
  2. Getting Started with Distributed Data Parallel:簡單介紹瞭如何使用DistributedDataParallel,但是用例並不清晰完整;
  3. ImageNet training in PyTorch:比較完整的使用實例,但是僅有代碼,缺少詳細說明;(apex也提供了一個類似的訓練用例Mixed Precision ImageNet Training in PyTorch)
  4. (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS:如何在亞馬遜雲上進行分佈式訓練,但是估計很多人用不到。

這篇教程將通過一個MNISI例子講述如何使用PyTorch的分佈式訓練,這裡將一段段代碼進行解釋,而且也包括任何使用apex進行混合精度訓練。

DistributedDataParallel內部機制

DistributedDataParallel通過多進程在多個GPUs間複製模型,每個GPU都由一個進程控制(當然可以讓每個進程控制多個GPU,但這顯然比每個進程有一個GPU要慢;也可以多個進程在一個GPU上運行)。GPU可以都在同一個節點上,也可以分佈在多個節點上。每個進程都執行相同的任務,並且每個進程都與所有其他進程通信。進程或者說GPU之間只傳遞梯度,這樣網絡通信就不再是瓶頸。

PyTorch 分佈式訓練簡明教程

在訓練過程中,每個進程從磁盤加載batch數據,並將它們傳遞到其GPU。每一個GPU都有自己的前向過程,然後梯度在各個GPUs間進行All-Reduce。每一層的梯度不依賴於前一層,所以梯度的All-Reduce和後向過程同時計算,以進一步緩解網絡瓶頸。在後向過程的最後,每個節點都得到了平均梯度,這樣模型參數保持同步。

這都要求多個進程(可能在多個節點上)同步並通信。Pytorch通過distributed.init_process_group函數來實現這一點。他需要知道進程0位置以便所有進程都可以同步,以及預期的進程總數。每個進程都需要知道進程總數及其在進程中的順序,以及使用哪個GPU。通常將進程總數稱為world_size.Pytorch提供了nn.utils.data.DistributedSampler來為各個進程切分數據,以保證訓練數據不重疊。

實例講解

這裡通過一個MNIST實例來講解,我們先將其改成分佈式訓練,然後增加混合精度訓練。

普通單卡訓練

首先,導入所需要的庫:

<code>import os
from datetime import datetime
import argparse
import torch.multiprocessing as mp
import torchvision

import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist
from apex.parallel import DistributedDataParallel as DDP
from apex import amp
/<code>

然後我們定義一個簡單的CNN模型處理MNIST數據:

<code>class ConvNet(nn.Module):
def __init__(self, num_classes=10):
super(ConvNet, self).__init__()
self.layer1 = nn.Sequential(
nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(16),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.layer2 = nn.Sequential(
nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.fc = nn.Linear(7*7*32, num_classes)
def forward(self, x):
out = self.layer1(x)
out = self.layer2(out)
out = out.reshape(out.size(0), -1)
out = self.fc(out)
return out
/<code>

主函數main()接受參數,執行訓練:

<code>def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N')
parser.add_argument('-g', '--gpus', default=1, type=int,
help='number of gpus per node')
parser.add_argument('-nr', '--nr', default=0, type=int,
help='ranking within the nodes')
parser.add_argument('--epochs', default=2, type=int, metavar='N',
help='number of total epochs to run')
args = parser.parse_args()
train(0, args)
/<code>

其中訓練部分主函數為:

<code>def train(gpu, args):
\ttorch.manual_seed(0)
model = ConvNet()
torch.cuda.set_device(gpu)
model.cuda(gpu)
batch_size = 100
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda(gpu)
optimizer = torch.optim.SGD(model.parameters(), 1e-4)
# Data loading code
train_dataset = torchvision.datasets.MNIST(root='./data',
train=True,
transform=transforms.ToTensor(),
download=True)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=0,
pin_memory=True)
start = datetime.now()
total_step = len(train_loader)
for epoch in range(args.epochs):
for i, (images, labels) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
labels = labels.cuda(non_blocking=True)
# Forward pass
outputs = model(images)
loss = criterion(outputs, labels)
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (i + 1) % 100 == 0 and gpu == 0:
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
epoch + 1,
args.epochs,
i + 1,
total_step,
loss.item())
)
if gpu == 0:
print("Training complete in: " + str(datetime.now() - start))
/<code>

通過啟動主函數來開始訓練:

<code>if __name__ == '__main__':
main()
/<code>

你可能注意到有些參數是多餘的,但是對後面的分佈式訓練是有用的。我們通過執行以下語句就可以在單機單卡上訓練:

<code>python src/mnist.py -n 1 -g 1 -nr 0
/<code>

分佈式訓練

使用多進程進行分佈式訓練,我們需要為每個GPU啟動一個進程。每個進程需要知道自己運行在哪個GPU上,以及自身在所有進程中的序號。對於多節點,我們需要在每個節點啟動腳本。

首先,我們要配置基本的參數:

<code>def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--nodes', default=1,
type=int, metavar='N')
parser.add_argument('-g', '--gpus', default=1, type=int,
help='number of gpus per node')
parser.add_argument('-nr', '--nr', default=0, type=int,
help='ranking within the nodes')
parser.add_argument('--epochs', default=2, type=int,
metavar='N',
help='number of total epochs to run')
args = parser.parse_args()
#########################################################
args.world_size = args.gpus * args.nodes #
os.environ['MASTER_ADDR'] = '10.57.23.164' #
os.environ['MASTER_PORT'] = '8888' #
mp.spawn(train, nprocs=args.gpus, args=(args,)) #
#########################################################
/<code>

其中args.nodes是節點總數,而args.gpus是每個節點的GPU總數(每個節點GPU數是一樣的),而args.nr 是當前節點在所有節點的序號。節點總數乘以每個節點的GPU數可以得到world_size,也即進程總數。所有的進程需要知道進程0的IP地址以及端口,這樣所有進程可以在開始時同步,一般情況下稱進程0是master進程,比如我們會在進程0中打印信息或者保存模型。PyTorch提供了mp.spawn來在一個節點啟動該節點所有進程,每個進程運行train(i, args),其中i從0到args.gpus - 1。

同樣,我們要修改訓練函數:

<code>def train(gpu, args):
############################################################
rank = args.nr * args.gpus + gpu\t
dist.init_process_group(
\tbackend='nccl',
\t\tinit_method='env://',
\tworld_size=args.world_size,
\trank=rank
)
############################################################

torch.manual_seed(0)
model = ConvNet()
torch.cuda.set_device(gpu)
model.cuda(gpu)
batch_size = 100
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda(gpu)
optimizer = torch.optim.SGD(model.parameters(), 1e-4)

###############################################################
# Wrap the model
model = nn.parallel.DistributedDataParallel(model,
device_ids=[gpu])
###############################################################
# Data loading code
train_dataset = torchvision.datasets.MNIST(
root='./data',
train=True,

transform=transforms.ToTensor(),
download=True
)
################################################################
train_sampler = torch.utils.data.distributed.DistributedSampler(
\ttrain_dataset,
\tnum_replicas=args.world_size,
\trank=rank
)
################################################################
train_loader = torch.utils.data.DataLoader(
\tdataset=train_dataset,
batch_size=batch_size,
##############################
shuffle=False, #
##############################
num_workers=0,
pin_memory=True,
#############################
sampler=train_sampler) #
#############################
...
/<code>

這裡我們首先計算出當前進程序號:rank = args.nr * args.gpus + gpu,然後就是通過dist.init_process_group初始化分佈式環境,其中backend參數指定通信後端,包括mpi, gloo, nccl,這裡選擇nccl,這是Nvidia提供的官方多卡通信框架,相對比較高效。mpi也是高性能計算常用的通信協議,不過你需要自己安裝MPI實現框架,比如OpenMPI。gloo倒是內置通信後端,但是不夠高效。init_method指的是如何初始化,以完成剛開始的進程同步;這裡我們設置的是env://,指的是環境變量初始化方式,需要在環境變量中配置4個參數:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面兩個參數我們已經配置,後面兩個參數也可以通過dist.init_process_group函數中world_size和rank參數配置。其它的初始化方式還包括共享文件系統以及TCP,比如init_method='tcp://10.1.1.20:23456',其實也是要提供master的IP地址和端口。注意這個調用是阻塞的,必須等待所有進程來同步,如果任何一個進程出錯,就會失敗。

對於模型側,我們只需要用DistributedDataParallel包裝一下原來的model即可,在背後它會支持梯度的All-Reduce操作。對於數據側,我們nn.utils.data.DistributedSampler來給各個進程切分數據,只需要在dataloader中使用這個sampler就好,值得注意的一點是你要訓練循環過程的每個epoch開始時調用train_sampler.set_epoch(epoch),(主要是為了保證每個epoch的劃分是不同的)其它的訓練代碼都保持不變。

最後就可以執行代碼了,比如我們是4節點,每個節點是8卡,那麼需要在4個節點分別執行:

<code>python src/mnist-distributed.py -n 4 -g 8 -nr i
/<code>

要注意的是,此時的有效batch_size其實是batch_size_per_gpu * world_size,對於有BN的模型還可以採用同步BN獲取更好的效果:

<code>model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
/<code>

上述講述的是分佈式訓練過程,其實同樣適用於評估或者測試過程,比如我們把數據劃分到不同的進程中進行預測,這樣可以加速預測過程。實現代碼和上述過程完全一樣,不過我們想計算某個指標,那就需要從各個進程的統計結果進行All-Reduce,因為每個進程僅是計算的部分數據的內容。比如我們要計算分類準確度,我們可以統計每個進程的數據總數total和分類正確的數量count,然後進行聚合。這裡要提的一點,當用dist.init_process_group初始化分佈式環境時,其實就是建立一個默認的分佈式進程組(distributed process group),這個group同時會初始化Pytorch的torch.distributed包。這樣我們可以直接用torch.distributed的API就可以進行分佈式基本操作了,下面是具體實現:

<code># define tensor on GPU, count and total is the result at each GPU
t = torch.tensor([count, total], dtype=torch.float64, device='cuda')
dist.barrier() # synchronizes all processes
dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result.
t = t.tolist()
all_count = int(t[0])
all_total = int(t[1])
acc = all_count / all_total
/<code>

混合精度訓練(採用apex)

混合精度訓練(混合FP32和FP16訓練)可以適用更大的batch_size,而且可以利用NVIDIA Tensor Cores加速計算。採用NVIDIA的apex進行混合精度訓練非常簡單,只需要修改部分代碼:

<code> rank = args.nr * args.gpus + gpu
dist.init_process_group(
backend='nccl',
init_method='env://',
world_size=args.world_size,
rank=rank)

\ttorch.manual_seed(0)
model = ConvNet()
torch.cuda.set_device(gpu)
model.cuda(gpu)
batch_size = 100
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda(gpu)
optimizer = torch.optim.SGD(model.parameters(), 1e-4)
# Wrap the model
##############################################################
model, optimizer = amp.initialize(model, optimizer,
opt_level='O2')
model = DDP(model)
##############################################################
# Data loading code
\t...
start = datetime.now()
total_step = len(train_loader)
for epoch in range(args.epochs):
for i, (images, labels) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
labels = labels.cuda(non_blocking=True)

# Forward pass
outputs = model(images)
loss = criterion(outputs, labels)
# Backward and optimize
optimizer.zero_grad()
##############################################################
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
##############################################################
optimizer.step()
...
/<code>

其實就兩處變化,首先是採用amp.initialize來包裝model和optimizer以支持混合精度訓練,其中opt_level指的是優化級別,如果為O0或者O3不是真正的混合精度,但是可以用來確定模型效果和速度的baseline,而O1和O2是混合精度的兩種設置,可以選擇某個進行混合精度訓練。另外一處是在進行根據梯度更新參數前,要先通過amp.scale_loss對梯度進行scale以防止梯度下溢(underflowing)。此外,你還可以用apex.parallel.DistributedDataParallel替換nn.DistributedDataParallel。

題外話

我覺得PyTorch官方的分佈式實現已經比較完善,而且性能和效果都不錯,可以替代的方案是horovod,不僅支持PyTorch還支持TensorFlow和MXNet框架,實現起來也是比較容易的,速度方面應該不相上下。

參考

  1. Distributed data parallel training in Pytorch https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html (大部分內容來自此處)
  2. torch.distributed https://pytorch.org/docs/stable/distributed.html


分享到:


相關文章: