エアライン事業で情報とモデルを明らかにして、明らかにスムースに動作するようにモニターする

エアライン事業での情報とモデルの明示およびスムーズな運営のためのモニタリング

イントロダクション

訓練と評価でパフォーマンスの良いモデルが、本番環境で悪化するという挫折感を経験したことがありますか?これは本番フェーズでよくある課題ですが、そこでEvidently.aiという素晴らしいオープンソースのツールが登場し、私たちのMLモデルを観察可能にして監視しやすくします。このガイドでは、本番環境でのデータとモデルのパフォーマンスの変化の背後にある理由と、実装するために必要なアクションについて取り上げます。また、このツールをStreamlit予測アプリと統合する方法も学びます。素晴らしい旅を始めましょう。

この記事はデータサイエンスブログマラソンの一環として公開されました。

必要な前提条件

1) リポジトリのクローン

git clone "https://github.com/VishalKumar-S/Flight-Delay-Prediction-and-live-Monitoring-with-Azure-Evidently-and-Streamlit-with-MVC-Architecture.git"

2) 仮想環境の作成とアクティベート

# 仮想環境を作成するpython3 -m venv venv# プロジェクトフォルダで仮想環境をアクティベートするsource venv/bin/activate

# このコマンドはrequirements.txtファイルにリストされているPythonパッケージをインストールします。pip install -r requirements.txt

4) StreamlitとEvidentlyのインストール

pip install streamlitpip install evidently

プロジェクト構造:

project_root/│├── assets/│├── data/│   └── Monitoring_data.csv│├── models/│   ├── best_model.pkl│   ├── lightgml_model.pkl│   ├── linear_regression_model.pkl│   ├── random_forest_model.pkl│   ├── ridge_regression.pkl│   ├── svm_model.pkl│   └── xgboost_model.pkl│├── notebooks/│   └── EDA.ipynb│├── src/│   ├── controller/│   │   └── flight_delay_controller.py│   ││   ├── model/│   │   └── flight_delay_model.py│   ││   ├── view/│   │   └── flight_delay_view.py│   ││   ├── data_preprocessing.py│   ├── model_evaluation.py│   └── modeling.py│├── .gitignore├── Dockerfile├── LICENSE├── Readme.md├── app.py└── requirements.txt

データの前処理

クラウドからの取得

このプロジェクトでは、Azureからデータセットを取得します。まず、Azureでストレージコンテナを作成し、ブロブストレージを作成し、生のデータセットをそこにアップロードして、パブリックにアクセス可能にし、将来のデータ前処理のために使用します。

データセットのリンク: https://www.kaggle.com/datasets/giovamata/airlinedelaycauses

以下は、クラウドから取得するためのコードの一部です。

class DataPreprocessorTemplate:    """    カスタマイズ可能なステップを持つデータ前処理のためのテンプレートメソッドパターン。    """    def __init__(self, data_url):        """        SASトークンを含むデータURLでDataPreprocessorテンプレートを初期化します。        Args:            data_url (str): SASトークンを含むデータへのURL。        """        self.data_url = data_url    def fetch_data(self):        """        Azure Blob Storageからデータを取得します。        Returns:            pd.DataFrame: パンダのデータフレーム形式で取得されたデータセット。        """        try:            # 提供されたURLを使用してデータセットを取得する            print("クラウドからデータを取得中...")            data = pd.read_csv(self.data_url)            return data        except Exception as e:            raise Exception("データの取得中にエラーが発生しました: " + str(e))def main():  # SASトークンを含むデータのURL  data_url = "https://flightdelay.blob.core.windows.net/flight-delayed-dataset/DelayedFlights.csv"  output_path = "../data/cleaned_flight_delays.csv"  data_preprocessor = DataPreprocessorTemplate(data_url)  data = data_preprocessor.fetch_data()  cleaned_data=data_preprocessor.clean_data(data)  data_preprocessor.save_cleaned_data(cleaned_data, output_path)    

アズールのイメージ:

データのクリーニングと変換

データの世界では、データの変換は生のダイヤモンドを磨き上げたものに変えます。ここでは、不要なフィーチャーの削除、欠損値の補完、カテゴリカルカラムのエンコーディング、そして外れ値の除去など、データの前処理ステップを進めていきます。ここではダミーエンコーディングを実装しました。その後、Zスコアテストを使用して外れ値を除去します。

データクリーニングのコードスニペット:

