「分散データパラレル(DDP)の包括的ガイド」

「美容とファッションの豊富な知識を持つ美容とファッションの専門家による、鮮やかで活気のある記事をしばしば執筆しています」

分散データパラレル(DDP)を使用してモデルのトレーニングを高速化する総合的なガイド

Image by Author

イントロダクション

みなさん、こんにちは!私はメタの研究科学者であるフランソワです。本新しいチュートリアルは「Awesome AI Tutorials」シリーズの一部です。

このチュートリアルでは、DDPというよく知られた技術について解説します。DDPを使用して複数のGPUでモデルをトレーニングする方法を理解していきましょう。

エンジニアリングスクールでの日々、私はGoogle ColabのGPUを使用してトレーニングを行っていました。しかし、企業の世界では状況が異なります。AIに重点を置いた組織に所属している場合、特にテックジャイアント内であれば、使用できるGPUクラスタの資源が豊富にあります。

このセッションでは、複数のGPUの力を活用し、素早く効率的なトレーニングを実現するための知識を提供します。しかも、思っている以上に簡単です!ただし、PyTorchやそのコアコンポーネント(データセット、データローダー、オプティマイザ、CUDA、トレーニングループなど)についての理解があることをおすすめします。

最初は、DDPを複雑でほぼ手に入れられないツールと見なし、必要なインフラストラクチャを設定するために大規模なチームが必要だと考えていました。しかし、私は保証します、DDPは直感的で簡潔であり、実装にはわずかな数行のコードしか必要ありません。さあ、この啓蒙的な旅に一緒に出発しましょう!

DDPの高レベルの理解

分散データパラレル(DDP)は、分解して考えれば簡単なコンセプトです。4つのGPUがあるクラスタを想像してください。DDPでは、同じモデルが各GPUにロードされ、オプティマイザも含まれています。主な違いは、データの分散方法です。

DDP, Image taken from PyTorch tutorial

深層学習に詳しい方なら、データローダーというツールを思い出すでしょう。データセットを異なるバッチに分割するツールです。通常、全データセットをこれらのバッチに分割し、各バッチの計算後にモデルを更新します。

さらに詳しく見ると、DDPはこのプロセスを洗練させ、各バッチを「サブバッチ」と呼ばれるものに分割します。基本的に、各モデルのレプリカは主バッチの一部を処理し、各GPUごとに異なる勾配計算が行われます。

DDPでは、このバッチを「DistributedSampler」というツールを使用してサブバッチに分割します。次の図に示すように:

DDP, Image taken from PyTorch tutorial

各サブバッチを個々のGPUに分配すると、各GPUが固有の勾配を計算します。

DDP, Image taken from PyTorch tutorial
  • ここで、DDPの魔法が登場します。モデルパラメータを更新する前に、各GPUで計算された勾配を集計し、全データバッチにわたって計算された平均勾配を各GPUに持つようにする必要があります。
  • これは、すべてのGPUから勾配を取得し、それらを平均化することで行われます。たとえば、4つのGPUがある場合、特定のモデルパラメータの平均勾配は、そのパラメータの各GPUにおける勾配の合計を4で割ったものです。
  • DDPは、効率的に通信し、GPU間で勾配を平均化するためのNCCLまたはGlooバックエンド(NCCLはNVIDIA GPUに最適化されており、Glooはより一般的なもの)を使用します。
DDP、PyTorchチュートリアルからの画像

用語、ノードとランクの用語集

コードに入る前に、頻繁に使用する語彙を理解することが重要です。以下の用語を解説しましょう:

  • ノード: ノードは、複数のGPUを備えた強力なマシンと考えてください。クラスターというと、単なるGPUの束ではありません。代わりに、これらはグループまたは「ノード」と呼ばれるものに編成されます。例えば、ノードには8つのGPUがあります。
  • マスターノード: マルチノード環境では、通常1つのノードが指揮を執ります。この「マスターノード」は、同期、モデルのコピーの開始、モデルのローディングの監視、ログエントリの管理などのタスクを担当します。マスターノードがないと、各GPUは独自のログを生成し、混乱が生じます。
  • ローカルランク: 「ランク」とはIDまたは位置と思ってください。ローカルランクは、その特定のノード(またはマシン)内のGPUの位置またはIDを指します。それは特定のマシンに制限されているため、「ローカル」という名前がついています。
  • グローバルランク: より広い視点で考えると、グローバルランクは利用可能なすべてのノードにまたがるGPUを識別します。これはマシンに関係なく一意の識別子です。
  • ワールドサイズ: 本質的には、これはすべてのノードの利用可能なGPUの数の合計です。単純に言えば、ノードの数と各ノードのGPUの数をかけたものです。

