MLにおけるETLデータパイプラインの構築方法

'ETLデータパイプラインの構築方法' in English is 'Building ETL Data Pipelines in ML.'

データ処理から即座の洞察力まで、頑健なパイプラインはどんなMLシステムにも必須です。データチームは、データとMLエンジニアで構成されることが多く、このインフラストラクチャを構築する必要がありますが、この経験は苦痛になることがあります。しかし、MLでETLパイプラインを効率的に使用することで、彼らの生活をはるかに簡単にすることができます。

この記事では、機械学習におけるETLパイプラインの重要性、人気のあるツールを使用したETLパイプラインの構築の実践的な例、およびデータエンジニアがパイプラインを強化し維持するための最良の方法を提案します。また、MLユースケースにおける異なるタイプのETLパイプラインについて説明し、データエンジニアが適切なパイプラインを選択するための実世界の例を提供します。技術的な詳細に入る前に、基本的な概念を見直しましょう。

機械学習におけるETLデータパイプラインとは何ですか?

ETLデータパイプラインは、必要なデータの抽出(E)、変換(T)、ロード(L)を実行するためのツールと活動の集合です。

ETLパイプライン | 出典:著者

これらの活動には、データを1つのシステムから抽出し、変換し、別のターゲットシステムに処理して保存および管理するということが含まれます。

MLは、モデルの精度と効果にトレーニングデータの品質が直接影響するため、ETLパイプラインに大きく依存しています。これらのパイプラインは、データサイエンティストが時間と労力を節約するのに役立ちます。データがクリーンで適切な形式になり、機械学習タスクで使用する準備ができていることを保証することによって、データサイエンティストがモデルの作成や継続的な改善に集中できるようにします。

さらに、ETLパイプラインはデータの隔離を解消し、単一の真実の情報源を確立するのに重要な役割を果たしています。ETLパイプラインの重要性について詳しく見てみましょう。

なぜ機械学習でETLパイプラインが必要なのですか?

ETLパイプラインの重要性は、組織が大規模で複雑なデータセットから価値ある洞察を導き出すことを可能にすることにあります。以下に、なぜそれらが重要なのかの具体的な理由をいくつか示します:

  • データ統合:組織は、ETLパイプラインを使用してさまざまなソースからデータを統合することができます。これにより、データサイエンティストはデータの統一されたビューを得ることができ、モデルのトレーニング方法、ハイパーパラメータの値などを決定することができます。
  • データ品質チェック:データが統合ステップを通過すると、ETLパイプラインはデータの品質を向上させるのに役立ちます。データの標準化、クリーニング、検証を行うことで、MLに使用されるデータが正確で信頼性があり、一貫性があることを保証します。
  • 時間の節約:ETLパイプラインは、抽出、変換、ロードの3つの主要なステップのプロセスを自動化するため、多くの時間を節約し、ヒューマンエラーの可能性を減らすのに役立ちます。これにより、データサイエンティストはモデルの作成や継続的な改善に集中することができます。
  • スケーラブル:現代のETLパイプラインはスケーラブルです。つまり、処理するデータの量に応じてスケールアップまたはスケールダウンすることができます。基本的には、ビジネスのニーズに基づいて任意の変更を行う柔軟性と機敏性が備わっています。

ETLとデータパイプラインの違いは何ですか?

データパイプラインは、異なるシステム間でデータを移動するカテゴリの総称であり、ETLデータパイプラインはデータパイプラインの一種です。

— Xoriant

ETLデータパイプラインとデータパイプラインは、同じ意味で使用されることが一般的です。これらの用語は、さまざまなソースからデータを単一のリポジトリに渡す機能とプロセスを参照しているにもかかわらず、同じではありません。なぜこれらを同義語として使用すべきではないかについて調査してみましょう。

比較

ETLパイプライン

データパイプライン

用語

略語が示すように、ETLはデータの抽出、変換、最後にターゲットソースへのロードという一連のプロセスを含みます。

データパイプラインもデータを1つのソースから別のソースに移動することを含みますが、必ずしもデータの変換を経る必要はありません。