def clean_data(self,df):        """        入力データをクリーニングおよび前処理します。        Args:            df (pd.DataFrame): 入力データセット。        Returns:            pd.DataFrame: クリーニングおよび前処理されたデータセット。        """        print("データをクリーニングしています...")        df=self.remove_features(df)        df=self.impute_missing_values(df)        df=self.encode_categorical_features(df)        df=self.remove_outliers(df)        return df    def remove_features(self,df):        """        データセットから不要な列を削除します。        Args:            df (pd.DataFrame): 入力データセット。        Returns:            pd.DataFrame: 不要な列が削除されたデータセット。        """        print("不要な列を削除しています...")        df=df.drop(['Unnamed: 0','Year','CancellationCode','TailNum','Diverted','Cancelled','ArrTime','ActualElapsedTime'],axis=1)        return df    def impute_missing_values(self,df):        """        データセット内の欠損値を補完します。        Args:            df (pd.DataFrame): 入力データセット。        Returns:            pd.DataFrame: 欠損値が補完されたデータセット。        """        print("欠損値を補完しています...")        delay_colns=['CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']        # これらの列の欠損値を0で補完します        df[delay_colns]=df[delay_colns].fillna(0)        # これらの列の欠損値を中央値で補完します        columns_to_impute = ['AirTime', 'ArrDelay', 'TaxiIn','CRSElapsedTime']        df[columns_to_impute]=df[columns_to_impute].fillna(df[columns_to_impute].median())        return df    def encode_categorical_features(self,df):        """        データセット内のカテゴリカルフィーチャーをエンコードします。        Args:            df (pd.DataFrame): 入力データセット。        Returns:            pd.DataFrame: カテゴリカルフィーチャーがエンコードされたデータセット。        """        print("カテゴリカルフィーチャーをエンコードしています...")        df=pd.get_dummies(df,columns=['UniqueCarrier', 'Origin', 'Dest'], drop_first=True)        return df    def remove_outliers(self,df):        """        データセットから外れ値を除去します。        Args:            df (pd.DataFrame): 入力データセット。        Returns:            pd.DataFrame: 外れ値が除去されたデータセット。        """        print("外れ値を除去しています...")        z_threshold=3        z_scores=np.abs(stats.zscore(df[self.numerical_columns]))        outliers=np.where(z_scores>z_threshold)        df_no_outliers=df[(z_scores<=z_threshold).all(axis=1)]        print("データクリーニング後の形状:", df_no_outliers.shape)        return df_no_outliers

その後、クリーニングされたデータセットを将来のモデルのトレーニングと評価の目的で保存します。クリーニングされたデータセットを保存するために、joblibを使用します。

以下はコードスニペットです:

def save_cleaned_data(self,cleaned_data, output_path):    """    クリーニングされたデータをCSVファイルに保存します。    Args:        cleaned_data (pd.DataFrame): クリーニングされたデータセット。        output_path (str): クリーニングされたデータを保存するパス。    """    print("クリーニングされたデータを保存しています...")                cleaned_data.to_csv(output_path,index=False)

トレーニングと評価

データのクリーニング後、データセットをトレーニングセットとテストセットの2つに分割します。そして、線形回帰、ランダムフォレスト回帰、xgboost、リッジ回帰、lightgbmモデルなどの複数の回帰モデルをトレーニングします。その後、joblibを使用してすべてのファイルを.pklファイルとして保存します。これにより、時間が短縮され、リソースの追加使用が最適化されます。

これでトレーニングされたモデルファイルを評価および予測の目的で使用できます。その後、R2スコア、MAE(平均絶対誤差)値、およびRMSE(平方根平均二乗誤差)値などの評価指標に基づいてモデルを評価します。最もパフォーマンスの良いモデルを保存して展開に使用します。

モデルトレーニングのためのコードスニペット:

# マシンラーニングモデルを作成するための関数def create_model(model_name):    if model_name == "random_forest":        return RandomForestRegressor(n_estimators=50, random_state=42)    elif model_name == "linear_regression":        return LinearRegression()    elif model_name == "xgboost":        return xgb.XGBRegressor()    elif model_name == "ridge_regression":        return Ridge(alpha=1.0)  # 必要に応じてalphaを調整    elif model_name == "lightgbm":        return lgb.LGBMRegressor()# クリーニングされたデータセットを読み込むprint("モデルの読み込みが開始されました...")cleaned_data = pd.read_csv("../data/cleaned_flight_delays.csv")print("モデルの読み込みが完了しました")# 目的変数(ArrDelay)と特徴量(X)を定義target_variable = "ArrDelay"X = cleaned_data.drop(columns=[target_variable])y = cleaned_data[target_variable]# データをトレーニングセットとテストセットに分割print("データをトレーニングセットとテストセットに分割しています...")X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)print("データの分割が完了しました。")

# モデルをトレーニングして保存するための関数def train_and_save_model(model_name, X_train, y_train):    model = create_model(model_name)    print(f"{model_name}モデルのトレーニング中...")    start_time = time.time()    model.fit(X_train, y_train)    print("モデルのトレーニングが完了しました...")    end_time = time.time()    elapsed_time = end_time - start_time    print(f"完了までのトレーニング時間:{elapsed_time:.2f}秒")    # トレーニング済みモデルを保存する    joblib.dump(model, f"../models/{model_name}_model.pkl")    print(f"{model_name}モデルが{model_name}_model.pklとして保存されました")# ランダムフォレストモデルを作成してトレーニングするtrain_and_save_model("random_forest", X_train, y_train)# 線形回帰モデルをトレーニングするtrain_and_save_model("linear_regression", X_train, y_train)# XGBoostモデルを作成してトレーニングするtrain_and_save_model("xgboost", X_train, y_train)# Ridge回帰モデルを作成してトレーニングするtrain_and_save_model("ridge_regression", X_train, y_train)# LightGBMモデルを作成してトレーニングするtrain_and_save_model("lightgbm", X_train, y_train)

モデル評価のためのコードスニペット

