「MLOPsを使用した不正取引検出の実装」

「美とファッションの世界を彩る魔法のトレンド」

イントロダクション

現代のデジタル世界では、人々は便利さのために現金ではなくオンライン取引とデジタル決済にますます移行しています。移行の増加に伴い、詐欺も増加しています。詐欺トランザクションは、偽の身元や虚偽の情報を使用してお金を要求することが含まれるため、個人や金融機関にとって重大な問題です。このプロジェクトでは、クレジットカードのデータセットを使用して、ライブトランザクションを監視し、それらが本物か詐欺かを予測するためのMLOPsモデルを設計するために、Airflowツールを使用します。

目標

  • 詐欺トランザクションの検出の重要性。
  • データのクリーニング、データセットの変換、データの前処理。
  • データセットの視覚的な分析から洞察を得る。
  • データサイエンスにおける詐欺トランザクション検出モデルの現実世界での応用。
  • Pythonプログラミング言語を使用した詐欺トランザクションデータの分析。
  • MS AzureとAirflowを使用したエンドツーエンドの詐欺検出の構築。

この記事はデータサイエンスブログマラソンの一環として公開されました。

詐欺トランザクション推定モデルとは何ですか?

詐欺トランザクションのデータセットには、トランザクションの時間、名前、金額、性別、カテゴリなどの列が含まれています。詐欺トランザクション推定モデルは、偽のトランザクションを予測するために開発された機械学習モデルで、大規模な有効なトランザクションと詐欺トランザクションのデータセットでトレーニングされています。

詐欺トランザクション分析とは何ですか?

詐欺トランザクション分析は、過去のデータセットを分析するプロセスです。データセットの分析は、データの不規則性を見つけ、データのパターンを見つけることを目指しています。詐欺トランザクション分析は、顧客を保護し、財務的な損失を減らすためにビジネスにおいて重要な役割を果たします。ルールベースの分析や異常検知など、さまざまな種類の詐欺トランザクション分析があります。

  • ルールベースの分析:ルールベースの分析では、無効なトランザクションをフラグ付けするためのルールを作成します。例えば、地理的な地域に基づいたルールが作成されることがあります。
  • 異常検知:異常検知では、異常または異常なトランザクションを見つけることを目指します。例えば、新しいIPアドレスから行われたトランザクションなどです。

詐欺トランザクションの検出の重要性

詐欺トランザクションの検出は、ビジネスや金融機関が顧客を詐欺から保護し、彼らのお金を守るために重要です。詐欺トランザクションを検出することの重要な理由をいくつか挙げます。

  • 財務的な損失の削減:詐欺トランザクションは企業に莫大な損失をもたらし、利益を減少させます。したがって、企業が詐欺トランザクションを検出することは重要です。
  • 評判の保護:評判の維持は、ビジネスにとって重要な要素であり、潜在的なクライアントや顧客の喪失につながります。
  • 顧客とビジネスの保護:詐欺トランザクションは顧客に財務的な損失や感情的な影響を与えることがあります。詐欺を検出することで、ビジネスは顧客と自社を守ることができます。

データの収集と前処理

データの収集と前処理は、詐欺検出モデルの開発において重要な部分です。データが収集されたら、データセットに対していくつかの手順を実行する必要があります。

  • データクリーニング:データクリーニングでは、重複するデータなどの不要なデータを削除し、欠損データを埋める作業が含まれます。
  • データ変換:データ変換では、データの列を分析に使用できる必要なデータ型に変換します。このステップにより、データの品質が維持されます。
  • データ探索:データ探索では、データセットを理解し、データ間の関係やパターンを見つける作業が含まれます。
  • 不均衡データの処理:詐欺検出データセットは多くの有効なトランザクションとごく少数の詐欺トランザクションが存在するため、モデルが過学習になる可能性が高いです。この問題は、過サンプリングや欠損サンプリング技術を使用することで対処できます。これらの技術を使用することで、バランスの取れたデータセットを作成できます。

ライブラリを使用して不正検知データセットを可視化する

数字だけを見ても関係性が分かりません。Pythonのライブラリを使用してデータセットから洞察を得るためにグラフやチャートを作成します。

  • Matplotlib:Pythonで使用される基本的なツールで、棒グラフや折れ線グラフなどさまざまなタイプのグラフやチャートを作成するために使用されます。
  • Seaborn:Pythonの別の可視化ツールで、ヒートマップやバイオリンプロットなど、より詳細な可視化画像を作成するのに役立ちます。

