「SageMakerエンドポイントとしてカスタムMLモデルを展開する」

「カスタムMLモデルをSageMakerエンドポイントに展開する方法」

Ricardo Gomez Angelによる写真(出典:Unsplash)

SageMaker エンドポイントのデプロイ

AWS SageMaker エンドポイントを作成するための簡単なガイド

機械学習(ML)モデルの開発には、データ収集からモデルのデプロイまで、重要なステップがあります。アルゴリズムの改良とテストによるパフォーマンスの確認を経て、最後の重要なステップはデプロイです。このフェーズでは、イノベーションが実用性に変わり、他の人々がモデルの予測能力を利用できるようになります。デプロイされたMLモデルは、開発と現実世界への影響との間のギャップを埋め、ユーザーや関係者に具体的な利益をもたらします。

このガイドでは、SageMaker エンドポイントとしてカスタムMLを開発するために必要な基本的な手順について説明します。この時点で、既に動作するモデルがあることを前提とし、それをエンドポイント経由で世界に公開したいと考えています。このガイドでは、PyTorchベースのモデルのデプロイを行います。このモデル、別名AI VADは、 「正確かつ解釈可能なビデオ異常検出のための属性ベースの表現」という論文に基づいており、その実装はanomalib GitHub リポジトリ(作成者: OpenVINO)で見つけることができます。この興味深いアプローチについて詳しく読むには、このブログの最後の付録セクションまでスクロールしてください。

この時点で、まず重要なことを強調したいと思います。この場合、PyTorchモデルをデプロイするために特に構築されたPyTorchModel抽象化を使用することはできません。その理由は、事前にビルドされたPyTorch Sagemakerイメージに含まれていない追加の依存関係であるanomalibパッケージを持っているためです。また、モデルには、トレーニング時に学習された追加情報が必要であり、これはPyTorchモデルの重みの一部ではないためです。

以下にこの目標を達成するための手順を示します:

  1. Sagemakerモデルのサービングスクリプトを書く
  2. モデルをS3にアップロードする
  3. AWS ECRにカスタムDockerイメージをアップロードする
  4. SageMakerでモデルを作成する
  5. エンドポイント構成を作成する
  6. エンドポイントを作成する
  7. エンドポイントを呼び出す

Sagemakerモデルのサービングスクリプトを書く

Sagemakerモデルのサービングスクリプト(inference.py)は、Sagemakerモデルを作成する際に重要なコンポーネントです。このスクリプトは、機械学習モデルと実世界のデータの間を結びつけます。具体的には、入力リクエストを処理し、モデルの予測を実行し、結果を返します。したがって、アプリケーションの意思決定プロセスに影響を与えます。

inference.pyスクリプトは、いくつかの重要なメソッドで構成されており、それぞれが独自の目的を果たし、モデルのサービングプロセスを支援しています。以下に、主要な4つのメソッドをリストアップしました。

  1. model_fnメソッドは、トレーニングされたモデルを読み込む役割を担っています。保存されたモデルのアーティファクトを読み込み、予測に使用できるモデルオブジェクトを返します。このメソッドは、SageMakerモデルサーバーが起動されるときにのみ呼び出されます。
  2. input_fnメソッドは、リクエストデータを受け取り、予測に適した形式にフォーマットします。例えば、以下のコードでは、この関数はデータのソース(イメージバイトまたはS3 URIのリスト)およびフレームのリストが1つのビデオクリップとして扱われるかどうかに基づいてデータを異なる形式でフォーマットします。
  3. predict_fnメソッドは、フォーマットされたリクエストデータを受け取り、読み込まれたモデルに対して推論を実行します。
  4. 最後に、output_fnメソッドが使用されます。このメソッドは、予測結果を受け取り、応答メッセージの形式にフォーマットします。例えば、JSONオブジェクトとしてパックします。

以下にSagemakerモデルのサービングスクリプトのコードがあります。

import os
import json
import joblib
import torch
from PIL import Image
import numpy as np
import io
import boto3
from enum import Enum
from urllib.parse import urlsplit
from omegaconf import OmegaConf
from anomalib.data.utils import read_image, InputNormalizationMethod, get_transforms
from anomalib.models.ai_vad.torch_model import AiVadModel

device = "cuda"

class PredictMode(Enum):
    frame = 1
    batch = 2
    clip = 3

