PyTorch DDPからAccelerateへ、そしてTrainerへ簡単に分散トレーニングをマスターしましょう
Let's master distributed training easily with PyTorch DDP to Accelerate and Trainer.
全般的な概要
このチュートリアルでは、PyTorchと単純なモデルのトレーニング方法について基本的な理解があることを前提としています。分散データ並列処理(DDP)というプロセスを通じて複数のGPUでのトレーニングを紹介します。以下の3つの異なる抽象化レベルを通じて行います:
- pytorch.distributedモジュールを使用したネイティブなPyTorch DDP
- pytorch.distributedをラップした🤗 Accelerateの軽量なラッパーを利用し、コードの変更なしに単一のGPUおよびTPUで実行できるようにする方法
- 🤗 Transformerの高レベルのTrainer APIを利用し、ボイラープレートコードを抽象化し、さまざまなデバイスと分散シナリオをサポートする方法
「分散」トレーニングとは何か、なぜ重要なのか?
まず、公式のMNISTの例に基づいて、以下の非常に基本的なPyTorchのトレーニングコードを見てみましょう。
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
class BasicNet(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
self.act = F.relu
def forward(self, x):
x = self.act(self.conv1(x))
x = self.act(self.conv2(x))
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.act(self.fc1(x))
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
トレーニングデバイス(cuda
)を定義します:
device = "cuda"
いくつかのPyTorchのDataLoaderを構築します:
- 🤗 Optimum IntelとOpenVINOでモデルを高速化しましょう
- マルチリンガルASRのためのWhisperの調整を行います with 🤗 Transformers
- Diffusersを使用したDreamboothによる安定した拡散のトレーニング
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
モデルをCUDAデバイスに移動します:
model = BasicNet().to(device)
PyTorchのオプティマイザを構築します:
optimizer = optim.AdamW(model.parameters(), lr=1e-3)
最後に、データセット全体に対して1つの完全なイテレーションを実行し、テストの正解率を計算するシンプルなトレーニングと評価ループを作成します:
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
optimizer.zero_grad()
model.eval()
correct = 0
with torch.no_grad():
for data, target in test_loader:
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')
通常、ここからは、これらのコードをPythonスクリプトにまとめるか、Jupyter Notebookで実行することができます。
ただし、これらのリソースが利用可能な場合、例えば2つのGPUまたは複数のマシンでスクリプトを実行するにはどうすればよいでしょうか?分散トレーニングを通じてトレーニング速度を向上させることができます。単にpython myscript.py
とするだけでは、スクリプトは単一のGPUを使用して実行されるだけです。ここで、torch.distributed
が重要な役割を果たします
PyTorch分散データ並列処理
名前の通り、torch.distributed
は分散環境での動作を意図しています。これには、各々が単一のGPUを持つ複数のマシンがあるマルチノード、または単一のシステムに複数のGPUがあるマルチGPU、またはその両方の組み合わせなどが含まれます。
上記のコードを分散環境で動作するように変換するには、最初にいくつかのセットアップ構成を定義する必要があります。詳細については、Getting Started with DDP Tutorialを参照してください。
まず、setup
とcleanup
関数を宣言する必要があります。これにより、すべての計算プロセスが通信できる処理グループが開かれます。
注意:このチュートリアルのこのセクションでは、これらはPythonスクリプトファイルとして送信されていると仮定する必要があります。後で、これを不要にするためのAccelerateを使用したランチャーについて説明します。
import os
import torch.distributed as dist
def setup(rank, world_size):
"PyTorch Distributed Data Parallelismのためのプロセスグループと設定をセットアップします"
os.environ["MASTER_ADDR"] = 'localhost'
os.environ["MASTER_PORT"] = "12355"
# プロセスグループを初期化します
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
"分散環境をクリーンアップします"
dist.destroy_process_group()
最後のパズルのピースは、データとモデルを別のGPUにどのように送信するかです。
これはDistributedDataParallel
モジュールが登場する場所です。それはモデルを各GPUにコピーし、loss.backward()
が呼び出されるとバックプロパゲーションが実行され、モデルのこれらのコピー全体で得られる勾配が平均/縮小されます。これにより、各デバイスがオプティマイザのステップ後に同じ重みを持つことが保証されます。
以下は、この機能を持つトレーニングセットアップの例です。
注意:ここでのrankは、現在のGPUの全体的なランクを他のすべての利用可能なGPUと比較したもので、つまりランクは
0 -> n-1
です
from torch.nn.parallel import DistributedDataParallel as DDP
def train(model, rank, world_size):
setup(rank, world_size)
model = model.to(rank)
ddp_model = DDP(model, device_ids=[rank])
optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
# 1エポックのトレーニング
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
optimizer.zero_grad()
cleanup()
勾配を正しく計算するために、オプティマイザは特定のデバイス上のモデルに基づいて宣言する必要があります(つまり、model
ではなくddp_model
)。
最後に、PyTorchには便利なtorchrun
コマンドラインモジュールがあり、スクリプトを実行する際に役立ちます。使用するノードの数と実行するスクリプトを渡すだけで設定できます。
torchrun --nproc_per_nodes=2 --nnodes=1 example_script.py
上記のコマンドは、単一のマシン上に存在する2つのGPUでトレーニングスクリプトを実行し、これはPyTorchで分散トレーニングのみを行うための基本的な手法です。
さて、Accelerateについて話してみましょう。これは、このプロセスを大幅に変更することなく、上記で行ったことを実行できるように設計されたライブラリです。さらに、Accelerateに固有のデータパイプラインもコードのパフォーマンスを向上させることができます。
🤗 Accelerate
Accelerateは、上記で実行したすべてのコードを大きく変更することなく実行できるようにするためのライブラリです。さらに、Accelerateに組み込まれたデータパイプラインは、コードのパフォーマンスも向上させることができます。
まず、上記で実行したすべてのコードを1つの関数にまとめて、違いを視覚化するのに役立てましょう:
def train_ddp(rank, world_size):
setup(rank, world_size)
# データローダーを構築する
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
# モデルを構築する
model = model.to(rank)
ddp_model = DDP(model, device_ids=[rank])
# オプティマイザを構築する
optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
# 1エポックのトレーニング
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
optimizer.zero_grad()
# 評価する
model.eval()
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')
次に、どのようにAccelerateが助けるかについて話しましょう。上記のコードにはいくつかの問題があります:
n
個のデータローダーが各デバイスに基づいて作成され、プッシュされるため、やや効率が悪いです。- このコードはマルチGPUのみで動作するため、単一ノードで再度実行するか、TPUで実行するために特別な注意が必要です。
Accelerateは、Accelerator
クラスを介してこれをサポートします。これにより、以下のように単一ノードとマルチノードを比較した場合、コードは3行のみが異なります:
def train_ddp_accelerate():
accelerator = Accelerator()
# データローダーの構築
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
# モデルの構築
model = BasicModel()
# オプティマイザーの構築
optimizer = optim.AdamW(model.parameters(), lr=1e-3)
# `accelerator.prepare`を介してすべてを送信
train_loader, test_loader, model, optimizer = accelerator.prepare(
train_loader, test_loader, model, optimizer
)
# 単一エポックのトレーニング
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
output = model(data)
loss = F.nll_loss(output, target)
accelerator.backward(loss)
optimizer.step()
optimizer.zero_grad()
# 評価
model.eval()
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')
これにより、PyTorchのトレーニングループは、Accelerator
オブジェクトのおかげで、どんな分散環境でも実行できるようになりました。このコードは、torchrun
CLIまたはAccelerateの独自のCLIインターフェースであるaccelerate launch
を介して引き続き起動できます。
その結果、Accelerateを使用して分散トレーニングを行うことは、できるだけ基本的なPyTorchコードを変更せずに簡単になりました。
以前にも述べたように、Accelerateはデータローダーを効率的にします。これは、トレーニング中にパートごとにバッチを自動的に異なるデバイスに送信するカスタムサンプラーを介して行われ、構成によっては一度にメモリに4つのコピーの代わりにデータの単一のコピーが既知になるようにします。また、元のデータセットの完全なコピーはメモリ全体に1つだけ存在します。トレーニングに使用されるすべてのノードの間でこのデータセットのサブセットが分割されるため、メモリ使用量が爆発的に増えずに、より大きなデータセットでトレーニングを行うことができます。
notebook_launcherの使用
以前にも述べたように、Jupyter Notebookから直接分散コードを開始できます。これは、Accelerateのnotebook_launcher
ユーティリティによって実現できます。これにより、Jupyter Notebook内のコードに基づいてマルチGPUトレーニングを開始できます。
使用方法は、ランチャーをインポートするだけです:
from accelerate import notebook_launcher
そして、先ほど宣言したトレーニング関数、渡す引数、および使用するプロセスの数(たとえばTPUの場合は8、2つのGPUの場合は2など)を指定します。上記の両方のトレーニング関数を実行できますが、注意点として、単一の起動後に別の起動を生成する前にインスタンスを再起動する必要があることです。
notebook_launcher(train_ddp, args=(), num_processes=2)
または:
notebook_launcher(train_accelerate_ddp, args=(), num_processes=2)
🤗 Trainerの使用
最後に、最も高レベルのAPIであるHugging Face Trainerに到着します。
これは、ユーザーが何もする必要がない状態で分散システムでトレーニングできるだけのトレーニングを含んでいます。
まず、トレーナーをインポートする必要があります:
from transformers import Trainer
次に、TrainingArguments
を定義して通常のハイパーパラメータを制御します。トレーナーは辞書形式でも動作するため、カスタムのcollate関数を作成する必要があります。
最後に、トレーナーをサブクラス化し、独自のcompute_loss
を書きます。
その後、このコードはトレーニングコードを一切書かずに分散環境でも動作します!
from transformers import Trainer, TrainingArguments
model = BasicNet()
training_args = TrainingArguments(
"basic-trainer",
per_device_train_batch_size=64,
per_device_eval_batch_size=64,
num_train_epochs=1,
evaluation_strategy="epoch",
remove_unused_columns=False
)
def collate_fn(examples):
pixel_values = torch.stack([example[0] for example in examples])
labels = torch.tensor([example[1] for example in examples])
return {"x":pixel_values, "labels":labels}
class MyTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False):
outputs = model(inputs["x"])
target = inputs["labels"]
loss = F.nll_loss(outputs, target)
return (loss, outputs) if return_outputs else loss
trainer = MyTrainer(
model,
training_args,
train_dataset=train_dset,
eval_dataset=test_dset,
data_collator=collate_fn,
)
trainer.train()
***** トレーニングを実行 *****
例の数 = 60000
エポック数 = 1
デバイスごとのバッチサイズ = 64
並列、分散、蓄積の合計トレーニングバッチサイズ = 64
勾配蓄積ステップ数 = 1
最適化ステップの合計数 = 938
notebook_launcher
の上記の例と同様に、すべてをトレーニング関数にまとめて再度実行できます:
def train_trainer_ddp():
model = BasicNet()
training_args = TrainingArguments(
"basic-trainer",
per_device_train_batch_size=64,
per_device_eval_batch_size=64,
num_train_epochs=1,
evaluation_strategy="epoch",
remove_unused_columns=False
)
def collate_fn(examples):
pixel_values = torch.stack([example[0] for example in examples])
labels = torch.tensor([example[1] for example in examples])
return {"x":pixel_values, "labels":labels}
class MyTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False):
outputs = model(inputs["x"])
target = inputs["labels"]
loss = F.nll_loss(outputs, target)
return (loss, outputs) if return_outputs else loss
trainer = MyTrainer(
model,
training_args,
train_dataset=train_dset,
eval_dataset=test_dset,
data_collator=collate_fn,
)
trainer.train()
notebook_launcher(train_trainer_ddp, args=(), num_processes=2)
リソース
PyTorch Distributed Data Parallelismについて詳しくは、こちらのドキュメントを参照してください
🤗 Accelerateについて詳しくは、こちらのドキュメントを参照してください
🤗 Transformersについて詳しくは、こちらのドキュメントを参照してください
We will continue to update VoAGI; if you have any questions or suggestions, please contact us!
Was this article helpful?
93 out of 132 found this helpful
Related articles