焦点領域

ETLは、生データを構造化された形式に変換し、データ駆動の意思決定のためにデータサイエンティストがモデルを作成し解釈できるようにします。

データパイプラインは、さまざまなソースからデータウェアハウスにデータを転送することを目的として作成されます。その後のプロセスやワークフローは、このデータを活用してビジネスインテリジェンスや分析ソリューションを作成することができます。

操作

ETLパイプラインはスケジュールに基づいて実行されます。たとえば、毎日、毎週、または毎月です。基本的なETLパイプラインはバッチ指向であり、データは指定されたスケジュールでチャンク単位で移動します。

データパイプラインはリアルタイム処理を行うことがよくあります。データは継続的に更新され、リアルタイムのレポートや分析をサポートします。

要約すると、ETLパイプラインは、複数のソースからデータを抽出し、共通の形式に変換し、データウェアハウスや他のストレージシステムにロードするために特別に設計されたデータパイプラインの一種です。データパイプラインにはさまざまなタイプのパイプラインが含まれる可能性がありますが、ETLパイプラインはデータパイプラインの特定のサブセットです。

ETLパイプラインの基本的なアーキテクチャを説明し、各ステップが異なる目的で実行される方法を見て、各ステップを完了するためのさまざまなツールから選択できることを確認しました。ELTアーキテクチャとそのタイプは、組織によって異なります。組織には異なる技術スタック、データソース、ビジネス要件があるためです。

MLにおける異なるタイプのETLパイプラインとは何ですか?

ETLパイプラインは、処理されるデータのタイプと処理方法に基づいてカテゴリ分けすることができます。以下にいくつかのタイプを示します:

  • バッチETLパイプライン:これは、バッチで大量のデータを一度に処理する従来のETLアプローチです。データは1つ以上のソースから抽出され、必要な形式に変換され、データウェアハウスなどのターゲットシステムにロードされます。バッチETLは、過去のデータでモデルをトレーニングしたり、定期的なバッチ処理ジョブを実行したりする場合に特に有用です。
  • リアルタイムETLパイプライン:これは、データがリアルタイムまたはほぼリアルタイムで到着するときにデータを処理するものです。データを連続的に処理することは、一度に必要な処理能力が少なくて済むことを意味し、使用量の急増を回避できます。ストリーム/リアルタイムETLは、リアルタイム処理が重要な詐欺検出などのアプリケーションに特に有用です。リアルタイムETLパイプラインには、ストリーム処理エンジンやメッセージングシステムなどのツールとテクノロジーが必要です。
  • 増分ETLパイプライン:これらのパイプラインは、前回の実行以降に変更されたデータのみを抽出して処理します。ソースデータが頻繁に変更されるが、ターゲットシステムは最新のデータのみが必要な場合に便利です。たとえば、データは頻繁に変更されますがリアルタイムではないレコメンデーションシステムなどのアプリケーションに適しています。
  • クラウドETLパイプライン:MLのクラウドETLパイプラインは、クラウドベースのサービスを使用してデータを抽出、変換、ロードし、MLシステムにトレーニングおよび展開するものです。AWS、Microsoft Azure、GCPなどのクラウドプロバイダは、これらのパイプラインを構築するために使用できるさまざまなツールとサービスを提供しています。たとえば、AWSは、ETLのためのAWS Glue、データストレージのためのAmazon S3、MLのトレーニングと展開のためのAmazon SageMakerなどのサービスを提供しています。
  • ハイブリッドETLパイプライン:これらのパイプラインは、バッチ処理とリアルタイム処理を組み合わせ、両方のアプローチの利点を活用します。ハイブリッドETLパイプラインは、大量のデータを予め決められた間隔で処理し、データが到着するたびにリアルタイムの更新もキャプチャします。ハイブリッドETLは、予測保守などのアプリケーションで特に有用であり、リアルタイムと過去のデータの両方がモデルのトレーニングに必要な場合に使用されます。

ETLパイプラインツール

最後のセクションで議論したように、ETLパイプラインを作成するには、以下の基本的なETLアーキテクチャステップの機能を提供できるツールが必要です。市場にはいくつかのツールがありますが、以下はいくつかの人気のあるものとその提供する機能です。