# クリーニングされたデータセットを読み込むcleaned_data = pd.read_csv("../data/cleaned_flight_delays.csv")# トレーニング済みのマシンラーニングモデルを読み込むrandom_forest_model = joblib.load("../models/random_forest_model.pkl")linear_regression_model = joblib.load("../models/linear_regression_model.pkl")xgboost_model = joblib.load("../models/xgboost_model.pkl")ridge_regression_model = joblib.load("../models/ridge_regression_model.pkl")lightgbm_model = joblib.load("../models/lightgbm_model.pkl")# 目的変数(ArrDelay)と特徴量(X)を定義target_variable = "ArrDelay"X = cleaned_data.drop(columns=[target_variable])y = cleaned_data[target_variable]# データをトレーニングセットとテストセットに分割X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)# モデルを評価するための関数を定義するdef evaluate_model(model, X_test, y_test):    y_pred = model.predict(X_test)    mae = mean_absolute_error(y_test, y_pred)    mse = mean_squared_error(y_test, y_pred)    r2 = r2_score(y_test, y_pred)    return mae, mse, r2# モデルを評価し、結果を保存するための辞書を作成するmodels = {    "ランダムフォレスト": random_forest_model,    "線形回帰": linear_regression_model,    "XGBoost": xgboost_model,    "Ridge回帰": ridge_regression_model,    "LightGBM": lightgbm_model,}# 各モデルを評価し、メトリックを保存するmetrics = {}for model_name, model in models.items():    mae, mse, r2 = evaluate_model(model, X_test, y_test)    metrics[model_name] = {"MAE": mae, "MSE": mse, "R2": r2}# 全モデルの評価メトリックを表示するfor model_name, model_metrics in metrics.items():    print(f"{model_name}の評価メトリック:")    print(f"平均絶対誤差(MAE):{model_metrics['MAE']:.2f}")    print(f"平均二乗誤差(MSE):{model_metrics['MSE']:.2f}")    print(f"決定係数(R2)スコア:{model_metrics['R2']:.2f}")    print()

評価の後、最適な展開モデルを選択します。

最良のモデルを保存して印刷するためのコードスニペット:

# R2スコアに基づいて最良のモデルを見つける
best_model = max(metrics, key=lambda model: metrics[model]["R2"])

# 結果を印刷する
print(f"トレーニング済みモデルの中で最良のモデルは {best_model} で、次のメトリックを持っています:")
print(f"平均絶対誤差(MAE): {metrics[best_model]['MAE']}")
print(f"平均二乗誤差(MSE): {metrics[best_model]['MSE']}")
print(f"R二乗(R2)スコア: {metrics[best_model]['R2']}")

# 最良のモデルを保存する
joblib.dump(models[best_model], "../models/best_model.pkl")
print("最良のモデルは best_model.pkl として保存されました")

出力:

明らかに:データとモデルのモニタリングのために

本番環境では、モデルの正常な機能にさまざまな問題が生じる可能性があります。いくつかの主な考慮事項は次のとおりです:

  1. トレーニングと提供のずれ:トレーニングと実験用のデータに大きな違いがある場合に発生します。
  2. データ品質と整合性の問題:パイプラインの断裂、インフラの問題、データスキーマの変更、またはソースからのその他のデータの問題など、データ処理に関連する問題が発生する可能性があります。
  3. アップストリームモデルのエラー:本番環境では、モデルはしばしばチェーンを形成し、1つのモデルの入力が他のモデルの出力に依存しています。そのため、1つのモデルの出力に問題があると、全体的なモデルの予測結果に影響を及ぼします。

4. 徐々の概念の変化:時間の経過とともに、取り組んでいる概念や目標変数に変化が生じることがあります。そのため、モニタリングが必要です。また、モデルの予測が誤る可能性のある予期しない状況も発生する可能性があります。たとえば、災害や自然災害など。

データとモデルの品質を評価するため、参照データセットと現在のデータセットの2つのデータセットを考慮します。

参照データセット:このデータセットは、現在のデータセットの品質メトリックのベンチマークです。Evidentlyによってデフォルトの閾値値が選択されますが、参照データセットに基づいて特定のニーズに応じたカスタマイズされたメトリックの閾値値を提供することもできます。

現在のデータセット:評価のために取得される未知のライブデータを表します。

これらの2つのデータセットでデータドリフト、ターゲットドリフト、データの品質、およびモデルのパフォーマンスレポートを計算します。

レポートには、データドリフト、ターゲットドリフト、データ品質、およびモデルパフォーマンスのメトリックが含まれます。通常、モニタリングは特定の間隔でバッチで行われます。

モデルを再トレーニングする前に考慮すべき事項:

1. データドリフトのチェック:データドリフトが検出された場合は、まずデータの品質を確認し、パンデミックや自然災害などの外部要因がドリフトに影響を与えていないかを確認することが推奨されます。

2. モデルのパフォーマンスの評価:データドリフトの問題に対処した後のモデルのパフォーマンスを考慮します。データとモデルの報告がドリフトを示している場合は、モデルを再トレーニングすることが良い考えかもしれません。この決定を下す前に、次の点を考慮してください。

3. 再トレーニングの検討:再トレーニングは常に解決策ではありません。多くの場合、モデルを再トレーニングするための十分な新しいデータが得られません。特定の理由により、データが不安定で間違っている可能性もあるため、新しいデータでトレーニングする際は注意が必要です。

