If you have any further questions or need assistance with anything else, feel free to ask!

「もしご質問等ございましたら、お気軽にお聞きください!」

データ管理

バッチデータ処理を行うためのVDKの使い方チュートリアル

Mika Baumeister氏の写真、Unsplashから

Versatile Data Kit(VDK)は、データ管理の複雑さを簡素化するために設計されたオープンソースのデータ取り込みおよび処理フレームワークです。 VDKはリアルタイムストリーミングを含むさまざまなデータ統合タスクを処理できますが、この記事ではバッチデータ処理での使用方法に焦点を当てています。

この記事では以下の内容をカバーします:

  • バッチデータ処理の紹介
  • VDKでのバッチ処理パイプラインの作成と管理
  • VDKでのバッチデータ処理のモニタリング

1 バッチデータ処理の紹介

バッチデータ処理は、指定された間隔で大量のデータを処理する方法です。バッチデータは以下のような特徴を持ちます:

  • 時間に依存しない:データは即時の処理を必要とせず、通常はリアルタイムの要件に敏感ではありません。ストリーミングデータとは異なり、バッチデータはスケジュールされた間隔で処理されるか、リソースが利用可能になった際に処理されます。
  • チャンク毎に分割可能:リソースを多く消費する単一の操作ではなく、バッチデータはより小さな管理しやすいセグメントに分割することができます。これらのセグメントは、データ処理システムの能力に応じて順次または並列に処理できます。

また、バッチデータはオフラインで処理することもできます。つまり、データソースや外部サービスへの一定の接続を必要としません。データソースが間欠的または一時的に利用できない場合には貴重な特徴です。

ELT(抽出、読み込み、変換)はバッチデータ処理の典型的なユースケースです。ELTには次の3つの主なフェーズがあります:

  • 抽出(E):データは異なる形式で複数のソースから抽出されます。これらのソースは構造化または非構造化の両方です。
  • 読み込み(L):データはデータウェアハウスなどのターゲット先に読み込まれます。
  • 変換(T):抽出されたデータは通常、クリーニング、調和、および共通の形式への変換などの前処理が必要です。

バッチデータ処理の概要を学んだので、次のステップに進みましょう:VDKでのバッチ処理パイプラインの作成と管理

2 VDKでのバッチ処理パイプラインの作成と管理

VDKでは、コンポーネントベースのアプローチを採用しており、データ処理パイプラインの迅速な構築が可能です。VDKの紹介については、以前の記事Versatile Data Kitの概要を参照してください。この記事では、既にVDKをコンピュータにインストール済みであることを前提としています。

VDKでのバッチ処理パイプラインの動作原理を説明するために、ELTタスクを実行する必要があるシナリオを考えてみましょう。

VDKで、ヨーロッパにある文化遺産の有名な集積地であるヨーロッパナから、フィンセント・ファン・ゴッホの絵画を取り込んで処理する必要があるとします。ヨーロッパナは、すべての文化遺産オブジェクトをパブリックなREST APIを通じて提供しています。ヨーロッパナでは、フィンセント・ファン・ゴッホに関して700点以上の作品を提供しています。

以下の図は、このシナリオでのバッチデータ処理のステップを示しています。

画像の著者: 著者

各ポイントを個別に調査してみましょう。このシナリオを実装するための完全なコードは、VDK GitHubリポジトリで入手できます。

2.1 抽出とロード

このフェーズでは、VDKジョブがEuropeana REST APIを呼び出して生データを抽出します。具体的には、次の3つのジョブを定義します:

  • job1 — 既存のテーブルを削除する(存在する場合)
  • job2 — 新しいテーブルを作成する
  • job3 — REST APIから直接テーブルの値を収集する

この例では、Europeana REST APIにアクセスするために正常に動作するためにアクティブなインターネット接続が必要です。この操作はバッチ処理です。データは一度だけダウンロードされ、統合が必要ありません。

抽出したデータはテーブルに保存されます。このタスクの難しさは、REST APIとのマッピングを構築することです。これは、job3で行われます。

job3の作成には、単純にこのマッピングを実行するためのPythonコードを記述するだけです。ただし、抽出したファイルをローカルファイルに保存する代わりに、VDKの関数(job_input.send_tabular_data_for_ingestion)を呼び出してファイルをVDKに保存します。次のコードの断片に示されています:

import inspectimport loggingimport osimport pandas as pdimport requestsfrom vdk.api.job_input import IJobInputdef run(job_input: IJobInput):    """    Download datasets required by the scenario and put them in the data lake.    """    log.info(f"Starting job step {__name__}")    api_key = job_input.get_property("api_key")    start = 1    rows = 100    basic_url = f"https://api.europeana.eu/record/v2/search.json?wskey={api_key}&query=who:%22Vincent%20Van%20Gogh%22"    url = f"{basic_url}&rows={rows}&start={start}"    response = requests.get(url)    response.raise_for_status()    payload = response.json()    n_items = int(payload["totalResults"])    while start < n_items:        if start > n_items - rows:            rows = n_items - start + 1        url = f"{basic_url}&rows={rows}&start={start}"        response = requests.get(url)        response.raise_for_status()        payload = response.json()["items"]        df = pd.DataFrame(payload)        job_input.send_tabular_data_for_ingestion(            df.itertuples(index=False),            destination_table="assets",            column_names=df.columns.tolist(),        )        start = start + rows

完全なコードについては、GitHubの例を参照してください。Europeanaからデータをダウンロードするには、無料のAPIキーが必要です。

抽出フェーズで生成される出力は、生の値を含むテーブルです。

2.2 変換

このフェーズでは、データのクリーニングと関連情報のみの抽出が行われます。VDKで関連するジョブを実装するために、2つのジョブを使用することができます:

  • job4 — 既存のテーブルを削除する(存在する場合)
  • job5 — クリーニングされたテーブルを作成する

job5では、次のコードの断片に示されているように、単純にSQLクエリを記述するだけです:

CREATE TABLE cleaned_assets AS (    SELECT        SUBSTRING(country, 3, LENGTH(country)-4) AS country,        SUBSTRING(edmPreview, 3, LENGTH(edmPreview)-4) AS edmPreview,        SUBSTRING(provider, 3, LENGTH(provider)-4) AS provider,        SUBSTRING(title, 3, LENGTH(title)-4) AS title,        SUBSTRING(rights, 3, LENGTH(rights)-4) AS rights    FROM assets)

VDKでこのジョブを実行すると、処理された値が含まれるcleaned_assetという名前の別のテーブルが生成されます。最後に、クリーニングされたデータをどこかで使用する準備が整いました。この場合、抽出した絵画を表示するWebアプリを作成できます。このタスクを実行するための完全なコードは、VDK GitHubリポジトリで見つけることができます。

3 VDKでのバッチデータ処理のモニタリング

VDKは、データジョブを監視するためのグラフィカルユーザーインターフェースであるVDK UIを提供しています。VDK UIをインストールするには、公式VDKビデオのこのリンクに従ってください。以下の図は、VDK UIのスナップショットを示しています。

Image by Author

メインページは2つあります:

  • 探索:このページでは、ジョブの実行成功率、過去24時間の実行が失敗したジョブ、過去24時間で最も失敗したジョブなど、データジョブを探索することができます。
  • 管理:このページではジョブの詳細を表示できます。列による並べ替え、複数のパラメータでの検索、一部の列でのフィルタリング、特定のジョブのソースの表示、他の列の追加などが可能です。

以下の公式VDKビデオをご覧いただき、VDK UIの使用方法を学んでください。

概要

おめでとうございます!VDKでバッチデータ処理を実装する方法を学びました!生データを取り込み、操作し、最終的に目的に応じて使用するだけです!他にも多くの例がVDK GitHubリポジトリにあります。

VDKで最新のデータ処理の開発とベストプラクティスについて最新情報を得てください。引き続き探求し、専門知識を磨いてください!

Versatile Data Kitの概要

データエンジニアの作業を効率化するフレームワーク、Versatile Data Kitの始め方

towardsdatascience.com

Versatile Data Kitで欠損値を処理する方法

欠損値を処理するためにVDKを使用してデータパイプラインを構築するチュートリアル

towardsdatascience.com

生データからクリーンなデータベースへ:Versatile Data Kitの詳細

VMwareが最近リリースしたVersatile Data Kit(フレームワーク)とTrino DBを使用した完全な例

towardsdatascience.com

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more