def model_fn(model_dir):
    """
    この関数は予測リクエストごとに最初に実行され、モデルをディスクから読み込んで後続の推論に使用されるモデルオブジェクトを返します。
    """
    # 設定ファイルをロードする
    config = OmegaConf.load(os.path.join(model_dir, "ai_vad_config.yaml"))
    config_model = config.model

    # モデルを読み込む
    model = AiVadModel(
        box_score_thresh=config_model.box_score_thresh,
        persons_only=config_model.persons_only,
        min_bbox_area=config_model.min_bbox_area,
        max_bbox_overlap=config_model.max_bbox_overlap,
        enable_foreground_detections=config_model.enable_foreground_detections,
        foreground_kernel_size=config_model.foreground_kernel_size,
        foreground_binary_threshold=config_model.foreground_binary_threshold,
        n_velocity_bins=config_model.n_velocity_bins,
        use_velocity_features=config_model.use_velocity_features,
        use_pose_features=config_model.use_pose_features,
        use_deep_features=config_model.use_deep_features,
        n_components_velocity=config_model.n_components_velocity,
        n_neighbors_pose=config_model.n_neighbors_pose,
        n_neighbors_deep=config_model.n_neighbors_deep,
    )

    # モデルの重みを読み込む
    model.load_state_dict(torch.load(os.path.join(model_dir, "ai_vad_weights.pth"), map_location=device), strict=False)

    # メモリバンクを読み込む
    velocity_estimator_memory_bank, pose_estimator_memory_bank, appearance_estimator_memory_bank = joblib.load(
        os.path.join(model_dir, "ai_vad_banks.joblib")
    )

    if velocity_estimator_memory_bank is not None:
        model.density_estimator.velocity_estimator.memory_bank = velocity_estimator_memory_bank
    if pose_estimator_memory_bank is not None:
        model.density_estimator.pose_estimator.memory_bank = pose_estimator_memory_bank
    if appearance_estimator_memory_bank is not None:
        model.density_estimator.appearance_estimator.memory_bank = appearance_estimator_memory_bank

    model.density_estimator.fit()

    # モデルをデバイスに移動する
    model = model.to(device)

    # 変換を取得する
    transform_config = config.dataset.transform_config.eval if "transform_config" in config.dataset.keys() else None
    image_size = (config.dataset.image_size[0], config.dataset.image_size[1])
    center_crop = config.dataset.get("center_crop")
    center_crop = tuple(center_crop) if center_crop is not None else None
    normalization = InputNormalizationMethod(config.dataset.normalization)
    transform = get_transforms(
        config=transform_config, image_size=image_size, center_crop=center_crop, normalization=normalization
    )

    return model, transform

def input_fn(request_body, request_content_type):
    """
    request_bodyはSageMakerによって渡され、コンテンツタイプはクライアント(または呼び出し元)がHTTPヘッダーで渡されます。
    """
    print("input_fn-----------------------")
    if request_content_type in ("application/x-image", "image/x-image"):
        image = Image.open(io.BytesIO(request_body)).convert("RGB")
        numpy_array = np.array(image)
        print("numpy_array.shape", numpy_array.shape)
        print("input_fn-----------------------")
        return [numpy_array], PredictMode.frame
    elif request_content_type == "application/json":
        request_body_json = json.loads(request_body)
        s3_uris = request_body_json.get("images", [])
        if len(s3_uris) == 0:
            raise ValueError(f"Imagesは必須のキーであり、少なくとも1つのS3 URIのリストを含んでいる必要があります")
        s3 = boto3.client("s3")
        frame_paths = []
        for s3_uri in s3_uris:
            parsed_url = urlsplit(s3_uri)
            bucket_name = parsed_url.netloc
            object_key = parsed_url.path.lstrip("/")
            local_frame_path = f"/tmp/{s3_uri.replace('/', '_')}"
            # S3からフレームをダウンロードする
            s3.download_file(bucket_name, object_key, local_frame_path)
            frame_paths.append(local_frame_path)
        frames = np.stack([torch.Tensor(read_image(frame_path)) for frame_path in frame_paths], axis=0)

        predict_mode = PredictMode.clip if request_body_json.get("clip", False) else PredictMode.batch

        print("frames.shape", frames.shape)
        print("predict_mode", predict_mode)
        print("input_fn-----------------------")

        return frames, predict_mode

    # もしもrequest_content_typeが予想される値でなければエラーを発生させる
    raise ValueError(f"サポートされていないコンテンツタイプ{request_content_type}です")