ホテル予約データセットを可視化するために使用される手法。

  • Countplot:Countplotはカテゴリ値のヒストグラムをプロットするために使用されます。これにより、異なるカテゴリ値に対する詐欺のプロットが可能となり、関係性を理解するのに役立ちます。
  • Distplot:Distplotを使用すると、時間にわたる分布を確認することができます。これにより、歪度のチェックに役立ちます。

不正検知MLOPsモデルのユースケースと応用

不正検知MLOPsモデルは、さまざまな業界で複数の用途があります。以下はその応用事例です:

  1. 銀行および金融機関:銀行や金融機関は、クレジットカードや保険詐欺などの不正取引を検知するためにMLOPツールを使用します。これらのツールを使用することで、詐欺を減らすことができます。
  2. 電子商取引および小売業:商品の購入中に詐欺取引を特定することは、顧客データと会社のビジネスを保護するのに役立ちます。
  3. 医療とホスピタリティ:医療界は、偽の医療請求や請求手続きを検出するためにMLOPsを使用します。検知モデルを使用することで、これらの誤った取引を減らすことができます。
  4. 通信とEチケティング:偽のSIMスワップ、定期購読詐欺、および偽の予約を検知することができます。MLOPsモデルを使用してこの問題に対処することができます。

不正検知MLOPsモデルの課題とベストプラクティス

不正検知モデルの構築には、さまざまな理由でいくつかの課題があります:

  • データ品質:不正検知モデルは、過去のデータに基づいて訓練されるため、データ品質が重要な役割を果たします。データセットが良ければ良いほど、モデルの精度が高くなります。不正データセットはバランスが崩れているため、有効な取引よりも無効な取引の方が多いことがあります。これはモデルのトレーニングフェーズで課題となります。
  • データプライバシー:不正取引は、不正な手段で顧客や企業のデータアクセスを侵害することが多く発生します。モデルを設計する際に、顧客データのプライバシーが保たれるようにすることが重要です。
  • モデルのドリフト:データ品質の変化により、モデルのドリフトが生じることがあります。モデルがドリフトしないようにするには、データ品質を維持し、モデルを監視する必要があります。
  • リアルタイム処理:リアルタイム処理は複雑さを増すため、さらなる課題があります。

不正検知モデルを構築する際のベストプラクティス

不正検知モデルを作成する際のベストプラクティスについては以下で説明します。

  • 特徴エンジニアリング: データ収集が完了した後、データの前処理と特徴エンジニアリングを行い、良質なデータを得るためにします。
  • 不均衡データの処理:不正検知データセットは、不正取引に比べて有効な取引が多いため、不均衡な性質を持ちます。この不均衡はモデルにバイアスをもたらすことがあります。この問題に対処するために、アンダーサンプリングやオーバーサンプリングの技術を使用します。
  • モデルの構築:アンサンブル技術を使用してモデルを訓練し、結果として高い精度を持つモデルを作成します。この結果のモデルは詐欺や有効な取引の両方を予測することができます。
  • MLOPs: トレーニングから展開、監視までソリューションのライフサイクル全体を構築するためにMLOPsフレームワークを使用します。このフレームワークはモデルの構築のルールを設定し、モデルの精度と信頼性を確保します。

デジタル化の進展とインターネットの普及に伴い、ますます多くの人々がデジタル決済やオンライン予約サービスを利用するようになります。技術の発展により、簡単で高速な支払いツールが作られるようになります。したがって、詐欺を防止し、顧客の信頼を得るためのツールの開発も重要となります。企業は信頼性のある、アクセスしやすい、費用効果の高いソリューションを求めることがよくあります。テクノロジーはそれに重要な役割を果たすことができます。金融商品を中心にツールやサービスを構築することで、事業者は顧客に幅広いサービスを提供することができます。このパーソナライズされた金融商品は、顧客と事業者の信頼関係を向上させることができます。

Pythonを使用した詐欺検出データ分析

Pythonを使った基本的なデータ分析を行いましょう。このデータはKaggleからのデータセットを使います。データセットをダウンロードするには、ここをクリックしてください

