「データサイエンスのベストプラクティス、パート1 – クエリをテストする」

Data Science Best Practices Part 1 - Testing Queries

クエリが期待通りに動作することを確認する方法と他の将来の利点

Midjourneyで生成

データサイエンスの分野は、数学と統計学、そしてコンピュータサイエンスに根ざしています。過去数十年間でかなりの発展を遂げてきましたが、組織内で確立された役割として、またテック業界における独立した分野として注目されるようになったのは、過去10〜15年間のことです。

比較的若い職業であるため、データサイエンスのベストプラクティスは十分な時間をかけて結集し、十分に文書化されていません。これは、ソフトウェアエンジニアリングの周辺分野とは対照的です。ソフトウェアエンジニアリングははるかに成熟しており、時間の経過によって有益なノウハウガイド、構造、および方法論が豊富に備わっています。

データサイエンティストは、特に実践に関連する場合に、ソフトウェアエンジニアとの重複と緊密な協力から恩恵を受けることが期待されるでしょう。残念ながら、実際には、多くのデータサイエンティストはこれらの方法論を知らないか、学ぶ意欲がなく、それらが関連性がないと主張したり、自分の責任範囲に含まれていないと主張したりします。

このブログシリーズでは、データサイエンティストの作業で使用できるヒント、トリック、および体系的なアプローチを共有したいと思います。これにより、コードの正確性と安定性を向上させ、モデルの管理を改善し、チームワークを促進することを目指します。

前提条件

まず、ビッグデータを使用する作業に直面する人々がいつかは直面するシナリオから始めましょう。私たちの頭の中に現在抱えているプロセスと一致するかどうか、そしてそれが私たちが求めている結果を正確に反映しているかどうかを保証するものは誰ですか?

これがテストが私たちの助けになる場所です。

テスト?

はい。我々が行うことは以下の通りです:

  1. 手作りの小さなデータセットを作成します。
  2. 手計算でクエリで得たい結果を計算します。
  3. 作成したクエリをその小さなデータセットに適用します。
  4. クエリの結果を自分の計算結果と照合します。

もしも一致しない場合、何か修正する必要があります。手計算が間違っていたか、クエリが期待通りに動作していなかった可能性があります。一方、結果が一致した場合、次のステップに進むことができます。

これから、これらのテストを書く際に使用する構造について、ステップバイステップで説明します。

環境のセットアップ

まず、PySparkを使用するために必要な環境(フィクスチャ)を作成しましょう。複数のテストケースを実行する場合があるため、PySparkセッションをモジュールレベルで設定します。そうしないと、各テストごとにセッションを開始および停止する必要があり、これには無視できないオーバーヘッドがかかります。

私はPythonの組み込みのunittestを使用していますが、あなたやあなたのチームの他のメンバーがpytestnoseまたは他のテストフレームワークを使用している場合は、これらのアクションを実行する方法を見つけることができると信じています。

unittestには、テストの前後に実行される2つのフックsetUpModuletearDownModuleがあります。これらを使用してPySparkセッションを開始および停止します。

# test_pyspark.py

import pyspark
import unittest

spark: pyspark.sql.SparkSession | None = None

def setUpModule():
    global spark
    spark = get_spark_session('local')

def tearDownModule():
    global spark
    if spark is None:
        return
    try:
        spark.stop()
    finally:
        spark = None

再利用可能なセッション作成関数が好きなので、ここにあります(非ローカルのオプションは後で埋めます):

# query_pyspark.py

import pyspark

def get_spark_session(scope='local'):
    if scope == 'local':
        return (
            pyspark.sql.SparkSession.builder
            .appName('unit-tests')
            .master('local[4]')
        ).getOrCreate()
    else:
        ...  # TODO

プロジェクトが大きくなった場合は、この関数をPySpark固有のユーティリティファイルに配置しますが、現時点ではプロジェクトをフラットかつ小さく保ちましょう。

最初のテスト

今実際にこのテストを実行したときにセッションが取得できるかどうかをテストします。以下がテストです:

# test_pyspark.py

class TestPysparkQueries(unittest.TestCase):
    def test_session_created(self):
        self.assertIsNotNone(spark)

そして、テストを実行すると(PyCharmではコードから直接実行できるため、コードの横にある緑の「再生」ボタンで確認できます)、OKメッセージが表示されます:

データの作成とテスト

この時点でデータについて話し始めることができます。手元には、異なるケースをカバーしていてまだ対応できる小さなデータセットがあるはずです。実際のサイズについては、ドメインとクエリの複雑さに応じて通常は20〜50行を推奨します。グループ化が含まれる場合は、5〜10の異なるグループを選択してください。

教育目的で、名前と誕生日のデータセットを作成しました。同じ姓の個人は兄弟とみなすことにします。また、行の順序にランダム性を導入して、順序に依存するクエリが直接順序に対処せずに正しい答えを得るのを防ぎます。データは次のようになります:

