Panderaを使用したPySparkアプリケーションのデータ検証

Panderaを使用したPySparkアプリケーションのデータ検証

 

データプラクティショナーであれば、データの検証が正確性と一貫性を確保する上で非常に重要であることを理解しているでしょう。特に大規模なデータセットやさまざまなソースからのデータを扱う場合には、これが特に重要です。しかし、Pandera Pythonライブラリを使用すると、データの検証プロセスを効率化し自動化することができます。Panderaは、スキーマとデータの検証のタスクをシンプルにするために緻密に作成されたオープンソースライブラリです。Pandasの堅牢性と汎用性を基にしており、データの検証を目的とした直感的で表現力豊かなAPIを導入しています。

本記事では、Panderaの主な機能を簡単に紹介し、最新のリリース(Pandera 0.16.0)以降、ネイティブのPySpark SQLと統合されたデータ処理ワークフローとの統合方法を説明します。

Panderaは、Pandas、pyspark.pandas、Daskなどの他の人気のあるPythonライブラリと連携するように設計されています。これにより、既存のデータ処理ワークフローに簡単にデータの検証を組み込むことができます。最近まで、PanderaはPySpark SQLのネイティブサポートが不足していましたが、QuantumBlackのチーム(AI by McKinsey)であるIsmail Negm-PARI、Neeraj Malhotra、Jaskaran Singh Sidana、Kasper Janehag、Oleksandr Lazarchuk、およびPanderaの創設者であるNiels Bantilanからなるチームが、ネイティブのPySpark SQLサポートを開発し、Panderaに貢献しました。本記事のテキストも、チームによって準備され、以下のように彼らの言葉で書かれています。

 

Panderaの主な機能

 

Panderaを使用してデータを検証する方法に慣れていない場合は、Khuyen Tranの「Panderaを使用してpandas DataFrameを検証する」を参照することをおすすめします。ここでは、シンプルで直感的なAPI、組み込みの検証関数、およびカスタマイズの主な機能と利点を簡単に説明します。

 

シンプルで直感的なAPI

 

Panderaの特徴の一つは、シンプルで直感的なAPIです。読みやすく理解しやすい宣言的な構文を使用してデータスキーマを定義することができます。これにより、効率的かつ効果的なデータ検証コードを簡単に記述することができます。

以下は、Panderaでスキーマ定義の例です:

class InputSchema(pa.DataFrameModel):
   year: Series[int] = pa.Field()
   month: Series[int] = pa.Field()
   day: Series[int] = pa.Field()

 

組み込みの検証関数

 

Panderaは、データの検証を行うための一連の組み込み関数(一般的にはチェックと呼ばれることが多い)を提供しています。Panderaスキーマに対してvalidate()を呼び出すと、スキーマとデータの両方の検証が行われます。データの検証は、裏側でcheck関数を呼び出します。

以下は、Panderaを使用してデータフレームオブジェクトに対してデータのcheckを実行する簡単な例です。

class InputSchema(pa.DataFrameModel):
   year: Series[int] = pa.Field(gt=2000, coerce=True)
   month: Series[int] = pa.Field(ge=1, le=12, coerce=True)
   day: Series[int] = pa.Field(ge=0, le=365, coerce=True)

InputSchema.validate(df)

 

上記の例では、yearフィールドにgt=2000というチェックが定義されており、このフィールドのすべての値が2000より大きい必要があります。そうでない場合、Panderaによって検証の失敗が発生します。

以下は、Panderaでデフォルトで利用可能なすべての組み込みチェックのリストです:

eq: 指定されたリテラルと値が等しいかどうかをチェックします
ne: 指定されたリテラルと値が等しくないかどうかをチェックします
gt: 指定されたリテラルより値が大きいかどうかをチェックします
ge: 指定されたリテラル以上の値かどうかをチェックします
lt: 指定されたリテラルより値が小さいかどうかをチェックします
le: 指定されたリテラル以下の値かどうかをチェックします
in_range: 指定された範囲に値があるかどうかをチェックします
isin: 指定されたリテラルのリストに値があるかどうかをチェックします
notin: 指定されたリテラルのリストに値がないかどうかをチェックします
str_contains: 値が指定された文字列を含んでいるかどうかをチェックします
str_endswith: 値が指定された文字列で終わっているかどうかをチェックします
str_length: 値の長さが一致しているかどうかをチェックします
str_matches: 値が指定された文字列と一致するかどうかをチェックします
str_startswith: 値が指定された文字列で始まっているかどうかをチェックします

 

カスタムバリデーション関数

 

Panderaでは、組み込みのバリデーションチェックに加えて、独自のカスタムバリデーション関数を定義することができます。これにより、ユースケースに基づいて独自のバリデーションルールを定義する柔軟性が得られます。

例えば、次のようにデータのバリデーションのためにラムダ関数を定義することができます:

schema = pa.DataFrameSchema({
   "column2": pa.Column(str, [
       pa.Check(lambda s: s.str.startswith("value")),
       pa.Check(lambda s: s.str.split("_", expand=True).shape[1] == 2)
   ]),
})

 

PanderaへのPySpark SQLデータフレームのサポートの追加

 

PySpark SQLへのサポート追加の過程で、私たちは2つの基本原則に従いました:

  • インタフェースとユーザーエクスペリエンスの一貫性
  • PySparkのパフォーマンス最適化

まず、一貫性のトピックについて詳しく見ていきましょう。ユーザーの視点からは、選択したフレームワークに関係なく、一貫したAPIセットとインタフェースを持つことが重要です。Panderaは複数のフレームワークを提供しているため、PySpark SQL APIでも一貫したユーザーエクスペリエンスを持つことはさらに重要でした。

これを考慮して、PySpark SQLを使用してPanderaスキーマを次のように定義することができます:

from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.types as T
import pandera.pyspark as pa

spark = SparkSession.builder.getOrCreate()


class PanderaSchema(DataFrameModel):
       """テストスキーマ"""
       id: T.IntegerType() = Field(gt=5)
       product_name: T.StringType() = Field(str_startswith="B")
       price: T.DecimalType(20, 5) = Field()
       description: T.ArrayType(T.StringType()) = Field()
       meta: T.MapType(T.StringType(), T.StringType()) = Field()


data_fail = [
       (5, "Bread", 44.4, ["description of product"], {"product_category": "dairy"}),
       (15, "Butter", 99.0, ["more details here"], {"product_category": "bakery"}),
   ]

spark_schema = T.StructType(
       [
           T.StructField("id", T.IntegerType(), False),
           T.StructField("product", T.StringType(), False),
           T.StructField("price", T.DecimalType(20, 5), False),
           T.StructField("description", T.ArrayType(T.StringType(), False), False),
           T.StructField(
               "meta", T.MapType(T.StringType(), T.StringType(), False), False
           ),
       ],
   )
df_fail = spark_df(spark, data_fail, spark_schema)

 

上記のコードでは、PanderaSchemaは入力のPySparkデータフレームのスキーマを定義しています。異なるdtypesを持つ5つのフィールドがあり、idproduct_nameフィールドにデータチェックが強制されています。

class PanderaSchema(DataFrameModel):
       """テストスキーマ"""
       id: T.IntegerType() = Field(gt=5)
       product_name: T.StringType() = Field(str_startswith="B")
       price: T.DecimalType(20, 5) = Field()
       description: T.ArrayType(T.StringType()) = Field()
       meta: T.MapType(T.StringType(), T.StringType()) = Field()

 

次に、ダミーデータを作成し、spark_schemaで定義されたネイティブのPySpark SQLスキーマを強制しました。