データの詳細

詐欺検出データセットには、モデルの学習に使用される100万件以上のレコードが含まれています。以下はデータセットの詳細です:

ステップ1 ライブラリのインポート

import random
import calendar
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.stats import norm, skew, ttest_ind
import warnings
warnings.filterwarnings('ignore')

ステップ2 データセットのインポートとデータの検査

# データをトレインとテストのファイルから読み込み、データフレームに変換する
df = pd.read_csv('C:\Decodr\G3\B\FRAUD_DETECTION_IN_IMBALANCED_DATA\data2\Train.csv')
T_df = pd.read_csv('C:\Decodr\G3\B\FRAUD_DETECTION_IN_IMBALANCED_DATA\data2\Test.csv')

# データフレームの形状を確認する
df.shape, T_df.shape

# トレインとテストのデータフレームの情報をチェックする
df.info(), T_df.info()

# トレインとテストのデータフレームの欠損値をチェックする
df.isna().sum(), T_df.isna().sum()

OUTPUT

ステップ3 データセットの可視化

# カテゴリ別の詐欺データ数の可視化
sns.countplot(data=df[df['is_fraud_cat'] == "T"], x='category')
plt.xticks(rotation=45)
plt.show()

OUTPUT

Insightショッピングフラグメントと食料品POSで最も多くの詐欺が発生しています。

# 性別別の詐欺データ数の可視化
sns.countplot(data=df[df['is_fraud_cat']=="T"],x='gender')
plt.show()

OUTPUT

Insight女性の顧客により多くの詐欺事例が発生していますが、男性と女性の数はほぼ同じです。

# 州別の詐欺データ数の可視化
fig, ax = plt.subplots(figsize=(120,60))
plt.rcParams.update({'font.size': 60})
sns.countplot(data=df[df['is_fraud_cat']=="T"],x='state')
plt.xticks(rotation=45)
for p, label in zip(ax.patches, df["state"].value_counts().index):
    ax.annotate(label, (p.get_x(), p.get_height()+0.15))
plt.title("Number of Credit Card Frauds by State")
plt.show()

OUTPUT

InsightOH、TX、LAの州が最も多くのクレジットカード詐欺を報告しています。

# 都市別の詐欺データ数の可視化
def randomcolor():
    r = random.random()
    b = random.random()
    g = random.random()
    rgb = [r,g,b]
    return rgb

plt.rcParams.update({'font.size': 20})
df[df['is_fraud_cat']=="T"]["city"].value_counts(sort=True,ascending=False).head(10).plot(kind="bar",color=randomcolor())
plt.title("Number of Credit Card Frauds by City")
plt.show()

出力

見解ダラス、ヒューストン、バーミンガムは、市ごとに最も多くの詐欺を報告しています。

#Jobに基づく詐欺df[df['is_fraud_cat']=="T"]["job"].value_counts(sort=True,ascending=False).head(10).plot(kind="bar",color=randomcolor())plt.title("職業別のクレジットカード詐欺の回数")plt.show()

出力

見解数量調査士の仕事が最も多くの詐欺が発生し、その後に造船技師と材料エンジニアが続きます。

#詐欺対非詐欺plt.figure(figsize=(8,5))ax = sns.countplot(x="is_fraud", data=df,color=randomcolor())for p in ax.patches:     ax.annotate('{:.1f}'.format(p.get_height()), (p.get_x()+0.25, p.get_height()+0.01))plt.show()

出力

見解約60,0006件のエントリーが詐欺取引を表しており、約100万件のエントリーのうちわずかに詐欺取引が存在しているため、私たちは不均衡なデータセットを見ています。

ステップ4. 前処理と特徴エンジニアリング