def predict_fn(input_data, model):
    """
    この関数は入力データとmodel_fnによって返されたモデルを受け取ります。
    model_fnの後に実行され、その出力がAPIのレスポンスとして返されます。
    """
    print("predict_fn-----------------------")
    model, transform = model

    frames, predict_mode = input_data

    processed_data = {}
    processed_data["image"] = [transform(image=frame)["image"] for frame in frames]
    processed_data["image"] = torch.stack(processed_data["image"])
    image = processed_data["image"].to(device)

    # バッチサイズ1のクリップに対してバッチの次元を追加する
    if predict_mode == PredictMode.clip:
        image = image.unsqueeze(0)

    print("image.shape", image.shape)

    model.eval()

    with torch.no_grad():
        boxes, anomaly_scores, image_scores = model(image)

    print("boxes_len", [len(b) for b in boxes])

    processed_data["pred_boxes"] = [box.int() for box in boxes]
    processed_data["box_scores"] = [score.to(device) for score in anomaly_scores]
    processed_data["pred_scores"] = torch.Tensor(image_scores).to(device)

    print("predict_fn-----------------------")

    return processed_data

def output_fn(prediction, accept):
    """
    モデル予測のための後処理関数です。predict_fnの後に実行されます。
    """
    print("output_fn-----------------------")

    # acceptタイプがJSONかどうかチェックする
    if accept != "application/json":
        raise ValueError(f"Acceptタイプ{accept}はサポートされていません")

    # PyTorchテンソルをリストに変換してJSONシリアライズ可能にする
    for key in prediction:
        # torch.Tensorの場合はリストに変換する
        if isinstance(prediction[key], torch.Tensor):
            prediction[key] = prediction[key].tolist()
        # リストの場合はリスト内のテンソルをすべて変換する
        elif isinstance(prediction[key], list):
            prediction[key] = [
                tensor.tolist() if isinstance(tensor, torch.Tensor) else tensor for tensor in prediction[key]
            ]

    print("output_fn-----------------------")

    return json.dumps(prediction), accept

P.S. 次のステップに進む前に、モデルのサービングスクリプトをテストすることを強くお勧めします。以下のコードで示されるように、呼び出しパイプラインをシミュレーションして簡単に行うことができます。

import json
from inference import model_fn, predict_fn, input_fn, output_fn

response, accept = output_fn(
    predict_fn(
        input_fn(payload, "application/x-image"),
        model_fn("../")
    ),
    "application/json"
)

json.loads(response).keys()

モデルをS3にアップロードする

