「Amazon SageMakerの非同期エンドポイントを使用して、Amazon SageMaker JumpStartの基礎モデルのデプロイコストを最適化する」
Optimizing deployment costs of Amazon SageMaker JumpStart basic models using Amazon SageMaker asynchronous endpoints.
幅広い産業での生成型AIアプリケーションの成功は、競争他社の成果を再現し超えるか、新たなエキサイティングなユースケースを解決するために、世界中の企業の注目と関心を集めています。これらの顧客は、TII Falcon、Stable Diffusion XL、またはOpenAIのGPT-3.5などの基礎モデルを、生成型AIのイノベーションのエンジンとして採用しています。
基礎モデルは、訓練された非構造化データの膨大な量のおかげで、人間のようなコンテンツの理解と生成が可能な生成型AIモデルのクラスです。これらのモデルは、画像生成、翻訳、質問応答などのさまざまなコンピュータビジョン(CV)および自然言語処理(NLP)のタスクを革新しました。これらは多くのAIアプリケーションの基盤となり、高度なインテリジェントシステムの開発において重要な要素となっています。
ただし、基礎モデルの展開には、コストとリソースの要件に関して重要な課題が伴うことがあります。これらのモデルは、数億から数千億のパラメータを持つことがあり、その大きさから、強力なハードウェアと大量のメモリ容量を必要とします。実際、基礎モデルの展開には、通常少なくとも1つ(より多くの場合もある)のGPUが必要です。例えば、TII Falcon-40B Instructモデルは、少なくともml.g5.12xlargeインスタンスがメモリに正常にロードされるために必要ですが、より大きなインスタンスでは最適なパフォーマンスを発揮します。そのため、長時間のセッションや一時的なワークロード中に、GPUパワードのインスタンスを持つランニングコストは、投資収益率(ROI)が低すぎてビジネス価値を証明するのに十分ではありません。
今年初めに、私たちはAmazon Bedrockを発表しました。これは、Amazonと私たちの生成型AIパートナーから基礎モデルにアクセスするためのサーバーレスAPIです。現在プライベートプレビュー中ですが、このサーバーレスAPIを使用すると、自分自身でエンドポイントを展開する必要なく、Amazon、Anthropic、Stability AI、AI21などの基礎モデルを使用することができます。ただし、Hugging Faceなどのコミュニティから提供されるオープンソースモデルは増え続けており、それらがすべてAmazon Bedrockを通じて利用可能になっているわけではありません。
- 「キャリアは、AWS GlueとAmazon SageMakerを使用してHVACの故障を予測する方法」
- 「Amazon SageMaker JumpStart上で、生成型AIベースのコンテンツモデレーションソリューションを構築する」
- 「メタヒューリスティクスの説明:アントコロニーオプティマイゼーション」
この記事では、これらの状況を対象にし、大規模な基礎モデルをAmazon SageMakerの非同期エンドポイントにAmazon SageMaker JumpStartから展開することで、高コストのリスクを解決します。これにより、エンドポイントのアーキテクチャのコストを削減し、リクエストがキューにあるときだけエンドポイントを実行し、対応するリクエストが待機していない場合はゼロにスケールダウンすることができます。これは、多くのユースケースにとって素晴らしいことです。ただし、ゼロにスケールダウンしたエンドポイントは、推論を提供する前に冷たい開始時間を導入します。
ソリューションの概要
次の図は、私たちのソリューションアーキテクチャを示しています。
私たちが展開するアーキテクチャは非常にシンプルです:
- ユーザーインターフェースはノートブックであり、Streamlitなどの技術を利用したWeb UIに置き換えることもできます。私たちの場合、ノートブックはAmazon SageMaker Studioノートブックであり、PyTorch 2.0 Python 3.10 CPUカーネルを実行しています。
- ノートブックは、SageMaker Python SDK、AWS SDK for Python(Boto3)、およびLangChainの3つの方法でエンドポイントにクエリを送信します。
- エンドポイントはSageMakerで非同期に実行され、そのエンドポイントにFalcon-40B Instructモデルを展開します。これは、現在のところ、Instructモデルの最先端であり、SageMaker JumpStartで利用可能です。単一のAPI呼び出しで、モデルをエンドポイントに展開できます。
SageMaker非同期推論とは
SageMaker非同期推論は、SageMakerの4つの展開オプションの1つであり、リアルタイムエンドポイント、バッチ推論、およびサーバーレス推論と共に、大量のペイロードサイズ(最大1 GB)、長い処理時間、およびほぼリアルタイムのレイテンシ要件を持つリクエストをキューに入れ、非同期に処理します。ただし、特に概念実証(POC)や開発中に大規模な基礎モデルを扱う場合に提供する主な利点は、非同期推論をゼロのインスタンス数にスケーリングできる能力であり、リクエストを処理する必要がない場合にコストを節約できます。SageMaker非同期推論についての詳細は、非同期推論を参照してください。次の図は、このアーキテクチャを示しています。
非同期推論エンドポイントを展開するには、AsyncInferenceConfig
オブジェクトを作成する必要があります。引数を指定せずにAsyncInferenceConfig
を作成する場合、デフォルトのS3OutputPath
はs3://sagemaker-{REGION}-{ACCOUNTID}/async-endpoint-outputs/{UNIQUE-JOB-NAME}
となり、S3FailurePath
はs3://sagemaker-{REGION}-{ACCOUNTID}/async-endpoint-failures/{UNIQUE-JOB-NAME}
となります。
SageMaker JumpStartとは何ですか
当社のモデルはSageMaker JumpStartから提供されています。SageMaker JumpStartは、事前学習済みモデル、ソリューションテンプレート、サンプルノートブックを提供するSageMakerの機能で、機械学習(ML)の旅を加速します。さまざまな問題タイプに対する事前学習済みモデルにアクセスできるため、堅牢な基盤でMLタスクを開始することができます。SageMaker JumpStartは、一般的なユースケースのためのソリューションテンプレートと学習用のサンプルノートブックも提供しています。SageMaker JumpStartを使用することで、ワンクリックのソリューション起動と実践的なML体験のための包括的なリソースで、MLプロジェクトを開始するために必要な時間と労力を削減できます。
次のスクリーンショットは、SageMaker JumpStart UIで利用可能なモデルの一部を示しています。
モデルを展開する
まず、モデルをSageMakerに展開します。これには、SageMaker JumpStartのUIまたはSageMaker Python SDKを使用できます。SageMaker Python SDKは、モデルを非同期エンドポイントに展開するために使用できるAPIを提供します:
%%time
from sagemaker.jumpstart.model import JumpStartModel, AsyncInferenceConfig
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer
model_id, model_version = "huggingface-llm-falcon-40b-instruct-bf16", "*"
my_model = JumpStartModel(model_id=model_id)
predictor = my_model.deploy(
initial_instance_count=0,
instance_type="ml.g5.12xlarge",
async_inference_config=AsyncInferenceConfig()
)
この呼び出しは約10分かかる場合があります。この間に、エンドポイントが立ち上げられ、コンテナとモデルアーティファクトがエンドポイントにダウンロードされ、SageMaker JumpStartからモデルの設定がロードされ、非同期エンドポイントがDNSエンドポイントを介して公開されます。エンドポイントをゼロにスケーリングダウンできるようにするには、Application Auto Scalingを使用して非同期エンドポイントのオートスケーリングを設定する必要があります。エンドポイントのバリアントをまずApplication Auto Scalingに登録し、スケーリングポリシーを定義し、その後スケーリングポリシーを適用する必要があります。この設定では、CustomizedMetricSpecification
を使用したカスタムメトリックであるApproximateBacklogSizePerInstance
を使用しています。詳細なAmazon CloudWatchメトリックの一覧については、CloudWatchでのモニタリングを参照してください。
import boto3
client = boto3.client("application-autoscaling")
resource_id = "endpoint/" + my_model.endpoint_name + "/variant/" + "AllTraffic"
# ゼロインスタンスまでの非同期エンドポイントのオートスケーリングを設定する
response = client.register_scalable_target(
ServiceNamespace="sagemaker",
ResourceId=resource_id,
ScalableDimension="sagemaker:variant:DesiredInstanceCount",
MinCapacity=0, # ゼロにスケーリングダウンするための最小インスタンス数 - コストをかけずに停止するために0にスケーリングダウンする
MaxCapacity=1, # 最大インスタンス数 - 開発には最大1インスタンスで十分
)
response = client.put_scaling_policy(
PolicyName="Invocations-ScalingPolicy",
ServiceNamespace="sagemaker", # リソースを提供するAWSサービスの名前空間。
ResourceId=resource_id, # エンドポイント名
ScalableDimension="sagemaker:variant:DesiredInstanceCount", # SageMakerはインスタンス数のみサポート
PolicyType="TargetTrackingScaling", # 'StepScaling'|'TargetTrackingScaling'
TargetTrackingScalingPolicyConfiguration={
"TargetValue": 5.0, # メトリックのターゲット値。ここではメトリックはSageMakerVariantInvocationsPerInstanceです
"CustomizedMetricSpecification": {
"MetricName": "ApproximateBacklogSizePerInstance",
"Namespace": "AWS/SageMaker",
"Dimensions": [{"Name": "EndpointName", "Value": my_model.endpoint_name}],
"Statistic": "Average",
},
"ScaleInCooldown": 600, # スケールインアクティビティが完了してから次のスケールインアクティビティを開始するまでの時間(秒単位)。
"ScaleOutCooldown": 300, # スケールアウトアクティビティが完了してから次のスケールアウトアクティビティを開始するまでの時間(秒単位)。
# 'DisableScaleIn': True|False - ターゲットトラッキングポリシーによるスケールインが無効化されているかどうかを示します。
# 値がtrueの場合、スケールインは無効化され、ターゲットトラッキングポリシーはスケーラブルリソースから容量を削除しません。
},
)
このポリシーが正常に設定されていることを確認するには、SageMakerコンソールに移動し、ナビゲーションペインのInferenceの下のEndpointsを選択し、デプロイしたエンドポイントを探します。
非同期エンドポイントの呼び出し
エンドポイントを呼び出すには、リクエストペイロードをAmazon Simple Storage Service(Amazon S3)に配置し、このペイロードへのポインタをInvokeEndpointAsync
リクエストの一部として提供する必要があります。呼び出し時、SageMakerはリクエストを処理するためにキューに入れ、応答として識別子と出力場所を返します。処理が完了すると、SageMakerは結果をAmazon S3の場所に配置します。Amazon Simple Notification Service(Amazon SNS)を使用して成功またはエラーの通知を受け取ることも選択できます。
SageMaker Python SDK
デプロイが完了すると、AsyncPredictor
オブジェクトが返されます。非同期推論を実行するには、データをAmazon S3にアップロードし、predict_async()
メソッドを使用してS3 URIを入力として使用する必要があります。これによりAsyncInferenceResponse
オブジェクトが返され、get_response()
メソッドを使用して結果をチェックできます。
または、定期的に結果をチェックし、生成時に返すようにする場合は、predict()
メソッドを使用します。以下のコードではこの2番目の方法を使用しています:
import time
# SageMaker Python SDKを使用して非同期エンドポイントを呼び出す
def query_endpoint(payload):
"""エンドポイントをクエリし、レスポンスを表示する"""
response = predictor.predict_async(
data=payload,
input_path="s3://{}/{}".format(bucket, prefix),
)
while True:
try:
response = response.get_result()
break
except:
print("推論が準備されていません...")
time.sleep(5)
print(f"\033[1m 入力:\033[0m {payload['inputs']}")
print(f"\033[1m 出力:\033[0m {response[0]['generated_text']}")
query_endpoint(payload)
Boto3
次に、Boto3のsagemaker-runtime
クライアントからinvoke_endpoint_async
メソッドを探索しましょう。このメソッドを使用すると、開発者はSageMakerエンドポイントを非同期に呼び出し、進行状況の追跡トークンと応答の取得に使用することができます。ただし、Boto3にはSageMaker Python SDKのget_result()
操作のように非同期推論の完了を待つ方法はありません。したがって、Boto3は推論の出力をAmazon S3のresponse["OutputLocation"]
に保存します。次の関数を使用して、推論ファイルがAmazon S3に書き込まれるのを待つことができます:
import json
import time
import boto3
from botocore.exceptions import ClientError
s3_client = boto3.client("s3")
# 推論が生成されるまで待機
def wait_inference_file(bucket, prefix):
while True:
try:
response = s3_client.get_object(Bucket=bucket, Key=prefix)
break
except ClientError as ex:
if ex.response['Error']['Code'] == 'NoSuchKey':
print("ファイルの生成を待っています...")
time.sleep(5)
next
else:
raise
except Exception as e:
print(e.__dict__)
raise
return response
この関数を使用して、エンドポイントをクエリできます:
# Boto3 SDKを使用して非同期エンドポイントを呼び出す
import boto3
sagemaker_client = boto3.client("sagemaker-runtime")
# エンドポイントをクエリする関数
def query_endpoint_boto3(payload):
"""エンドポイントをクエリし、レスポンスを表示する"""
response = sagemaker_client.invoke_endpoint_async(
EndpointName=my_model.endpoint_name,
InputLocation="s3://{}/{}".format(bucket, prefix),
ContentType="application/json",
Accept="application/json"
)
output_url = response["OutputLocation"]
output_prefix = "/".join(output_url.split("/")[3:])
# Boto3を使用してoutput_urlのAmazon S3からファイルのバイトを読み取る
output = wait_inference_file(bucket, output_prefix)
output = json.loads(output['Body'].read())[0]['generated_text']
# 出力を表示
print(f"\033[1m 入力:\033[0m {payload['inputs']}")
print(f"\033[1m 出力:\033[0m {output}")
query_endpoint_boto3(payload)
LangChain
LangChainは、Harrison Chaseによって2022年10月にリリースされたオープンソースのフレームワークです。LangChainは、さまざまなシステムやデータソースとの統合を提供することで、大規模な言語モデル(LLM)を使用したアプリケーションの開発を簡素化します。LangChainは、ドキュメントの分析、要約、チャットボットの作成、コードの分析などを可能にします。数百人の開発者からの貢献とベンチャーファームからの大規模な資金援助を受けて、LangChainは人気を集めています。LangChainは、LLMと外部ソースを接続することで、動的でデータに応答するアプリケーションの作成が可能になります。開発プロセスを効率化するために、ライブラリ、API、ドキュメンテーションを提供しています。
LangChainは、フレームワークにSageMakerエンドポイントを使用するためのライブラリとサンプルを提供し、SageMaker上でホストされるMLモデルをチェーンの「脳」として使用することを容易にします。LangChainがSageMakerとどのように統合されるかの詳細については、LangChainのドキュメントのSageMakerエンドポイントを参照してください。
現在のLangChainの実装の制約の一つは、非同期エンドポイントをネイティブにサポートしていないことです。LangChainで非同期エンドポイントを使用するには、LangChainで既に利用可能なSagemakerEndpoint
クラスを拡張した新しいクラスSagemakerAsyncEndpoint
を定義する必要があります。さらに、以下の情報を提供します。
- 非同期推論が入力(および出力)を保存するS3バケットとプレフィックス
- タイムアウトする前の最大待機時間(秒)
invoke_endpoint()
ではなくinvoke_endpoint_async()
を使用してエンドポイントにクエリを実行するためのupdated _call()
関数- 非同期エンドポイントが冷始動状態(ゼロにスケールダウン)の場合に非同期エンドポイントを起動する方法
新しく作成されたSagemakerAsyncEndpoint
を確認するには、GitHubで利用可能なsagemaker_async_endpoint.py
ファイルをチェックしてください。
from typing import Dict
from langchain import PromptTemplate
from langchain.llms.sagemaker_endpoint import LLMContentHandler
from langchain.chains import LLMChain
from sagemaker_async_endpoint import SagemakerAsyncEndpoint
class ContentHandler(LLMContentHandler):
content_type:str = "application/json"
accepts:str = "application/json"
len_prompt:int = 0
def transform_input(self, prompt: str, model_kwargs: Dict) -> bytes:
self.len_prompt = len(prompt)
input_str = json.dumps({"inputs": prompt, "parameters": {"max_new_tokens": 100, "do_sample": False, "repetition_penalty": 1.1}})
return input_str.encode('utf-8')
def transform_output(self, output: bytes) -> str:
response_json = output.read()
res = json.loads(response_json)
ans = res[0]['generated_text']
return ans
chain = LLMChain(
llm=SagemakerAsyncEndpoint(
input_bucket=bucket,
input_prefix=prefix,
endpoint_name=my_model.endpoint_name,
region_name=sagemaker.Session().boto_region_name,
content_handler=ContentHandler(),
),
prompt=PromptTemplate(
input_variables=["query"],
template="{query}",
),
)
print(chain.run(payload['inputs']))
クリーンアップ
エンドポイントからの推論の生成のテストが完了したら、余分な料金を発生させないためにエンドポイントを削除することを忘れないでください。
predictor.delete_endpoint()
結論
TII Falconなどの大規模な基礎モデルを展開する際には、コストの最適化が重要です。これらのモデルには強力なハードウェアと大容量のメモリが必要であり、インフラストラクチャのコストが高くなります。SageMakerの非同期推論は、リクエストを非同期に処理する展開オプションであり、保留中のリクエストがない場合にはインスタンス数をゼロにスケールダウンすることで費用を削減します。この記事では、SageMaker JumpStartの基礎モデルをSageMakerの非同期エンドポイントに展開する方法を示しました。SageMaker Python SDK、Boto3、LangChainを使用したコード例を提供し、非同期エンドポイントを呼び出し結果を取得するためのさまざまな方法を説明しました。これらの技術により、開発者や研究者は、高度な言語理解システムの基礎モデルの機能を活用しながらコストを最適化することができます。
非同期推論やSageMaker JumpStartについて詳しくは、以下の記事をご覧ください。
- Amazon Kendra、LangChain、大規模な言語モデルを使用した高精度の生成型AIアプリケーションを迅速に構築する
- Amazon SageMakerの非同期エンドポイントで大規模な動画のコンピュータビジョン推論を実行する
We will continue to update VoAGI; if you have any questions or suggestions, please contact us!
Was this article helpful?
93 out of 132 found this helpful
Related articles