PyTorch DDPからAccelerateへ、そしてTrainerへ簡単に分散トレーニングをマスターしましょう

Let's master distributed training easily with PyTorch DDP to Accelerate and Trainer.

全般的な概要

このチュートリアルでは、PyTorchと単純なモデルのトレーニング方法について基本的な理解があることを前提としています。分散データ並列処理(DDP)というプロセスを通じて複数のGPUでのトレーニングを紹介します。以下の3つの異なる抽象化レベルを通じて行います:

  • pytorch.distributedモジュールを使用したネイティブなPyTorch DDP
  • pytorch.distributedをラップした🤗 Accelerateの軽量なラッパーを利用し、コードの変更なしに単一のGPUおよびTPUで実行できるようにする方法
  • 🤗 Transformerの高レベルのTrainer APIを利用し、ボイラープレートコードを抽象化し、さまざまなデバイスと分散シナリオをサポートする方法

「分散」トレーニングとは何か、なぜ重要なのか?

まず、公式のMNISTの例に基づいて、以下の非常に基本的なPyTorchのトレーニングコードを見てみましょう。

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

class BasicNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)
        self.act = F.relu

    def forward(self, x):
        x = self.act(self.conv1(x))
        x = self.act(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.act(self.fc1(x))
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output

トレーニングデバイス(cuda )を定義します:

device = "cuda"

いくつかのPyTorchのDataLoaderを構築します:

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307), (0.3081))
])

train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

モデルをCUDAデバイスに移動します:

model = BasicNet().to(device)

PyTorchのオプティマイザを構築します:

optimizer = optim.AdamW(model.parameters(), lr=1e-3)

最後に、データセット全体に対して1つの完全なイテレーションを実行し、テストの正解率を計算するシンプルなトレーニングと評価ループを作成します:

model.train()
for batch_idx, (data, target) in enumerate(train_loader):
    data, target = data.to(device), target.to(device)
    output = model(data)
    loss = F.nll_loss(output, target)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

model.eval()
correct = 0
with torch.no_grad():
    for data, target in test_loader:
        output = model(data)
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

通常、ここからは、これらのコードをPythonスクリプトにまとめるか、Jupyter Notebookで実行することができます。

ただし、これらのリソースが利用可能な場合、例えば2つのGPUまたは複数のマシンでスクリプトを実行するにはどうすればよいでしょうか?分散トレーニングを通じてトレーニング速度を向上させることができます。単にpython myscript.pyとするだけでは、スクリプトは単一のGPUを使用して実行されるだけです。ここで、torch.distributedが重要な役割を果たします

PyTorch分散データ並列処理

名前の通り、torch.distributedは分散環境での動作を意図しています。これには、各々が単一のGPUを持つ複数のマシンがあるマルチノード、または単一のシステムに複数のGPUがあるマルチGPU、またはその両方の組み合わせなどが含まれます。

上記のコードを分散環境で動作するように変換するには、最初にいくつかのセットアップ構成を定義する必要があります。詳細については、Getting Started with DDP Tutorialを参照してください。

まず、setupcleanup関数を宣言する必要があります。これにより、すべての計算プロセスが通信できる処理グループが開かれます。

注意:このチュートリアルのこのセクションでは、これらはPythonスクリプトファイルとして送信されていると仮定する必要があります。後で、これを不要にするためのAccelerateを使用したランチャーについて説明します。

import os
import torch.distributed as dist

def setup(rank, world_size):
    "PyTorch Distributed Data Parallelismのためのプロセスグループと設定をセットアップします"
    os.environ["MASTER_ADDR"] = 'localhost'
    os.environ["MASTER_PORT"] = "12355"

    # プロセスグループを初期化します
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    "分散環境をクリーンアップします"
    dist.destroy_process_group()

最後のパズルのピースは、データとモデルを別のGPUにどのように送信するかです。

これはDistributedDataParallelモジュールが登場する場所です。それはモデルを各GPUにコピーし、loss.backward()が呼び出されるとバックプロパゲーションが実行され、モデルのこれらのコピー全体で得られる勾配が平均/縮小されます。これにより、各デバイスがオプティマイザのステップ後に同じ重みを持つことが保証されます。

以下は、この機能を持つトレーニングセットアップの例です。

注意:ここでのrankは、現在のGPUの全体的なランクを他のすべての利用可能なGPUと比較したもので、つまりランクは0 -> n-1です

from torch.nn.parallel import DistributedDataParallel as DDP

def train(model, rank, world_size):
    setup(rank, world_size)
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])
    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
    # 1エポックのトレーニング
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    cleanup()

