「Amazon SageMaker Pipelinesを使用した機械学習ワークフローの構築のためのベストプラクティスとデザインパターン」

Best practices and design patterns for building machine learning workflows using Amazon SageMaker Pipelines

Amazon SageMakerパイプラインは、機械学習(ML)ワークフローの構築とオーケストレーションを行うための完全に管理されたAWSサービスです。SageMakerパイプラインは、データのロード、データの変換、トレーニング、調整、展開など、MLワークフローのさまざまなステップをオーケストレーションする機能をMLアプリケーション開発者に提供します。SageMakerパイプラインを使用すると、SageMakerでMLジョブをオーケストレーションできます。また、AWS Lambda関数、Amazon EMRジョブなどのリソースを使用することもできます。これにより、MLワークフローの特定の要件に合わせたカスタマイズ可能で再現可能なパイプラインを構築することができます。

この記事では、SageMakerパイプラインの価値を最大化し、開発体験をシームレスにするためのベストプラクティスをいくつか紹介します。また、SageMakerパイプラインの構築時に一般的なデザインシナリオとパターンについても説明し、それらを解決するための例も提供します。

SageMakerパイプラインのベストプラクティス

このセクションでは、SageMakerパイプラインを使用してワークフローを設計する際に遵守できるいくつかのベストプラクティスについて説明します。これらを採用することで、開発プロセスが改善され、SageMakerパイプラインの操作管理が効率化されます。

パイプラインの遅延読み込みにはパイプラインセッションを使用する

パイプラインセッションを使用すると、パイプラインリソースの遅延初期化(パイプラインランタイムまでジョブは開始されません)が可能になります。 PipelineSessionコンテキストはSageMakerセッションを継承し、トレーニングジョブ、エンドポイント、Amazon Simple Storage Service(Amazon S3)の入力データセットなど、他のSageMakerエンティティとリソースと対話するための便利なメソッドを実装しています。SageMakerパイプラインを定義する際には、通常のSageMakerセッションではなくPipelineSessionを使用する必要があります:

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
sklearn_processor = SKLearnProcessor(
    framework_version=’0.20.0’,
    instance_type=’ml.m5.xlarge’,
    instance_count=1,
    base_job_name="sklearn-abalone-process",
    role=role,
    sagemaker_session=pipeline_session,
)

開発中のコスト効果的で迅速なイテレーションのためにローカルモードでパイプラインを実行する

LocalPipelineSessionコンテキストを使用してパイプラインをローカルモードで実行することができます。このモードでは、パイプラインとジョブはSageMakerの管理リソースではなく、ローカルマシンのリソースを使用してローカルで実行されます。ローカルモードは、データのサブセットを使用してパイプラインコードのイテレーションを行うための費用効果の高い方法を提供します。パイプラインがローカルでテストされた後、PipelineSessionコンテキストを使用してスケーリングして実行することができます。

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline_context import LocalPipelineSession
local_pipeline_session = LocalPipelineSession()
role = sagemaker.get_execution_role()
sklearn_processor = SKLearnProcessor(
    framework_version=’0.20.0’,
    instance_type=’ml.m5.xlarge',
    instance_count=1,
    base_job_name="sklearn-abalone-process",
    role=role,
    sagemaker_session=local_pipeline_session,
)

バージョニングを使用してSageMakerパイプラインを管理する

アーティファクトとパイプライン定義のバージョニングは、開発ライフサイクルで一般的な要件です。パイプラインオブジェクトに一意の接頭辞や接尾辞(タイムスタンプなど)を使用してパイプラインの複数のバージョンを作成することができます。以下に示すコードを参照してください:

from sagemaker.workflow.pipeline_context import PipelineSession
import time

current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
pipeline_name = "pipeline_" + current_time
pipeline_session = PipelineSession()
pipeline = Pipeline(
    name=pipeline_name,
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=pipeline_session,
)

SageMakerパイプラインの実行をSageMaker Experimentsと統合して整理・追跡する

SageMakerパイプラインは、SageMaker Experimentsと簡単に統合してパイプラインの実行を整理・追跡することができます。これは、パイプラインオブジェクトの作成時にPipelineExperimentConfigを指定することで実現します。この構成オブジェクトでは、実験名とトライアル名を指定することができます。SageMakerパイプラインの実行の詳細は、指定された実験名とトライアルの下に整理されます。実験名を明示的に指定しない場合、実験名にはパイプライン名が使用されます。同様に、トライアル名を明示的に指定しない場合、トライアルまたはラングループ名にはパイプラインの実行IDが使用されます。以下のコードを参照してください:

Pipeline(
    name="マイパイプライン",
    parameters=[...],
    pipeline_experiment_config=PipelineExperimentConfig(
        experiment_name=ExecutionVariables.PIPELINE_NAME,
        trial_name=ExecutionVariables.PIPELINE_EXECUTION_ID
        ),
    steps=[...]
)

プライベートVPC内でSageMakerパイプラインを安全に実行する

MLワークロードを保護するために、SageMakerパイプラインによってオーケストレーションされるジョブを、プライベートVPC、プライベートサブネット、セキュリティグループ内の安全なネットワーク構成で展開することがベストプラクティスです。この安全な環境の使用を確実にし、強制するために、SageMaker実行ロール(パイプラインが実行中に仮定するロール)に対して以下のAWS Identity and Access Management(IAM)ポリシーを実装できます。また、SageMakerパイプラインによってオーケストレーションされるジョブをネットワーク分離モードで実行するためのポリシーも追加できます。

# プライベートVPC内での実行を強制するIAMポリシー

{
    "Action": [
        "sagemaker:CreateProcessingJob",
        "sagemaker:CreateTrainingJob",
        "sagemaker:CreateModel"
    ],
    "Resource": "*",
    "Effect": "Deny",
    "Condition": {
        "Null": {
            "sagemaker:VpcSubnets": "true"
        }
    }
}

# ネットワーク分離モードでの実行を強制するIAMポリシー
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Deny",
            "Action": [
                "sagemaker:Create*"
            ],
            "Resource": "*",
            "Condition": {
                "StringNotEqualsIfExists": {
                    "sagemaker:NetworkIsolation": "true"
                }
            }
        }
    ]
}

これらのセキュリティ制御を備えたパイプラインの実装例については、「Amazon SageMakerでのジョブのオーケストレーション、モデルの登録、および継続的デプロイメント」を参照してください。

タグを使用してパイプライン実行のコストを監視する

SageMakerパイプラインの単体使用は無料です。ただし、処理、トレーニング、バッチ推論などの個々のパイプラインステップの一部として展開するコンピューティングリソースとストレージリソースに対して料金が発生します。パイプライン実行ごとのコストを集計するために、リソースを作成する各パイプラインステップにタグを含めることができます。これらのタグは、コストエクスプローラで参照し、パイプライン実行の総コストをフィルタリングおよび集計するために使用できます。次の例を参照してください。

sklearn_processor = SKLearnProcessor(
    framework_version='0.20.0',
    instance_type='ml.m5.xlarge',
    instance_count=1,
    base_job_name="sklearn-abalone-process",
    role=role,
    tags=[{'Key':'pipeline-cost-tag', 'Value':'<<tag_parameter>>'}]
)

step_process = ProcessingStep(
    name="AbaloneProcess",
    processor=sklearn_processor,
    ...
)

コストエクスプローラから、タグでフィルタリングされたコストを取得できます。

response = client.get_cost_and_usage(
    TimePeriod={
        'Start': '2023-07-01',
        'End': '2023-07-15'
        },
    Metrics=['BLENDED_COST','USAGE_QUANTITY','UNBLENDED_COST'],
    Granularity='MONTHLY',
    Filter={
        'Dimensions': {
            'Key':'USAGE_TYPE',
            'Values': [
                'SageMaker:Pipeline'
            ]
        },
        'Tags': {
            'Key': 'keyName',
            'Values': [
                'keyValue',
                ]
        }
    }
)

一部の一般的なシナリオのためのデザインパターン

このセクションでは、SageMakerパイプラインの一部の一般的なユースケースに対するデザインパターンについて説明します。

ラムダステップを使用して軽量なPython関数を実行する

Python関数は、MLワークフローで頻繁に使用されます。前処理、後処理、評価などに使用されます。Lambdaは、サーバーレスのコンピュートサービスで、サーバーのプロビジョニングや管理なしでコードを実行できます。Lambdaを使用すると、Pythonを含む任意の言語でコードを実行できます。これを使用して、パイプラインの一部としてカスタムのPythonコードを実行できます。Lambdaステップを使用すると、SageMakerパイプラインの一部としてLambda関数を実行できます。以下のコードから始めてください。

%%writefile lambdafunc.py

import json

def lambda_handler(event, context):
    str1 = event["str1"]
    str2 = event["str2"]
    str3 = str1 + str2
    return {
        "str3": str3
    }

