非常に大きなデータセットのランダム化

大きなデータセットのランダム化

メモリに収まらないほど大きなデータセットをランダム化する問題を考えてみましょう。この記事では、Pythonで簡単にかつ(比較的)高速に行う方法について説明します。

近年では、ギガバイトやテラバイトのサイズで測定されるデータセットは珍しくありません。このような大量のデータは、頑健な機械学習モデルを作成するためのトレーニングプロセスに非常に役立ちます。しかし、こんなに大きなデータセットをどのようにランダム化できるのでしょうか?

写真 by Jess Bailey on Unsplash

大量の項目がファイルの各行にある非常に大きなデータセットがあると想像してみてください。データの詳細はここでは関係ありません。データセットはCSV(カンマ区切り値)またはTSV(タブ区切り値)ファイルの行であるかもしれませんし、各行がJSONオブジェクトであるかもしれません。または大規模なポイントクラウドの点のX、Y、Z値のリストかもしれません。必要なのは、データセットが1行ごとに1つの項目でフォーマットされていることです。

小さなデータセットを含むファイルの場合、次のような単純なPython関数を使用してファイルをランダム化(「シャッフル」と呼ぶ)することができます:

import randomdef shuffle_in_memory(filename_in, filename_out):    # ファイルを行ごとにシャッフル    with open(filename_in) as fp:        lines = fp.readlines()    # ランダムに並び替える:    random.shuffle(lines)    # 新しい順序を書き出す:    with open(filename_out, "w") as fp:        fp.writelines(lines)

shuffle_in_memory() 関数は、入力ファイル名と出力ファイル名を取り、組み込みの random.shuffle() を使用してメモリ内で行をシャッフルし、ランダム化されたデータを書き出します。その名前が示すように、この関数はファイルのすべての行が一度にメモリにロードされていることを前提としています。

この関数をテストするために、いくつかのテストファイルを作成しましょう。関数 make_file() は、テストファイルに含める行数を指定します。関数はファイルを作成し、ファイル名を返します。

import osdef make_file(lines):    filename = "test-%s.txt" % lines    print("テストファイル '%s' を作成中..." % filename)    with open(filename, "w") as fp:        for i in range(lines):            fp.write(f"Line {i}\n")    print("完了!")    return filename

例えば、100行の「test-1000.txt」という名前のファイルを作成する場合は、次のようになります:

filename_in = make_file(1000)

この関数を実行すると、現在のディレクトリに「test-1000.txt」というファイルが作成され、次のように1,000行のテキストが含まれているはずです:

Line 0Line 1Line 2Line 3Line 4Line 5Line 6Line 7Line 8Line 9...

shuffle_in_memory() 関数をテストするために、出力ファイルの名前を指定し、その文字列を変数 filename_out に保存し、関数を呼び出します:

filename_out = "test-randomized-1000.txt"shuffle_in_memory(filename_in, filename_out)

これで、ディレクトリに「test-randomized-1000.txt」という2番目のファイルが作成されます。このファイルは「test-1000.txt」とまったく同じサイズで、まったく同じ行がランダムな順序になっているはずです:

Line 110Line 592Line 887Line 366Line 52Line 22Line 891Line 83Line 931Line 408...

では、大きなファイルの場合はどうすればいいのでしょうか?たとえば、10,000,000行の大きさのファイルを作成してみましょう。(ほとんどのコンピュータでは、これはまだメモリ内でランダム化するのに十分小さいですが、練習には十分に大きいです。)前と同様に、入力ファイルを make_file() で作成します:

filename_in_big = make_file(10_000_000)

数秒かかる場合があります。その後、ディレクトリに「test-10000000.txt」という名前のファイルが作成されます。それは以前と同じように始まりますが、1000万行あります。そのファイルのサイズは約128MBです。

どのようにランダム化しますか?すべてのRAMを使用したくない場合、または十分なRAMがない場合、代わりにハードディスクを使用することができます。以下の再帰的なアルゴリズムは、ソートという類似の問題に基づいています。次のshuffle()関数は、マージソートアルゴリズムに基づいています。

まず、ファイルがメモリ内でシャッフル可能なサイズかどうかをチェックします(再帰関数の基底ケース)。パラメータのmemory_limitはバイト単位で指定されます。ファイルサイズがmemory_limitより小さい場合、メモリ内でシャッフルされます。サイズが大きすぎる場合、ファイルはランダムに複数の小さなファイルに分割され、それぞれが再帰的にシャッフルされます。最後に、小さくシャッフルされたファイルの内容が再びマージされます。

以下がその関数です:

import tempfiledef shuffle(filename_in, filename_out, memory_limit, file_split_count,             depth=0, debug=False):    if os.path.getsize(filename_in) < memory_limit:        if debug: print(" " * depth, f"Level {depth + 1}",            "Shuffle in memory...")        shuffle_in_memory(filename_in, filename_out)    else:        if debug: print(            " " * depth, f"Level {depth + 1}",            f"{os.path.getsize(filename_in)} is too big;",            f"Split into {file_split_count} files..."        )        # Split the big file into smaller files        temp_files = [tempfile.NamedTemporaryFile('w+', delete=False)                      for i in range(file_split_count)]        for line in open(filename_in):            random_index = random.randint(0, len(temp_files) - 1)            temp_files[random_index].write(line)        # Now we shuffle each smaller file        for temp_file in temp_files:            temp_file.close()            shuffle(temp_file.name, temp_file.name, memory_limit,                     file_split_count, depth+1, debug)        # And merge back in place of the original        if debug: print(" " * depth, f"Level {depth + 1}",             "Merge files...")        merge_files(temp_files, filename_out)

もしこれがソートアルゴリズムなら、ファイルを慎重にマージしてソートされた順序を作成するでしょう。しかし、シャッフルの場合、特定の順序でマージする必要はありません。したがって、merge_files()関数は次のようになります:

def merge_files(temp_files, filename_out):    with open(filename_out, "w") as fp_out:        for temp_file in temp_files:            with open(temp_file.name) as fp:                line = fp.readline()                while line:                    fp_out.write(line)                    line = fp.readline()

注意すべきは、一度にすべての行をメモリに読み込まないように注意することです。ファイルのサイズと同じだけのメモリを必要とするメモリシャッフルの制限を設定して、テストしてみましょう。ファイルサイズが128,888,890より小さくないため、いくつかの小さなファイルに分割されます。この例では、メモリ内でシャッフルできるように、大きなファイルを2つに分割しましょう:

filename_out_big = "test-randomized-10000000.txt"shuffle(filename_in_big, filename_out_big, 128_888_890, 2, debug=True)

この呼び出しの結果は次のようになります:

 Level 1 128888890 is too big; Split into 2 files...  Level 2 Shuffle in memory...  Level 2 Shuffle in memory... Level 1 Merge files...

そして、結果の「test-randomized-10000000.txt」ファイルの内容は、すべてがランダム化された1000万行になります。より良いテストは、ランダム化するために必要なメモリをファイルよりもはるかに小さくし、大きすぎるファイルを2つ以上に分割することです。たとえば、約1MBのRAMしか使用せず、ファイルを20個の小さなファイルに分割しましょう:

shuffle(filename_in_big, filename_out_big, 1_000_000, 20, debug=True)

この例では、1 MB以上のRAMを使用せず、20個ずつ大きいサブファイルを再帰的に分解します。

このアルゴリズムは、任意のサイズのファイルに対して動作します(ディスク容量が十分にある必要があります!)。shuffle_in_memory() に割り当てるメモリが多ければ多いほど、実行速度が速くなります。小さいファイルの数が多すぎると、ファイルのオープンとクローズに時間がかかります。 memory_limit の値を変更してみることもできますが、私は20から200の間で良い結果を得ています。初期ファイルが大きいほど、サブファイルが必要になるでしょう。

他のアルゴリズムも使用できますが、SQLiteデータベースにすべての行を書き込んでランダムな順序でSELECTする方法には期待していましたが、上記のコードと同じ速度でした。

import sqlite3def shuffle_sql(filename_in, filename_out, memory_limit, depth=0, debug=False):    if os.path.getsize(filename_in) < memory_limit:        if debug: print(" " * depth, f"Level {depth + 1}",            "メモリ内でシャッフル...")        shuffle_in_memory(filename_in, filename_out)    else:        if debug: print(            " " * depth, f"Level {depth + 1}",            f"{os.path.getsize(filename_in)} は大きすぎます;",            f"SQLiteデータベースに書き込み中..."        )        temp_db = tempfile.NamedTemporaryFile(delete=False)        connection = sqlite3.connect(temp_db.name)        cursor = connection.cursor()        cursor.execute("""            CREATE TABLE IF NOT EXISTS lines (                line TEXT            );        """)        with open(filename_in) as fp:            line = fp.readline()            while line:                cursor.execute("INSERT INTO lines (line) VALUES (?);", [line])                line = fp.readline()            connection.commit()        with open(filename_out, "w") as fp:          for line in cursor.execute("""              SELECT line FROM lines ORDER BY random();              """):              fp.write(line[0])

shuffle_sql(filename_in_big, filename_out_big, 1_000_000, debug=True)

Pythonだけで再帰的なシャッフルアルゴリズムを上回ることはできますか?もしそうであれば、教えていただければ嬉しいです!

人工知能、機械学習、データサイエンスに興味がありますか?拍手とフォローをご検討ください。ご興味があることをお知らせください!

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more