4. データドリフトの警告の設定:データドリフトの警告を設定する前に、その特定のデータ/特徴の予測に対する重要性を分析してください。すべてのデータドリフトが重要であり、対策が必要なわけではありません。各特徴が予測に与える影響の重要性を常に分析してください。

さまざまな形式でレポートを生成することができます。HTML、JPEG、JSONなどがあります。レポートを生成するためのコードスニペットを探索しましょう。

モデルのパフォーマンスレポートコードスニペット

# 回帰性能レポートオブジェクトを作成する
regression_performance_report = Report(metrics=[RegressionPreset()])
# 回帰性能レポートを実行する
regression_performance_report.run(
    reference_data=reference,  # 比較用の参照データセット
    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],  # 解析用の現在のデータセット
    column_mapping=column_mapping  # 参照と現在のデータセットの列のマッピング
)
# モデルのパフォーマンスレポートのHTMLファイルを保存するパスを指定する
model_performance_report_path = reports_dir / 'model_performance.html'
# 回帰性能レポートをHTMLファイルとして保存する
regression_performance_report.save_html(model_performance_report_path)

ターゲットドリフトコードスニペット

# ターゲットドリフトレポートオブジェクトを作成する
target_drift_report = Report(metrics=[TargetDriftPreset()])
# ターゲットドリフトレポートを実行する
target_drift_report.run(
    reference_data=reference,  # 比較用の参照データセット
    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],  # 解析用の現在のデータセット
    column_mapping=column_mapping  # 参照と現在のデータセットの列のマッピング
)
# ターゲットドリフトレポートのHTMLファイルを保存するパスを指定する
target_drift_report_path = reports_dir / 'target_drift.html'
# ターゲットドリフトレポートをHTMLファイルとして保存する
target_drift_report.save_html(target_drift_report_path)

データドリフト

# 列のマッピングオブジェクトを作成する
column_mapping = ColumnMapping()
column_mapping.numerical_features = numerical_features  # マッピングのための数値特徴量を定義する
# データドリフトレポートオブジェクトを作成する
data_drift_report = Report(metrics=[DataDriftPreset()])
# データドリフトレポートを実行する
data_drift_report.run(
    reference_data=reference,  # 比較用の参照データセット
    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],  # 解析用の現在のデータセット
    column_mapping=column_mapping  # 参照と現在のデータセットの数値特徴量のマッピング
)
# データドリフトレポートのHTMLファイルを保存するパスを指定する
data_drift_report_path = reports_dir / 'data_drift.html'
# データドリフトレポートをHTMLファイルとして保存する
data_drift_report.save_html(data_drift_report_path)
column_mapping = ColumnMapping()
column_mapping.numerical_features = numerical_features
data_drift_report = Report(metrics=[DataDriftPreset()])
data_drift_report.run(
    reference_data=reference,
    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],
    column_mapping=column_mapping
)
data_drift_report_path = reports_dir / 'data_drift.html'
data_drift_report.save_html(data_drift_report_path)

データ品質コードスニペット

# 列のマッピングオブジェクトを作成する
column_mapping = ColumnMapping()
column_mapping.numerical_features = numerical_features  # マッピングのための数値特徴量を定義する
# データ品質レポートオブジェクトを作成する
data_quality_report = Report(metrics=[DataQualityPreset()])
# データ品質レポートを実行する
data_quality_report.run(
    reference_data=reference,  # 比較用の参照データセット
    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],  # 解析用の現在のデータセット
    column_mapping=column_mapping  # 参照と現在のデータセットの数値特徴量のマッピング
)
# データ品質レポートのHTMLファイルを保存するパスを指定する
data_quality_report_path = reports_dir / 'data_quality.html'
# データ品質レポートをHTMLファイルとして保存する
data_quality_report.save_html(data_quality_report_path)

モデル-ビュー-コントローラー(MVC)アーキテクチャの探索

このセクションでは、MVC(モデル-ビュー-コントローラー)アーキテクチャについて学びましょう。

MVCアーキテクチャは、Webアプリで使用されるデザインパターンです。コードの複雑さを簡素化し、コードの可読性を向上させるために使用されます。そのコンポーネントを見てみましょう。

モデル: ロジック

モデルコンポーネントは、アプリケーションのコアロジックを表します。データ処理や機械学習モデルの操作を担当します。モデルスクリプトはsrc/model/ディレクトリにあります。

ビュー: ユーザーインターフェースの構築

ビューコンポーネントは、ユーザーインターフェースの作成に責任を持ちます。ユーザーとの対話や情報の表示を行います。ビュースクリプトはsrc/view/ディレクトリにあります。

コントローラー: 組織的な橋梁

コントローラーコンポーネントは、モデルとビューの間の仲介的な橋梁として機能します。ユーザーの入力やリクエストを処理します。コントローラースクリプトはsrc/controller/ディレクトリにあります。

MVCダイアグラム:

以下はMVCアーキテクチャの視覚表現です:

MVCアーキテクチャでの遅延予測のコードスニペットを見てみましょう:

1) モデル (flight_delay_model.py):

#必要なライブラリやモジュールのインポート
import pandas as pd
import joblib
import xgboost as xgb
from typing import Dict