勾配を正しく計算するために、オプティマイザは特定のデバイス上のモデルに基づいて宣言する必要があります(つまり、modelではなくddp_model)。

最後に、PyTorchには便利なtorchrunコマンドラインモジュールがあり、スクリプトを実行する際に役立ちます。使用するノードの数と実行するスクリプトを渡すだけで設定できます。

torchrun --nproc_per_nodes=2 --nnodes=1 example_script.py

上記のコマンドは、単一のマシン上に存在する2つのGPUでトレーニングスクリプトを実行し、これはPyTorchで分散トレーニングのみを行うための基本的な手法です。

さて、Accelerateについて話してみましょう。これは、このプロセスを大幅に変更することなく、上記で行ったことを実行できるように設計されたライブラリです。さらに、Accelerateに固有のデータパイプラインもコードのパフォーマンスを向上させることができます。

🤗 Accelerate

Accelerateは、上記で実行したすべてのコードを大きく変更することなく実行できるようにするためのライブラリです。さらに、Accelerateに組み込まれたデータパイプラインは、コードのパフォーマンスも向上させることができます。

まず、上記で実行したすべてのコードを1つの関数にまとめて、違いを視覚化するのに役立てましょう:

def train_ddp(rank, world_size):
    setup(rank, world_size)
    # データローダーを構築する
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307), (0.3081))
    ])

    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)

    train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
    test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

    # モデルを構築する
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    # オプティマイザを構築する
    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)

    # 1エポックのトレーニング
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    
    # 評価する
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

次に、どのようにAccelerateが助けるかについて話しましょう。上記のコードにはいくつかの問題があります:

  1. n個のデータローダーが各デバイスに基づいて作成され、プッシュされるため、やや効率が悪いです。
  2. このコードはマルチGPUのみで動作するため、単一ノードで再度実行するか、TPUで実行するために特別な注意が必要です。

Accelerateは、Acceleratorクラスを介してこれをサポートします。これにより、以下のように単一ノードとマルチノードを比較した場合、コードは3行のみが異なります:

def train_ddp_accelerate():
    accelerator = Accelerator()
    # データローダーの構築
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307), (0.3081))
    ])

    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)

    train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
    test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

    # モデルの構築
    model = BasicModel()

    # オプティマイザーの構築
    optimizer = optim.AdamW(model.parameters(), lr=1e-3)

    # `accelerator.prepare`を介してすべてを送信
    train_loader, test_loader, model, optimizer = accelerator.prepare(
        train_loader, test_loader, model, optimizer
    )

    # 単一エポックのトレーニング
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        output = model(data)
        loss = F.nll_loss(output, target)
        accelerator.backward(loss)
        optimizer.step()
        optimizer.zero_grad()
    
    # 評価
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

これにより、PyTorchのトレーニングループは、Acceleratorオブジェクトのおかげで、どんな分散環境でも実行できるようになりました。このコードは、torchrun CLIまたはAccelerateの独自のCLIインターフェースであるaccelerate launchを介して引き続き起動できます。

その結果、Accelerateを使用して分散トレーニングを行うことは、できるだけ基本的なPyTorchコードを変更せずに簡単になりました。

以前にも述べたように、Accelerateはデータローダーを効率的にします。これは、トレーニング中にパートごとにバッチを自動的に異なるデバイスに送信するカスタムサンプラーを介して行われ、構成によっては一度にメモリに4つのコピーの代わりにデータの単一のコピーが既知になるようにします。また、元のデータセットの完全なコピーはメモリ全体に1つだけ存在します。トレーニングに使用されるすべてのノードの間でこのデータセットのサブセットが分割されるため、メモリ使用量が爆発的に増えずに、より大きなデータセットでトレーニングを行うことができます。

notebook_launcherの使用

以前にも述べたように、Jupyter Notebookから直接分散コードを開始できます。これは、Accelerateのnotebook_launcherユーティリティによって実現できます。これにより、Jupyter Notebook内のコードに基づいてマルチGPUトレーニングを開始できます。

使用方法は、ランチャーをインポートするだけです:

from accelerate import notebook_launcher

そして、先ほど宣言したトレーニング関数、渡す引数、および使用するプロセスの数(たとえばTPUの場合は8、2つのGPUの場合は2など)を指定します。上記の両方のトレーニング関数を実行できますが、注意点として、単一の起動後に別の起動を生成する前にインスタンスを再起動する必要があることです。

notebook_launcher(train_ddp, args=(), num_processes=2)

または:

notebook_launcher(train_accelerate_ddp, args=(), num_processes=2)

🤗 Trainerの使用

最後に、最も高レベルのAPIであるHugging Face Trainerに到着します。