以下のHTMLコードを日本語に翻訳します(HTMLコードは翻訳結果に含めます):

<p>SageMakerのPython SDKのLambdaヘルパーを使用して、Lambda関数を作成します:</p><pre><code>from sagemaker.lambda_helper import Lambda

def create_lambda(function_name, script, handler):
    response = Lambda(
        function_name=function_name,
        execution_role_arn=role,
        script= script,
        handler=handler,
        timeout=600,
        memory_size=10240,
    ).upsert()

    function_arn = response['FunctionArn']
    return function_arn

fn_arn = create_Lambda("func", "lambdafunc.py", handler = "lambdafunc.lambda_handler")</code></pre><p>Lambdaステップを呼び出します:</p><pre><code>from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum
)

str3 = LambdaOutput(output_name="str3", output_type=LambdaOutputTypeEnum.String)

# Lambdaステップ
step_lambda1 = LambdaStep(
    name="LambdaStep1",
    lambda_func=Lambda(
        function_arn=fn_arn
    ),
    inputs={
        "str1": "Hello",
        "str2": " World"
    },
    outputs=[str3],
)</code></pre><h3 id="pass-data-between-steps">ステップ間でデータを渡す</h3><p>パイプラインステップの入力データは、アクセス可能なデータの場所またはパイプライン内の前のステップで生成されたデータです。これらの情報をProcessingInputパラメータとして提供できます。ProcessingInputの使用方法についていくつかのシナリオを見てみましょう。</p><h4 id="scenario-1-pass-the-output-primitive-data-types-of-a-lambda-step-to-a-processing-step">シナリオ1:Lambdaステップの出力(プリミティブデータ型)を処理ステップに渡す</h4><p>プリミティブデータ型とは、文字列、整数、ブール値、浮動小数点などのスカラーデータ型を指します。</p><p>次のコードスニペットでは、プリミティブデータ型の変数のディクショナリを返すLambda関数を定義しています。SageMakerパイプライン内のLambdaステップから呼び出された場合、Lambda関数コードはキーと値のペアのJSONを返します。</p><pre><code>def handler(event, context):
    ...
    return {
        "output1": "string_value",
        "output2": 1,
        "output3": True,
        "output4": 2.0,
    }</code></pre><p>パイプライン定義では、特定のデータ型のSageMakerパイプラインパラメータを定義し、変数をLambda関数の出力に設定できます:</p><pre><code>from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum
)
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor

role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()

# 1. Lambdaステップの出力パラメータを定義する

str_outputParam = LambdaOutput(output_name="output1", output_type=LambdaOutputTypeEnum.String)
int_outputParam = LambdaOutput(output_name"output2", output_type=LambdaOutputTypeEnum.Integer)
bool_outputParam = LambdaOutput(output_name"output3", output_type=LambdaOutputTypeEnum.Boolean)
float_outputParam = LambdaOutput(output_name"output4", output_type=LambdaOutputTypeEnum.Float)

# 2. Lambdaステップを実行し、出力を返す

step_lambda = LambdaStep(
    name="MyLambdaStep",
    lambda_func=Lambda(
        function_arn="arn:aws:lambda:us-west-2:123456789012:function:sagemaker_test_lambda",
        session=PipelineSession(),
        ),
    inputs={"arg1": "foo", "arg2": "foo1"},
    outputs=[
        str_outputParam, int_outputParam, bool_outputParam, float_outputParam
        ],
)

# 3. Lambdaの出力を抽出する

str_outputParam = step_lambda.properties.Outputs["output1"]

# 4. 後続のステップで使用する。例えばProcessingステップ

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=1,
    sagemaker_session=pipeline_session,
    role=role
)

processor_args = sklearn_processor.run(
    code="code/preprocess.py", #実行するPythonスクリプト
    arguments=["--input-args", str_outputParam]
)

step_process = ProcessingStep(
    name="processstep1",
    step_args=processor_args,
)</code></pre><h4 id="scenario-2-pass-the-output-non-primitive-data-types-of-a-lambda-step-to-a-processing-step">シナリオ2:Lambdaステップの出力(非プリミティブデータ型)を処理ステップに渡す</h4>

非プリミティブデータ型は、非スカラーデータ型(例:NamedTuple)を指します。Lambda関数から非プリミティブデータ型を返す必要がある場合、非プリミティブデータ型を文字列に変換する必要があります:

# 非プリミティブデータ型を返すLambda関数のコード

from collections import namedtuple

def lambda_handler(event, context):
    Outputs = namedtuple("Outputs", "sample_output")
    named_tuple = Outputs(
                    [
                        {'output1': 1, 'output2': 2},
                        {'output3': 'foo', 'output4': 'foo1'}
                    ]
                )
return{
    "named_tuple_string": str(named_tuple)
}

# “Parameter Input”としてLambdaの出力を使用するパイプラインのステップ

output_ref = step_lambda.properties.Outputs["named_tuple_string"]

その後、この文字列をパイプライン内の後続のステップの入力として使用することができます。コード内で名前付きタプルを使用するには、eval()を使用して文字列内のPython式を解析します:

# 処理ロジックコードで文字列を解読する

import argparse
from collections import namedtuple

Outputs = namedtuple("Outputs", "sample_output")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--named_tuple_string", type=str, required=True)
    args = parser.parse_args()
    # evalを使用して文字列から名前付きタプルを取得する
    named_tuple = eval(args.named_tuple_string)

シナリオ3: ステップの出力をプロパティファイルを介して渡す

処理ステップの出力をプロパティJSONファイルに格納して、ConditionStepまたは別のProcessingStepで後続の処理に使用することもできます。プロパティファイルをクエリするためには、JSONGet関数を使用することができます。以下のコードを参照してください:

# 1. 処理出力を持つProcessorを定義する
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="sklearn-abalone-preprocess",
    sagemaker_session=session,
    role=sagemaker.get_execution_role(),
)

step_args = sklearn_processor.run(

                outputs=[
                    ProcessingOutput(
                        output_name="hyperparam",
                        source="/opt/ml/processing/evaluation"
                    ),
                ],
            code="./local/preprocess.py",
            arguments=["--input-data", "s3://my-input"],
)

# 2. プロセッサで使用するoutput_nameがProcessorで使用されているものと一致するようにPropertyFileを定義する

hyperparam_report = PropertyFile(
    name="AbaloneHyperparamReport",
    output_name="hyperparam",
    path="hyperparam.json",
)

プロパティファイルの内容が次のようであると仮定します:

{
    "hyperparam": {
        "eta": {
            "value": 0.6
        }
    }
}

この場合、JsonGet関数を使用して特定の値をクエリし、後続のステップで使用することができます:

# 3. プロパティファイルをクエリする
eta = JsonGet(
    step_name=step_process.name,
    property_file=hyperparam_report,
    json_path="hyperparam.eta.value",
)

パイプライン定義内の変数をパラメータ化する

実行時に使用できるように変数をパラメータ化することは、しばしば望ましいです。たとえば、S3 URIを構築するために使用することができます。Join関数を使用して、実行時に評価されるように文字列をパラメータ化することができます。次のコードスニペットは、Join関数を使用して変数を定義し、それを処理ステップの出力場所に設定する方法を示しています:

# s3 URIを格納する変数を定義する
s3_location = Join(
    on="/", 
    values=[
        "s3:/",
        ParameterString(
            name="MyBucket", 
            default_value=""
        ),
        "training",
        ExecutionVariables.PIPELINE_EXECUTION_ID
    ]
)

# 処理ステップを定義する
sklearn_processor = SKLearnProcessor(
    framework_version="1.2-1",
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess",
    sagemaker_session=pipeline_session,
    role=role,
)

# 処理ステップでの出力場所としてs3uriを使用する
processor_run_args = sklearn_processor.run(
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/train",
            destination=s3_location,
        ),
    ],
    code="code/preprocess.py"
)

step_process = ProcessingStep(
    name="PreprocessingJob”,
    step_args=processor_run_args,
)

イテラブル上で並列コードを実行する

一部の機械学習のワークフローでは、静的なアイテムのセット(イテラブル)上で並列のループを使用してコードを実行します。それは、異なるデータ上で実行される同じコードであるか、データの各アイテムごとに実行する必要がある異なるコードの片割れであることがあります。例えば、ファイル内の非常に多数の行を持ち、処理時間を短縮したい場合は、前者のパターンに頼ることができます。データの特定のサブグループに異なる変換を行いたい場合は、データの各サブグループごとに異なるコードを実行する必要があるかもしれません。以下の2つのシナリオでは、この目的のためにSageMakerパイプラインを設計する方法を説明します。

シナリオ1: データの異なる部分に処理ロジックを実装する