ツール

クラウドベース

事前構築されたコネクタ

サーバーレス

事前構築された変換オプション

APIサポート

完全に管理された

Hevo Data

AWS Glue

GCP Cloud Data Fusion

<img src="https://neptune.ai/wp-content

ML ETLパイプラインの構築方法

前のセクションでは、基本的なETLの概念とツールについて簡単に説明しましたが、このセクションでは、それらを活用してETLパイプラインを構築する方法について説明します。まず、アーキテクチャについて話しましょう。

ETLのアーキテクチャ

ETLのアーキテクチャの特徴は、データがデータウェアハウスに到達する前に、必要な準備手順をすべて経ることです。その結果、最終的なリポジトリにはきれいで完全で信頼性のあるデータが含まれており、修正する必要なくさらに利用することができます。— Coupler

ETLのアーキテクチャには、上記のようなフローを示す図がよく含まれています。データソースから最終的な宛先までのETLパイプライン内の情報の流れを示しています。それは次の3つの主要なエリアで構成されています:着陸エリア、ステージングエリア、データウェアハウスエリア。

  • 着陸エリアは、ソースの場所から抽出されたデータが最初に到着する場所です。ETLパイプラインを通過させる前に、複数のバッチのデータを保存することができます。
  • ステージングエリアは、ETLの変換を行うための中間の場所です。
  • データウェアハウスエリアは、ETLパイプライン内のデータの最終的な宛先です。データの分析を行い、有益な洞察を得てより良いビジネスの意思決定を行うために使用されます。

ETLデータパイプラインのアーキテクチャは、階層化されています。各サブシステムは重要であり、順次、各サブシステムが次のサブシステムにフィードされるまでデータが宛先に到達します。

ETLデータパイプラインのアーキテクチャ | 出典:著者
  1. データの発見:データは、データベース、ファイルシステム、API、ストリーミングソースなど、さまざまなタイプのシステムから取得できます。また、ETLに適したデータであるかどうかを理解するために、データプロファイリング(データの発見)が必要です。これには、データの構造、関係、および内容を調べることが含まれます。
  1. インジェスション:さまざまなデータソースからデータをステージングエリアまたはデータレイクに取り込むことができます。データの抽出は、API、直接的なデータベース接続、またはファイル転送などのさまざまな技術を使用して行うことができます。データは一度にすべて抽出することもできます(DBからの抽出)、または増分的に抽出することもできます(APIを使用した抽出)、または変更がある場合にのみ抽出することもできます(トリガーでクラウドストレージ(S3)からデータを抽出する)。
  1. 変換:このステージでは、データをクリーニングし、エンリッチし、ターゲットシステムの要件に合わせてデータを整形します。データは、フィルタリング、集約、結合、複雑なビジネスルールの適用など、さまざまな技術を使用して操作することができます。データを操作する前に、データをクリーニングする必要があります。これには、重複エントリの削除、関係のないデータの削除、誤ったデータの特定などが含まれます。これにより、MLアルゴリズムのデータの精度と信頼性が向上します。
  1. データの保存:変換されたデータをMLモデルで使用できる適切な形式で保存します。ストレージシステムはデータベース、データウェアハウス、またはクラウドベースのオブジェクトストアなど、システムの要件に応じて構成することができます。データは、構造化または非構造化形式で保存することができます。
  2. 特徴エンジニアリング:特徴エンジニアリングでは、生データを選択し、変換し、組み合わせて、MLモデルで使用できる意味のある特徴を作成します。これは、モデルの精度と解釈性に直接影響を与えます。効果的な特徴エンジニアリングには、ドメイン知識、創造性、反復的な実験が必要です。それにより、特定の問題に対して最適な特徴のセットを特定することができます。

では、話し合ったツールの1つを使用して、自分自身のETLパイプラインを構築しましょう!

AirFlowを使用したETLパイプラインの構築