AI VAD PyTorchモデルを完全に同じ状態で読み込むSageMakerエンドポイントを作成するために、次のファイルが必要です:

  • AI VAD PyTorchモデルの重み(state_dict)
  • 密度推定器のメモリバンク(モデルの重みの一部ではない)
  • PyTorchモデルのハイパーパラメータを含む設定ファイル
  • Sagemakerモデルサービングスクリプト(inference.py

以下のコードは、すべての必要なファイルを1つのディレクトリに整理する方法を示しています。

P.S.、組み込みのPyTorch ModelCheckpointコールバックをオーバーライドして、これらのメモリバンクがチェックポイントの保存の一部として保存されることを保証しています(実装はこちらで見つけることができます)。

import torch
import joblib
import shutil

checkpoint = "results/ai_vad/ucsd/run/weights/lightning/model.ckpt"
config_path = "results/ai_vad/ucsd/run/config.yaml"
model_weights = torch.load(checkpoint)
model_state_dict = model_weights["state_dict"]
torch.save(model_state_dict, "../ai_vad_weights.pth")
velocity_estimator_memory_bank = None
pose_estimator_memory_bank = None
appearance_estimator_memory_bank = None

if "velocity_estimator_memory_bank" in model_weights:
    velocity_estimator_memory_bank = model_weights["velocity_estimator_memory_bank"]
if "pose_estimator_memory_bank" in model_weights:
    pose_estimator_memory_bank = model_weights["pose_estimator_memory_bank"]
if "appearance_estimator_memory_bank" in model_weights:
    appearance_estimator_memory_bank = model_weights["appearance_estimator_memory_bank"]

banks = (velocity_estimator_memory_bank, pose_estimator_memory_bank, appearance_estimator_memory_bank)
joblib.dump(banks, "../ai_vad_banks.joblib")
shutil.copyfile(config_path, "../ai_vad_config.yaml")

その後、4つのファイルをまとめてtar.gzにするために以下のコマンドを使用しました。

tar -czvf ../ai_vad_model.tar.gz -C ../ ai_vad_weights.pth ai_vad_banks.joblib ai_vad_config.yaml inference.py

最後に、ファイルをboto3を使用してS3にアップロードしました。

import boto3
from datetime import datetime

current_datetime = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
s3 = boto3.resource('s3')
s3.meta.client.upload_file("../ai_vad_model.tar.gz", "ai-vad", f"{current_datetime}/ai_vad_model.tar.gz")

カスタムDockerイメージをAWS ECRにアップロードする

前述したように、事前にビルドされたPyTorch Sagemakerイメージに含まれていない追加の依存関係(つまり、anomalibパッケージ)があるため、その目的のために新しいDockerイメージを作成しました。カスタムDockerイメージをビルドする前に、Amazon ECRリポジトリへの認証が必要です。

REGION=<my_aws_region>
ACCOUNT=<my_aws_account>
# DockerをAmazon ECRレジストリに認証する
aws ecr get-login-password --region $REGION | docker login --username AWS --password-stdin <docker_registry_url>.dkr.ecr.$REGION.amazonaws.com
# プライベートなAmazon ECRレジストリにログイン
aws ecr get-login-password --region $REGION | docker login --username AWS --password-stdin $ACCOUNT.dkr.ecr.$REGION.amazonaws.com

以下のDockerfileを使用してカスタムDockerイメージをビルドし、異なるDockerレジストリパスはこちらで見つけることができます。モデルの要件(CPU / GPU、Pythonバージョンなど)およびAWSリージョンに基づいて、適切なレジストリパスを選択するようにしてください。たとえば、リージョンがus-east-1の場合、完全なDockerレジストリパスは次のようになります:763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:2.0.0-gpu-py310

# SageMaker PyTorchイメージをベースイメージとして使用します
FROM <docker_registry_url>.dkr.ecr.<my_aws_region>.amazonaws.com/pytorch-inference:2.0.0-gpu-py310
# 追加の依存関係をインストールします
RUN pip install "git+https://github.com/hairozen/anomalib.git@ai-vad-inference-improvements"

これで、カスタムイメージをビルドするためにクラシックなDockerビルドコマンドを実行できます。

docker build -t ai-vad-image .

次のステップは、構築した新しいイメージのAWS ECRリポジトリを作成し、タグ付けし、イメージをAWS ECRリポジトリにプッシュすることです。

# AWS ECRリポジトリを作成します
aws ecr create-repository --repository-name ai-vad-image
# イメージにタグを付けます
docker tag ai-vad-image:latest $ACCOUNT.dkr.ecr.$REGION.amazonaws.com/ai-vad-image:latest
# タグ付きのイメージをAWS ECRリポジトリにプッシュします
docker push $ACCOUNT.dkr.ecr.$REGION.amazonaws.com/ai-vad-image:latest

SageMakerでモデルを作成する

このステップは非常に簡単です。以下のコードを使用します。

import boto3
import sagemaker

sagemaker_client = boto3.client(service_name="sagemaker")
role = sagemaker.get_execution_role()
model_name = f"ai-vad-model-{current_datetime}"

primary_container = {
    "Image": f"{my_aws_account}.dkr.ecr.{my_aws_region}.amazonaws.com/ai-vad-image:latest",
    "ModelDataUrl": f"s3://ai-vad/{current_datetime}/ai_vad_model.tar.gz"
}

create_model_response = sagemaker_client.create_model(
    ModelName=model_name,
    ExecutionRoleArn=role,
    PrimaryContainer=primary_container)

エンドポイント構成の作成

次のステップは、エンドポイント構成を作成することです。以下に基本的なものがあります。

endpoint_config_name = f"ai-vad-model-config-{current_datetime}"
sagemaker_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[{
        "InstanceType": "ml.g5.xlarge",
        "InitialVariantWeight": 1,
        "InitialInstanceCount": 1,
        "ModelName": model_name,
        "VariantName": "AllTraffic"
    }]
)