具体的に説明すると、1台のマシンで作業している場合、ローカルランクはグローバルランクと同じです。

画像でこれを明確に説明するために:

ローカルランク、チュートリアルからの画像
ローカルランク、チュートリアルからの画像

DDPの制約を理解する

Distributed Data Parallel (DDP) は多くの深層学習ワークフローで革新的な役割を果たしていますが、その境界を理解することが重要です。

DDPの制約の肝はメモリ消費にあります。DDPでは、各GPUはモデルの複製、オプティマイザ、およびそれに対応するデータのバッチを読み込みます。 GPUのメモリは通常、数GBから高い端のGPUでは80GBに及びます。

小さなモデルの場合、これは問題ありません。ただし、大規模な言語モデル(LLM)やGPTに類似したアーキテクチャなどの領域に進出する場合、単一のGPUのメモリの制約が不十分かもしれません。

コンピュータビジョンでは、軽量モデルがたくさんありますが、3D画像や物体検出のタスクなど、バッチサイズを増やす場合に課題が発生します。

Fully Sharded Data Parallel (FSDP)が登場します。この方法は、データを配布するだけでなく、モデルとオプティマイザの状態をGPUメモリに分散させることで、DDPの利点を拡張します。この手法は有利に聞こえますが、FSDPはGPU間の通信を増加させ、トレーニングを遅くする可能性もあります。

まとめ:

  • モデルとそれに対応するバッチがGPUのメモリに十分収まる場合、スピードを考えるとDDPが最適です。
  • より大きなメモリを必要とする巨大なモデルの場合、速度を犠牲にしてでもメモリを増やすためにFSDPが適しています。

なぜDPよりもDDPを選ぶべきか?

PyTorchのウェブサイトを見ると、実際にはオプションがあります:DPとDDP。ただ、迷ったり混乱したりしないようにこれだけ言っておきます:ただ、DDPを使用してください。より速く、単一のノードに限定されません。

Pytorchのチュートリアルからの比較

コードの解説:

分散深層学習の実装は思ったよりも簡単です。魅力は、GPUの設定や勾配分配の微妙な点に悩まされることがないことにあります。

テンプレートとスクリプト全体はこちらにあります:

GitHub – FrancoisPorcher/awesome-ai-tutorials: データサイエンスのボスにしてくれるAIチュートリアルのベストコレクション!

データサイエンスのボスにしてくれるAIチュートリアルのベストコレクション! – GitHub …

github.com

これから実施する手順の概要をご紹介します:

  1. プロセスの初期化:マスターノードの指定、ポートの指定、world_sizeの設定が含まれます。
  2. 分散データローダーのセットアップ:このステップでは、各バッチを利用可能なGPUに均等に分割することが重要です。データが重ならないように確実に広がるようにします。
  3. モデルのトレーニング/テスト:基本的に、このステップは単一のGPUプロセスとほとんど変わりません。

1つのGPUと1つのノードでのトレーニング(ベースライン)

まず、データセットをロードし、モデルを作成して、単一のGPU上でエンドツーエンドでトレーニングするバニラのコードを定義しましょう。これが私たちの出発点となります:

import torchimport torch.nn.functional as Ffrom torch.utils.data import Dataset, DataLoaderfrom sklearn.datasets import load_winefrom sklearn.model_selection import train_test_splitfrom sklearn.preprocessing import StandardScalerimport numpy as npclass WineDataset(Dataset):    def __init__(self, data, targets):        self.data = data        self.targets = targets    def __len__(self):        return len(self.data)    def __getitem__(self, idx):        return torch.tensor(self.data[idx], dtype=torch.float), torch.tensor(self.targets[idx], dtype=torch.long)class SimpleNN(torch.nn.Module):    def __init__(self):        super(SimpleNN, self).__init__()        self.fc1 = torch.nn.Linear(13, 64)        self.fc2 = torch.nn.Linear(64, 3)    def forward(self, x):        x = F.relu(self.fc1(x))        x = self.fc2(x)        return xclass Trainer():    def __init__(self, model, train_data, optimizer, gpu_id, save_every):        self.model = model        self.train_data = train_data        self.optimizer = optimizer        self.gpu_id = gpu_id        self.save_every = save_every        self.losses = []    def _run_batch(self, source, targets):        self.optimizer.zero_grad()        output = self.model(source)        loss = F.cross_entropy(output, targets)        loss.backward()        self.optimizer.step()        return loss.item()    def _run_epoch(self, epoch):        total_loss = 0.0        num_batches = len(self.train_data)        for source, targets in self.train_data:            source = source.to(self.gpu_id)            targets = targets.to(self.gpu_id)            loss = self._run_batch(source, targets)            total_loss += loss        avg_loss = total_loss / num_batches        self.losses.append(avg_loss)        print(f"Epoch {epoch}, Loss: {avg_loss:.4f}")    def _save_checkpoint(self, epoch):        checkpoint = self.model.state_dict()        PATH = f"model_{epoch}.pt"        torch.save(checkpoint, PATH)        print(f"Epoch {epoch} | Model saved to {PATH}")    def train(self, max_epochs):        self.model.train()        for epoch in range(max_epochs):            self._run_epoch(epoch)            if epoch % self.save_every == 0:                self._save_checkpoint(epoch)def load_train_objs():    wine_data = load_wine()    X = wine_data.data    y = wine_data.target    # Normalize and split    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)    scaler = StandardScaler().fit(X_train)    X_train = scaler.transform(X_train)    X_test = scaler.transform(X_test)    train_set = WineDataset(X_train, y_train)    test_set = WineDataset(X_test, y_test)    print("データセットのサンプル:")    sample_data, sample_target = train_set[0]    print(f"データ:{sample_data}")    print(f"ターゲット:{sample_target}")    model = SimpleNN()    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)    return train_set, model, optimizerdef prepare_dataloader(dataset, batch_size):    return DataLoader(dataset, batch_size=batch_size, pin_memory=True, shuffle=True)def main(device, total_epochs, save_every, batch_size):    dataset, model, optimizer = load_train_objs()    train_data = prepare_dataloader(dataset, batch_size)    trainer = Trainer(model, train_data, optimizer, device, save_every)    trainer.train(total_epochs)main(device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu"), total_epochs=100, save_every=50, batch_size=32)

1ノードでの複数GPUトレーニング

