「世界最小のデータパイプラインフレームワーク」

「世界最小のデータパイプラインフレームワーク」を大幅に進化させる

シンプルで高速なデータパイプラインの基盤に洗練された機能を持たせる。

アナ・ルシア・コットネによる写真、Unsplash

データの整理は、データサイエンティストにとって最も時間がかかる仕事の一つです。データの整理には、クリーニング、変換、一般的なデータの生形式から有用なものに変換するための操作が含まれます。多くのアクティビティと同様に、整理プロセスは時間の経過に応じて洗練される必要があります。そのため、データセットの整理方法を追跡し、チームが時間の経過にわたってプロセスを管理および再現できるようにすることが重要です。データの整理は、常に楽しいとは言えませんが、どのような現代の企業においても最も重要な活動かもしれません。

データパイプラインに特化した企業もありますが、それらは複雑で非常に洗練されたものです。しかし、今回の探索では、テキストファイルを単語や「トークン」のセットに変換し、私たちにとって役に立たないテキストを捨てるというタスクを考えましょう。シンプルに始めて、徐々に進んでいきましょう。

最初に、テキスト内の単語に対して整理の機能を実行するための一連のステップを定義しましょう。Pythonのtext.translate()関数を使用して作業の一部を行います。以下の4つの関数を考えてみましょう:

import stringdef step1(word):    trans = str.maketrans("", "", string.punctuation)    return word.replace("\n", " ").translate(trans)def step2(word):    return word.lower()def step3(word):    trans = str.maketrans("", "", "0123456789")    return word.replace("\n", " ").translate(trans)def step4(word):    return (all([char in string.ascii_letters for char in word]) and             len(word) > 0)

step1は、単語から句読点と改行をすべて削除する関数です。 step2は単語を小文字に変換します。 step3は再びtext.translate()を使用して数字を削除します。そして、step4は非ASCII文字を含む単語をフィルタリングするためのフィルタとして使用されます。さらに、ステミングなどの追加のステップを想像することもできます。

これらはシンプルな関数ですので、step1を単語に適用すると、次のようになります:

>>> step1("Testing---123;")'Testing123'

確かに、句読点がテキストから取り除かれました。これらの3つの関数をロシアのマトリョーシカのように単語に包み込むことで、すべての関数を適用することができます:

>>> step3(step2(step1("Testing---123;")))'testing'

ここでは、step1step2step3が適用され、文字列”testing”だけが残りました。各ステップが特定の順序で動作するように関数を定義することに注意してください。つまり、step1step2の前に行う必要があります。

この関数ベースのプロセスは作成しやすく使いやすいです。もちろん、すべての関数を一度に実行することもできます。しかし、関数の「パイプライン」が長く複雑になるにつれて、プロセスを個別のステップに分割することで、プロセスをより管理しやすくすることができます。実際、各ステップは異なるチームで作業するほど複雑になるかもしれません。

では、これまでのところは順調です。しかし、もちろん、各単語に関数のパイプラインを手動で適用したくはありません。代わりに、リスト内のすべての単語にそれを適用したいと思います。これを行うために、非常にシンプルなapply()関数を作成します:

def apply(step, values):    return [step(value) for value in values]

これで、同じ関数を単語のリスト全体に使用することができます:

>>> apply(step3,           apply(step2,                 apply(step1,                       ["Testing---123;", "456---", "Hello!"])))['testing', '', 'hello']

ああ、そうですね、空の単語を削除する必要があります。それにはstep4がちょうどそれ用に設計されていますが、少し複雑な使い方です。次のようになります:

>>> list(filter(step4,             apply(step3,                   apply(step2,                         apply(step1,                               ["Testing---123;", "456---", "Hello!"])))))['testing', 'hello']

つまり、step4はTrueを返して保持し、Falseを返して削除するフィルタ関数なので、それは次のような形で適用されます:filter(step4, data)

この単純なアプローチにはいくつかの問題があります:

  1. ステップは内側から外側に適用されます。つまり、最初のステップであるstep1は最も内側の関数であり、一方でstep3は外側にあります。非常に直感的ではありません。
  2. 各ステップ関数ごとにapply()関数を繰り返し使用する必要があるため、非常に冗長です。
  3. フィルタ(step4のような)は他の関数と同様に使用することはできません。

これらの問題を考慮して、主な機能を一般化されたパイプラインとして抽象化することはできますか?2つのステップの方法を想像しています:

# まず、パイプライン関数を作成します:p = my_pipeline(step1, step2, step3)# それからデータセットに適用します:p(["Testing---123;", "456---", "Hello!"])

どのようにしてmy_pipelineを定義できるでしょうか?実際にはかなり簡単です:

def my_pipeline(*steps):    def wrapper(inputs):        for step in steps:            inputs = apply(step, inputs)        return inputs    return wrapper

つまり、my_pipelineはステップ関数のシリーズを受け取り、各ステップを適用し、処理された単語のリストを返す関数を返す関数です。

試してみましょう:

>>> p = my_pipeline(step1, step2, step3)>>> p(["Testing---123;", "456---", "Hello!"])['testing', '', 'hello']

うまくいきます – 以前とまったく同じ結果が得られました!では、step4フィルタ関数はどうでしょうか?一旦それを残しておいて、このシステムを「実際の」データ上で試してみましょう。実際には虚偽のデータになります。これらの実験では、10,000のドキュメントを作成し、それぞれが10の段落で構成されます。Pythonパッケージessential_generatorsDocumentGenerator()を使用します。