エンドポイントの作成

さあ、いよいよエンドポイントを作成する準備が整いました。

endpoint_name = f"ai-vad-model-endpoint-{current_datetime}"
sagemaker_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name
)

エンドポイントのステータスが “Creating” から “InService” に変わるまで数分かかる場合があるため、注意してください。ステータスを確認するには、以下のようにします。

response = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
response["EndpointStatus"]

エンドポイントを呼び出す

お待ちかねの時がやってきました。エンドポイントを呼び出して、すべてが正常に動作するかテストしましょう。

with open(file_name, "rb") as f:
    payload = f.read()

predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name)
predictor.serializer = DataSerializer(content_type="image/x-image")
predictor.predict(payload)

これは素敵なチェックですが、predictor.predict関数はSageMakerのサービングスクリプトからの完全な呼び出しパイプラインを実行しません。呼び出しパイプラインには次のような処理が含まれます:output_fn(predict_fn(input_fn(input_data, model_fn(model_dir)),accept)

また、APIコールを使用してモデルを呼び出すことでもテストできます。

with open(file_name, "rb") as f:
    payload = f.read()

sagemaker_runtime = boto3.client("runtime.sagemaker")
response = sagemaker_runtime.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType="image/x-image",
    Body=payload
)

response = json.loads(response["Body"].read().decode())

「anomalib」が提供する素晴らしい可視化ツールを使用して、UCSDped2データセットの特定のフレームのボックスとラベルを描画できます。

画像は著者によって生成されました。UCSD Anomaly Detection Datasetを基にしたanomalibパッケージを使用して生成されました。緑のボックスは歩行者の歩き方に異常がないことを示していますが、バイカーの赤いボックスはAI VADモデルの速度とポーズの特徴による異常を示しています。

結論

では、ここでカバーした内容をまとめましょう。SageMakerモデルのデプロイには、数つの手順が必要です。

まず、Sagemakerモデルサービングスクリプトを作成し、モデルの機能と振る舞いを定義する必要があります。

次に、モデルはAmazon S3にアップロードされ、保存と取得に使用されます。さらに、モデルとその依存関係をコンテナ化するために、カスタムのDockerイメージがAWS Elastic Container Registry(ECR)にアップロードされます。次のステップでは、SageMakerでモデルを作成し、S3に保存されたモデルアーティファクトとECRに保存されたDockerイメージを関連付けます。

次に、モデルをホストするために使用するインスタンスの数とタイプを定義するエンドポイント設定が作成されます。

最後に、デプロイされたモデルとクライアントアプリケーションとの間のライブ接続を確立するためにエンドポイントが作成され、エンドポイントを呼び出してリアルタイムの予測を行うことができます。

これらの手順により、SageMakerモデルのデプロイは効率的かつ信頼性のあるモデルサービングが実現されます。

付録

Reiss et al.によって2023年に発表されたAttribute-based Representations for Accurate and Interpretable Video Anomaly Detection論文は、属性ベースの表現を使用したシンプルで非常に効果的なビデオ異常検出(VAD)の方法を提案しています。

この論文では、従来のVAD方法が深層学習に頼ることが多く、システムが特定のフレームやオブジェクトを異常としてフラッグを立てる理由がユーザーに理解しにくいという課題に取り組んでいます。

この問題に対処するため、著者らは各オブジェクトをその速度、ポーズ、深度で表現する方法を提案しています。これらの属性は理解しやすく解釈しやすいものであり、密度ベースのアプローチを使用して異常スコアを計算するために使用することができます。

このシンプルな表現が、ShanghaiTechなどのいくつかの難解なVADデータセットで最先端のパフォーマンスを実現するために十分であることを論文は示しています。

正確性に加えて、著者らは自身の手法が解釈可能であることも示しています。たとえば、ユーザーに対してビデオの異常スコアに最も寄与しているオブジェクトのリストを提供し、それらのオブジェクトの速度、ポーズ、深度情報と共に提示することができます。これにより、システムがなぜビデオを異常としてフラッグを立てているのかをユーザーが理解するのに役立ちます。

全体として、この論文はVADの分野において重要な貢献です。さまざまなアプリケーションで使用できるシンプルで正確で解釈可能なVADの手法を提案しています。

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more