data['trans_date_trans_time'] = pd.to_datetime(data['trans_date_trans_time'], format='%d-%m-%Y %H:%M')data['trans_date']=data['trans_date_trans_time'].dt.strftime('%Y-%m-%d')data['trans_date']=pd.to_datetime(data['trans_date'])data['dob']=pd.to_datetime(data['dob'],format='%d-%m-%Y')data["age"] = data["trans_date"]-data["dob"]data["age"] = data["age"].astype('int64')data['trans_month'] = pd.DatetimeIndex(data['trans_date']).monthdata['trans_year'] = pd.DatetimeIndex(data['trans_date']).yeardata['Month_name'] = data['trans_month'].apply(lambda x: calendar.month_abbr[x])data['latitudinal_distance'] = abs(round(data['merch_lat']-data['lat'],3))data['longitudinal_distance'] = abs(round(data['merch_long']-data['long'],3))data.gender=data.gender.apply(lambda x: 1 if x=="M" else 0)data = data.drop(['cc_num','merchant','first','last','street','zip','trans_num','unix_time','trans_date_trans_time','city','lat','long','job','dob','merch_lat','merch_long','trans_date','state','Month_name'],axis=1)data =pd.get_dummies(data,columns=['category'],drop_first=True)#アンダーサンプリングの実行normal = data[data['is_fraud']==0]fraud = data[data['is_fraud']==1]normal_sample=normal.sample(n=len(fraud),random_state=42)new_data = pd.concat([normal_sample,fraud],ignore_index=True)

MS AzureとAirflowを使用したエンドツーエンドモデル構築

上記のステップでは、視覚化のためにデータファイルをローカルで読み込んでいましたが、実装部分では、MS Azureなどのクラウドサービスを使用します。データの取り込みとモデルの構築にAirflowツールを統合した方法を示します。MS Azureでは、まず、ストレージアカウントを作成し、その中にコンテナを作成します。このコンテナにファイルを保存します。Airflowパイプラインを作成し、コンテナからデータを取得して必要な場所に保存します。その後、エンドツーエンドのモデルを構築し、パブリックにするためにストリームクラウドにデプロイします。

ストレージアカウントを作成するためには、Azureアカウントを作成する必要があります。以下の手順に従ってください:

  • MS Azureアカウントを作成します
  • ストレージアカウントを作成します
  • ストレージアカウント内にコンテナを作成します
  • コンテナが作成されたら、ファイルを手動でアップロードするか、Airflow DAGを使用してアップロードします

エアフローとは何ですか?

エアフローは、モデルの構築と監視をサポートするオープンソースのワークフロー管理プラットフォームです。ディレクテッドアクリックグラフ(DAG)を使用してワークフローを定義します。以下にエアフローのいくつかの利点を説明します。

  • 動的に定義されたワークフロー:エアフローでは、Pythonを使用してカスタムのワークフローを定義できます。このワークフローは簡単に作成や変更ができます。これによりワークフローの柔軟性が向上します。
  • スケーラブル:分散アーキテクチャを使用することで、エアフローを素早くスケールさせることができ、複数のワークフローを同時に処理できます。
  • 監視とログ:エアフローは使いやすいウェブインターフェースを提供しています。このウェブインターフェースを使用すると、ユーザーは監視やログの表示ができます。これにより問題のトラブルシューティングを迅速に行うことができます。
  • 並列実行:エアフローはワークフローを並列実行する機能を提供しており、実行時間を大幅に短縮し、モデルのパフォーマンスを向上させます。

現実世界では、モデルの構築だけでは不十分です。モデルを本番環境に展開し、時間とともにモデルのパフォーマンスと実際のデータとの相互作用を監視する必要があります。エアフローを使用して、エンドツーエンドの機械学習を構築し、監視することができます。エアフローでは、ワークフローを作成し、実行の依存関係を設定することができます。エアフローでワークフローの状態を確認することもできます。ワークフローが正常に完了したか、失敗したか、再開したかなどをエアフローで確認できます。ワークフローの実行後には、ログをエアフローで監視できます。これにより、本番に適したモデルのトラッキングが可能です。詳細については、エアフローのドキュメントを参照することを強くお勧めします。

ワークフロー

ワークフローは以下のステップで構成されています:

  • data_upload_operator:このオペレータは、ローカルストレージからファイルを取得してAzure Blobコンテナにアップロードします。
  • data_download_operator:このオペレータは、Azureからファイルをダウンロードしてローカルストレージに保存します。
  • data_preprocessing_operator:このオペレータは、Azureからダウンロードしたデータセットに対して前処理を行います。
  • data_split_operator:このオペレータは、データセットを2つの部分に分割します。最初の部分ではモデルをトレーニングし、2番目の部分ではモデルをテストします。
  • model_training_operator:このオペレータは、データセット上でモデルのトレーニングを行います。
  • model_evaluation_operator:このオペレータは、モデルのパフォーマンスを評価するために使用されます。
  • model_prediction_operator:このオペレータは、新しい未知のデータセット上でモデルの予測を行います。