spark_schema = T.StructType(
       [
           T.StructField("id", T.IntegerType(), False),
           T.StructField("product", T.StringType(), False),
           T.StructField("price", T.DecimalType(20, 5), False),
           T.StructField("description", T.ArrayType(T.StringType(), False), False),
           T.StructField(
               "meta", T.MapType(T.StringType(), T.StringType(), False), False
           ),
       ],
   )

df_fail = spark_df(spark, data_fail, spark_schema)

 

これは、スキーマとデータのバリデーションの失敗をシミュレートするために行われています。

以下はdf_failデータフレームの内容です:

df_fail.show()

   +---+-------+--------+--------------------+--------------------+
   | id|product|   price|         description|                meta|
   +---+-------+--------+--------------------+--------------------+
   |  5|  パン|44.40000|[製品の説明]|{製品カテゴリ: 乳製品}|
   | 15| バター|99.00000| [詳細はこちら]|{製品カテゴリ: 乳製品}|
   +---+-------+--------+--------------------+--------------------+

 

次に、Panderaのvalidate関数を呼び出して、次のようにスキーマとデータのレベルのバリデーションを実行できます:

df_out = PanderaSchema.validate(check_obj=df)

 

間もなく、df_outの内容を探索します。

 

PySparkのパフォーマンス最適化

 

私たちの貢献は、大規模なデータセットでの作業時に重要なPySparkのデータフレームとの最適なパフォーマンスを実現するために特別に設計されています。これは、PySparkの分散コンピューティング環境の固有の課題を処理するために必要です。

Panderaは、PySparkの分散コンピューティングアーキテクチャを使用して、大規模なデータセットを効率的に処理しながらデータの整合性と正確性を維持します。私たちは、大規模なデータセットのより迅速かつ効率的なバリデーションを可能にするために、PySparkのパフォーマンス向上のためにPanderaのカスタムバリデーション関数を書き直しました。これにより、高いボリュームでのデータエラーや整合性のリスクを低減することができます。

 

包括的なエラーレポート

 

Panderaには、Pythonの辞書オブジェクトの形式で詳細なエラーレポートを生成する機能を追加しました。これらのレポートは、validate関数から返されるデータフレームを介してアクセスできます。これらのレポートは、ユーザーの設定に応じて、スキーマとデータレベルのバリデーションの包括的な要約を提供します。

この機能は、開発者が迅速にデータ関連の問題を特定し、解決するために非常に有用です。生成されたエラーレポートを使用することで、チームはアプリケーション内のスキーマとデータの問題の包括的なリストを作成することができます。これにより、優先順位を付けて問題を解決することができます。

この機能は、現在PySpark SQL専用であり、Panderaでエラーレポートを使用する際にユーザーに向けた向上したエクスペリエンスを提供します。

上記のコード例では、sparkデータフレームにvalidate()を呼び出したことを思い出してください:

df_out = PanderaSchema.validate(check_obj=df)

 

それはデータフレームオブジェクトを返しました。アクセサを使用して、エラーレポートを次のように抽出することができます:

print(df_out.pandera.errors)

 

{
  "SCHEMA":{
     "COLUMN_NOT_IN_DATAFRAME":[
        {
           "schema":"PanderaSchema",
           "column":"PanderaSchema",
           "check":"column_in_dataframe",
           "error":"column 'product_name' not in dataframe Row(id=5, product='Bread', price=None, description=['description of product'], meta={'product_category': 'dairy'})"
        }
     ],
     "WRONG_DATATYPE":[
        {
           "schema":"PanderaSchema",
           "column":"description",
           "check":"dtype('ArrayType(StringType(), True)')",
           "error":"expected column 'description' to have type ArrayType(StringType(), True), got ArrayType(StringType(), False)"
        },
        {
           "schema":"PanderaSchema",
           "column":"meta",
           "check":"dtype('MapType(StringType(), StringType(), True)')",
           "error":"expected column 'meta' to have type MapType(StringType(), StringType(), True), got MapType(StringType(), StringType(), False)"
        }
     ]
  },
  "DATA":{
     "DATAFRAME_CHECK":[
        {
           "schema":"PanderaSchema",
           "column":"id",
           "check":"greater_than(5)",
           "error":"column 'id' with type IntegerType() failed validation greater_than(5)"
        }
     ]
  }
}

 

