「Pandasを使用したSpark上のPythonの並列化 並行性のオプション」

「スパーク上でのPythonの並列化と並行性オプションの実現にPandasを活用する方法」

Pandasと一緒にSparkのメリットを活用する

Florian Steciukによる写真

以前の職務では、Managed Servicesのお客様の数千のディスクに対して将来のディスクストレージの使用量を予測するための内部プロジェクトに取り組んでいました。各ディスクは個別の使用パターンに基づいており、これはディスクごとに将来の使用量を予測するために個別の機械学習モデルが必要であることを意味します。この予測を実行し、適切なアルゴリズムを選択すること自体が課題ですが、それをスケールで実行するには別の問題があります。

より高度なインフラストラクチャを活用するために、連続的な予測から離れて予測の処理を並列化することで操作を高速化することができます。このブログ記事では、Pandas UDFと「concurrent.futures」モジュールという2つの並行処理のアプローチを比較し、それぞれの使用ケースを特定することを目指しています。

課題

Pandasは、データ分析領域でPythonでデータセットを扱うためのゲートウェイパッケージです。DataFrameを通じて、データのプロファイリングやデータ品質の評価、探索的データ分析の実行、データの記述的な可視化、将来のトレンドの予測などを行うことができます。

これは確かに素晴らしいツールですが、Pythonの単一スレッドの特性は、大きなデータセットで作業する場合や複数のデータのサブセットで同じ分析を実行する必要がある場合にスケーリングが悪くなる可能性があります。

ビッグデータの世界では、優れたパフォーマンスを維持するためにスケーラビリティに重点を置いたより洗練されたアプローチが期待されます。Sparkは、他の言語と共に、分散処理の利点を取り入れることで、より大規模で複雑なデータ構造を処理するのに役立ちます。

この具体的な例に入る前に、データ処理における並行性の必要性を要約するいくつかの使用ケースを一般化することができます:

  • 複数のデータファイルに一様な変換を適用する
  • 複数のデータのサブセットに対して将来の値を予測する
  • 機械学習モデルのハイパーパラメータを調整し、最も効率的な設定を選択する

上記のような作業負荷を実行する要件をエスカレートさせると、PythonとPandasではこのデータを逐次的に処理するのが最も直接的なアプローチです。この例では、1つのディスクごとに上記のフローを実行します。

データ

この例では、数千のディスクのデータがあり、時間にわたって記録された空き容量が表示されており、各ディスクの将来の空き容量値を予測したいとします。

より明確にするために、1,000のディスクを持つcsvファイルを提供します。各ディスクにはGB単位で記録された1か月間分の履歴データが含まれています。これは、スケールで予測するさまざまなアプローチの影響を見るための十分なサイズです。

Image by Author: Example DataFrame

このような時系列の問題では、過去のデータを使用して将来のトレンドを予測し、各ディスクに対してどの機械学習(ML)アルゴリズムが最適であるかを理解したいと考えています。AutoMLなどのツールは、1つのデータセットに対して適切なモデルを見つける際には非常に便利ですが、ここでは1,000のデータセットに対処しているため、これは過剰です。

この場合、比較したいアルゴリズムの数を2つに制限し、各ディスクに対して最適なモデルを使用して予測するための最適なアルゴリズムをRoot Mean Squared Error(RMSE)を使用して評価します。RMSEに関する詳細な情報はこちらで確認できます。これらのアルゴリズムは次のとおりです:

  • 線形回帰
  • Fbprophet(より複雑な線にデータをフィットさせる)
  • Facebookの時系列予測モデル。
  • 季節性のためのハイパーパラメータを持つより複雑な予測に対応しています。

1つのディスクの将来の空き容量を予測する場合、すべてのコンポーネントが準備できています。以下のフローに従います:

画像:作者によるデータライフサイクル

これをスケールアウトし、複数のディスク(例:1,000個)でこのフローを実行したいと思います。

レビューの一環として、さまざまなスケールで異なるアルゴリズムのRMSE値の計算パフォーマンスを比較します。そのため、このシミュレーションのために最初の100個のディスクのサブセットを作成しました。

これにより、データセットのサイズによるパフォーマンスの興味深い洞察が得られます。さまざまな複雑さの操作を実行します。

並行性の導入

Pythonは有名なシングルスレッドであり、一度に利用可能なすべての計算リソースを使用しません。

その結果、3つのオプションがあります:

  1. 予測を順次計算するためにforループを実装し、シングルスレッドのアプローチを取る。
  2. Pythonのfuturesモジュールを使用して、複数のプロセスを同時に実行する。
  3. Pandas UDF(ユーザー定義関数)を使用して、Pandasの構文と互換性のあるパッケージを維持しながら、PySparkで分散コンピューティングを活用する。

私はさまざまな環境条件で比較的詳細な比較を行いたかったので、単一ノードのDatabricksクラスタと4つのワーカーノードを持つ別のDatabricksクラスタを使用して、Pandas UDFアプローチにおけるSparkの利用性を活用しています。