さあ、データをPySparkセッションにロードする時間です。ただし、まずはサニティーテストを作成しましょう。ちなみに、テストを作成してからテストをパスするコードを書くのは、テスト駆動開発(TDD)の一部ですが、データサイエンティストにはテスト部分だけを推奨します。

サニティーテストでは、列名のテスト、データセットのサイズのテスト、または両方をテストすることができます。さらに深いテストを行うこともできます。CSVファイルをDataFrameと行ごとに一致させるテストを作成することさえできます。

テストを作成する際に厳密にするほど、コードが正しいことを後で確信することができますが、将来の変更がより困難になる可能性もあります。たとえば、特定のエッジケースをテストするためにデータセットの1行を追加/変更したい場合はどうなりますか?

これらの速度と正確さの要素をバランスさせることは、私たちの仕事の中で科学よりも芸術の一部であり、時間と実践によって自然に身につけることができます。

# test_pyspark.py

    def test_load_data(self):
        df = get_family_data()
        self.assertEqual(25, df.count())

次に、データを読み込むための関数を書きましょう:

# query_pyspark.py

def get_family_data():    return (        get_spark_session(scope='local')        .read.csv(os.path.join(os.path.dirname(__file__),                   '../assets/data_sample.csv'))    )

そしてテストを実行すると…失敗しますか?でもどうしてでしょうか?

再度行数をカウントして確認してみると、25行であることがわかり、コードにheader=Trueを追加することでテストが成功します(次の例ではフェイクドラマを省略しますのでご安心ください):

# query_pyspark.py

def get_family_data():    return (        get_spark_session(scope='local')        .read.csv(os.path.join(os.path.dirname(__file__),                   '../assets/data_sample.csv'), header=True)    )

クエリのテスト

今度はクエリに特化したテストの時間です。例えば、各家族の年長の子供を取得したいとします。データセットを目で確認するか(またはソートされたスプレッドシートを使用するか)して、取得する予定の名前の正確なセットを見つけて、テストに固定します:

# test_pyspark.py

    def test_elder_child_query(self):        df = get_elder_child(get_family_data())        elders = {_.elder_child for _ in df.toLocalIterator()}        self.assertEqual(elders, {'Gus', 'Rita', 'Sam', 'Trent', 'Ursula'})

そして、テストを成功させるためのコード:

# query_pyspark.py

def get_elder_child(family_df: pyspark.sql.DataFrame):    return (        family_df        .orderBy(f.col('date_born').desc())        .groupby('last_name')        .agg(f.first('first_name').alias('elder_child'))    )

フェイクドラマは省略しますが、テストが成功するまで何度もクエリを修正する必要がありました。例えば、first_nameでグループ化し、last_nameの値を集約し、ソートを降順にすることを忘れていました。

私の仕事では、テストが私を数多くのピンチから救ってくれました

終わりですか?まったくそんなことはありません。

双子の場合はどうなるか、子供のいない家族は存在するか、データが信頼できない場合はnull値はどうなるか、など、エッジケースを考えるべきです。

これらのオプションごとに、データセットに移動してそのようなケースを作り出し、テストとコードを更新します。

これらの特殊なケースに遭遇した場合、後から現れるバグを通じて(自分自身ではなく)それらを考え出した場合でも、同じようにデータセットを変更して、そこから続けることがあります。

他のクエリにもテストを書く必要がありますし、さまざまな種類のテストに出会います。前述のテストでは、結果のセットに注意を払いましたが、単純な1対1の変換をテストしたい場合、つまりf(row) = yのような場合はどうでしょうか。Sparkの非決定性を考慮する必要があります。

例えば、データセットの名前のイニシャルを取得したいとしましょう。

1つのオプションは、DataFrameをソートし、手作りのリストと等しいことをアサートする際にこの順序を信頼することです:

# query_pyspark.py

def get_initials_col():    return (        f.concat(            f.substring('first_name', 0, 1),            f.lit('. '),            f.substring('last_name', 0, 1),            f.lit('.'),        )    ).alias('initials')

# test_pyspark.py

    def test_get_initials_col_1_by_1(self):        df = (            get_family_data()            .withColumn('initials', get_initials_col())            .orderBy('date_born')        )        expected_list = ['V. A.', 'W. W.', 'X. M.', 'Y. T.', 'Z. C.', 'I. M.', 'J. T.', 'K. C.', 'L. A.', 'M. W.',                         'N. M.', 'O. T.', 'P. C.', 'Q. A.', 'A. A.', 'B. W.', 'C. M.', 'E. T.', 'F. C.', 'G. A.',                         'H. W.', 'R. W.', 'S. M.', 'T. T.', 'U. C.']        for expected, actual in zip(expected_list, [_.initials for _ in df.toLocalIterator()]):            self.assertEqual(expected, actual)