上記のように、エラーレポートは2つのレベルで集約され、Pythonの辞書オブジェクトに格納され、Grafanaなどのツールを使用して時間とともにエラーの時系列可視化を行うなど、下流のアプリケーションで簡単に利用できます:

  1. バリデーションの種類 = SCHEMAまたはDATA
  2. エラーのカテゴリ = DATAFRAME_CHECKまたはWRONG_DATATYPEなど

このエラーレポートの再構築のための新しいフォーマットは、私たちの貢献の一環として0.16.0で導入されました。

 

ON/OFFスイッチ

 

PySparkに依存するアプリケーションでは、オン/オフスイッチは柔軟性とリスク管理の観点から重要な機能であり、大きな違いをもたらすことができます。具体的には、オン/オフスイッチにより、チームはプロダクションでデータの検証を無効にすることなく、コードの変更を必要とせずにデータの検証を無効にすることができます。

これは、パフォーマンスが重要なビッグデータパイプラインの場合に特に重要です。多くの場合、データの検証は処理時間のかなりの部分を占めるため、パイプライン全体のパフォーマンスに影響を与えることがあります。オン/オフスイッチを使用することで、チームは必要に応じて迅速かつ簡単にデータの検証を無効にすることができ、コードの変更に時間を費やす必要がありません。

私たちのチームは、Panderaにオン/オフスイッチを導入しました。これにより、ユーザーは単純に設定を変更するだけで、プロダクションでデータの検証を簡単に無効にすることができます。これにより、開発中のデータの品質や正確性を犠牲にすることなく、必要に応じてパフォーマンスを優先するための柔軟性が提供されます。

検証を有効にするには、環境変数で次の設定を行ってください:

export PANDERA_VALIDATION_ENABLED=False

 

これにより、Panderaはアプリケーション内のすべての検証を無効にします。デフォルトでは、検証は有効です。

現在、この機能は0.16.0からのPySpark SQLのみで利用可能です。これは、私たちの貢献によって導入された新しい概念です。

 

Panderaの実行の詳細な制御

 

オン/オフスイッチ機能に加えて、Panderaの検証フローの実行に対するより詳細な制御も導入しました。これは、設定可能な設定を導入することで実現されます。この設定により、ユーザーは3つの異なるレベルで実行を制御できます:

  1. SCHEMA_ONLY: この設定はスキーマの検証のみを行います。データがスキーマ定義に準拠していることを確認しますが、追加のデータレベルの検証は行いません。
  2. DATA_ONLY: この設定はデータレベルの検証のみを行います。データを定義された制約やルールと照合しますが、スキーマの検証は行いません。
  3. SCHEMA_AND_DATA: この設定はスキーマとデータレベルの両方の検証を行います。データをスキーマ定義と定義された制約やルールの両方に照合します。

この詳細な制御を提供することで、ユーザーは特定のユースケースに最適な検証レベルを選択することができます。例えば、データが定義されたスキーマに準拠していることを確認することが主な関心事である場合、SCHEMA_ONLY設定を使用して全体の処理時間を短縮することができます。また、データがスキーマに準拠していることが既知であり、データレベルの検証が重要な場合、DATA_ONLY設定を使用してデータレベルの検証を優先することができます。

Panderaの実行に対するこの強化された制御により、ユーザーは精度と効率のバランスをとることができ、よりターゲットと最適化された検証体験を実現することができます。

export PANDERA_VALIDATION_DEPTH=SCHEMA_ONLY

 

デフォルトでは、検証は有効であり、深さはSCHEMA_AND_DATAに設定されていますが、ユースケースに応じてSCHEMA_ONLYまたはDATA_ONLYに変更できます。