次のステップで、単一ノードのすべてのGPUを使用します。

  1. 分散トレーニング用に必要なライブラリのインポート。
  2. 分散環境の初期化:(特にMASTER_ADDRMASTER_PORT
  3. DistributedDataParallelラッパーを使用してモデルをDDPでラップする。
  4. 分散方式でデータセットがGPU間で分割されるように、分散サンプラーを使用する。
  5. メイン関数をspawnして、マルチGPUトレーニングのために複数のプロセスを作成する。

必要なライブラリには以下が必要です:

import torch.multiprocessing as mpfrom torch.utils.data.distributed import DistributedSamplerfrom torch.nn.parallel import DistributedDataParallel as DDPfrom torch.distributed import init_process_group, destroy_process_groupimport os

次に、各プロセスを設定する必要があります。例えば、1ノードに8つのGPUがある場合、次の関数を8回呼び出します。各GPUごとに正しいlocal_rankで:

def ddp_setup(rank, world_size):    """    分散環境を設定します。  引数:  rank:現在のプロセスのランク。分散トレーニングの各プロセスの一意の識別子です。  world_size:分散トレーニングに参加するプロセスの総数。    """        # メインノードのアドレス。単一ノードトレーニングを行っているため、localhostに設定されています。    os.environ["MASTER_ADDR"] = "localhost"        # マスターノードがワーカーまたは他のプロセスからの通信を待機するポート。    os.environ["MASTER_PORT"] = "12355"        # プロセスグループの初期化。  # 'backend'は使用される通信バックエンドを指定し、"nccl"はGPUトレーニングに最適化されています。    init_process_group(backend="nccl", rank=rank, world_size=world_size)        # 現在のCUDAデバイスを指定されたデバイス(ランクで特定)に設定します。    # これにより、マルチGPUセットアップで各プロセスが異なるGPUを使用することが保証されます。    torch.cuda.set_device(rank)

関数の説明:

  • MASTER_ADDRは、マスター(またはランク0のプロセス)が実行されているマシンのホスト名です。ここではlocalhostです。
  • MASTER_PORT:マスターがワーカーまたは他のプロセスからの接続を待機するポートを指定します。12355は任意です。システムで他のサービスによって使用されていない未使用のポート番号を選択し、ファイアウォールのルールで許可されている限り、どの番号でも選ぶことができます。
  • torch.cuda.set_device(rank):各プロセスが対応するGPUを使用するようにします。

次に、Trainerクラスを少し変更する必要があります。単純にモデルをDDP関数でラップするだけです:

class Trainer():    def __init__(self, model, train_data, optimizer, gpu_id, save_every):        self.model = model.to(gpu_id)        self.train_data = train_data        self.optimizer = optimizer        self.gpu_id = gpu_id        self.save_every = save_every        self.losses = []                # これが変更点です        self.model = DDP(self.model, device_ids=[gpu_id])

残りのTrainerクラスは同じです、素晴らしいですね!

次に、データローダーを変更する必要があります。なぜなら、バッチを各GPUごとに分割する必要があるからです:

def prepare_dataloader(dataset: Dataset, batch_size: int):    return DataLoader(        dataset,        batch_size=batch_size,        pin_memory=True,        shuffle=False,        sampler=DistributedSampler(dataset)    )

これで、main関数を変更できます。 この関数は各プロセスに対して呼び出されるため(私たちの場合は8回)、次のようになります:

def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):    """    分散データパラレル(DDP)セットアップのメイントレーニング関数。      引数:    rank(int):現在のプロセスのランク(0 <= rank < world_size)。各プロセスにはユニークなランクが割り当てられます。    world_size(int):分散トレーニングに関与するプロセスの総数。    save_every(int):モデルチェックポイントの保存頻度(エポック単位)。    total_epochs(int):トレーニングの総エポック数。    batch_size(int):1回のイテレーションで処理されるサンプルの数(順方向および逆方向のパス)。    """        # マスターアドレス、ポート、およびバックエンドを含む分散環境の設定を行います。    ddp_setup(rank, world_size)        # 必要なトレーニングオブジェクト(データセット、モデル、オプティマイザ)をロードします。    dataset, model, optimizer = load_train_objs()        # 分散トレーニング用にデータローダーを準備します。データセットをプロセスごとに分割し、シャッフルを処理します。    train_data = prepare_dataloader(dataset, batch_size)        # ロードされたモデル、データ、およびその他の設定でトレーナーインスタンスを初期化します。    trainer = Trainer(model, train_data, optimizer, rank, save_every)        # 指定されたエポック数でモデルをトレーニングします。    trainer.train(total_epochs)        # トレーニングが完了したら、分散環境をクリーンアップします。    destroy_process_group()

最後に、スクリプトを実行する際には、8つのプロセスを起動する必要があります。これはmp.spawn()関数を使用して行われます:

if __name__ == "__main__":    import argparse    parser = argparse.ArgumentParser(description='simple distributed training job')    parser.add_argument('total_epochs', type=int, help='モデルのトレーニングに必要な総エポック数')    parser.add_argument('save_every', type=int, help='スナップショットを保存する頻度')    parser.add_argument('--batch_size', default=32, type=int, help='各デバイスの入力バッチサイズ(デフォルト:32)')    args = parser.parse_args()        world_size = torch.cuda.device_count()    mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)

究極のステップ:複数のノードでのトレーニング

これまでお疲れ様でした!究極のステップは、異なるノードで利用可能なすべてのGPUを使用できるようにすることです。しかし、これまでの内容を理解しているなら、これは非常に簡単です。

