リアルタイムでデータを理解する

Understanding data in real-time

ハンズオンチュートリアル

bytewaxとydata-profilingを使った

このブログ投稿では、オープンソースのストリーミングソリューションであるbytewaxydata-profilingを組み合わせて活用する方法について説明します。ストリーミングフローの品質を向上させるためにどのように組み合わせることができるかを紹介します。準備はいいですか?

ストリーム処理は、データがストレージに保存される前や途中でのリアルタイムデータ分析を可能にし、ステートフルまたはステートレスのいずれかで行われます。

ステートフルなストリーム処理は、リアルタイムの推奨、パターン検出、複雑なイベント処理など、処理に過去の履歴が必要な場合に使用されます(ウィンドウ、キーによる結合など)。

ステートレスなストリーム処理は、メールのマスキングやタイプの変換など、ストリーム内の他のデータポイントの知識を必要としないインライン変換に使用されます。

Photo by Markus Spiske on Unsplash

全体的に、データストリームは産業界で広く使用され、不正検出、患者モニタリング、イベント予測保守などのユースケースに適用されています。

すべてのデータストリームが考慮しなければならない重要な要素はデータの品質です

伝統的なモデルとは異なり、データの品質は通常、データウェアハウスやダッシュボードソリューションの作成時に評価されますが、ストリーミングデータでは継続的なモニタリングが必要です。

データ品質を収集から下流アプリケーションへのフィードまで維持することは非常に重要です。なぜなら、データ品質の悪さは組織にとって高いコストとなるからです:

「ほとんどの企業にとって、データの品質の悪さは収益の15%から25%にもなります。(…) データ品質に立ち向かうことで、これらのコストの2/3を削減することができます。」

— Thomas C. Redman, “Getting in Front on Data Quality”の著者

本記事では、bytewaxydata-profilingを組み合わせて、ストリーミングフローのプロファイリングと品質向上をどのように行うかを紹介します!

Bytewaxを使用したデータ専門家向けのストリーム処理

Bytewaxは、Python開発者向けに特別に設計されたOSSストリーム処理フレームワークです。

これにより、Flink、Spark、Kafka Streamsと同様の機能を持つストリーミングデータパイプラインとリアルタイムアプリケーションを構築できますが、使いやすくなじみのあるインターフェースとPythonエコシステムとの100%の互換性を提供します。

組み込みコネクタまたは既存のPythonライブラリを使用することで、リアルタイムおよびストリーミングデータソース(Kafka、RedPanda、WebSocketなど)に接続し、変換されたデータをさまざまな下流システム(Kafka、パーケットファイル、データレイクなど)に書き出すことができます。

変換には、Bytewaxがmap、ウィンドウ処理、集計メソッドなどを備えているため、ステートフルおよびステートレスな変換を容易に行えますし、復元やスケーラビリティなどの使い慣れた機能も提供します。

Bytewaxは、Pythonを最初に考えたデータ指向のデータストリーム体験を提供し、データエンジニアやデータサイエンティスト向けに特に設計されています。ユーザーは、SparkやFlinkのようなJVMベースのストリーミングプラットフォームを学ぶ必要なく、自分のニーズを満たすために必要なカスタマイズを作成できます。

Bytewaxは、生成的AIの埋め込みパイプライン、データストリーム内の欠損値の処理、ストリーミングコンテキストでの言語モデルの使用による金融市場の理解など、多くのユースケースに適しています。ユースケースのインスピレーションやドキュメント、チュートリアル、ガイドなどの詳細情報については、bytewaxのウェブサイトをご覧ください。

データストリームのためのデータプロファイリングの重要性

データプロファイリングは、いかなる機械学習タスクの成功への鍵であり、データの構造、振る舞い、品質を徹底的に理解するというステップを指します。

データプロファイリングは、データの形式や基本的な記述子(サンプルの数、特徴の数/種類、重複する値など)に関連する側面を分析することを意味します。また、欠損データやバランスの取れていない特徴の存在など、データ収集や処理中に生じる他の複雑な要因を分析することも含まれます。

高いデータ品質基準を確保することは、すべてのドメインや組織にとって重要ですが、特に連続的なデータを出力するドメインにとっては特に重要です。このようなドメインでは状況が急速に変化する場合があり、即座の対応が必要となる場合があります(例:医療モニタリング、株価、大気品質ポリシー)。

多くのドメインでは、データプロファイリングは、データベースに格納された過去のデータを考慮した探索的データ分析のレンズから使用されます。一方、データストリームの場合、データプロファイリングは、ストリームに沿って継続的に検証と品質管理が行われるため、重要な役割を果たします。データは異なる時間枠やプロセスの段階でチェックする必要があります。

自動プロファイリングをデータフローに組み込むことで、データの現在の状態に関するフィードバックをすぐに得ることができ、データの一貫性と整合性(破損した値や変更された形式など)または短期間に発生するイベント(データのドリフト、ビジネスルールや結果からの逸脱など)に関連する潜在的な重大な問題に対して警告を受けることができます。

「実世界のドメインでは、マーフィーの法則が必ず発生し、「すべてが確実にうまくいく」と言える状況では、自動プロファイリングは多くの問題解決を助け、システムを運用から外す必要がなくなるかもしれません!」

データプロファイリングに関しては、ydata-profilingは、表形式または時系列データのどちらにも一貫して人気があります。その理由は明らかです – 分析と洞察のためのコードの一行で広範な分析と洞察が得られるからです。