これは、ユーザーが何もする必要がない状態で分散システムでトレーニングできるだけのトレーニングを含んでいます。

まず、トレーナーをインポートする必要があります:

from transformers import Trainer

次に、TrainingArgumentsを定義して通常のハイパーパラメータを制御します。トレーナーは辞書形式でも動作するため、カスタムのcollate関数を作成する必要があります。

最後に、トレーナーをサブクラス化し、独自のcompute_lossを書きます。

その後、このコードはトレーニングコードを一切書かずに分散環境でも動作します!

from transformers import Trainer, TrainingArguments

model = BasicNet()

training_args = TrainingArguments(
    "basic-trainer",
    per_device_train_batch_size=64,
    per_device_eval_batch_size=64,
    num_train_epochs=1,
    evaluation_strategy="epoch",
    remove_unused_columns=False
)

def collate_fn(examples):
    pixel_values = torch.stack([example[0] for example in examples])
    labels = torch.tensor([example[1] for example in examples])
    return {"x":pixel_values, "labels":labels}

class MyTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        outputs = model(inputs["x"])
        target = inputs["labels"]
        loss = F.nll_loss(outputs, target)
        return (loss, outputs) if return_outputs else loss

trainer = MyTrainer(
    model,
    training_args,
    train_dataset=train_dset,
    eval_dataset=test_dset,
    data_collator=collate_fn,
)

trainer.train()

    ***** トレーニングを実行 *****
      例の数 = 60000
      エポック数 = 1
      デバイスごとのバッチサイズ = 64
      並列、分散、蓄積の合計トレーニングバッチサイズ = 64
      勾配蓄積ステップ数 = 1
      最適化ステップの合計数 = 938

notebook_launcherの上記の例と同様に、すべてをトレーニング関数にまとめて再度実行できます:

def train_trainer_ddp():
    model = BasicNet()

    training_args = TrainingArguments(
        "basic-trainer",
        per_device_train_batch_size=64,
        per_device_eval_batch_size=64,
        num_train_epochs=1,
        evaluation_strategy="epoch",
        remove_unused_columns=False
    )

    def collate_fn(examples):
        pixel_values = torch.stack([example[0] for example in examples])
        labels = torch.tensor([example[1] for example in examples])
        return {"x":pixel_values, "labels":labels}

    class MyTrainer(Trainer):
        def compute_loss(self, model, inputs, return_outputs=False):
            outputs = model(inputs["x"])
            target = inputs["labels"]
            loss = F.nll_loss(outputs, target)
            return (loss, outputs) if return_outputs else loss

    trainer = MyTrainer(
        model,
        training_args,
        train_dataset=train_dset,
        eval_dataset=test_dset,
        data_collator=collate_fn,
    )

    trainer.train()

notebook_launcher(train_trainer_ddp, args=(), num_processes=2)

リソース

PyTorch Distributed Data Parallelismについて詳しくは、こちらのドキュメントを参照してください

🤗 Accelerateについて詳しくは、こちらのドキュメントを参照してください

🤗 Transformersについて詳しくは、こちらのドキュメントを参照してください

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more

人工知能

エンテラソリューションズの創設者兼CEO、スティーブン・デアンジェリス- インタビューシリーズ

スティーブン・デアンジェリスは、エンタラソリューションズの創設者兼CEOであり、自律的な意思決定科学(ADS®)技術を用いて...

人工知能

「クリス・サレンス氏、CentralReachのCEO - インタビューシリーズ」

クリス・サレンズはCentralReachの最高経営責任者であり、同社を率いて、自閉症や関連する障害を持つ人々のために優れたクラ...

機械学習

3つの質問:大規模言語モデルについて、Jacob Andreasに聞く

CSAILの科学者は、最新の機械学習モデルを通じた自然言語処理の研究と、言語が他の種類の人工知能をどのように高めるかの調査...

データサイエンス

「3つの質問:ロボットの認識とマッピングの研磨」

MIT LIDSのLuca CarloneさんとJonathan Howさんは、将来のロボットが環境をどのように知覚し、相互作用するかについて議論し...

人工知能

「トリントの創設者兼CEO、ジェフ・コフマンへのインタビューシリーズ」

ジェフ・コーフマンは、ABC、CBS、CBCニュースで30年のキャリアを持った後、Trintの創設者兼CEOとなりましたジェフは手作業の...

人工知能

Aaron Lee、Smith.aiの共同設立者兼CEO - インタビューシリーズ

アーロン・リーさんは、Smith.aiの共同創業者兼CEOであり、AIと人間の知性を組み合わせて、24時間365日の顧客エンゲージメン...