モデル開発

先に述べたエアフローオペレータを使用しています。それでは、コーディングの部分に移りましょう。

data_upload_operator

from azure.storage.blob import BlobServiceClientfrom config.constant import storage_account_key, storage_account_name, connection_string, container_name, file_path_up, file_namedef uploadToBlobStorage():    try:        blob_service_client = BlobServiceClient.from_connection_string        (connection_string)        blob_client = blob_service_client.get_blob_client        (container = container_name, blob = file_name)        with open(file_path_up,"rb") as data:            blob_client.upload_blob(data)        print("Upload " + file_name + " from local to container " + container_name)    except Exception as e:        print(f"An error occurred: {str(e)}")uploadToBlobStorage()

上記では、ローカルストレージからファイルを取得してAzureストレージアカウントにアップロードするuploadToBlobStorage()メソッドを定義しています。

data_download_operator

from azure.storage.blob import BlobServiceClientfrom config.constant import storage_account_key, storage_account_name, connection_string, container_name, blob_name, file_path_downdef downloadFromBlobStorage():    try:        # Initialize a BlobServiceClient using the connection string        blob_service_client = BlobServiceClient.from_connection_string        (connection_string)        # Get a BlobClient for the target blob        blob_client = blob_service_client.get_blob_client        (container=container_name, blob=blob_name)        # Download the blob to a local file        with open(file_path_down, "wb") as data:            data.write(blob_client.download_blob().readall())        print(f"Downloaded {blob_name} from {container_name} to {file_path_down}")    except Exception as e:        print(f"An error occurred: {str(e)}")downloadFromBlobStorage()

downloadFromBlobStorage()メソッドがここで定義されています。これはストレージアカウントに接続し、ファイルをダウンロードします。その後、ファイルはローカルパスに保存されます。

data_preprocessing_operator

from airflow.models import BaseOperatorfrom airflow.utils.decorators import apply_defaultsimport pandas as pdimport calendarclass DataPreprocessingOperator(BaseOperator):    @apply_defaults    def __init__(self, preprocessed_data, *args, **kwargs):         super(DataPreprocessingOperator, self).__init__(*args, **kwargs)        self.preprocessed_data = preprocessed_data    def execute(self, context):        try:            # データ前処理のロジックをここに記述してください            # たとえば、取り込んだデータの特徴をクリーンアップ、変換、またはエンジニアリングすることができます            data = pd.read_csv('data/processed/ingested_data.csv')            data['trans_date_trans_time'] = pd.to_datetime            (data['trans_date_trans_time'], format='%d-%m-%Y %H:%M')            data['trans_date']=data['trans_date_trans_time'].dt.strftime('%Y-%m-%d')            data['trans_date']=pd.to_datetime(data['trans_date'])            data['dob']=pd.to_datetime(data['dob'],format='%d-%m-%Y')            data["age"] = data["trans_date"]-data["dob"]            data["age"] = data["age"].astype('int64')            data['trans_month'] = pd.DatetimeIndex(data['trans_date']).month            data['trans_year'] = pd.DatetimeIndex(data['trans_date']).year            data['Month_name'] = data['trans_month'].            apply(lambda x: calendar.month_abbr[x])            data['latitudinal_distance'] = abs(round(data['merch_lat']-data['lat'],3))            data['longitudinal_distance'] = abs(round(data['merch_long']-data['long'],3))            data.gender=data.gender.apply(lambda x: 1 if x=="M" else 0)            data = data.drop(['cc_num','merchant','first','last','street','zip',            'trans_num','unix_time','trans_date_trans_time','city','lat','long',            'job','dob','merch_lat','merch_long','trans_date','state','Month_name'],            axis=1)            data =pd.get_dummies(data,columns=['category'],drop_first=True)            # アンダーサンプリングを実行します            normal = data[data['is_fraud']==0]            fraud = data[data['is_fraud']==1]            normal_sample=normal.sample(n=len(fraud),random_state=42)            new_data = pd.concat([normal_sample,fraud],ignore_index=True)            # オーバーサンプリングを実行します            # normal = data[data['is_fraud']==0]            # fraud = data[data['is_fraud']==1]            # fraud_sample=fraud.sample(n=len(normal),replace=True,random_state=42)            # new_data = pd.concat([normal,fraud_sample],ignore_index=True)                        # 前処理されたデータを出力ファイル(例:CSVファイル)に保存します            new_data.to_csv(self.preprocessed_data, index=False)        except Exception as e:            self.log.error(f'Data preprocessing failed: {str(e)}')            raise e
  • 上記ではデータ型を変更し、列を削除しました
  • データセットが不均衡であるため、アンダーサンプリングを実行しました。また、オーバーサンプリングのためのコードも用意しました。

