「PyTorchにおける複数GPUトレーニングとそれに代わる勾配蓄積」
PyTorchの複数GPUトレーニングと勾配蓄積
コードと理論
この記事では、まず「データ並列化(DP)」と「分散データ並列化(DDP)」のアルゴリズムの違いを見ていきます。次に、「勾配蓄積(GA)」とは何かを説明し、最後にDDPとGAがPyTorchでどのように実装され、同じ結果につながるのかを示します。
はじめに
ディープニューラルネットワーク(DNN)を訓練する際、重要なハイパーパラメータの1つはバッチサイズです。通常、バッチサイズは大きすぎないようにする必要があります。なぜなら、ネットワークが過学習する可能性があるからです。しかし、収束が遅くなるため、小さすぎることも避ける必要があります。高解像度の画像や他の多くのメモリを使用するデータを扱う場合、現在は大部分の大規模DNNモデルの訓練がGPU上で行われることを前提としています。利用可能なGPUのメモリによっては、小さなバッチサイズを適合させることが問題になることがあります。先述のように、小さなバッチサイズは収束が遅くなるため、効果的なバッチサイズを増やすためには3つの主要な方法があります。
- 複数の小さなGPUを使用してモデルを並列に実行する – DPまたはDDPアルゴリズム
- 大きなGPUを使用する(高価)
- 複数のステップで勾配を蓄積する
次に、1.と3.について詳しく見てみましょう。大きなGPUを使用できる幸運がある場合は、残りの部分をスキップして、PyTorchでDDPがどのように実装されているかを確認することができます。
効果的なバッチサイズとして30が必要であり、各GPUには10のデータポイント(ミニバッチサイズ)しか収まらないとします。次の2つの選択肢があります:データ並列化または分散データ並列化:
- 「夢の彫刻:DreamTimeは、テキストから3Dコンテンツ生成の最適化戦略を改善するAIモデルです」
- 「大規模言語モデルのランドスケープをナビゲートする」
- 「2023年の機械学習のアンラーニング:現在の状況と将来の方向性」
データ並列化(DP)
まず、マスターGPUを定義します。その後、以下の手順を実行します:
- 10のデータポイント(ミニバッチ)とモデルのレプリカをマスターGPUから他の2つのGPUに移動します
- 各GPUで順方向のパスを実行し、出力をマスターGPUに渡します
- マスターGPUで総合損失を計算し、その後、各GPUに損失を送信してパラメータの勾配を計算します
- 勾配をマスターGPUに戻します(これはすべての訓練例の勾配の平均です)、その後、それらを合計して30のバッチ全体の平均勾配を取得します
- マスターGPU上でパラメータを更新し、次のイテレーションのためにこれらの更新を他の2つのGPUに送信します
このプロセスにはいくつかの問題点と非効率性があります:
- データは他のGPUに分割される前にマスターGPUから渡されます。また、総合損失の計算とパラメータの更新はマスターGPUで行われるため、マスターGPUは他のGPUよりも多く利用されます
- 各イテレーションで他のGPU上のモデルを同期する必要があるため、トレーニングが遅くなる可能性があります
分散データ並列(DDP)
分散データ並列は、データ並列アルゴリズムの非効率性を改善するために導入されました。前と同じ設定(バッチごとに30のデータポイントと3つのGPU)を持っていますが、以下の違いがあります:
- マスターGPUを持っていません
- マスターGPUがないため、データをディスク/ RAMから直接非重複方式で各GPUにロードします。これはDistributedSamplerが行います。内部では、ローカルランク(GPU ID)を使用してデータをGPU間に分散します。つまり、30のデータポイントがある場合、最初のGPUは[0、3、6、…、27]のポイントを使用し、2番目のGPUは[1、4、7、..、28]、3番目のGPUは[2、5、8、..、29]を使用します
n_gpu = 3for i in range(n_gpu): print(np.arange(30)[i:30:n_gpu])
3. 順方向のパス、損失計算、および逆方向のパスはそれぞれのGPUで独立に実行され、勾配は非同期に縮小され、平均が計算され、その後、すべてのGPUにわたって更新が行われます
DDPがDPよりも優れている利点のため、現在はDDPの使用が好まれており、したがってDDPの実装のみを表示します。
勾配の蓄積
1つのGPUしか持っていない場合でも、より大きなバッチサイズを使用したい場合、勾配をある数のステップで蓄積するという代替オプションがあります。これにより、効果的なバッチサイズを増やすために、一定数のミニバッチの勾配を蓄積することができます。上記の例では、10個のデータポイントの勾配を3回の反復で蓄積することで、効果的なバッチサイズが30であるDDPトレーニングと同じ結果を得ることができます。
DDPプロセスコード
次に、異なるプロセス間で通信を可能にするプロセスグループを初期化する必要があります。int(os.environ[“LOCAL_RANK”])を使用して、特定のプロセスで使用されているGPUを取得します。
init_process_group(backend="nccl")device = int(os.environ["LOCAL_RANK"])torch.cuda.set_device(device)
次に、マルチGPUトレーニングを可能にするDistributedDataParallelでモデルをラップする必要があります。
model = NeuralNetwork(args.data_size) model = model.to(device) if args.distributed: model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[device])
最後の部分は、DDPセクションで説明したDistributedSamplerを定義することです。
sampler = torch.utils.data.DistributedSampler(dataset)
トレーニングの残りの部分は同じままです – この記事の最後に完全なコードを含めます。
勾配の蓄積コード
バックプロパゲーションが発生すると、loss.backward()を呼び出した後、勾配はそれぞれのテンソルに格納されます。実際の更新は、optimizer.step()が呼び出されたときに行われ、その後にoptimizer.zero_grad()が呼び出され、次の反復のバックプロパゲーションとパラメータの更新を実行するために、テンソルに格納された勾配がゼロに設定されます。したがって、勾配を蓄積するためには、loss.backward()を必要な勾配蓄積回数だけ呼び出す必要がありますが、勾配をゼロに設定せずに複数の反復で蓄積されるようにします。そして、それらを平均して蓄積された勾配反復の平均勾配を得ます (loss = loss/ACC_STEPS)。その後、optimizer.step()を呼び出し、勾配をゼロに設定して次の勾配の蓄積を開始します。コードでは次のようになります:
ACC_STEPS = dist.get_world_size() # == number of GPUs# データを反復処理するfor i, (idxs, row) in enumerate(loader): loss = model(row) # accumulation stepsに応じてlossをスケーリングする loss = loss/ACC_STEPS loss.backward() # ACC_STEPSに対して勾配を蓄積し続ける if ((i + 1) % ACC_STEPS == 0): optimizer.step() optimizer.zero_grad()
完全なコード
import osos.environ["CUDA_VISIBLE_DEVICES"] = "0,1"print(os.environ["CUDA_VISIBLE_DEVICES"])import torchimport torch.nn as nnfrom torch.utils.data import DataLoader, Dataset, Samplerimport argparseimport torch.optim as optim import numpy as npimport randomimport torch.backends.cudnn as cudnnimport torch.nn.functional as Ffrom torch.distributed import init_process_groupimport torch.distributed as distclass data_set(Dataset): def __init__(self, df): self.df = df def __len__(self): return len(self.df) def __getitem__(self, index): sample = self.df[index] return index, sample class NeuralNetwork(nn.Module): def __init__(self, dsize): super().__init__() self.linear = nn.Linear(dsize, 1, bias=False) self.linear.weight.data[:] = 1. def forward(self, x): x = self.linear(x) loss = x.sum() return loss class DummySampler(Sampler): def __init__(self, data, batch_size, n_gpus=2): self.num_samples = len(data) self.b_size = batch_size self.n_gpus = n_gpus def __iter__(self): ids = [] for i in range(0, self.num_samples, self.b_size * self.n_gpus): ids.append(np.arange(self.num_samples)[i: i + self.b_size*self.n_gpus :self.n_gpus]) ids.append(np.arange(self.num_samples)[i+1: (i+1) + self.b_size*self.n_gpus :self.n_gpus]) return iter(np.concatenate(ids)) def __len__(self): # print ('\tcalling Sampler:__len__') return self.num_samples def main(args=None): d_size = args.data_size if args.distributed: init_process_group(backend="nccl") device = int(os.environ["LOCAL_RANK"]) torch.cuda.set_device(device) else: device = "cuda:0" # fix the seed for reproducibility seed = args.seed torch.manual_seed(seed) np.random.seed(seed) random.seed(seed) cudnn.benchmark = True # generate data data = torch.rand(d_size, d_size) model = NeuralNetwork(args.data_size) model = model.to(device) if args.distributed: model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[device]) optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9) dataset = data_set(data) if args.distributed: sampler = torch.utils.data.DistributedSampler(dataset, shuffle=False) else: # we define `DummySampler` for exact reproducibility with `DistributedSampler` # which splits the data as described in the article. sampler = DummySampler(dataset, args.batch_size) loader = DataLoader( dataset, batch_size=args.batch_size, num_workers=0, pin_memory=True, sampler=sampler, shuffle=False, collate_fn=None, ) if not args.distributed: grads = [] # ACC_STEPSはGPUと同じで、損失をこの数で除算して複数のGPUから得られる勾配と同じ勾配を得る必要がある ACC_STEPS = args.acc_steps optimizer.zero_grad() for epoch in range(args.epochs): if args.distributed: loader.sampler.set_epoch(epoch) for i, (idxs, row) in enumerate(loader): if args.distributed: optimizer.zero_grad() row = row.to(device, non_blocking=True) if args.distributed: rank = dist.get_rank() == 0 else: rank = True loss = model(row) if args.distributed: # `DistributedDataParallel`により、自動的に勾配を平均化する loss.backward() else: # accumulation stepsに応じてlossをスケーリングする loss = loss/ACC_STEPS loss.backward() if i == 0 and rank: print(f"Epoch {epoch} {100 * '='}") if not args.distributed: if (i + 1) % ACC_STEPS == 0: # ACC_STEPSが終わった時点でのみステップする # エポック全体のgradsを蓄積する optimizer.step() optimizer.zero_grad() else: optimizer.step() if not args.distributed and args.verbose: print(100 * "=") print("Model weights : ", model.linear.weight) print(100 * "=") elif args.distributed and args.verbose and rank: print(100 * "=") print("Model weights : ", model.module.linear.weight) print(100 * "=") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--distributed', actionこれで、これらの2つのスクリプトを実行すると、次のようになります:
- python3 ddp.py — epochs 2 — batch_size 4 — data_size 8 — verbose — acc_steps 2
- torchrun — standalone — nproc_per_node=2 ddp.py — epochs 2 — distributed — batch_size 4 — data_size 8 — verbose
私たちは正確に同じ最終モデルパラメータを得ることがわかります:
# Gradient AccumulatorModel weights から: Parameter containing:tensor([[0.9472, 0.9440, 0.9527, 0.9687, 0.9570, 0.9343, 0.9411, 0.9186]], device='cuda:0', requires_grad=True)# DDP から:Model weights は次のようになります: Parameter containing:tensor([[0.9472, 0.9440, 0.9527, 0.9687, 0.9570, 0.9343, 0.9411, 0.9186]], device='cuda:0', requires_grad=True)
結論
この記事では、DP、DDPアルゴリズム、および勾配蓄積の背後にある直感を簡単に紹介し、効果的なバッチサイズを複数のGPUを使用せずに増やす方法を示しました。重要なことは、最終的な結果が同じであっても、複数のGPUを使用したトレーニングの方が勾配蓄積よりもはるかに速いため、トレーニングのスピードが重要な場合は、複数のGPUを使用するしかないということです。
We will continue to update VoAGI; if you have any questions or suggestions, please contact us!
Was this article helpful?
93 out of 132 found this helpful
Related articles