想像してみてください。私たちは、花を3つの異なるカテゴリ(セトサ、バーシコロール、ヴァージニカ)に分類することができる機械学習分類モデルを作成したいとします。私たちは、データセットを使用することになりますが、例えば毎週更新されるとします。これはバッチETLデータパイプラインの仕事のように思えます。

バッチETLデータパイプラインを設定するために、私たちはApache Airflowを使用します。Apache Airflowはオープンソースのワークフローマネージメントシステムであり、ETLワークフローの記述、スケジュール、監視を簡単に行うことができます。以下の手順に従って、独自のバッチETLパイプラインを設定してください。

以下は、AirFlowでETLワークフローを作成するために使用できる一般的な手順です:

  1. Airflow環境のセットアップ:システムにAirflowをインストールして設定します。インストール手順については、こちらを参照してください。
  2. DAGの定義とワークフローの設定:Airflowで、私たちのML分類器のETLパイプラインをオーケストレートするための有向非巡回グラフ(DAG)を定義します。DAGには、タスクのコレクションとそれらの間の依存関係が含まれます。この演習では、タスクを定義するためにPythonオペレータを使用し、DAGのスケジュールを「None」として設定します。パイプラインは手動で実行されるためです。

以下のコードでDAGファイル「airflow_classification_ml_pipeline.py」を作成します:

from datetime import timedelta
# DAGオブジェクト。これを使用してDAGをインスタンス化する必要があります。
from airflow import DAG

from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago


from python_functions import download_dataset 
from python_functions import data_preprocessing
from python_functions import ml_training_classification

with DAG(
    dag_id='airflow_classification_ml_pipeline', ## DAGの名前
    default_args=args,
    description='Classification ML pipeline',
    schedule = None,  
 ) as dag:

# タスク1 - データセットのダウンロード
 task_download_dataset = PythonOperator(
 task_id='download_dataset',
 python_callable=download_dataset
 )

 # タスク2 - データの変換
 task_data_preprocessing = PythonOperator(
 task_id='data_preprocessing',
 python_callable=data_preprocessing
 )

 # タスク3 - MLモデルのトレーニング
 task_ml_training_classification = PythonOperator(
 task_id='ml_training_classification',
 python_callable=ml_training_classification
 )

# ワークフロープロセスを定義
task_download_dataset >> task_data_preprocessing >> task_ml_training_classification

  1. ETLタスクを実装する: DAGで定義された各タスクを実装します。これらのタスクには、scikit-learnデータセットパッケージからirisデータセットをロードし、データを変換し、洗練されたデータフレームを使用して機械学習モデルを作成するなどの処理が含まれます。

すべてのETLタスクを含むPython関数ファイル「etl_functions.py」を作成してください。

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score

import pandas as pd
import numpy as np 

def download_dataset():

    iris = load_iris()
    iris_df = pd.DataFrame(
              data = np.c_[iris['data'], iris['target']],
              columns = iris['feature_names'] + ['target'])

    pd.DataFrame(iris_df).to_csv("iris_dataset.csv")


def data_preprocessing():

    iris_transform_df = pd.read_csv("iris_dataset.csv",index_col=0)
    cols = ["sepal length (cm)","sepal width (cm)","petal length (cm)","petal width (cm)"]
    iris_transform_df[cols] = iris_transform_df[cols].fillna(
                              iris_transform_df[cols].mean())
    iris_transform_df.to_csv("clean_iris_dataset.csv")

  1. パイプラインのモニタリングと管理: DAGとワークフローコードが準備できたので、AirflowサーバーでETL全体を監視できるようになります。

1. AirflowサーバーでDAGをリストアップします。

AirflowサーバーにリストアップされたDAG | 出典: 著者

2. ワークフローグラフを確認し、パイプラインを実行します(DAGをトリガーします):

ワークフローグラフ | 出典: 著者

3. モニタリングとログの確認:DAGをトリガーした後、UIでDAGの進行状況を監視できます。以下の画像は、すべての3つのステップが成功したことを示しています。

UIでDAGの進行状況を監視する | 出典: 著者

UIでガントチャートを使用して各タスクがかかった時間を確認する方法もあります:

ガントチャートを使用して各タスクの所要時間を確認する | 出典: 著者

この演習では、DAGを使用してETLワークフローを作成し、スケジュールを設定しなかったが、好きなスケジュールを設定してパイプラインを監視することができます。また、頻繁に更新されるデータセットを使用して、スケジュールを設定するかどうかを判断することもできます。

さらに、異なるオペレータやエグゼキュータを試して、Airflowのオーケストレーションをスケールアップすることもできます。リアルタイムのETLデータパイプラインを探索するには、このチュートリアルに従ってください。

MLにおけるETLパイプラインのベストプラクティス

データ駆動型の組織にとって、堅牢なETLパイプラインは不可欠です。これには以下が必要です:

  • 1
    データソースの効果的な管理
  • 2
    データ品質と精度の確保
  • 3
    効率的な処理のためのデータフローの最適化

データ分析と機械学習モデルの統合により、組織は高度な能力を持つようになり、精度を向上させて需要を予測することができます。

機械学習(ML)アプリケーションのETL(抽出、変換、ロード)パイプラインを構築するためのいくつかのベストプラクティスがあります。以下にいくつかの重要なポイントを示します:

  • 要件を明確に理解して開始します。機械学習モデルをサポートするために必要なデータソースを特定します。適切なデータ型を使用していることを確認します。これにより、データが正しくフォーマットされていることが確認され、MLアルゴリズムがデータを効率的に処理するために重要です。データのサブセットから始め、徐々にスケールアップすることで、さらなるタスクやプロセスを確認することができます。
  • データから不正確さや矛盾を修正または削除します。これは重要です、なぜならMLアルゴリズムはデータの矛盾や外れ値に敏感に反応する場合があるからです。
  • データへのアクセス制御を実装し、データへの役割ベースのアクセスを確保します。
  • 可能な場合は、分散ファイルシステム、並列処理、ステージングテーブルやキャッシング技術を活用します。これにより、データの処理が高速化され、パイプラインの最適化が支援されます。これは最終的にMLモデルのパフォーマンスを向上させるのに役立ちます。
  • データ駆動型のワークフローをスケジュール化または自動化して、データをさまざまなソース間で移動および変換します。
  • ETLデータを監視およびログすることで、機械学習モデルで使用されるデータを追跡できます。たとえば、MLモデルのパフォーマンスに影響を与える可能性のあるデータの変動を追跡することができます。
  • ETLコードベースのバージョン管理を維持します。これにより、変更を追跡し、他の開発者と協力し、パイプラインが予想どおりに実行され、モデルのパフォーマンスに影響を与えないように確認できます。
  • クラウドベースのサービスを使用している場合は、ゼロからすべてを作成するのに時間を節約するため、ETLテンプレートを利用します。

結論

この記事では、MLにおけるETLデータパイプラインのさまざまな側面について説明しました。

  • 1
    ETLパイプラインは良い機械学習モデルを作成するために重要です。
  • 2
    データと要件に応じて、どのようにETLアーキテクチャをセットアップし、さまざまなタイプのETLデータパイプラインを使用できるかを判断します。
  • 3
    Airflowを使用したバッチETLパイプラインの構築。ETLプロセスを自動化し、ワークフローをログおよび監視することもできます。
  • 4
    スケーラブルで効率的なETLデータパイプラインの作成方法。

この記事が役に立つことを願っています。この記事とバッチパイプラインのハンズオン演習を参考にして、自分自身でパイプラインを作成できるはずです。記事で言及されているツールを選んで、旅を始めましょう。

学びを楽しんでください!

参考文献

  • https://neptune.ai/blog/deployment-of-data-and-ml-pipelines-crypto-industry 
  • https://towardsdatascience.com/building-etl-pipelines-for-beginners-17c3492d29d2 
  • https://www.analyticsvidhya.com/blog/2022/06/a-complete-guide-on-building-an-etl-pipeline-for-beginners/ 
  • https://www.projectpro.io/article/how-to-build-etl-pipeline-example/526 
  • https://hevodata.com/learn/steps-to-build-etl-pipeline/ 

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more