model_training_operator

from airflow.models import BaseOperatorfrom airflow.utils.decorators import apply_defaultsimport pandas as pdfrom sklearn.ensemble import RandomForestClassifierimport joblibclass ModelTrainingRFCOperator(BaseOperator):    """    マシンラーニングモデルをトレーニングしてファイルに保存するためのカスタムApache Airflowオペレーター。    """    def __init__(self, X_train_file, y_train_file, model_file, *args, **kwargs):        """        オペレーターを初期化します。        :param X_train_file: トレーニングセット(X_train)の特徴のファイルパス。        :param y_train_file: トレーニングセット(y_train)のラベルのファイルパス。        :param model_file: トレーニング済みモデルを保存するファイルパス。        """        super(ModelTrainingRFCOperator, self).__init__(*args, **kwargs)        self.X_train_file = X_train_file        self.y_train_file = y_train_file        self.model_file = model_file    def execute(self, context):        self.log.info(f'データ {self.X_train_file, self.y_train_file} を使用してマシンラーニングモデルをトレーニングしています')        try:            X_train = pd.read_csv(self.X_train_file)            y_train = pd.read_csv(self.y_train_file)                        print(X_train.shape)            print(y_train.shape)                        # マシンラーニングモデルを初期化およびトレーニングします            #(モデルクラスで置き換えてください)            RFC = RandomForestClassifier(n_estimators=100, random_state=0)            RFC.fit(X_train, y_train)            # トレーニング済みモデルを提供されたmodel_fileに保存します            joblib.dump(RFC, self.model_file)        except Exception as e:            self.log.error(f'Model training failed: {str(e)}')            raise e

前処理とデータ分割の後、次のステップではモデルのトレーニングです。コードでは、モデルのトレーニングにRandomForestClassifierを使用しました。

model_evaluation_operator

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import pandas as pd
from sklearn.metrics import accuracy_score, classification_report
import joblib

class ModelEvaluationRFCOperator(BaseOperator):
    """カスタムApache Airflowオペレータで、機械学習モデルを評価し、評価結果をファイルに保存します。"""
    
    @apply_defaults
    def __init__(self, X_test_file, y_test_file, model_file, output_file, *args, **kwargs):
        """オペレータを初期化します。
        
        :param X_test_file: テストセットの特徴量(X_test)のファイルパス。
        :param y_test_file: テストセットのラベル(y_test)のファイルパス。
        :param model_file: トレーニング済みモデルを読み込むファイルパス。
        :param output_file: 評価結果を保存するファイルパス。
        """
        super(ModelEvaluationRFCOperator, self).__init__(*args, **kwargs)
        self.X_test_file = X_test_file
        self.y_test_file = y_test_file
        self.model_file = model_file
        self.output_file = output_file
    
    def execute(self, context):
        self.log.info(f'テストデータを使って機械学習モデルの評価を実行中: {self.X_test_file}, {self.y_test_file}')
        
        # XComを使って前のタスクからテストデータを取得する
        test_data = context['ti'].xcom_pull(task_ids='data_split_task', key='test_data')
        
        try:
            """機械学習モデルを評価し、評価結果をファイルに保存するためのオペレータを実行します。"""
            
            # 提供されたファイルからテストデータとトレーニング済みモデルを読み込む
            X_test = pd.read_csv(self.X_test_file)
            y_test = pd.read_csv(self.y_test_file)
            model = joblib.load(self.model_file)
            
            # トレーニング済みモデルを使って予測を行う
            y_pred = model.predict(X_test)
            
            # 評価指標を計算して表示する
            accuracy = accuracy_score(y_test, y_pred)
            classification_rep = classification_report(y_test, y_pred, target_names=['クラス0', 'クラス1'])  # 必要に応じてラベルをカスタマイズ
           
            # 評価結果を指定された出力ファイルに保存する
            with open(self.output_file, 'w') as f:
                f.write(f"正解率: {accuracy}\n\n分類レポート:\n{classification_rep}")
        
        except Exception as e:
            self.log.error(f'モデルの評価に失敗しました: {str(e)}')
            raise e