複数のノードでスケーリングする場合の主な違いは、local_rankからglobal_rankへの切り替えです。これは、各プロセスが一意の識別子を必要とするため、重要です。たとえば、2つのノードで作業しており、それぞれ8つのGPUを持っている場合、プロセス0と9はどちらもlocal_rank 0になります。

global_rankは以下の直感的な式で与えられます:

global_rank = node_rank * world_size_per_node + local_rank

それでは、まずddp_setup関数を修正しましょう:

def ddp_setup(local_rank, world_size_per_node, node_rank):    os.environ["MASTER_ADDR"] = "MASTER_NODE_IP"  # <-- マスターノードのIPに置き換えてください    os.environ["MASTER_PORT"] = "12355"      global_rank = node_rank * world_size_per_node + local_rank    init_process_group(backend="nccl", rank=global_rank, world_size=world_size_per_node*torch.cuda.device_count())    torch.cuda.set_device(local_rank)

次に、main関数を調整する必要があります。引数にwold_size_per_nodeを追加しました:

def main(local_rank: int, world_size_per_node: int, save_every: int, total_epochs: int, batch_size: int, node_rank: int):    ddp_setup(local_rank, world_size_per_node, node_rank)    # ...(main関数の残りの部分)

最後に、mp.spawn()関数にworld_size_per_nodeを追加して調整します:

if __name__ == "__main__":    import argparse    parser = argparse.ArgumentParser(description='simple distributed training job')    parser.add_argument('total_epochs', type=int, help='モデルのトレーニングに必要な総エポック数')    parser.add_argument('save_every', type=int, help='スナップショットを保存する頻度')    parser.add_argument('--batch_size', default=32, type=int, help='各デバイスの入力バッチサイズ(デフォルト:32)')    parser.add_argument('--node_rank', default=0, type=int, help='マルチノードトレーニング時のノードのランク')    args = parser.parse_args()    world_size_per_node = torch.cuda.device_count()    mp.spawn(main, args=(world_size_per_node, args.save_every, args.total_epochs, args.batch_size, args.node_rank), nprocs=world_size_per_node)

クラスタの使用(SLURM)

トレーニングをクラスタに送信する準備が整いました。非常に簡単です。単に所望のノード数を指定するだけです。

以下はSLURMスクリプトのテンプレートです:

#!/bin/bash#SBATCH --job-name=DDPTraining       # ジョブの名前#SBATCH --nodes=$1                   # ユーザーが指定したノード数#SBATCH --ntasks-per-node=1          # ノードごとに1つのタスクのみを実行することを保証#SBATCH --cpus-per-task=1            # タスクごとのCPUコア数#SBATCH --gres=gpu:1                 # ノードごとのGPU数#SBATCH --time=01:00:00              # 時間制限(hours:minutes:seconds)(この例では1時間)#SBATCH --mem=4GB                    # GPUごとのメモリ制限#SBATCH --output=training_%j.log     # 出力とエラーログの名前(%jはジョブIDに展開されます)#SBATCH --partition=gpu              # パーティションまたはキューを指定するpython3 your_python_script.py --total_epochs 10 --save_every 2 --batch_size 32 --node_rank $SLURM_NODEID

そして、ターミナルから以下のコマンドでトレーニングを開始できます。

sbatch train_net.sh 2  # 2つのノードを使用する場合

おめでとうございます、成功しました!

読んでくれてありがとうございます!出かける前に:

より素晴らしいチュートリアルについては、私のAIチュートリアルのコンパイルをGitHubで確認してください。

GitHub – FrancoisPorcher/awesome-ai-tutorials: AIチュートリアルの最高のコレクションであなたを一流のプロにします

データサイエンスのボスにするためのAIチュートリアルの最高のコレクション!- GitHub …

github.com

私の記事をメールで受け取るべきです。ここで購読してください。

VoAGIのプレミアム記事にアクセスしたい場合は、月額$5の会員登録が必要です。私のリンクを使って登録すると、追加費用なしで料金の一部をサポートしてくれます。

この記事が有益で参考になった場合は、もっと詳細なコンテンツを提供するために私をフォローして拍手を残すことを検討してください!あなたのサポートは、私たちの集合的な理解を助けるためのコンテンツの制作を続けるのに役立ちます。

参考文献

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more