別のオプションは、同じ作業を行うネイティブな関数を作成し、十分にテストすることです。その後、結果をメモリにロードした後に入力に適用し、各行について等価性をアサートするテストを書くことができます。以下に例を示します:

# query_pyspark.py

def get_initials(first_name, last_name):    return f'{first_name[:1]}. {last_name[:1]}.'

# test_pyspark.py

    def test_get_initials(self):        self.assertEqual('B. H.', get_initials('Bob', 'Hope'))        self.assertEqual('C. C.', get_initials('Charlie', 'Chaplin'))        self.assertEqual('J. L.', get_initials('Jonathan', 'Livingstone'))    def test_get_initials_col_support_function(self):        df = (            get_family_data()            .withColumn('initials', get_initials_col())        )        for row in df.toLocalIterator():            self.assertEqual(get_initials(row.first_name, row.last_name), row.initials)

2つのオプションのうち、私は後者を明らかに好むでしょう。なぜなら、それは直接データに依存せず、サポート関数のプロキシを介してテストされるため、より柔軟性があるからです。

もちろん、クエリに対して関数があまりにも重い場合は、コードの複雑さを低く保つためにUDFとして使用することもできます。

待ってください、これ以上ありますか?

もちろんです。結合やウィンドウ関数の結果など、さまざまなケースがありますが、上記の例でクエリを書く際にテストが重要なツールであり、有効な方法論の選択であることを信じています。これは、私のようなデータサイエンティストにとっても同様です。

注意しておきたいのは、PySparkで作業する際にこの方法がどのように機能するかを示したかったということです。なぜなら、これは一般的なビッグデータツールですが、このパターンは単にPySparkに制限されず、ビッグデータベースにも制限されません。実際、どのデータベースでも動作するはずです。また、pandasのようなインメモリデータベースツールでもこの方法論を使用することができます。

データソースに接続できること。

データソース内のデータをロード/モックできること。

クエリを実行できること。

クエリの結果を取得し処理できること。

この4つのステップを実行できる場合は、問題ありません。使用しているツールがこれらのステップのいずれかを実行できない場合は、そのツールの使用を再考する必要があるかもしれません。

そして、テストコードをテストするという隠れた利点があることに気付くでしょう。例えば、関数の実行時間やメモリ消費量の面でパフォーマンスが低いことがわかり、最適化やリファクタリングを試みることを決めた場合、既存のテストを使用して新しいコードが前のコードと同じ出力を生成することを保証することができます。これらのテストがなければ、一行のコードさえ変更することを恐れるでしょう。下流の依存関係を壊してしまう可能性があります。

まとめ

テストは、コードの正確性を確認し、リファクタリングを容易にするための強力で重要な手段です。

将来の投稿では、データサイエンスにおいて良い実践と考える例を他にも紹介します。お互いのつま先を踏まないで同じモデルで作業する方法、データセットのバージョン管理方法、本番環境でコードのパフォーマンスを観察する方法など、さまざまなトピックを取り上げます。

お楽しみに。

FAQ

Q: え、なに?

A: このブログシリーズで紹介された概念について、ここや他の場所で会話を始めても構いません。

Q: クエリに数千行のテストが必要な場合はどうすればいいですか?

A: クエリのパラメータ化バージョンを作成し、例えばdef get_n_smalles_children(family_df, n): …というようにして、パラメータを十分に小さくします。別のオプションは、データをプログラムでシミュレートすることですが、これには新たな質問や課題も生じます。

Q: クエリを繰り返し変更する場合はどうすればよいですか?テストも変更する必要がありますか?

A: 理想的には、時間の経過とともにクエリを変更しないことが望ましいですが、私たちのフィールドの探索的な性質は認識しています。そのため、答えは「はい」です。これは、テストの記述により、作業速度が低下するように感じる理由の一つです。ただし、速度は正確性/正確さに対して重み付けされます。クエリの構造がより固まってきた段階でテストを書くようにすることもできます。

質問:PyCharmを使用しない場合、テストをどのように実行しますか?

回答:テストファイルの末尾に以下のマジックラインを追加し、python test_pyspark.pyで実行します。インポートが機能するために、コードのルートがPYTHONPATHに含まれていることを確認してください(PyCharmは自動的に行います)。

if __name__ == '__main__':    unittest.main()

質問:.csvにデータを保持しない場合、どうすればよいですか?

回答:データを保存および読み込むために機能する方法は何でも構いませんが、整理された形式に保つようにしてください。非常に小さなデータセットの場合は、dict-to-DataFrame(またはjson-to-DataFrame)を使用し、より大きなデータセットの場合はHadoopに永続的に保存されたテーブルを使用しました。

質問:上記の例で提供された関数は、非常にシンプルなのではないですか?

回答:はい、そうです。私の教え方は、単純な例を与えて徐々に複雑にしていくことです。残念ながら、この投稿の余白は後半を含むには小さすぎます。

質問:上記のコードを参照として使用できるリポジトリはありますか?

回答:はい、はい、あります。

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more