モデルのトレーニング後、モデルを評価し、分類レポートを作成しました。ここでは、モデルの正解率、適合率、再現率、F1スコアを確認しています。

model_prediction_operator

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import pandas as pd
from sklearn.metrics import accuracy_score, classification_report
import joblib
import calendar

class ModelPredictionOperator(BaseOperator):
    """カスタムApache Airflowオペレータで、機械学習モデルを評価し、評価結果をファイルに保存します。"""
    
    @apply_defaults
    def __init__(self, input_file, model_file, output_file, *args, **kwargs):
        """オペレータを初期化します。
        
        :param X_test_file: テストセットの特徴量(X_test)のファイルパス。
        :param y_test_file: テストセットのラベル(y_test)のファイルパス。
        :param model_file: トレーニング済みモデルを読み込むファイルパス。
        :param output_file: 評価結果を保存するファイルパス。
        """
        super(ModelPredictionOperator, self).__init__(*args, **kwargs)
        self.input_file = input_file
        self.model_file = model_file
        self.output_file = output_file
    
    def execute(self, context):
        self.log.info(f'データ{self.input_file}を使って機械学習モデルの評価を実行中')
        
        try:
            """機械学習モデルを評価し、評価結果をファイルに保存するためのオペレータを実行します。"""
            
            # 提供されたファイルからテストデータとトレーニング済みモデルを読み込む
            new_data = pd.read_csv('data/raw/Test.csv')
            new_data['trans_date_trans_time'] = pd.to_datetime(new_data['trans_date_trans_time'], format='%d-%m-%Y %H:%M')
            new_data['trans_date'] = new_data['trans_date_trans_time'].dt.strftime('%Y-%m-%d')
            new_data['trans_date'] = pd.to_datetime(new_data['trans_date'])
            new_data['dob'] = pd.to_datetime(new_data['dob'], format='%d-%m-%Y')
            new_data["age"] = new_data["trans_date"] - new_data["dob"]
            new_data["age"] = new_data["age"].astype('int64')
            new_data['trans_month'] = pd.DatetimeIndex(new_data['trans_date']).month
            new_data['trans_year'] = pd.DatetimeIndex(new_data['trans_date']).year
            new_data['Month_name'] = new_data['trans_month'].apply(lambda x: calendar.month_abbr[x])
            new_data['latitudinal_distance'] = abs(round(new_data['merch_lat'] - new_data['lat'],3))
            new_data['longitudinal_distance'] = abs(round(new_data['merch_long'] - new_data['long'],3))
            new_data.gender = new_data.gender.apply(lambda x: 1 if x=="M" else 0)
            new_data = new_data.drop(['cc_num','merchant','first','last','street','zip','trans_num','unix_time','trans_date_trans_time','city','lat','long','job','dob','merch_lat','merch_long','trans_date','state','Month_name'], axis=1)
            new_data = pd.get_dummies(new_data, columns=['category'], drop_first=True)
            X_new = new_data.drop(["is_fraud"], axis=1)
            y_new = new_data["is_fraud"]
            
            model = joblib.load(self.model_file)
            
            # トレーニング済みモデルを使って予測を行う
            y_pred_new = model.predict(X_new)
            print('y_new:', y_new)
            print('y_pred_new:', y_pred_new)
            
            # 評価指標を計算して表示する
            accuracy = accuracy_score(y_new, y_pred_new)
            classification_rep = classification_report(y_new, y_pred_new, target_names=['クラス0', 'クラス1'])  # 必要に応じてラベルをカスタマイズ
            
            # 評価結果を指定された出力ファイルに保存する
            with open(self.output_file, 'w') as f:
                f.write(f"正解率: {accuracy}\n\n分類レポート:\n{classification_rep}")
        
        except Exception as e:
            self.log.error(f'モデルの評価に失敗しました: {str(e)}')
            raise e