from essential_generators import DocumentGeneratorimport osgen = DocumentGenerator()def generate_documents(    count=10_000,     paragraphs=10,     output_folder="documents",     overwrite=False):    os.makedirs(output_folder, exist_ok=True)    for n in range(count):        filename = os.path.join(            output_folder,             "doc_%05d.txt" % (n + 1)        )        if overwrite or not os.path.exists(filename):            with open(filename, "w") as fp:                for p in range(paragraphs):                    fp.write(gen.paragraph() + "\n\n")generate_documents()

これにはデータの生成に約30秒かかります。簡単なコードを続けるために、もう1つのステップを導入する必要があります:

def step0(filename):    return open(filename).read().split(" ")

このステップでは、ファイル名を受け取り、ファイルを開いて文字列をスペースで分割します。また、apply()関数を若干調整して、単語のリストではなく単語のリストを処理するようにする必要があります:

def apply(step, outputs):    return (step(input) if not isinstance(input, list) else             [step(i) for i in input] for input in outputs)

また、applyにわずかな調整をもう1つ行いました:周囲の角括弧ではなく、括弧を使用してジェネレータ式を返すようにしました。これにより、処理が必要になるまで遅延されます(遅延評価と呼ばれることもあります)。

これでほぼ完成したパイプラインシステムを構築できます:

p = my_pipeline(step0, step1, step2, step3)list(p(["documents/doc_00001.txt"]))

入力としてファイル名のリストを使用することに注意してください。素敵でシンプルです。ただし、まだいくつかの点を改善したいと思っています:

  1. フィルタを簡単に処理できる能力
  2. データセットを高速に処理するためにパイプラインを並列実行できる能力
  3. パイプラインを可視化する能力

この3つの追加機能については、上記のアイデアに基づいて開発されたpicopipeプロジェクトを参照してください。以下のコマンドでインストールできます:

pip install picopipe

そして、同じステップ関数を使って以下のように実行できます:

from picopipe import pipeline, pfilter
p = pipeline(step0, step1, step2, step3, pfilter(step4))
list(p(["documents/doc_00001.txt"])[0])

ここで、pfilterはパイプラインフィルタを表し、単純にstep4関数の周りにラップします。デザインにはかなり満足していますが、実際にどれくらい速くなるか見てみましょう。

まず、すべての文書ファイル名を取得しましょう。簡単な方法はglobを使用することです:

import glob
dataset = glob.glob("documents/doc_*.txt")

そして、すべての文書を処理できます:

results = list(p(dataset))

これには、ラップトップで10,000のドキュメントをすべて処理するのに約21秒かかります。短くてシンプルです!それがより速く実行できるでしょうか?

はい!パイプにはn_jobsパラメータもあります。これは並列実行できるジョブの数を示します。次のコードは、データセットを複数回処理し、0から9のスレッドで実行します。9つのスレッドで実行すると、どれくらい速くなると思いますか?

import time
x = []
y = []
for i in range(10):
    start = time.time()
    results = list(p(dataset, n_jobs=i))
    total_time = time.time() - start
    x.append(i)
    y.append(total_time)

これには数分かかるでしょう。スレッドごとの実行時間とスレッド数の関係を示すグラフをプロットすると:

処理時間を並列実行ジョブの数に分割して表示したグラフ。著者による画像。

興味深いことに、このグラフは追加のスレッドでさらに減少するのではなく横ばいになっています。つまり、9つのスレッドを使用すると1つのスレッドを使用するのと比べて9倍速くなるわけではありません。なぜでしょうか?残念ながら、法律は破れません。そして、それは法律があります:アムダールの法則と呼ばれます。基本的には、削減できないオーバーヘッドコストがあるため、N倍の速度を得ることはできないということです。この場合、4つのスレッドを使用すると、処理時間を約21秒から8秒まで短縮できます。それでも悪くないですね!

最後に、パイプラインを可視化したいと思います。このプロジェクトの一部として、Mermaid Diagram形式を試してみることにしました。最近、githubのリポジトリを含めて多くのサポートを得ています。この形式は非常にシンプルで作成しやすいです。githubでレンダリングするには、単に拡張子を.mmdとしてファイルを名前付けします。以下のコードでpicopipeを使用してMermaidスクリプトを生成する方法を示します:

from picopipe import to_mermaid
with open("pipeline.mmd", "w") as fp:
    fp.write(to_mermaid(p))

githubでのレンダリングを示す以下の画像です:

Github.comはMermaidドキュメントファイルを直接サポートしています。著者による画像。

残念ながら、GitHubではマウスオーバー機能(CSSで定義)は表示されません。ただし、独自のCSSを設定できる場合、パイプラインを視覚化するだけでなく、ステップボックス上にマウスオーバーするとステップコードが表示されます:

著者によるCometのカスタムパネルに表示されたマーメイドダイアグラムのイメージ。

上記のマウスオーバーサポート付きのマーメイドチャートは、Cometのカスタムパネルシステム(すべてのユーザーに無料)を使用して作成されました。マーメイドファイルを表示するカスタムパネルを作成するのは非常に簡単でした。以下は上記のマーメイドチャートがライブでレンダリングされたデモです:comet.com/dsblank/picopipe/a4c044c1657b464087ec44f67ae22709

これで、世界最小のデータパイプラインフレームワークの開発とその並列化と視覚化の探求が終了しました。すべてのコードはこちらから見つけることができます:github.com/dsblank/picopipe ここで提示されたアイデアと最終モジュールが役立つことを願っています。

人工知能、機械学習、またはデータサイエンスに興味がありますか?拍手とフォローをご検討ください。Dougは、comet.comのリサーチヘッドです。comet.comは、MLの実験追跡とモデル監視の企業です。

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more