「MLOpsを活用した顧客離反予測プロジェクト」
「顧客離反予測プロジェクトにおけるMLOpsの活用」
イントロダクション
データサイエンスと聞くと、まず思い浮かぶのはノートブック上でモデルを構築しデータをトレーニングすることです。しかし、実際の世界のデータサイエンスでは、このような状況はありません。実際の世界では、データサイエンティストはモデルを構築し、それを本番環境に展開します。本番環境には、モデルの開発、展開、信頼性の間にギャップがあり、効率的でスケーラブルな運用を実現するために、データサイエンティストはMLOps(Machine Learning Operations)を使用します。MLOpsは本番環境でMLアプリケーションを構築し展開するための手法です。この記事では、MLOpsを使用して、顧客の離反予測プロジェクトを構築し展開します。
学習目標
この記事では、次のことを学びます:
- プロジェクトの概要
- ZenMLとMLOpsの基礎を紹介します
- 予測のためにモデルをローカルに展開する方法を学びます
- データの前処理とエンジニアリング、モデルのトレーニングと評価に入ります。
この記事はData Science Blogathonの一部として公開されました。
- 「AIの潜在能力解放:クラウドGPUの台頭」
- 「自然言語処理の技術比較:RNN、トランスフォーマー、BERT」
- 「深層学習による遺伝子制御の解明:オルタナティブスプライシングの理解に向けた新たなAIアプローチ」
プロジェクトの概要
まず、プロジェクトの内容を理解する必要があります。このプロジェクトでは、通信会社からのデータセットを使用します。このデータセットを使用して、ユーザーが会社のサービスを継続するかどうかを予測するモデルを構築します。このMLアプリケーションを構築するために、ZenmMLとMLFlowの助けを借ります。プロジェクトのワークフローは以下の通りです。
プロジェクトのワークフロー
- データ収集
- データの前処理
- モデルのトレーニング
- モデルの評価
- 展開
MLOpsとは?
MLOpsは、開発から展開、継続的なメンテナンスまでのエンドツーエンドの機械学習ライフサイクルです。MLOpsは、機械学習モデルのライフサイクル全体を効率的かつスケーラブルに自動化することで、拡張性、信頼性、効率性を確保します。
簡単な例を使って説明しましょう:
あなたが自分の街に摩天楼を建設していると想像してみてください。建物の建設は完了していますが、電気、水道、排水システムなどが不足しています。その摩天楼は機能せず、実用的ではありません。
機械学習モデルにも同じことが当てはまります。これらのモデルが展開、スケーラビリティ、長期メンテナンスを考慮せずに設計された場合、効果的で実用的ではありません。これは、データサイエンティストにとって、本番環境で使用するために機械学習モデルを構築する際に大きな障害となります。
MLOpsは、機械学習モデルの本番環境での製造、展開、長期メンテナンスをガイドするベストプラクティスと戦略のセットです。これにより、これらのモデルは正確な予測を提供するだけでなく、企業にとって堅牢でスケーラブルで価値のある資産となります。したがって、MLOpsなしでは、これらのタスクを効率的に実行することは困難であり、悪夢のようなものです。このプロジェクトでは、MLOpsの動作方法、異なるステージ、およびカスタマー離反予測モデルの構築までを説明します。
ZenMLの紹介
ZenMLは、ポータブルで本番環境に対応したパイプラインを構築するためのオープンソースのMLOPSフレームワークです。ZenMLフレームワークは、このプロジェクトをMLOpsを使用して実行するのに役立ちます。
⚠️ Windowsユーザーの場合は、PCにwslをインストールしてみてください。ZenmlはWindowsでサポートされていません。
次のプロジェクトに進む前に。
MLOPSの基本的な概念
- ステップ:ステップは、パイプラインやワークフローの単一のタスク単位です。各ステップは、機械学習ワークフローの開発に必要な特定のアクションや操作を表します。たとえば、データクリーニング、データの前処理、モデルのトレーニングなどは、機械学習モデルの開発における特定のステップです。
- パイプライン:複数のステップを結びつけて、機械学習タスクのための構造化された自動化プロセスを作成します。たとえば、データ処理パイプライン、モデル評価パイプライン、モデルトレーニングパイプラインなどがあります。
はじめに
プロジェクトのために仮想環境を作成します:
conda create -n churn_prediction python=3.9
次に、これらのライブラリをインストールします:
pip install numpy pandas matplotlib scikit-learn
これらをインストールした後、ZenMLをインストールします:
pip install zenml["server"]
そして、ZenMLリポジトリを初期化します。
zenml init
画面の表示がこのようになれば、続行するための緑のフラグが表示されます。初期化すると、ディレクトリに .zenml というフォルダが作成されます。
ディレクトリ内にデータ用のフォルダを作成します。データはこの リンク から取得してください:
この構造に従ってフォルダを作成します。
データ収集
このステップでは、csvファイルからデータをインポートします。このデータは、クリーニングとエンコード後にモデルのトレーニングに使用されます。
stepsフォルダ内にingest_data.pyというファイルを作成します。
import pandas as pdimport numpy as npimport loggingfrom zenml import stepclass IngestData: """ ワークフローへのデータインジェスト。 """ def __init__(self, path:str) -> None: """ Args: data_path(str): データファイルのパス """ self.path = path def get_data(self): df = pd.read_csv(self.path) logging.info("CSVファイルの読み込みが完了しました。") return df @step(enable_cache = False)def ingest_df(data_path:str) -> pd.DataFrame: """ CSVファイルからデータをインジェストするためのZenMLステップ。 """ try: # IngestDataクラスのインスタンスを作成してデータをインジェスト ingest_data = IngestData(data_path) df = ingest_data.get_data() logging.info("データのインジェストが完了しました") return df except Exception as e: # データインジェストに失敗した場合、エラーメッセージをログに記録して例外を発生させます logging.error("データのインジェスト中にエラーが発生しました") raise e
プロジェクトのリンクはこちらです。
このコードでは、まずデータインジェストのロジックをカプセル化するためにIngestDataクラスを作成しました。次に、ZenMLステップであるingest_dfを作成しました。これはデータ収集パイプラインの個々のユニットです。
pipelineフォルダ内にtraining_pipeline.pyというファイルを作成します。
コードの書き方
from zenml import pipelinefrom steps.ingest_data import ingest_df# training_pipelineというZenMLパイプラインを定義します.@pipeline(enable_cache=False)def train_pipeline(data_path:str): ''' モデルのトレーニングのためのデータパイプライン Args: data_path (str): インジェストするデータのパス ''' df = ingest_df(data_path=data_path)
ここで、一連のステップを使用して機械学習モデルのトレーニングパイプラインを作成しています。
次に、ベースディレクトリに名前がrun_pipeline.pyのファイルを作成してパイプラインを実行します。
from pipelines.training_pipeline import train_pipelineif __name__ == '__main__': # パイプラインを実行する train_pipeline(data_path="/mnt/e/Customer_churn/data/WA_Fn-UseC_-Telco-Customer-Churn.csv")
このコードはパイプラインを実行するためのものです。
これでデータインジェストパイプラインの作成が完了しました。実行しましょう。
ターミナルで次のコマンドを実行します:
python run_pipeline.py
次に、トレーニングパイプラインが正常に完了したことを示すコマンドが表示されます。
データの前処理
このステップでは、データのクリーニングのための異なる戦略を作成します。不要な列は削除され、カテゴリカルな列はラベルエンコーディングを使用してエンコードされます。最後に、データはトレーニングデータとテストデータに分割されます。
srcフォルダにclean_data.pyという名前のファイルを作成します。
このファイルでは、データのクリーニング戦略のクラスを作成します。
import pandas as pdimport numpy as npimport loggingfrom sklearn.model_selection import train_test_splitfrom abc import abstractmethod, ABCfrom typing import Unionfrom sklearn.preprocessing import LabelEncoderclass DataStrategy(ABC): @abstractmethod def handle_data(self, df:pd.DataFrame) -> Union[pd.DataFrame,pd.Series]: pass# データの前処理戦略class DataPreprocessing(DataStrategy): def handle_data(self, df: pd.DataFrame) -> Union[pd.DataFrame, pd.Series]: try: df['TotalCharges'] = df['TotalCharges'].replace(' ', 0).astype(float) df.drop('customerID', axis=1, inplace=True) df['Churn'] = df['Churn'].replace({'Yes': 1, 'No': 0}).astype(int) service = ['PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies'] for col in service: df[col] = df[col].replace({'No phone service': 'No', 'No internet service': 'No'}) logging.info("dfの長さ: ", len(df.columns)) return df except Exception as e: logging.error("前処理でエラーが発生しました", e) raise e# 特徴量のエンコーディング戦略class LabelEncoding(DataStrategy): def handle_data(self, df: pd.DataFrame) -> Union[pd.DataFrame, pd.Series]: try: df_cat = ['gender', 'Partner', 'Dependents', 'PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling', 'PaymentMethod'] lencod = LabelEncoder() for col in df_cat: df[col] = lencod.fit_transform(df[col]) logging.info(df.head()) return df except Exception as e: logging.error(e) raise e# データ分割戦略class DataDivideStrategy(DataStrategy): def handle_data(self, df:pd.DataFrame) -> Union[pd.DataFrame, pd.Series]: try: X = df.drop('Churn', axis=1) y = df['Churn'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1) return X_train, X_test, y_train, y_test except Exception as e: logging.error("データの分割でエラーが発生しました", e) raise e
このコードは、機械学習のためのモジュラーなデータの前処理パイプラインを実装しています。データの前処理、特徴量エンコーディング、データのエンコーディングのステップを含んでおり、予測モデリングのためのデータクリーニングを行います。
1. データの前処理: このクラスは、不要な列の削除やデータの欠損値(NA値)の処理を担当します。
2. ラベルエンコーディング: ラベルエンコーディングクラスは、カテゴリカル変数を数値フォーマットに変換し、機械学習アルゴリズムで効果的に使用できるようにします。テキストベースのカテゴリを数値に変換します。
3. データの分割戦略: このクラスはデータセットを独立変数(X)と従属変数(y)に分割します。そして、データをトレーニングセットとテストセットに分割します。
これらの戦略をステップバイステップで実装して、機械学習タスクのためのデータを準備します。
これらの戦略により、データがモデルのトレーニングと評価のために正しく構造化され、フォーマットされることが保証されます。
stepsフォルダにdata_cleaning.pyを作成します。
import pandas as pdimport numpy as npfrom src.clean_data import DataPreprocessing, DataDivideStrategy, LabelEncodingimport loggingfrom typing_extensions import Annotatedfrom typing import Tuplefrom zenml import step# データのクリーニングと前処理に関するZenMLステップを定義します@step(enable_cache=False)def cleaning_data(df: pd.DataFrame) -> Tuple[ Annotated[pd.DataFrame, "X_train"], Annotated[pd.DataFrame, "X_test"], Annotated[pd.Series, "y_train"], Annotated[pd.Series, "y_test"],]: try: # DataPreprocessing戦略をインスタンス化 data_preprocessing = DataPreprocessing() # インプットのDataFrameにデータの前処理を適用 data = data_preprocessing.handle_data(df) # LabelEncoding戦略をインスタンス化 feature_encode = LabelEncoding() # 前処理されたデータにラベルエンコーディングを適用 df_encoded = feature_encode.handle_data(data) # DataFrameの列に関する情報をログに記録 logging.info(df_encoded.columns) logging.info("Columns:", len(df_encoded)) # DataDivideStrategy戦略をインスタンス化 split_data = DataDivideStrategy() # エンコードされたデータをトレーニングセットとテストセットに分割 X_train, X_test, y_train, y_test = split_data.handle_data(df_encoded) # 分割されたデータをタプルとして返す return X_train, X_test, y_train, y_test except Exception as e: # データクリーニング中に発生したエラーを処理・ログに記録 logging.error("ステップでデータのクリーニング中にエラーが発生しました", e) raise e
このステップでは、clean_data.pyで作成した戦略を実装しました。
このステップをtraining_pipeline.pyで実装しましょう。
from zenml import pipeline#importing steps from steps.ingest_data import ingest_dffrom steps.data_cleaning import cleaning_dataimport logging#ZenMLパイプラインを定義します。@pipeline(enable_cache=False)def train_pipeline(data_path:str): ''' モデルのトレーニングのためのデータパイプライン。 ''' df = ingest_df(data_path=data_path) X_train, X_test, y_train, y_test = cleaning_data(df=df)
これで、トレーニングパイプラインのデータ前処理ステップが完了しました。
モデルのトレーニング
では、このプロジェクトのためにモデルを構築します。ここでは、バイナリ分類の問題を予測します。ロジスティック回帰を使用することができます。ロジスティック回帰については、こちらで読むことができます。データ前処理のステップで行った手順を実装します。まず、srcフォルダにtraining_model.pyというファイルを作成します。
import pandas as pdfrom sklearn.linear_model import LogisticRegressionfrom abc import ABC, abstractmethodimport logging#抽象モデルクラスclass Model(ABC): @abstractmethod def train(self,X_train:pd.DataFrame,y_train:pd.Series): """ データに基づいてモデルを訓練します。 """ pass class LogisticReg(Model): """ ロジスティック回帰モデルの実装 """ def train(self, X_train: pd.DataFrame, y_train: pd.Series): """ モデルの訓練 Args: X_train: pd.DataFrame, y_train: pd.Series """ logistic_reg = LogisticRegression() logistic_reg.fit(X_train,y_train) return logistic_reg
すべてのモデルが実装する必要がある
train
メソッドを持つ抽象のModelクラスを定義します。LogisticRegクラスは、ロジスティック回帰を使用した具体的な実装です。次のステップでは、ステップフォルダにconfig.pyというファイルを設定します。ステップフォルダ内にconfig.pyという名前のファイルを作成します。
モデルのパラメータの設定
from zenml.steps import BaseParameters"""このファイルは、機械学習モデルとトレーニングプロセスに関連するさまざまなパラメータを設定し指定するために使用されます。"""class ModelName(BaseParameters): """ モデルの設定 """ model_name: str = "logistic regression"
config.pyという名前のファイルで機械学習モデルに関連するパラメータを設定します。モデル名を指定するために、BaseParametersを継承したModelNameクラスを作成します。これにより、モデルのタイプを簡単に変更できます。
import logging import pandas as pdfrom src.training_model import LogisticRegfrom zenml import stepfrom .config import ModelName#train_modelという名前のステップを定義します。@step(enable_cache=False)def train_model(X_train:pd.DataFrame,y_train:pd.Series,config:ModelName): """ 設定されたモデルに基づいてデータをトレーニングします。 """ try: model = None if config == "logistic regression": model = LogisticReg() else: raise ValueError("Model name is not supported") trained_model = model.train(X_train=X_train,y_train=y_train) return trained_model except Exception as e: logging.error("Error in step training model",e) raise e
ステップフォルダのmodel_train.pyという名前のファイルで、train_modelという名前のステップをZenMLを使用して定義します。このステップの目的は、ModelName内のモデル名を基に機械学習モデルをトレーニングすることです。
プログラム内で
設定されたモデル名を確認します。それが「logistic regression」であれば、LogisticRegモデルのインスタンスを作成し、提供されたトレーニングデータ(X_trainとy_train)でトレーニングします。モデル名がサポートされていない場合、エラーが発生します。このプロセス中のエラーはログに記録され、エラーが発生します。
この後、training_pipeline.pyでこのステップを実装します。
from zenml import pipelinefrom steps.ingest_data import ingest_dffrom steps.data_cleaning import cleaning_datafrom steps.model_train import train_modelimport logging#ZenMLパイプラインを定義します。@pipeline(enable_cache=False)def train_pipeline(data_path:str): ''' モデルのトレーニングのためのデータパイプライン。 ''' #データを取り込むステップ:データを返します。 df = ingest_df(data_path=data_path) #データをクリーニングするステップ:データをクリーニングします。 X_train, X_test, y_train, y_test = cleaning_data(df=df) #モデルのトレーニング model = train_model(X_train=X_train,y_train=y_train)
さて、パイプラインに train_model のステップを実装しました。そのため、model_train.py のステップは完了しました。
モデルの評価
このステップでは、モデルの効率性を評価します。そのために、テストデータの予測精度を確認します。まず、パイプラインで使用する戦略を作成します。
src フォルダ内に evaluate_model.py という名前のファイルを作成します。
import loggingfrom sklearn.metrics import confusion_matrix, classification_report, accuracy_scorefrom abc import ABC, abstractmethodimport numpy as np# モデル評価のための抽象クラスclass Evaluate(ABC): @abstractmethod def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray) -> float: """ 機械学習モデルのパフォーマンスを評価するための抽象メソッド。 Args: y_true (np.ndarray): 正解ラベル。 y_pred (np.ndarray): 予測ラベル。 Returns: float: 評価結果。 """ pass# 正解率を計算するクラスclass Accuracy_score(Evaluate): """ モデルの予測結果の正解率を計算して返します。 """ def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray) -> float: try: accuracy_scr = accuracy_score(y_true=y_true, y_pred=y_pred) * 100 logging.info("Accuracy_score:", accuracy_scr) return accuracy_scr except Exception as e: logging.error("モデルの正確性の評価においてエラーが発生しました。",e) raise e# 適合率を計算するクラスclass Precision_Score(Evaluate): def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray) -> float: """ モデルの予測結果の適合率を生成して返します。 """ try: precision = precision_score(y_true=y_true,y_pred=y_pred) logging.info("適合率: ",precision) return float(precision) except Exception as e: logging.error("適合率の計算中にエラーが発生しました。",e) raise eclass F1_Score(Evaluate): def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray): """ モデルの予測結果のF1スコアを生成して返します。 """ try: f1_scr = f1_score(y_pred=y_pred, y_true=y_true) logging.info("F1スコア: ", f1_scr) return f1_scr except Exception as e: logging.error("F1スコアの計算中にエラーが発生しました。", e) raise e
評価戦略を作成したので、それを使用してモデルを評価します。ステップ evaluate_model.py のコードを steps フォルダに実装しましょう。ここでは、再現率スコア、正確度スコア、および適合率スコアは、モデルの評価指標として使用する戦略です。
これらをステップに実装しましょう。steps フォルダに evaluation.py という名前のファイルを作成します:
import loggingimport pandas as pdimport numpy as npfrom zenml import stepfrom src.evaluate_model import ClassificationReport, ConfusionMatrix, Accuracy_scorefrom typing import Tuplefrom typing_extensions import Annotatedfrom sklearn.base import ClassifierMixin@step(enable_cache=False)def evaluate_model( model: ClassifierMixin, X_test: pd.DataFrame, y_test: pd.Series) -> Tuple[ Annotated[np.ndarray,"confusion_matix"], Annotated[str,"classification_report"], Annotated[float,"accuracy_score"], Annotated[float,"precision_score"], Annotated[float,"recall_score"] ]: """ 一般的な指標を使用して、機械学習モデルのパフォーマンスを評価します。 """ try: y_pred = model.predict(X_test) precision_score_class = Precision_Score() precision_score = precision_score_class.evaluate_model(y_pred=y_pred,y_true=y_test) mlflow.log_metric("適合率",precision_score) accuracy_score_class = Accuracy_score() accuracy_score = accuracy_score_class.evaluate_model(y_true=y_test, y_pred=y_pred) logging.info("正確度スコア:",accuracy_score) return accuracy_score, precision_score except Exception as e: logging.error("モデルの評価中にエラーが発生しました。",e) raise e
さて、このステップをパイプラインに実装しましょう。training_pipeline.py を更新します:
このコードは、機械学習パイプライン内での evaluate_model ステップを定義します。トレーニング済みの分類モデル (model)、独立したテストデータ (X_test)、およびテストデータの正解ラベル (y_test) を入力として受け取ります。一般的な分類指標を使用してモデルのパフォーマンスを評価し、適合率スコア、および正確度スコアなどの結果を返します。
さて、パイプラインでこのステップを実装しましょう。 training_pipeline.pyを更新します:
from zenml import pipelinefrom steps.ingest_data import ingest_dffrom steps.data_cleaning import cleaning_datafrom steps.model_train import train_modelfrom steps.evaluation import evaluate_modelimport logging# ZenMLのパイプラインを定義します。@pipeline(enable_cache=False)def train_pipeline(data_path:str): ''' モデルのトレーニングのためのデータパイプライン Args: data_path (str): インジェストされるデータのパス ''' #データをインジェストするステップ df = ingest_df(data_path=data_path) #データのクリーニングステップ X_train, X_test, y_train, y_test = cleaning_data(df=df) #モデルのトレーニングステップ model = train_model(X_train=X_train,y_train=y_train) #評価メトリクスのステップ accuracy_score, precision_score = evaluate_model(model=model,X_test=X_test, y_test=y_test)
以上です。これでトレーニングパイプラインが完了しました。次に、以下のコマンドを実行します:
python run_pipeline.py
ターミナルで実行してください。正常に実行されれば、ローカルでのトレーニングパイプラインが次のようになります:
実験トラッカーとは何ですか?
実験トラッカーは、機械学習においてさまざまな実験を記録、監視、および管理するために使用されるツールです。
データサイエンティストは、最適な結果を得るために異なるモデルを試行錯誤します。したがって、彼らはデータを追跡し、異なるモデルを使用する必要があります。Excelシートを使用して手動で記録すると非常に困難になります。
MLflow
MLflowは、機械学習における実験の追跡と管理を効率化するための貴重なツールです。実験の追跡、モデルの逐次更新、および関連データを自動化します。これにより、モデル開発プロセスが合理化され、結果を視覚化するためのユーザーフレンドリーなインターフェースが提供されます。
MLflowをZenMLと統合することで、機械学習オペレーションフレームワーク内での実験の堅牢性と管理が向上します。
MLflowをZenMLにセットアップするには、次の手順に従ってください:
- MLflowのインストール:
- 次のコマンドを使用してMLflowをインストールします:
zenml integration install mlflow -y
2. MLflowの実験トラッカーを登録する:
次のコマンドを使用してMLflowに実験トラッカーを登録します:
zenml experiment-tracker register mlflow_tracker --flavor=mlflow
3. スタックを登録する:
ZenMLでは、スタックはMLワークフロー内のタスクを定義するコンポーネントのコレクションです。これにより、MLパイプラインのステップが効率的に組織化および管理されます。次のコマンドでスタックを登録します:
詳細はdocumentationを参照してください。
zenml model-deployer register mlflow --flavor=mlflowzenml stack register mlflow_stack -a default -o default -d mlflow -e mlflow_tracker --set
これにより、スタックは特定の設定とアーティファクトのストレージ、オーケストレータ、デプロイ先、実験トラッキングと関連付けられます。
4. スタックの詳細を表示する:
次のコマンドを使用して、スタックのコンポーネントを表示できます:
zenml stack describe
これにより、「mlflow_tracker」スタックに関連付けられたコンポーネントが表示されます。
さて、トレーニングモデルに実験トラッカーを実装し、モデルを評価しましょう:
コンポーネントの名前は、mlflow_trackerと表示されます。
ZenML実験トラッカーの設定
まず、train_model.pyを更新します:
import loggingimport mlflowimport pandas as pdfrom src.training_model import LogisticRegfrom sklearn.base import ClassifierMixinfrom zenml import stepfrom .config import ModelName# ZenML クライアントから現在のスタックの実験トラッカーを取得します。experiment_tracker = Client().active_stack.experiment_tracker# train_modelというステップを定義します。@step(experiment_tracker = experiment_tracker.name,enable_cache=False)def train_model( X_train:pd.DataFrame, y_train:pd.Series, config:ModelName ) -> ClassifierMixin: """ 設定されたモデルに基づいてデータをトレーニングする Args: X_train: pd.DataFrame = 独立したトレーニングデータ y_train: pd.Series = 依存するトレーニングデータ. """ try: model = None if config.model_name == "logistic regression": #スコア、モデルなどを自動的にログに記録 mlflow.sklearn.autolog() model = LogisticReg() else: raise ValueError("モデル名がサポートされていません") trained_model = model.train(X_train=X_train,y_train=y_train) logging.info("トレーニングモデルの完了.") return trained_model except Exception as e: logging.error("ステップトレーニングモデルでエラーが発生しました。",e) raise e
このコードでは、mlflow.sklearn.autolog()を使用して実験トラッカーを設定しました。これにより、モデルに関するすべての詳細が自動的にログに記録され、実験の追跡や分析がより簡単に行えるようになります。
evaluation.py内で以下のコードを使用します。
from zenml.client import Clientexperiment_tracker = Client().active_stack.experiment_tracker@step(experiment_tracker=experiment_tracker.name, enable_cache = False)
パイプラインの実行
run_pipeline.pyスクリプトを以下のように更新します。
from pipelines.training_pipeline import train_pipelinefrom zenml.client import Clientif __name__ == '__main__': #実験トラッキングURIを表示 print(Client().active_stack.experiment_tracker.get_tracking_uri()) #パイプラインの実行 train_pipeline(data_path="/mnt/e/Customer_churn/data/WA_Fn-UseC_-Telco-Customer-Churn.csv")
これをコピーして次のコマンドに貼り付けてください。
--backend-store-uri "--uri on the top of "file:/home/ "
実験の探索
上記のコマンドで生成されたリンクをクリックしてMLflow UIを開きます。ここでは、以下のような洞察を見つけることができます。
- パイプライン: 実行したすべてのパイプラインに簡単にアクセスできます。
- モデルの詳細: パイプラインをクリックしてモデルのすべての詳細を見ることができます。
- メトリクス: メトリクスセクションに移動してモデルのパフォーマンスを可視化することができます。
これで、ZenMLとMLflowを使用して機械学習の実験追跡を攻略することができます!
デプロイメント
次のセクションでは、このモデルをデプロイします。次の概念を知っておく必要があります。
a). 継続的なデプロイメントパイプライン
このパイプラインは、モデルのデプロイメントプロセスを自動化します。モデルが評価基準をクリアした場合、それは自動的に本番環境にデプロイされます。たとえば、データの前処理、データのクリーニング、データのトレーニング、モデルの評価などがあります。
b). 予測デプロイメントパイプライン
予測デプロイメントパイプラインは、リアルタイムまたはバッチ予測のための機械学習モデルのデプロイに焦点を当てています。予測デプロイメントパイプラインは、モデルを本番環境で予測するためのモデルのデプロイメントに特化しています。たとえば、テキストを送信できるAPIエンドポイントをセットアップします。モデルの可用性と拡張性を確保し、リアルタイムのパフォーマンスを監視します。これらのパイプラインは、機械学習システムの効率と効果を維持するために重要です。これで、継続的なパイプラインを実装します。
pipelinesフォルダにdeployment_pipeline.pyというファイルを作成します。
import numpy as npimport jsonimport loggingimport pandas as pdfrom zenml import pipeline, stepfrom zenml.config import DockerSettingsfrom zenml.constants import DEFAULT_SERVICE_START_STOP_TIMEOUTfrom zenml.integrations.constants import MLFLOWfrom zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import ( MLFlowModelDeployer,)from zenml.integrations.mlflow.services import MLFlowDeploymentServicefrom zenml.integrations.mlflow.steps import mlflow_model_deployer_stepfrom zenml.steps import BaseParameters, Outputfrom src.clean_data import FeatureEncodingfrom .utils import get_data_for_testfrom steps.data_cleaning import cleaning_datafrom steps.evaluation import evaluate_modelfrom steps.ingest_data import ingest_df# MLflow統合を持つDocker設定を定義するdocker_settings = DockerSettings(required_integrations = {MLFLOW})#デプロイメントパイプラインの設定クラスを定義するclass DeploymentTriggerConfig(BaseParameters): min_accuracy:float = 0.92@step def deployment_trigger( accuracy: float, config: DeploymentTriggerConfig,): """ 精度が最小精度よりも大きい場合にのみデプロイメントをトリガーします。 Args: accuracy: モデルの精度。 config: 最小精度のしきい値。 """ try: return accuracy >= config.min_accuracy except Exception as e: logging.error("デプロイメントトリガーのエラー",e) raise e#継続的なパイプラインを定義する@pipeline(enable_cache=False,settings={"docker":docker_settings})def continuous_deployment_pipeline( data_path:str, min_accuracy:float = 0.92, workers: int = 1, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT): df = ingest_df(data_path=data_path) X_train, X_test, y_train, y_test = cleaning_data(df=df) model = train_model(X_train=X_train, y_train=y_train) accuracy_score, precision_score = evaluate_model(model=model, X_test=X_test, y_test=y_test) deployment_decision = deployment_trigger(accuracy=accuracy_score) mlflow_model_deployer_step( model=model, deploy_decision = deployment_decision, workers = workers, timeout = timeout )
機械学習プロジェクト用のZenMLフレームワーク
このコードは、ZenMLフレームワークを使用して機械学習プロジェクトの連続デプロイメントを定義します。
1. 必要なライブラリのインポート: モデルのデプロイメントに必要なライブラリのインポート。
2. Dockerの設定: MLflowと共に使用するために、Docker設定を構成することで、これらのモデルを一貫してパッケージ化して実行できます。
3. DeploymentTriggerConfig: モデルのデプロイに最小精度閾値が設定されているクラスです。
4. deployment_trigger: モデルの精度が最小精度を超えた場合、このステップはリターンされます。
5. continuous_deployment_pipeline: このパイプラインには、データの取り込み、データのクリーニング、モデルのトレーニング、モデルの評価の数多くのステップが含まれています。また、モデルは最小精度の閾値を満たしている場合にのみデプロイされます。
次に、deployment_pipeline.pyに推論パイプラインを実装します
import loggingimport pandas as pdfrom zenml.steps import BaseParameters, Outputfrom zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import MLFlowModelDeployerfrom zenml.integrations.mlflow.services import MLFlowDeploymentServiceclass MLFlowDeploymentLoaderStepParameters(BaseParameters): pipeline_name: str step_name: str running: bool = True@step(enable_cache=False)def dynamic_importer() -> str: data = get_data_for_test() return data@step(enable_cache=False)def prediction_service_loader( pipeline_name: str, pipeline_step_name: str, running: bool = True, model_name: str = "model",) -> MLFlowDeploymentService: model_deployer = MLFlowModelDeployer.get_active_model_deployer() existing_services = model_deployer.find_model_server( pipeline_name=pipeline_name, pipeline_step_name=pipeline_step_name, model_name=model_name, running=running, ) if not existing_services: raise RuntimeError( f"No MLflow prediction service deployed by the " f"{pipeline_step_name} step in the {pipeline_name} " f"pipeline for the '{model_name}' model is currently " f"running." ) return existing_services[0]@stepdef predictor(service: MLFlowDeploymentService, data: str) -> np.ndarray: service.start(timeout=10) data = json.loads(data) prediction = service.predict(data) return prediction@pipeline(enable_cache=False, settings={"docker": docker_settings})def inference_pipeline(pipeline_name: str, pipeline_step_name: str): batch_data = dynamic_importer() model_deployment_service = prediction_service_loader( pipeline_name=pipeline_name, pipeline_step_name=pipeline_step_name, running=False, ) prediction = predictor(service=model_deployment_service, data=batch_data) return prediction
このコードは、MLflowを介してデプロイされた機械学習モデルを使用して予測を行うためのパイプラインを設定します。データをインポートし、デプロイされたモデルをロードして予測に使用します。
pipelinesフォルダのutils.pyにget_data_for_test()関数を作成する必要があります。これにより、コードをより効率的に管理できます。
import loggingimport pandas as pd from src.clean_data import DataPreprocessing, LabelEncoding# テスト目的のデータ取得関数def get_data_for_test(): try: df = pd.read_csv('./data/WA_Fn-UseC_-Telco-Customer-Churn.csv') df = df.sample(n=100) data_preprocessing = DataPreprocessing() data = data_preprocessing.handle_data(df) # FeatureEncoding戦略のインスタンス化 label_encode = LabelEncoding() df_encoded = label_encode.handle_data(data) df_encoded.drop(['Churn'],axis=1,inplace=True) logging.info(df_encoded.columns) result = df_encoded.to_json(orient="split") return result except Exception as e: logging.error("e") raise e
さて、作成したパイプラインをデプロイし、デプロイされたモデルで予測を行うためのファイルrun_deployment.pyをプロジェクトディレクトリに作成しましょう。
import click # コマンドライン引数の処理のためimport logging from typing import castfrom rich import print # コンソールの出力フォーマット# デプロイと推論用のパイプラインをインポートfrom pipelines.deployment_pipeline import (continuous_deployment_pipeline, inference_pipeline)# MLflowユーティリティとコンポーネントのインポートfrom zenml.integrations.mlflow.mlflow_utils import get_tracking_urifrom zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import ( MLFlowModelDeployer)from zenml.integrations.mlflow.services import MLFlowDeploymentService# 異なる構成のための定数を定義: DEPLOY, PREDICT, DEPLOY_AND_PREDICTDEPLOY = "deploy"PREDICT = "predict"DEPLOY_AND_PREDICT = "deploy_and_predict"# Clickを使用してコマンドライン引数を処理するメイン関数を定義する@click.command()@click.option( "--config", "-c", type=click.Choice([DEPLOY, PREDICT, DEPLOY_AND_PREDICT]), default=DEPLOY_AND_PREDICT, help="オプションとしてモデルをトレーニングおよびデプロイするためにデプロイメントパイプラインのみを実行する (`deploy`)、デプロイされたモデルに対して予測のみを実行する (`predict`)、または両方を実行する (`deploy_and_predict`) を選択することができます。デフォルトでは両方が実行されます (`deploy_and_predict`)。",)@click.option( "--min-accuracy", default=0.92, help="モデルをデプロイするために必要な最小精度",)def run_main(config:str, min_accuracy:float ): # アクティブなMLFlowモデルデプロイヤーコンポーネントを取得する mlflow_model_deployer_component = MLFlowModelDeployer.get_active_model_deployer() # モデルをデプロイする必要があるか、予測を行うか、または両方を行うかを判断する deploy = config == DEPLOY or config == DEPLOY_AND_PREDICT predict = config == PREDICT or config == DEPLOY_AND_PREDICT # モデルのデプロイが要求された場合: if deploy: continuous_deployment_pipeline( data_path='/mnt/e/Customer_churn/data/WA_Fn-UseC_-Telco-Customer-Churn.csv', min_accuracy=min_accuracy, workers=3, timeout=60 ) # 予測が要求された場合: if predict: # 推論パイプラインの実行を初期化する inference_pipeline( pipeline_name="continuous_deployment_pipeline", pipeline_step_name="mlflow_model_deployer_step", ) # MLflow UIで実験ランを表示するための手順を出力する print( "以下のコマンドを実行すると、実験ランをMLflow UIで確認できます:\n " f"[italic green] mlflow ui --backend-store-uri '{get_tracking_uri()}" "[/italic green]\n MLflow UI内で実験ランを確認することができます。mlflow_example_pipeline実験内に実行が追跡されるため、2つ以上の実行を比較することも
このコードは、MLFlowとZenMlを使用して機械学習モデルの管理とデプロイを行うためのコマンドラインスクリプトです。
さて、モデルをデプロイしましょう。
ターミナルでこのコマンドを実行してください。
python run_deployment.py --config deploy
これで、モデルがデプロイされました。パイプラインが正常に実行され、zenmlのダッシュボードで確認することができます。
python run_deployment.py --config predict
予測プロセスの開始
今、MLFlowの予測サーバーが実行されています。
データを入力し、結果を表示するためのWebアプリが必要です。なぜ私たちがゼロからWebアプリを作成しなければならないか疑問に思うかもしれません。
実際には、機械学習モデルのための高速かつ簡単なフロントエンドWebアプリを構築するのに役立つオープンソースのフロントエンドフレームワークであるStreamlitを使用します。
ライブラリのインストール
pip install streamlit
プロジェクトディレクトリにstreamlit_app.pyという名前のファイルを作成します。
import jsonimport loggingimport numpy as npimport pandas as pdimport streamlit as stfrom PIL import Imagefrom pipelines.deployment_pipeline import prediction_service_loaderfrom run_deployment import maindef main(): st.title("End to End Customer Satisfaction Pipeline with ZenML") st.markdown( """ #### 問題の設定 ここでは、注文ステータス、価格、支払いなどの特徴に基づいて、次の注文または購入の顧客満足度スコアを予測することを目的としています。 [ZenML](https://zenml.io/)を使用して、プロダクションレディなパイプラインを構築し、顧客満足度スコアを予測します。 """ ) st.markdown( """ 上記はパイプライン全体の図で、まずデータを取り込み、クリーニングし、モデルをトレーニングし、モデルを評価し、データソースが変更されるか任意のハイパーパラメータの値が変更されると、デプロイがトリガーされ、(再)モデルがトレーニングされ、モデルが最小の精度要件を満たす場合、モデルがデプロイされます。 """ ) st.markdown( """ #### 特徴の説明 このアプリは、特定の顧客の顧客満足度スコアを予測するために設計されています。以下に示す製品の特徴を入力し、顧客満足度スコアを取得できます。 | モデル | 説明 | | ------------- | - | | SeniorCitizen | 顧客がシニア市民であるかどうかを示します。 | | tenure | 顧客が会社に所属していた月数。 | | MonthlyCharges | 顧客が発生した月額料金。 | | TotalCharges | 顧客が発生した合計料金。 | | gender | 顧客の性別(男性:1、女性:0)。 | | Partner | 顧客にパートナーがいるかどうか(はい:1、いいえ:0)。 | | Dependents | 顧客に扶養家族がいるかどうか(はい:1、いいえ:0)。 | | PhoneService | 顧客が電話サービスを利用しているかどうか(はい:1、いいえ:0)。 | | MultipleLines | 顧客が複数の回線を持っているかどうか(はい:1、いいえ:0)。 | | InternetService | インターネットサービスの種類(いいえ:1、その他:0)。 | | OnlineSecurity | 顧客がオンラインセキュリティサービスを利用しているかどうか(はい:1、いいえ:0)。 | | OnlineBackup | 顧客がオンラインバックアップサービスを利用しているかどうか(はい:1、いいえ:0)。 | | DeviceProtection | 顧客がデバイス保護サービスを利用しているかどうか(はい:1、いいえ:0)。 | | TechSupport | 顧客がテクニカルサポートサービスを利用しているかどうか(はい:1、いいえ:0)。 | | StreamingTV | 顧客がテレビストリーミングサービスを利用しているかどうか(はい:1、いいえ:0)。 | | StreamingMovies | 顧客が映画ストリーミングサービスを利用しているかどうか(はい:1、いいえ:0)。 | | Contract | 契約の種類(1年契約:1、その他:0)。 | | PaperlessBilling | 顧客がペーパーレス請求を利用しているかどうか(はい:1、いいえ:0)。 | | PaymentMethod | 支払方法(クレジットカード:1、その他:0)。 | | Churn | 顧客が離反しているかどうか(はい:1、いいえ:0)。 | """ ) payment_options = { 2: "電子チェック", 3: "郵送チェック", 1: "銀行振り込み(自動)", 0: "クレジットカード(自動)" } contract = { 0: "月間契約", 2: "2年契約", 1: "1年契約" } def format_func(PaymentMethod): return payment_options[PaymentMethod] def format_func_contract(Contract): return contract[Contract] display = ("男性", "女性") options = list(range(len(display))) # 各データ列の値を定義する SeniorCitizen = st.selectbox("シニア市民ですか?", options=[True, False],) tenure = st.number_input("在籍期間") MonthlyCharges = st.number_input("月額料金:") TotalCharges = st.number_input("合計料金
このコードは、顧客データと人口統計の詳細に基づいて、電気通信会社で顧客の離反を予測するためのStreamLitのフロントエンドを定義しています。
ユーザーは使いやすいインターフェースを通じて自分の情報を入力することができ、コードはトレーニングされた機械学習モデル(ZenMLとMLflowで展開)を使用して予測を行います。
予測結果はその後、ユーザーに表示されます。
今、このコマンドを実行してください:
⚠️ 予測モデルが実行されていることを確認してください
streamlit run streamlit_app.py
リンクをクリックしてください。
以上で、プロジェクトは完了しました。
以上で、私たちはエンドツーエンドの機械学習プロジェクトを成功裏に完了しました。専門家が全体プロセスにどのように取り組むかを紹介しました。
まとめ
この顧客離反予測モデルの開発と展開を通じた機械学習オペレーション(MLOps)の包括的な探求により、MLOpsの変革力が機械学習ライフサイクルの効率化においてどれだけ重要かを目撃しました。データ収集と前処理からモデルのトレーニング、評価、展開まで、私たちのプロジェクトは開発と製品間のギャップを埋めるためにMLOpsが果たす重要な役割を示しています。データ主導の意思決定にますます頼る組織にとって、ここで示される効率的でスケーラブルなプラクティスは、機械学習アプリケーションの成功を保証するためのMLOpsの重要性を強調しています。
主なポイント
- MLOps(Machine Learning Operations)は、効率的で信頼性の高くスケーラブルな運用を保証するために、エンドツーエンドの機械学習ライフサイクルを効率化するのに重要です。
- ZenMLとMLflowは、実世界のアプリケーションで機械学習モデルの開発、追跡、展開を容易にする強力なフレームワークです。
- クリーニング、エンコーディング、分割などの適切なデータ前処理は、堅牢な機械学習モデルの構築の基礎です。
- 精度、適合率、再現率、F1スコアなどの評価尺度は、モデルのパフォーマンスを包括的に理解するために重要です。
- MLflowなどの実験追跡ツールは、データサイエンスのプロジェクトでのコラボレーションと実験管理を向上させます。
- 連続的なインファレンス展開パイプラインは、製品環境でモデルの効率と可用性を維持するために重要です。
よくある質問
この記事に表示されているメディアはAnalytics Vidhyaの所有ではなく、著者の裁量により使用されています。
We will continue to update VoAGI; if you have any questions or suggestions, please contact us!
Was this article helpful?
93 out of 132 found this helpful
Related articles
- ‘LinkedInの仕事検索機能を支える埋め込みアーキテクチャの内部’
- Mistral-7B-v0.1をご紹介します:新しい大型言語モデルの登場’ (Misutoraru 7B v0.1 wo goshōkai shimasu Atarashii ōgata gengo moderu no tōjō)
- 「AWS上でクラウドネイティブなフェデレーテッドラーニングアーキテクチャを再発明する」
- 「Amazon SageMaker JumpStartで利用可能な自動音声認識のWhisperモデル」
- 新しい – Amazon SageMaker Canvasで利用可能なノーコード生成AI機能が追加されました
- 「メタのMusicGenを使用してColabで音楽を生成する」
- オリゴが警告を発しています:TorchServeの重大なセキュリティの問題により、ハッカーはサーバを乗っ取り、悪意のあるAIモデルを注入することができます