「Amazon SageMaker Pipelinesを使用した機械学習ワークフローの構築のためのベストプラクティスとデザインパターン」
Best practices and design patterns for building machine learning workflows using Amazon SageMaker Pipelines
Amazon SageMakerパイプラインは、機械学習(ML)ワークフローの構築とオーケストレーションを行うための完全に管理されたAWSサービスです。SageMakerパイプラインは、データのロード、データの変換、トレーニング、調整、展開など、MLワークフローのさまざまなステップをオーケストレーションする機能をMLアプリケーション開発者に提供します。SageMakerパイプラインを使用すると、SageMakerでMLジョブをオーケストレーションできます。また、AWS Lambda関数、Amazon EMRジョブなどのリソースを使用することもできます。これにより、MLワークフローの特定の要件に合わせたカスタマイズ可能で再現可能なパイプラインを構築することができます。
この記事では、SageMakerパイプラインの価値を最大化し、開発体験をシームレスにするためのベストプラクティスをいくつか紹介します。また、SageMakerパイプラインの構築時に一般的なデザインシナリオとパターンについても説明し、それらを解決するための例も提供します。
SageMakerパイプラインのベストプラクティス
このセクションでは、SageMakerパイプラインを使用してワークフローを設計する際に遵守できるいくつかのベストプラクティスについて説明します。これらを採用することで、開発プロセスが改善され、SageMakerパイプラインの操作管理が効率化されます。
パイプラインの遅延読み込みにはパイプラインセッションを使用する
パイプラインセッションを使用すると、パイプラインリソースの遅延初期化(パイプラインランタイムまでジョブは開始されません)が可能になります。 PipelineSession
コンテキストはSageMakerセッションを継承し、トレーニングジョブ、エンドポイント、Amazon Simple Storage Service(Amazon S3)の入力データセットなど、他のSageMakerエンティティとリソースと対話するための便利なメソッドを実装しています。SageMakerパイプラインを定義する際には、通常のSageMakerセッションではなくPipelineSession
を使用する必要があります:
- BYOL(Bootstrap Your Own Latent)— コントラスティブな自己教示学習の代替手段
- 「生成AIにおけるLLMエージェントのデコーディングの機会と課題」
- 「AIとMLが高い需要になる10の理由」 1. ビッグデータの増加による需要の増加:ビッグデータの処理と分析にはAIとMLが必要です 2. 自動化の需要の増加:AIとMLは、自動化されたプロセスとタスクの実行に不可欠です 3. 予測能力の向上:AIとMLは、予測分析において非常に効果的です 4. パーソナライズされたエクスペリエンスの需要:AIとMLは、ユーザーの行動と嗜好を理解し、パーソナライズされたエクスペリエンスを提供するのに役立ちます 5. 自動運転技術の需要の増加:自動運転技術の発展にはAIとMLが不可欠です 6. セキュリティの需要の増加:AIとMLは、セキュリティ分野で新たな挑戦に対処するために使用されます 7. ヘルスケアの需要の増加:AIとMLは、病気の早期検出や治療計画の最適化など、医療分野で重要な役割を果たします 8. クラウドコンピューティングの需要の増加:AIとMLは、クラウドコンピューティングのパフォーマンスと効率を向上させるのに役立ちます 9. ロボティクスの需要の増加:AIとMLは、ロボットの自律性と学習能力を高めるのに使用されます 10. インターネットオブシングス(IoT)の需要の増加:AIとMLは、IoTデバイスのデータ分析と制御に重要な役割を果たします
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
sklearn_processor = SKLearnProcessor(
framework_version=’0.20.0’,
instance_type=’ml.m5.xlarge’,
instance_count=1,
base_job_name="sklearn-abalone-process",
role=role,
sagemaker_session=pipeline_session,
)
開発中のコスト効果的で迅速なイテレーションのためにローカルモードでパイプラインを実行する
LocalPipelineSession
コンテキストを使用してパイプラインをローカルモードで実行することができます。このモードでは、パイプラインとジョブはSageMakerの管理リソースではなく、ローカルマシンのリソースを使用してローカルで実行されます。ローカルモードは、データのサブセットを使用してパイプラインコードのイテレーションを行うための費用効果の高い方法を提供します。パイプラインがローカルでテストされた後、PipelineSessionコンテキストを使用してスケーリングして実行することができます。
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline_context import LocalPipelineSession
local_pipeline_session = LocalPipelineSession()
role = sagemaker.get_execution_role()
sklearn_processor = SKLearnProcessor(
framework_version=’0.20.0’,
instance_type=’ml.m5.xlarge',
instance_count=1,
base_job_name="sklearn-abalone-process",
role=role,
sagemaker_session=local_pipeline_session,
)
バージョニングを使用してSageMakerパイプラインを管理する
アーティファクトとパイプライン定義のバージョニングは、開発ライフサイクルで一般的な要件です。パイプラインオブジェクトに一意の接頭辞や接尾辞(タイムスタンプなど)を使用してパイプラインの複数のバージョンを作成することができます。以下に示すコードを参照してください:
from sagemaker.workflow.pipeline_context import PipelineSession
import time
current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
pipeline_name = "pipeline_" + current_time
pipeline_session = PipelineSession()
pipeline = Pipeline(
name=pipeline_name,
steps=[step_process, step_train, step_eval, step_cond],
sagemaker_session=pipeline_session,
)
SageMakerパイプラインの実行をSageMaker Experimentsと統合して整理・追跡する
SageMakerパイプラインは、SageMaker Experimentsと簡単に統合してパイプラインの実行を整理・追跡することができます。これは、パイプラインオブジェクトの作成時にPipelineExperimentConfigを指定することで実現します。この構成オブジェクトでは、実験名とトライアル名を指定することができます。SageMakerパイプラインの実行の詳細は、指定された実験名とトライアルの下に整理されます。実験名を明示的に指定しない場合、実験名にはパイプライン名が使用されます。同様に、トライアル名を明示的に指定しない場合、トライアルまたはラングループ名にはパイプラインの実行IDが使用されます。以下のコードを参照してください:
Pipeline(
name="マイパイプライン",
parameters=[...],
pipeline_experiment_config=PipelineExperimentConfig(
experiment_name=ExecutionVariables.PIPELINE_NAME,
trial_name=ExecutionVariables.PIPELINE_EXECUTION_ID
),
steps=[...]
)
プライベートVPC内でSageMakerパイプラインを安全に実行する
MLワークロードを保護するために、SageMakerパイプラインによってオーケストレーションされるジョブを、プライベートVPC、プライベートサブネット、セキュリティグループ内の安全なネットワーク構成で展開することがベストプラクティスです。この安全な環境の使用を確実にし、強制するために、SageMaker実行ロール(パイプラインが実行中に仮定するロール)に対して以下のAWS Identity and Access Management(IAM)ポリシーを実装できます。また、SageMakerパイプラインによってオーケストレーションされるジョブをネットワーク分離モードで実行するためのポリシーも追加できます。
# プライベートVPC内での実行を強制するIAMポリシー
{
"Action": [
"sagemaker:CreateProcessingJob",
"sagemaker:CreateTrainingJob",
"sagemaker:CreateModel"
],
"Resource": "*",
"Effect": "Deny",
"Condition": {
"Null": {
"sagemaker:VpcSubnets": "true"
}
}
}
# ネットワーク分離モードでの実行を強制するIAMポリシー
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": [
"sagemaker:Create*"
],
"Resource": "*",
"Condition": {
"StringNotEqualsIfExists": {
"sagemaker:NetworkIsolation": "true"
}
}
}
]
}
これらのセキュリティ制御を備えたパイプラインの実装例については、「Amazon SageMakerでのジョブのオーケストレーション、モデルの登録、および継続的デプロイメント」を参照してください。
タグを使用してパイプライン実行のコストを監視する
SageMakerパイプラインの単体使用は無料です。ただし、処理、トレーニング、バッチ推論などの個々のパイプラインステップの一部として展開するコンピューティングリソースとストレージリソースに対して料金が発生します。パイプライン実行ごとのコストを集計するために、リソースを作成する各パイプラインステップにタグを含めることができます。これらのタグは、コストエクスプローラで参照し、パイプライン実行の総コストをフィルタリングおよび集計するために使用できます。次の例を参照してください。
sklearn_processor = SKLearnProcessor(
framework_version='0.20.0',
instance_type='ml.m5.xlarge',
instance_count=1,
base_job_name="sklearn-abalone-process",
role=role,
tags=[{'Key':'pipeline-cost-tag', 'Value':'<<tag_parameter>>'}]
)
step_process = ProcessingStep(
name="AbaloneProcess",
processor=sklearn_processor,
...
)
コストエクスプローラから、タグでフィルタリングされたコストを取得できます。
response = client.get_cost_and_usage(
TimePeriod={
'Start': '2023-07-01',
'End': '2023-07-15'
},
Metrics=['BLENDED_COST','USAGE_QUANTITY','UNBLENDED_COST'],
Granularity='MONTHLY',
Filter={
'Dimensions': {
'Key':'USAGE_TYPE',
'Values': [
'SageMaker:Pipeline'
]
},
'Tags': {
'Key': 'keyName',
'Values': [
'keyValue',
]
}
}
)
一部の一般的なシナリオのためのデザインパターン
このセクションでは、SageMakerパイプラインの一部の一般的なユースケースに対するデザインパターンについて説明します。
ラムダステップを使用して軽量なPython関数を実行する
Python関数は、MLワークフローで頻繁に使用されます。前処理、後処理、評価などに使用されます。Lambdaは、サーバーレスのコンピュートサービスで、サーバーのプロビジョニングや管理なしでコードを実行できます。Lambdaを使用すると、Pythonを含む任意の言語でコードを実行できます。これを使用して、パイプラインの一部としてカスタムのPythonコードを実行できます。Lambdaステップを使用すると、SageMakerパイプラインの一部としてLambda関数を実行できます。以下のコードから始めてください。
%%writefile lambdafunc.py
import json
def lambda_handler(event, context):
str1 = event["str1"]
str2 = event["str2"]
str3 = str1 + str2
return {
"str3": str3
}
以下のHTMLコードを日本語に翻訳します(HTMLコードは翻訳結果に含めます):
<p>SageMakerのPython SDKのLambdaヘルパーを使用して、Lambda関数を作成します:</p><pre><code>from sagemaker.lambda_helper import Lambda
def create_lambda(function_name, script, handler):
response = Lambda(
function_name=function_name,
execution_role_arn=role,
script= script,
handler=handler,
timeout=600,
memory_size=10240,
).upsert()
function_arn = response['FunctionArn']
return function_arn
fn_arn = create_Lambda("func", "lambdafunc.py", handler = "lambdafunc.lambda_handler")</code></pre><p>Lambdaステップを呼び出します:</p><pre><code>from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
LambdaStep,
LambdaOutput,
LambdaOutputTypeEnum
)
str3 = LambdaOutput(output_name="str3", output_type=LambdaOutputTypeEnum.String)
# Lambdaステップ
step_lambda1 = LambdaStep(
name="LambdaStep1",
lambda_func=Lambda(
function_arn=fn_arn
),
inputs={
"str1": "Hello",
"str2": " World"
},
outputs=[str3],
)</code></pre><h3 id="pass-data-between-steps">ステップ間でデータを渡す</h3><p>パイプラインステップの入力データは、アクセス可能なデータの場所またはパイプライン内の前のステップで生成されたデータです。これらの情報をProcessingInput
パラメータとして提供できます。ProcessingInputの使用方法についていくつかのシナリオを見てみましょう。</p><h4 id="scenario-1-pass-the-output-primitive-data-types-of-a-lambda-step-to-a-processing-step">シナリオ1:Lambdaステップの出力(プリミティブデータ型)を処理ステップに渡す</h4><p>プリミティブデータ型とは、文字列、整数、ブール値、浮動小数点などのスカラーデータ型を指します。</p><p>次のコードスニペットでは、プリミティブデータ型の変数のディクショナリを返すLambda関数を定義しています。SageMakerパイプライン内のLambdaステップから呼び出された場合、Lambda関数コードはキーと値のペアのJSONを返します。</p><pre><code>def handler(event, context):
...
return {
"output1": "string_value",
"output2": 1,
"output3": True,
"output4": 2.0,
}</code></pre><p>パイプライン定義では、特定のデータ型のSageMakerパイプラインパラメータを定義し、変数をLambda関数の出力に設定できます:</p><pre><code>from sagemaker.workflow.lambda_step import (
LambdaStep,
LambdaOutput,
LambdaOutputTypeEnum
)
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
# 1. Lambdaステップの出力パラメータを定義する
str_outputParam = LambdaOutput(output_name="output1", output_type=LambdaOutputTypeEnum.String)
int_outputParam = LambdaOutput(output_name"output2", output_type=LambdaOutputTypeEnum.Integer)
bool_outputParam = LambdaOutput(output_name"output3", output_type=LambdaOutputTypeEnum.Boolean)
float_outputParam = LambdaOutput(output_name"output4", output_type=LambdaOutputTypeEnum.Float)
# 2. Lambdaステップを実行し、出力を返す
step_lambda = LambdaStep(
name="MyLambdaStep",
lambda_func=Lambda(
function_arn="arn:aws:lambda:us-west-2:123456789012:function:sagemaker_test_lambda",
session=PipelineSession(),
),
inputs={"arg1": "foo", "arg2": "foo1"},
outputs=[
str_outputParam, int_outputParam, bool_outputParam, float_outputParam
],
)
# 3. Lambdaの出力を抽出する
str_outputParam = step_lambda.properties.Outputs["output1"]
# 4. 後続のステップで使用する。例えばProcessingステップ
sklearn_processor = SKLearnProcessor(
framework_version="0.23-1",
instance_type="ml.m5.xlarge",
instance_count=1,
sagemaker_session=pipeline_session,
role=role
)
processor_args = sklearn_processor.run(
code="code/preprocess.py", #実行するPythonスクリプト
arguments=["--input-args", str_outputParam]
)
step_process = ProcessingStep(
name="processstep1",
step_args=processor_args,
)</code></pre><h4 id="scenario-2-pass-the-output-non-primitive-data-types-of-a-lambda-step-to-a-processing-step">シナリオ2:Lambdaステップの出力(非プリミティブデータ型)を処理ステップに渡す</h4>
非プリミティブデータ型は、非スカラーデータ型(例:NamedTuple
)を指します。Lambda関数から非プリミティブデータ型を返す必要がある場合、非プリミティブデータ型を文字列に変換する必要があります:
# 非プリミティブデータ型を返すLambda関数のコード
from collections import namedtuple
def lambda_handler(event, context):
Outputs = namedtuple("Outputs", "sample_output")
named_tuple = Outputs(
[
{'output1': 1, 'output2': 2},
{'output3': 'foo', 'output4': 'foo1'}
]
)
return{
"named_tuple_string": str(named_tuple)
}
# “Parameter Input”としてLambdaの出力を使用するパイプラインのステップ
output_ref = step_lambda.properties.Outputs["named_tuple_string"]
その後、この文字列をパイプライン内の後続のステップの入力として使用することができます。コード内で名前付きタプルを使用するには、eval()
を使用して文字列内のPython式を解析します:
# 処理ロジックコードで文字列を解読する
import argparse
from collections import namedtuple
Outputs = namedtuple("Outputs", "sample_output")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--named_tuple_string", type=str, required=True)
args = parser.parse_args()
# evalを使用して文字列から名前付きタプルを取得する
named_tuple = eval(args.named_tuple_string)
シナリオ3: ステップの出力をプロパティファイルを介して渡す
処理ステップの出力をプロパティJSONファイルに格納して、ConditionStep
または別のProcessingStep
で後続の処理に使用することもできます。プロパティファイルをクエリするためには、JSONGet関数を使用することができます。以下のコードを参照してください:
# 1. 処理出力を持つProcessorを定義する
sklearn_processor = SKLearnProcessor(
framework_version="0.23-1",
instance_type="ml.m5.xlarge",
instance_count=1,
base_job_name="sklearn-abalone-preprocess",
sagemaker_session=session,
role=sagemaker.get_execution_role(),
)
step_args = sklearn_processor.run(
outputs=[
ProcessingOutput(
output_name="hyperparam",
source="/opt/ml/processing/evaluation"
),
],
code="./local/preprocess.py",
arguments=["--input-data", "s3://my-input"],
)
# 2. プロセッサで使用するoutput_nameがProcessorで使用されているものと一致するようにPropertyFileを定義する
hyperparam_report = PropertyFile(
name="AbaloneHyperparamReport",
output_name="hyperparam",
path="hyperparam.json",
)
プロパティファイルの内容が次のようであると仮定します:
{
"hyperparam": {
"eta": {
"value": 0.6
}
}
}
この場合、JsonGet関数を使用して特定の値をクエリし、後続のステップで使用することができます:
# 3. プロパティファイルをクエリする
eta = JsonGet(
step_name=step_process.name,
property_file=hyperparam_report,
json_path="hyperparam.eta.value",
)
パイプライン定義内の変数をパラメータ化する
実行時に使用できるように変数をパラメータ化することは、しばしば望ましいです。たとえば、S3 URIを構築するために使用することができます。Join
関数を使用して、実行時に評価されるように文字列をパラメータ化することができます。次のコードスニペットは、Join
関数を使用して変数を定義し、それを処理ステップの出力場所に設定する方法を示しています:
# s3 URIを格納する変数を定義する
s3_location = Join(
on="/",
values=[
"s3:/",
ParameterString(
name="MyBucket",
default_value=""
),
"training",
ExecutionVariables.PIPELINE_EXECUTION_ID
]
)
# 処理ステップを定義する
sklearn_processor = SKLearnProcessor(
framework_version="1.2-1",
instance_type="ml.m5.xlarge",
instance_count=processing_instance_count,
base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess",
sagemaker_session=pipeline_session,
role=role,
)
# 処理ステップでの出力場所としてs3uriを使用する
processor_run_args = sklearn_processor.run(
outputs=[
ProcessingOutput(
output_name="train",
source="/opt/ml/processing/train",
destination=s3_location,
),
],
code="code/preprocess.py"
)
step_process = ProcessingStep(
name="PreprocessingJob”,
step_args=processor_run_args,
)
イテラブル上で並列コードを実行する
一部の機械学習のワークフローでは、静的なアイテムのセット(イテラブル)上で並列のループを使用してコードを実行します。それは、異なるデータ上で実行される同じコードであるか、データの各アイテムごとに実行する必要がある異なるコードの片割れであることがあります。例えば、ファイル内の非常に多数の行を持ち、処理時間を短縮したい場合は、前者のパターンに頼ることができます。データの特定のサブグループに異なる変換を行いたい場合は、データの各サブグループごとに異なるコードを実行する必要があるかもしれません。以下の2つのシナリオでは、この目的のためにSageMakerパイプラインを設計する方法を説明します。
シナリオ1: データの異なる部分に処理ロジックを実装する
複数のインスタンスで処理ジョブを実行することができます(instance_count
を1より大きい値に設定することで)。これにより、Amazon S3からの入力データがすべての処理インスタンスに分散されます。次に、インスタンス番号とアイテムリスト内の対応する要素に基づいて、スクリプト(process.py)を使用してデータの特定の部分で作業することができます。process.py内のプログラミングロジックは、処理するアイテムリストに応じて異なるモジュールまたはコードが実行されるように書くことができます。以下の例では、ProcessingStepで使用できるプロセッサを定義しています:
sklearn_processor = FrameworkProcessor(
estimator_cls=sagemaker.sklearn.estimator.SKLearn,
framework_version="0.23-1",
instance_type='ml.m5.4xlarge',
instance_count=4, #並列実行/インスタンスの数
base_job_name="parallel-step",
sagemaker_session=session,
role=role,
)
step_args = sklearn_processor.run(
code='process.py',
arguments=[
"--items",
list_of_items, #アイテムのリストを含むデータ構造
inputs=[
ProcessingInput(source="s3://sagemaker-us-east-1-xxxxxxxxxxxx/abalone/abalone-dataset.csv",
destination="/opt/ml/processing/input"
)
],
]
)
シナリオ2: ステップのシーケンスを実行する
並列で実行する必要のあるステップのシーケンスがある場合は、各シーケンスを独立したSageMakerパイプラインとして定義することができます。これらのSageMakerパイプラインの実行は、親パイプラインのLambdaStep
の一部であるLambda関数からトリガーすることができます。以下のコードは、2つの異なるSageMakerパイプラインの実行がトリガーされるシナリオを示しています:
import boto3
def lambda_handler(event, context):
items = [1, 2]
#sagemakerクライアント
sm_client = boto3.client("sagemaker")
#トリガーする必要のあるパイプラインの名前。
#複数ある場合は、boto3 APIを使用して利用可能なパイプラインを取得し、ロジックに基づいて適切なものをトリガーすることができます。
pipeline_name = 'child-pipeline-1'
#各アイテムごとにパイプラインをトリガー
response_ppl = sm_client.start_pipeline_execution(
PipelineName=pipeline_name,
PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s),
)
pipeline_name = 'child-pipeline-2'
response_ppl = sm_client.start_pipeline_execution(
PipelineName=pipeline_name,
PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s),
)
return
結論
この記事では、SageMakerパイプラインの効率的な使用とメンテナンスのためのベストプラクティスについて説明しました。また、SageMakerパイプラインを使用してワークフローを設計する際に採用できる特定のパターンも提供しました。これには、新しいパイプラインの作成や他のオーケストレーションツールからの機械学習ワークフローの移行など、さまざまなシナリオが含まれます。ワークフローオーケストレーションのためのSageMakerパイプラインの使用を開始するには、GitHubとAmazon SageMaker Model Building Pipelinesのコードサンプルを参照してください。
We will continue to update VoAGI; if you have any questions or suggestions, please contact us!
Was this article helpful?
93 out of 132 found this helpful
Related articles