予測演算子では、新しいデータセット、つまりテストデータファイルでモデルをテストしています。予測の後、分類レポートの準備をしています。

クラウドでの環境設定とモデル展開

pythonかanacondaを使用して、仮想環境を作成します。

#仮想環境を作成するコマンドpython3 -m venv <仮想環境名>

以下のコマンドを使用して、環境にいくつかのPythonパッケージをインストールする必要があります。

cd airflow-projects/fraud-predictionpip install -r requirements.txt

ワークフローを実行する前に、airflowをインストールし、データベースを設定する必要があります。

#airflowをインストールするコマンドpip install 'apache-airflow==2.7.1' \ --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.1/ constraints-3.8.txt"#ホームパスを設定するexport AIRFLOW_HOME=/c/Users/[YourUsername]/airflow#データベースを初期化するコマンド:airflow db init#Airflowユーザーを作成するコマンドairflow users create --username admin –password admin –firstname admin –lastname admin –role Admin –email [email protected]#作成されたユーザーを確認するコマンドairflow users list#Webサーバーを実行するコマンド#スケジューラを実行するコマンドairflow scheduler#デフォルトのポート8080が使用中の場合は、以下のコマンドでポートを変更する:airflow webserver –port <port number>

上記のように、実行するためのAirflow DAGで実行できるようにさまざまなairflowオペレーターを作成しました。ワンクリックでDAGをトリガーできます。

image.png

ワークフローが正常に完了する前に、異なるステータスが通過する場合があります。以下に示します:

上記で説明した異なるオペレーターは、実行されるとワークフローのステータスをリアルタイムで監視することもできます。

AirflowでトリガーされたDAGのログをモニターすることができます。以下はサンプルです。

クラウドでのモデル展開

最適なモデルを使用した後、streamlitコードを使用してモデルを展開しました。以下のコマンドを使用して、このStreamlitアプリをローカルシステムで実行します。

#ローカルでStreamlitアプリを実行するコマンドstreamlit run streamlit_app.py

アプリのクラウドバージョンは、以下のURLを使用してアクセスできます。パブリックにアクセスできます。

https://fraud-prediction-mlops-d8rcgc2prmv9xapx5ahhhn.streamlit.app/

エンドツーエンドの完全なML実装コードについては、こちらをクリックしてください。

結果

複数のアルゴリズムを試して、それぞれのモデルの性能を比較しました。結果は以下の通りです。

上記の結果から、不均衡なデータセットにアンサンブル学習テクニックを使用した後、4つのモデルがすべて90%以上の正確度で非常に良い結果を示しています。ランダムフォレスト分類器と決定木分類器はほぼ同じ正確度を持っており、ランダムフォレストの方がやや優れています。

  • 精度:精度は正しい予測の比率を総予測数で割ったものです。
  • 適合率:適合率は陽性クラスでの正しい予測の数です。
  • リコール:リコールはデータセットの実際の陽性サンプルから行われた正しい楽観的な予測の数です。
  • F1スコア:F1スコアはモデルの正確さを測るものです。適合率と精度の調和平均として定義されています。

デモアプリケーション

このプロジェクトのStreamlitを使用したライブデモアプリケーションです。製品のいくつかの入力特徴を受け取り、トレーニングされたモデルを使用して有効または不正なトランザクションを予測します。

結論

今日の世界はデジタルであり、テクノロジーは私たちの生活の一部になっています。本からスマートフォン、ラップトップまでオンラインサービスの増加があります。何でもオンラインで購入することができます。したがって、詐欺を防止し、詐欺検出モデルを実装することは、すべての企業にとって重要なことになります。機械学習は、ビジネスと顧客にとって重要な役割を果たすことができます。

  • 詐欺の金融取引を特定することにより、ビジネスの利益を増やします。
  • ビジネスの評判を維持し、顧客を増やすのに役立ちます。
  • 機械学習ツールは、利用しやすくより良いサービスを提供するのに役立ちます。
  • 顧客に良いサービスを提供し、信頼関係を築くのに役立ちます。

よくある質問

この記事に表示されているメディアはAnalytics Vidhyaが所有していません。著者の裁量で使用されています。

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more