class FlightDelayModel:
    def __init__(self, model_file="models/best_model.pkl"):
        """
        FlightDelayModelを初期化します。
        引数:
            model_file(str): トレーニング済みモデルファイルへのパス。
        """
        # 提供されたファイルから事前訓練済みモデルをロードします。
        self.model = joblib.load(model_file)
        
    def predict_delay(self, input_data: Dict):
        """
        入力データに基づいてフライトの遅延を予測します。
        引数:
            input_data(Dict): 予測のための入力特徴量を含む辞書。
        戻り値:
            float: 予測されたフライトの遅延。
        """
        # 入力データの辞書を予測用のDataFrameに変換します。
        input_df = pd.DataFrame([input_data])
        # 事前訓練済みモデルを使用して予測を行います。
        prediction = self.model.predict(input_df)
        return prediction

2) ビュー (flight_delay_view.py):

import streamlit as st
from typing import Dict

class FlightDelayView:
    def display_input_form(self):
        """
        ユーザーがデータを入力するためのStreamlitサイドバーで入力フォームを表示します。
        """
        st.sidebar.write("入力値:")
        # 入力フォーム要素の作成のためのコーディング部分はここになります
        
    def display_selected_inputs(self, selected_data: Dict):
        """
        ユーザーの入力に基づいて選択された入力値を表示します。
        引数:
            selected_data(Dict): 選択された入力値を含む辞書。
        戻り値:
            Dict: 選択された値付きの同じ辞書。
        """
        # 選択された入力値を表示するためのコーディング部分はここになります
        # スライダー、数値入力、セレクトボックスなど、選択された入力値の表示
        return selected_data
    
    def display_predicted_delay(self, flight_delay):
        """
        予測されたフライトの遅延をユーザーに表示します。
        引数:
            flight_delay: 予測されたフライトの遅延値。
        """
        # 予測された遅延を表示するためのコーディング部分はここになります
        # ユーザーに予測されたフライトの遅延値を表示

3) コントローラー (flight_delay_controller.py):

import streamlit as st
from src.view.flight_delay_view import FlightDelayView
from src.model.flight_delay_model import FlightDelayModel
from typing import Dict

class FlightDelayController:
    def __init__(self):
        self.model = FlightDelayModel()
        self.view = FlightDelayView()
        self.selected_data = self.model.selected_data()
        
    def run_prediction(self):
        # 入力フォームを表示し、ユーザーの入力を収集し、選択された入力を表示します
        self.view.display_input_form()
        input_data = self.get_user_inputs()
        self.view.display_selected_inputs(input_data)
        if st.button("フライト遅延予測"):
            # 予測ボタンがクリックされたとき、フライトの遅延を予測し結果を表示します
            flight_delay = self.model.predict_delay(input_data)
            self.view.display_predicted_delay(flight_delay)
            
    def get_user_inputs(self):
        # Streamlitサイドバーからユーザーの入力を収集するためのコーディング部分はここになります
        user_inputs = {}
        # ユーザーの入力を保存するための空の辞書を作成します
        # Streamlitのサイドバーウィジェットを使用してユーザーの入力を収集できます。例:st.sidebar.slider、st.sidebar.selectboxなど
        # ここにユーザーの入力を収集し、'user_inputs'辞書を埋めるためのコードを追加できます
        # 例:
        # user_inputs['selected_feature'] = st.sidebar.slider("特徴量を選択", 最小値, 最大値, デフォルト値)
        # 各ユーザーの入力を収集するためにこれを繰り返します
        return user_inputs
        # ユーザーの入力が含まれる辞書を返します

Streamlitでの予測とモニタリングの統合

ここでは、Streamlitの予測とEvidentlyのモニタリングを統合し、ユーザーが予測やデータやモデルのモニタリングを行うことができるようにします。

このアプローチにより、ユーザーはMVCアーキテクチャのデザインパターンに従って、シームレスに予測、モニタリング、またはその両方を行うことができます。

コード:

src/flight_delay_Controller.pyに実装された参照データセットと現在のデータセットの選択のコードスニペット

# 必要なライブラリのインポート
import streamlit as st
import pandas as pd
import time

class FlightDelayController:
    def run_monitoring(self):
        # Streamlitアプリケーションのタイトルと紹介
        st.title("データ&モデルモニタリングアプリ")
        st.write("データ&モデルモニタリングアプリにおいています。サイドバーから日付範囲を選択し、'Submit'をクリックしてモデルのトレーニングとモニタリングを開始してください。")
        # ユーザーが好きな日付範囲を選択できるようにします
        new_start_month = st.sidebar.selectbox("開始月", range(1, 12), 1)
        new_end_month = st.sidebar.selectbox("終了月", range(1, 12), 1)
        new_start_day = st.sidebar.selectbox("開始日", range(1, 32), 1)
        new_end_day = st.sidebar.selectbox("終了日", range(1, 32), 30)
        # もしユーザーが「Submit」ボタンをクリックした場合
        if st.button("Submit"):
            st.write("現在のデータを取得しています...")
            # データの取得にかかる時間を計測します
            data_start = time.time()
            df = pd.read_csv("data/Monitoring_data.csv")
            data_end = time.time()
            time_taken = data_end - data_start
            st.write(f"データの取得には {time_taken:.2f} 秒かかりました")
            # 選択された日付範囲に基づいてデータをフィルタリングします
            date_range = (
                (df['Month'] >= new_start_month) & (df['DayofMonth'] >= new_start_day) &
                (df['Month'] <= new_end_month) & (df['DayofMonth'] <= new_end_day)
            )
            # 参照データセットと現在のデータセットを作成します
            reference_data = df[~date_range]
            current_data = df[date_range]