現在、この機能は0.16.0からのPySpark SQLのみで利用可能です。これは、私たちの貢献によって導入された新しい概念です。

 

列およびデータフレームレベルのメタデータ

 

私たちのチームは、Panderaに、FieldおよびSchema / Modelレベルで追加のメタデータを格納できる機能を追加しました。この機能は、スキーマ定義に文脈情報を埋め込むことができるように設計されており、他のアプリケーションで活用することができます。

例えば、データ型、フォーマット、単位など、特定の列に関する詳細を格納することで、開発者はダウンストリームのアプリケーションがデータを正しく解釈して使用できるようにすることができます。同様に、スキーマのどの列が特定のユースケースに必要かについての情報を格納することで、開発者はデータ処理パイプラインを最適化し、ストレージコストを削減し、クエリのパフォーマンスを向上させることができます。

スキーマレベルでは、ユーザーはアプリケーション全体で異なるスキーマを分類するのに役立つ情報を格納することができます。このメタデータには、スキーマの目的、データのソース、データの日付範囲などの詳細が含まれることがあります。これは、複雑なデータ処理ワークフローを管理する場合に特に役立ちます。異なる目的で複数のスキーマが使用され、効率的に追跡および管理する必要がある場合などです。

class PanderaSchema(DataFrameModel):
       """Pandera スキーマクラス"""
       id: T.IntegerType() = Field(
           gt=5,
           metadata={"usecase": ["RetailPricing", "ConsumerBehavior"],
              "category": "product_pricing"},
       )
       product_name: T.StringType() = Field(str_startswith="B")
       price: T.DecimalType(20, 5) = Field()


       class Config:
           """pandera クラスのコンフィグ"""
           name = "product_info"
           strict = True
           coerce = True
           metadata = {"category": "product-details"}

 

上記の例では、スキーマオブジェクト自体に関する追加情報を紹介しました。これはフィールドとスキーマの2つのレベルで許可されています。

スキーマレベルのメタデータ(それに含まれるすべてのフィールドを含む)を抽出するために、以下のヘルパー関数を提供しています:

PanderaSchema.get_metadata()
出力は以下のような辞書オブジェクトです:
{
       "product_info": {
           "columns": {
               "id": {"usecase": ["RetailPricing", "ConsumerBehavior"],
                      "category": "product_pricing"},
               "product_name": None,
               "price": None,
           },
           "dataframe": {"category": "product-details"},
       }
}

 

現在、この機能は0.16.0での新しいコンセプトであり、PySpark SQLとPandas用に追加されました。

 

サマリー

 

バリデーションを本番環境でコードの変更なしに無効化するためのオン/オフスイッチ、Panderaのバリデーションフローの細かな制御、列とデータフレームのレベルでの追加のメタデータの保存など、いくつかの新機能とコンセプトを紹介しました。詳細は、バージョン0.16.0のPanderaドキュメントでさらに詳細を確認できます。

Panderaの創設者であるNiels Bantilanは、最新バージョン0.16.0のリリースについての最近のブログ記事で次のように説明しています:

 

新しいスキーマ仕様とバックエンドAPIを使用したPanderaの拡張性を証明するために、私たちはQuantumBlackチームと協力してPyspark SQLのスキーマとバックエンドを実装しました… そして、わずか数か月でMVPを完成させました!

 

Panderaのオープンソースコードベースへの最新の貢献は、PySparkやその他のビッグデータ技術を使用しているチームに利益をもたらすでしょう。

この最新の貢献に関して、以下のQuantumBlack、AI by McKinseyのチームメンバーが責任を持っています: Ismail Negm-PARI、Neeraj Malhotra、Jaskaran Singh Sidana、Kasper Janehag、Oleksandr Lazarchuk。特にこの記事の公開の準備において助力してくれたNeerajに感謝します。Jo Stitchburyは経験豊富な技術ライターです。彼女はデータサイエンスと分析、AI、ソフトウェア業界について執筆しています。

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more