複雑で時間のかかる操作は裏で行われます:ydata-profilingは、データに含まれる特徴のタイプを自動的に検出し、数値またはカテゴリカルの特徴タイプに応じて、プロファイリングレポートに表示されるサマリー統計と可視化を調整します。

データ中心の分析を促進するため、このパッケージはまた、特徴間の既存の関係に焦点を当て、それらのペア間の相互作用と相関を徹底的に評価し、重複または一定値からスキューおよびバランスの取れていない特徴まで、データ品質の警告を提供します。

これは、最小限の努力でデータの品質の360ºビューを提供します。

プロファイリングレポート:潜在的なデータ品質の問題を強調。著者による画像。

すべてをまとめる:bytewaxとydata-profiling

プロジェクトを開始する前に、まずPythonの依存関係を設定し、データソースを構成する必要があります。

まず、bytewaxydata-profilingパッケージをインストールします(必要に応じて仮想環境を使用することをお勧めします – 追加のガイダンスが必要な場合は、この手順を確認してください!)

次に、さまざまなIoTデバイスからの温度、湿度、一酸化炭素、液体石油ガス、煙、光、および動きの複数の測定値を含む環境センサーテレメトリデータセット(ライセンス – CC0:パブリックドメイン)をアップロードします。

本番環境では、これらの測定値は各デバイスによって連続的に生成されるため、入力はKafkaなどのストリーミングプラットフォームで期待される形式と同様になります。この記事では、ストリーミングデータでのみ見られるコンテキストをシミュレートするために、データをCSVファイルから1行ずつ読み取り、bytewaxを使用してデータフローを作成します。

(ちなみに、データフローは、有向非巡回グラフ(DAG)として記述できるデータパイプラインのことです)

まず、必要なインポートを行います

次に、データフローオブジェクトを定義します。その後、文字列を日時オブジェクトに変換し、データを(デバイスID、データ)の形式に再構築する関数を渡す、状態を持たないマップメソッドを使用します。

mapメソッドは、各データポイントに対して状態を持たない方法で変更を行います。データの形状を変更した理由は、次のステップでデータをグループ化し、すべてのデバイスではなく各デバイスごとにデータをプロファイリングするためです。

次に、bytewaxの状態保持機能を活用して、定義した期間内の各デバイスのデータを収集します。 ydata-profilingでは、時間の経過に伴うデータのスナップショットが必要であり、そのためにウィンドウ演算子を使用するのが最適な方法です。

ydata-profilingでは、特定のコンテキストに指定されたデータフレームに対して要約統計情報を生成することができます。例えば、この例では、各IoTデバイスや特定の時間枠に言及するデータのスナップショットを生成することができます。

スナップショットが定義されたら、ydata-profilingを利用するのは、分析したい各データフレームに対してProfileReportを呼び出すだけです。

この例では、画像をマップメソッド内の関数の一部としてローカルファイルに書き出しています。これらはメッセージングツールを介して報告することもできますし、将来的にはリモートストレージに保存することもできます。プロファイルが完了すると、データフローはいくつかの出力を期待するため、プロファイル関数からマップステップで渡されたプロファイルされたデバイスとプロファイルされた時間を印刷するために組み込みのStdOutputを使用できます。

Bytewaxデータフローの実行方法は複数あります。この例では同じローカルマシンを使用していますが、Bytewaxは複数のPythonプロセス、複数のホスト、Dockerコンテナ、Kubernetesクラスタなどで実行することもできます。

この記事ではローカルセットアップを続けますが、パイプラインが本番環境に移行する準備が整ったら、Kubernetesデータフローデプロイメントを管理する補助ツールwaxctlをチェックすることをお勧めします。

データ品質を検証し、スキーマやデータ形式の変更をチェックし、異なるデバイスや時間枠間でデータの特性を比較するために、プロファイリングレポートを使用できます。

実際には、2つのデータプロファイル間の違いを明示的に示す比較レポート機能を活用することができます。これにより、重要なパターンを検出したり、調査する必要がある問題を容易に検出したりすることができます。

自分自身のデータストリームを探索する準備ができましたか?

データストリームの検証は、データ品質の問題を継続的に特定し、異なる時間のデータの状態を比較するために重要です。

医療、エネルギー、製造、エンターテイメントの組織にとって、連続したデータストリームを扱うことは、データの品質評価からデータプライバシーまでのデータガバナンスのベストプラクティスを確立するための重要な要素です。

これには、データのスナップショットの分析が必要であり、この記事で紹介したように、bytewaxydata-profilingを組み合わせることでシームレスに実現できます。

Bytewaxは、データストリームをスナップショットに処理し、構造化するために必要なすべてのプロセスを処理します。その後、ydata-profilingを使用してデータの特性に関する包括的なレポートを通じて要約し、比較することができます。

受信したデータを適切に処理し、プロファイリングすることで、データのスキーマや形式のエラーの修正から、実世界の活動から派生する追加の問題のハイライトや緩和(例:不正行為や侵入/脅威の検出)や機器の故障、ビジネスルールとの不一致など、重要なイベントの検出(データドリフトやアライメントのズレ)まで、さまざまなドメインでの使用事例が可能になります。

これで、データストリームの探索を始める準備が整いました!他にも見つけた使用事例があれば教えてください。また、ご質問や提案があれば、コメントやData-Centric AI Communityでお気軽にご連絡ください!では、そちらでお会いしましょう!

謝辞

この記事は、Fabiana Clemente(ydata-profilingの開発)、Zander MathesonとOli Makhasoeva(bytewaxの開発)のサポートを受けて書かれました。 OSSパッケージの詳細については、それぞれのドキュメントであるydata-profiling docsとbytewax docsをご覧ください。

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more