参照データセットと現在のデータセットを選択した後、データドリフト、データ品質、モデル品質、およびターゲットドリフトのレポートを生成します。レポート生成のコードは、MVCアーキテクチャデザインに従って3つのパートに分割されています。これら3つのファイルに含まれるコードを見てみましょう。

flight_delay_Controller.pyのコードスニペット

import streamlit as stfrom src.view.flight_delay_view import FlightDelayViewfrom src.model.flight_delay_model import FlightDelayModelimport numpy as npimport pandas as pdfrom scipy import statsimport time# Controllerclass FlightDelayController:    """    フライト遅延予測アプリのコントローラコンポーネントです。    このクラスは、モデルとビューコンポーネントの間の相互作用を取りまとめます。    """    def __init__(self):        # モデルとビューのインスタンスを作成してコントローラを初期化します。        self.model = FlightDelayModel()        # FlightDelayModelのインスタンスを作成します。        self.view = FlightDelayView()        # FlightDelayViewのインスタンスを作成します。        self.selected_data = self.model.selected_data()        # モデルから選択したデータを取得します。        self.categorical_options = self.model.categorical_features()        # モデルからカテゴリ特徴を取得します。    def run_monitoring(self):        # モニタリングアプリケーションを実行するための関数です。        # タイトルと簡単な説明を設定します。        st.title("データ&モデルモニタリングアプリ")        st.write("データ&モデルモニタリングアプリにいます。サイドバーから日付と月の範囲を選択し、「提出」をクリックしてモデルの学習とモニタリングを開始してください。")        # チェックボックスを使用して生成するレポートを選択します。        st.subheader("生成するレポートを選択してください")        generate_model_report = st.checkbox("モデルパフォーマンスレポートを生成する")        generate_target_drift = st.checkbox("ターゲットドリフトレポートを生成する")        generate_data_drift = st.checkbox("データドリフトレポートを生成する")        generate_data_quality = st.checkbox("データ品質レポートを生成する")        if st.button("提出"):            # 'date_range'と'df' はここでは表示されていませんが、すべてのコードスニペットで既に表示されています。            # 選択した日付範囲を除いた参照データ。            reference_data = df[~date_range]            # 選択した日付範囲の現在のデータ。            current_data = df[date_range]            self.view.display_monitoring(reference_data, current_data)  # モニタリングデータを表示します。            self.model.train_model(reference_data, current_data)  # モデルをトレーニングします。            # 選択されたレポートを生成して表示します。            if generate_model_report:                st.write("### モデルパフォーマンスレポート")                st.write("モデルパフォーマンスレポートを生成中...")                performance_report = self.model.performance_report(reference_data, current_data)                self.view.display_report(performance_report, "モデルパフォーマンスレポート")            if generate_target_drift:                st.write("### ターゲットドリフトレポート")                st.write("ターゲットドリフトレポートを生成中...")                target_report = self.model.target_report(reference_data, current_data)                self.view.display_report(target_report, "ターゲットドリフトレポート")            if generate_data_drift:                st.write("### データドリフトレポート")                st.write("データドリフトレポートを生成中...")                data_drift_report = self.model.data_drift_report(reference_data, current_data)                self.view.display_report(data_drift_report, "データドリフトレポート")            if generate_data_quality:                st.write("### データ品質レポート")                st.write("データ品質レポートを生成中...")                data_quality_report = self.model.data_quality_report(reference_data, current_data)                self.view.display_report(data_quality_report, "データ品質レポート")

Flight_delay_model.pyのコード