複数のインスタンスで処理ジョブを実行することができます(instance_countを1より大きい値に設定することで)。これにより、Amazon S3からの入力データがすべての処理インスタンスに分散されます。次に、インスタンス番号とアイテムリスト内の対応する要素に基づいて、スクリプト(process.py)を使用してデータの特定の部分で作業することができます。process.py内のプログラミングロジックは、処理するアイテムリストに応じて異なるモジュールまたはコードが実行されるように書くことができます。以下の例では、ProcessingStepで使用できるプロセッサを定義しています:

sklearn_processor = FrameworkProcessor(
    estimator_cls=sagemaker.sklearn.estimator.SKLearn,
    framework_version="0.23-1",
    instance_type='ml.m5.4xlarge',
    instance_count=4, #並列実行/インスタンスの数
    base_job_name="parallel-step",
    sagemaker_session=session,
    role=role,
)

step_args = sklearn_processor.run(
    code='process.py',
    arguments=[
        "--items", 
        list_of_items, #アイテムのリストを含むデータ構造
        inputs=[
            ProcessingInput(source="s3://sagemaker-us-east-1-xxxxxxxxxxxx/abalone/abalone-dataset.csv",
                    destination="/opt/ml/processing/input"
            )
        ],
    ]
)

シナリオ2: ステップのシーケンスを実行する

並列で実行する必要のあるステップのシーケンスがある場合は、各シーケンスを独立したSageMakerパイプラインとして定義することができます。これらのSageMakerパイプラインの実行は、親パイプラインのLambdaStepの一部であるLambda関数からトリガーすることができます。以下のコードは、2つの異なるSageMakerパイプラインの実行がトリガーされるシナリオを示しています:

import boto3
def lambda_handler(event, context):
    items = [1, 2]
    #sagemakerクライアント
    sm_client = boto3.client("sagemaker")
    
    #トリガーする必要のあるパイプラインの名前。
    #複数ある場合は、boto3 APIを使用して利用可能なパイプラインを取得し、ロジックに基づいて適切なものをトリガーすることができます。
    pipeline_name = 'child-pipeline-1'

    #各アイテムごとにパイプラインをトリガー
    response_ppl = sm_client.start_pipeline_execution(
                        PipelineName=pipeline_name,
                        PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s),
                    )
    pipeline_name = 'child-pipeline-2'
    response_ppl = sm_client.start_pipeline_execution(
                        PipelineName=pipeline_name,
                        PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s),
                    )
return

結論

この記事では、SageMakerパイプラインの効率的な使用とメンテナンスのためのベストプラクティスについて説明しました。また、SageMakerパイプラインを使用してワークフローを設計する際に採用できる特定のパターンも提供しました。これには、新しいパイプラインの作成や他のオーケストレーションツールからの機械学習ワークフローの移行など、さまざまなシナリオが含まれます。ワークフローオーケストレーションのためのSageMakerパイプラインの使用を開始するには、GitHubとAmazon SageMaker Model Building Pipelinesのコードサンプルを参照してください。

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more

データサイエンス

スタンフォードの研究者たちは、基礎流体力学のための初の大規模な機械学習データセットであるBLASTNetを紹介しました

スタンフォードの研究者たちは、BLASTNetという画期的な開発を紹介し、計算流体力学(CFD)の新たな時代の到来を予感させまし...

機械学習

このAIニュースレターは、あなたが必要とするすべてです#61

「最近の数ヶ月間、私たちは大規模な言語モデル(LLM)の進歩と新しい技術の徐々の導入を続けてきましたが、まだGPT-4を直接...

人工知能

「責任あるAIの推進のための新しいパートナーシップ」

「本日、Google、Microsoft、OpenAI、Anthropicが共同でフロンティアモデルフォーラムを設立することを発表しました」

データサイエンス

CDPとAIの交差点:人工知能が顧客データプラットフォームを革新する方法

「顧客データプラットフォーム(CDP)内のAI駆動の洞察が、パーソナライズされた顧客体験を革新する方法」

人工知能

画像をプロンプトに変換する方法:Img2Prompt AIモデルによるステップバイステップガイド

シンプルなAPIコールと少しのNode.jsで画像からプロンプトを収集する

AI研究

デジタルルネッサンス:NVIDIAのNeuralangelo研究が3Dシーンを再構築

NVIDIA Researchによる新しいAIモデル、Neuralangeloは、ニューラルネットワークを使用して3D再構築を行い、2Dビデオクリップ...