各ディスクについて、以下のアプローチを評価するために次の手順を実行します:

  • データをトレーニングセットとテストセットに分割する
  • トレーニングセットを入力として使用し、テストセットの日付に対して予測を行う
  • 予測値とテストセットの実際の値を比較して、Root Mean Squared Error(RMSE)スコアを取得する

予測を含む修正されたDataFrameと、各ディスクとアルゴリズムのRMSEスコアを含むDataFrameを出力することになります。

それらの関数は以下のようになります:

上記の3つのアプローチを比較します。いくつかの異なるシナリオがあるため、以下の表に結果を収集します:

次の組み合わせで:

手法

  • 順次
  • futures
  • Pandas UDF

アルゴリズム

  • 線形回帰
  • Fbprophet
  • 結合(それぞれのディスクごとの両方のアルゴリズム) – 比較を収集するための最も効率的な方法です。

クラスターモード

  • 単一ノードクラスター
  • 4つのワーカーを持つ標準クラスター

ディスクの数

  • 100
  • 1000

結果は、このブログの付録で次のような形式で提示されていますので、興味があればさらにご覧いただけます。

手法

手法1:順次

手法2:concurrent.futures

このモジュールの使用方法には2つのオプションがあります。メモリ集約型の操作の並列化(ThreadPoolExecutorを使用)またはCPU集約型の操作(ProcessPoolExecutorを使用)です。これについての説明は、以下のブログで見つけることができます。私たちが取り組むCPU集約型の問題に対しては、ProcessPoolExecutorが適しています。

手法3:Pandas UDF

これからギアを切り替えて、Sparkを使用して分散コンピューティングを利用して効率を向上させます。Databricksを使用しているため、Sparkのほとんどの設定は自動的に行われますが、データの一般的な処理方法にいくつかの調整があります。

まず、データをPySpark DataFrameにインポートします:

私たちはPandas grouped map UDF(PandasUDFType.GROUPED_MAP)を利用する予定です。DataFrameを渡し、DataFrameを返すためです。Apache Spark 3.0以降、このデコレータを明示的に宣言する必要はありません!

PySparkでは、Pandas UDFのfbprophet、回帰、およびRMSE関数をDataFrameの構造に合わせて分割する必要がありますが、これを達成するために大規模なコードのオーバーホールは必要ありません。

その後、applyInPandasを使用して結果を生成することができます。

注:上記の例は、可読性のために線形回帰を使用したプロセスのデモンストレーションだけを示しています。詳しいデモンストレーションについては、以下の完全なノートブックをご覧ください。

結果の解釈

異なる手法と異なる環境設定に対してプロットを作成し、データをアルゴリズムとディスクの数でグループ化して簡単に比較しました。

表形式の結果は、この投稿の付録にありますので、ご注意ください。

以下に私がまとめたこれらの結論のハイライトを示します:

  • 予測するディスクの数が1,000個と100個では、(一般的に)時間のかかるプロセスです。
  • 順次実行アプローチは一般的に最も遅く、効率的なリソースの利用を行うことができません。
  • Pandas UDFは、小さな単純なタスクではかなり効率が悪いです。データの変換のオーバーヘッドはより高価ですが、これを補うために並列化が役立ちます。
  • 順次およびconcurrent.futuresアプローチは、Databricksで利用できるクラスタリングを無視しており、追加の計算を見逃しています。

まとめ

コンテキストは、どのアプローチが最も成功するかに大きく関与しますが、DatabricksとSparkはしばしばビッグデータの問題に使用されるため、Pandas UDFを使用した大規模で複雑なデータセットにおける利益がわかります。

小さなデータセットに対してSpark環境を使用する場合は、より小さな(そして安価な!)コンピューティング構成でも同じ効率で行うことができます。これは、concurrent.futuresモジュールの使用によって示されていますので、ソリューションの設計時にこれを念頭に置いてください。

PythonとPandasに詳しい方であれば、初心者向けのチュートリアルで見られる順次のループアプローチからの転換は、負担にはならないはずです。

私たちはこれを投稿で調査していませんが、最新のpyspark.pandasモジュールには現在のバージョンとの互換性の問題が見つかっています。それは将来より一般的になることが予想され、注目すべきアプローチの1つです。このAPIは(Databricksチームによって開発されましたが、現在はリタイアしています)Pandasの利便性とSparkの潜在的な利点を活用しています。

ここで実現しようとしている効果を示すために、実際には予測はせずに、各ディスクに対して生成されたRMSE値を調べるだけです。ここで設定したフレームワークは、評価メトリック(およびディスクの物理的な制約など、各ケースで適切かどうかを判断するための他のロジック)が適切かどうかを決定し、可能な場合には予測するためのアルゴリズムを使用して将来の値を予測するためのロジックを適用するためにも同じように適用することができます。

いつものように、ノートブックは私のGitHubで見つけることができます。

付録

元の投稿はhttps://blog.coeo.comで公開され、この再掲用のために適応されています。

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more