import pandas as pdimport joblibimport xgboost as xgbfrom evidently.pipeline.column_mapping import ColumnMappingfrom evidently.report import Reportfrom evidently.metric_preset import DataDriftPresetfrom evidently.metric_preset import TargetDriftPresetfrom evidently.metric_preset import DataQualityPresetfrom evidently.metric_preset.regression_performance import RegressionPresetimport timeimport streamlit as stfrom typing import Dict# Modelclass FlightDelayModel:    """    フライト遅延予測アプリのモデルコンポーネントです。    このクラスは、データの読み込み、モデルの読み込み、および遅延予測を処理します。    """    def __init__(self, model_file="models/best_model.pkl"):        # FlightDelayModelクラスを初期化します        # データ分析のための列マッピングを定義します        self.column_mapping = ColumnMapping()        self.column_mapping.target = self.target        self.column_mapping.prediction = 'prediction'        self.column_mapping.numerical_features = self.numerical_features    # モデルパフォーマンスレポート    def performance_report(self, reference_data: pd.DataFrame, current_data: pd.DataFrame):        """        モデル予測のパフォーマンスレポートを生成します。        Args:            reference_data (pd.DataFrame): 比較用の参照データセット。            current_data (pd.DataFrame): 予測を含む現在のデータセット。        Returns:            Report: 回帰パフォーマンスメトリックを含むレポート。        """        regression_performance_report = Report(metrics=[RegressionPreset()])        regression_performance_report.run(            reference_data=reference_data,            current_data=current_data,            column_mapping=self.column_mapping        )        return regression_performance_report    def target_report(self, reference_data: pd.DataFrame, current_data: pd.DataFrame):        """        ターゲットドリフト分析のレポートを生成します。        Args:            reference_data (pd.DataFrame): 比較用の参照データセット。            current_data (pd.DataFrame): 予測を含む現在のデータセット。        Returns:            Report: ターゲットドリフトメトリックを含むレポート。        """        target_drift_report = Report(metrics=[TargetDriftPreset()])        target_drift_report.run(            reference_data=reference_data,            current_data=current_data,            column_mapping=self.column_mapping        )        return target_drift_report    def data_drift_report(self, reference_data: pd.DataFrame, current_data: pd.DataFrame):        """        データドリフト分析のレポートを生成します。        Args:            reference_data (pd.DataFrame): 比較用の参照データセット。            current_data (pd.DataFrame): 予測を含む現在のデータセット。        Returns:            Report: データドリフトメトリックを含むレポート。        """        data_drift_report = Report(metrics=[DataDriftPreset()])        data_drift_report.run(            reference_data=reference_data,            current_data=current_data,            column_mapping=self.column_mapping        )        return data_drift_report    def data_quality_report(self, reference_data: pd.DataFrame, current_data: pd.DataFrame):        """        データ品質レポートを生成します(注:十分な分析に時間がかかる場合があります)。        Args:            reference_data (pd.DataFrame): 比較用の参照データセット。            current_data (pd.DataFrame): 予測を含む現在のデータセット。        Returns:            Report: データ品質メトリックを含むレポート。        """        st.write("データ品質レポートの生成には、約10分程度かかる場合があります。時間がない場合は、待つか他のレポートをご覧ください。")        data_quality_report = Report(metrics=[DataQualityPreset()])        data_quality_report.run(            reference_data=reference_data,            current_data=current_data,            column_mapping=self.column_mapping        )        return data_quality_report

 フライト遅延ビュー.pyコード

# 必要なライブラリのインポート
import streamlit as st
import pandas as pd

# フライト遅延ビューのためのクラスを定義する
class FlightDelayView:
    """フライト遅延予測アプリのビューコンポーネントです。このクラスは、Streamlitウェブアプリケーションの表示を担当します。"""
    
    @staticmethod
    def display_input_form():
        """Streamlitアプリケーションで入力フォームを表示します。"""
        st.title("フライト遅延予測アプリ")
        st.write("このアプリは、フライトの遅延時間を予測します。")
        st.sidebar.header("ユーザー入力")
    
    @staticmethod
    def display_monitoring(reference_data, current_data):
        """モニタリング情報を表示します。
        
        Args:
            reference_data (DataFrame): 参照データセット。
            current_data (DataFrame): 現在のデータセット。
        """
        st.write("レポートを表示するには下にスクロールしてください")
        st.write("参照データセットの形状:", reference_data.shape)
        st.write("現在のデータセットの形状:", current_data.shape)
        # モデルトレーニング情報
        st.write("### モデルのトレーニング中...")
    
    @staticmethod
    def display_selected_inputs(selected_data):
        """選択されたユーザー入力を表示します。
        
        Args:
            selected_data (dict): ユーザーが入力したデータ。
        """
        input_data = pd.DataFrame([selected_data])
        st.write("選択された入力:")
        st.write(input_data)
        return input_data
    
    @staticmethod
    def display_predicted_delay(flight_delay):
        """予測されたフライト遅延時間を表示します。
        
        Args:
            flight_delay (float): 予測されたフライト遅延時間(分単位)。
        """
        st.write("予測されたフライト遅延時間(分):", round(flight_delay[0], 2))
    
    @staticmethod
    def display_report(report, report_name: str):
        """Evidentlyが生成したレポートを表示します。
        
        Args:
            report (Report): 表示するEvidentlyレポート。
            report_name (str): レポートの名前(例: "モデルパフォーマンスレポート")。
        """
        st.write(f"{report_name}")
        # スクロール可能なHTMLとしてEvidentlyレポートを表示する
        st.components.v1.html(report.get_html(), height=1000, scrolling=True)

そして、最終的なStreamlitアプリが到着しました。予測とモニタリングを統合しています。

app.py

import streamlit as st
import pandas as pd
from src.controller.flight_delay_controller import FlightDelayController

def main():
    st.set_page_config(page_title="フライト遅延予測アプリ", layout="wide")
    # コントローラーの初期化
    controller = FlightDelayController()
    # Streamlitアプリを作成する
    st.sidebar.title("フライト遅延予測とデータ・モデル監視アプリ")
    choice = st.sidebar.radio("オプションを選択:", ("予測を行う", "データとモデルを監視する"))
    if choice == "予測を行う":
        controller.run_prediction()
    elif choice == "データとモデルを監視する":
        controller.run_monitoring()

if __name__ == '__main__':
    main()

Streamlitアプリケーションを実行するには、次を実行します

# アプリケーションを実行するには、次を実行します:
streamlit run app.py

アプリを使用するには、Webブラウザでhttp://localhost:8501にアクセスしてください。

予測ダッシュボード:

モニタリングダッシュボード:

Evidentlyレポートでの洞察解析

データドリフトレポート:

ここでは、プロジェクトのデータドリフトレポートを見ることができます。5つの列がドリフトしていることがわかります。次のステップは、これらの特徴のドリフトの潜在的な原因を対応するドメインエキスパートと分析することです。

ターゲットドリフト:

ここでは、ターゲットのドリフトを確認することができます。ここではドリフトは検出されません。

モデルパフォーマンスレポート

ここでは、新しいバッチデータセットの分析後にモデルのパフォーマンスメトリクスをすべて見ることができます。メトリクスに基づいて必要な決定を行うことができます。

データ品質レポート

通常、生の未処理データのデータ品質レポートを使用します。列数、相関、重複値など、すべてのデータに関連するメトリクスを確認することができます。また、これを探索的データ分析に使用することもできます。

デプロイメントのためのDockerの統合:

Dockerとその重要性の理解:

依存性の問題なく、任意の環境でプロジェクトを実行するために、プロジェクトをDocker化することは重要です。 Dockerはアプリケーションのパッケージ化と配布のための必須ツールです。このプロジェクトにDockerを設定して使用する手順は次のとおりです。

Dockerfile:

プロジェクトディレクトリにあるDockerfileには、Dockerイメージのビルドのための命令が含まれています。正しい構文でDockerfileを作成してください。

プロジェクト構造のDockerファイル:

効率的なDockerfileの作成

できるだけイメージサイズを最小限に抑えることは重要です。 Docker内でのマルチステージングなどのテクニックを使用して、イメージサイズを減らし、Pythonの非常に軽量なベースイメージを使用することができます。

コード:

# Use an official Python runtime as a parent imageFROM python:3.9-slim# Set the working directory to /appWORKDIR /app# Create the necessary directoriesRUN mkdir -p /app/data /app/models /app/src/controller /app/src/model /app/src/view# Copy the files from your host machine into the containerCOPY app.py /app/COPY src/controller/flight_delay_controller.py /app/src/controller/COPY src/model/flight_delay_model.py /app/src/model/COPY src/view/flight_delay_view.py /app/src/view/COPY requirements.txt /app/# Create and activate a virtual environmentRUN python -m venv venvRUN /bin/bash -c "source venv/bin/activate"# Install any needed packages specified in requirements.txtRUN pip install -r requirements.txt# Install wgetRUN apt-get update && apt-get install -y wget# Download the dataset file using wgetRUN wget -O /app/data/DelayedFlights.csv https://flightdelay.blob.core.windows.net/flight-delayed-dataset/DelayedFlights.csv# Download the best model file using wgetRUN wget -O /app/models/best_model.pkl https://flightdelay.blob.core.windows.net/flight-delayed-dataset/best_model.pkl# Make port 8501 available to the world outside this containerEXPOSE 8501# Define the default command to run your Streamlit applicationCMD ["streamlit", "run", "app.py"]

Dockerコンテナのビルドと実行

Dockerfileを作成した後、ターミナルでDockerイメージをビルドし、コンテナを実行するために以下の手順に従ってください。

docker build -t flight-delay-prediction .docker run -p 8501:8501 flight-delay-prediction

Streamlit SharingでのStreamlitアプリの共有:

他の人と共有するためにStreamlitアプリケーションをビルドした後、無料でアプリケーションをホストするためにStreamlit Sharingを使用することができます。以下の手順に従ってください:

1) プロジェクトの整理:ルートディレクトリにapp.pyがあることを確認してください。

2) 依存関係の定義:必要なパッケージをすべてインストールするために、requirements.txtファイルを作成してください。

3) GitHubの使用:GitHubでリポジトリをプッシュし、Streamlit Sharingとのシームレスな統合を行ってください。

次に、Streamlitの共有にサインアップし、GitHubリポジトリのURLを貼り付け、[デプロイ]をクリックします。すると、プロジェクトのURLが生成されます。これを他の人と共有することができます。

結論

このガイドでは、Streamlit、Azure、Docker、Evidentlyなどのツールを組み合わせて、素晴らしいデータ駆動型アプリケーションを構築する方法を学びました。このガイドを締めくくるにあたり、Streamlitを使用したWebアプリケーションの構築、Dockerを使用したポータブルアプリケーションの作成、Evidentlyを使用したデータとモデルの品質の確保に関する十分な知識が得られるでしょう。ただし、このガイドは読んで知識を理解し、今後のデータサイエンスプロジェクトに適用することに焦点を当てています。実験して、イノベーションを試し、さらなる改善を探求してください。ガイドの最後までご一緒いただきありがとうございました。学び続けて、継続して成長しましょう!

キーポイント

  • MLプロジェクトをDocker化することは、依存関係の問題なくどんな環境でも実行できるようにするために不可欠です。
  • Dockerイメージのサイズを縮小するために、マルチステージビルドを検討してください。
  • コード内でMVCアーキテクチャを実装することは、複雑なWebアプリケーションの可読性を向上させ、複雑さを軽減します。
  • 本番環境でデータとモデルの品質をモニタリングすることは、統合するのに役立ちます。
  • すべてのデータとモデルレポートを分析した後は、適切な対応を取る必要があります。

よくある質問

GitHubリポジトリ: https://github.com/VishalKumar-S/Flight-Delay-Prediction-and-live-Monitoring-with-Azure-Evidently-and-Streamlit-with-MVC-Architecture

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more