『Amazon SageMaker を使用して、Talent.com の ETL データ処理を効率化する』

『Amazon SageMakerを活用し、Talent.comのETLデータ処理を効率化する方法』

この投稿は、機械学習エンジニアのAnatoly Khomenkoと、Talent.comのCTOであるAbdenour Bezzouhによって共同執筆されました。

2011年に設立されたTalent.comは、クライアントと公開求人リストからの有償求人リストを集約し、統一された検索が容易なプラットフォームを作成しました。Talent.comは、言語、業界、配信チャネルを横断し、75カ国以上にわたる3000万件以上の求人リストをカバーし、求職者の多様なニーズに応え、何百万人もの求職者と求人を効果的につなげています。

Talent.comのミッションは、グローバルな労働力のつながりを促進することです。そのために、Talent.comはWeb上のさまざまなソースから求人リストを集約し、求職者に彼らのスキルと経験に適した3000万件以上の求人機会へのアクセスを提供します。この取り組みと一緒に、Talent.comはAWSと協力して、深層学習により駆動される最先端の求人推薦エンジンを開発し、ユーザーが自分のキャリアを進めるのをサポートしています。

この求人推薦エンジンの効果的な運用を保証するためには、Talent.comの集約求人リストからの特徴の抽出と洗練に責任を持つ大規模なデータ処理パイプラインを実装することが重要です。このパイプラインは、1時間以内に1日当たり500万件のレコードを処理でき、複数の日のレコードを並列で処理できます。さらに、このソリューションは迅速な本番展開が可能です。このパイプラインへのデータの主要なソースは、JSON Lines形式で、日付別に分割されたAmazon Simple Storage Service(Amazon S3)に格納されています。したがって、毎日数万件のJSON Linesファイルが生成され、毎日増分更新が行われます。

このデータ処理パイプラインの主な目的は、Talent.comの求人推薦エンジンをトレーニングおよび展開するために必要な特徴の作成を支援することです。このパイプラインは、インクリメンタルな更新をサポートし、求人推薦システムのトレーニングおよび展開モジュールに必要な複雑な特徴抽出要件に対応する必要があります。当社のパイプラインは、複数のソースからのデータを中央のリポジトリに統合する一般的なETL(抽出、変換、ロード)プロセスに属しています。

Talent.comとAWSが協力して、Amazon SageMakerを利用してセットアップした最先端の自然言語処理と深層学習モデルトレーニング技術の構築方法に関する詳細な情報は、Amazon SageMakerを使用してNLPベースの求人推薦システムを構築する:テキストから夢の仕事へを参照してください。このシステムには、特徴エンジニアリング、深層学習モデルアーキテクチャの設計、ハイパーパラメータの最適化、およびモデル評価が含まれており、すべてのモジュールはPythonを使用して実行されます。

この投稿では、私たちがSageMakerを使用して、Talent.comの求人推薦エンジンのための特徴を準備するための大規模なデータ処理パイプラインを構築した方法を紹介します。このソリューションにより、データサイエンティストはPythonライブラリ(Scikit-LearnPyTorchなど)を使用したSageMakerノートブックで特徴抽出のアイデアを考え、同じコードをデータ処理パイプラインに迅速にデプロイして大規模な特徴抽出を行うことができます。このソリューションでは、AWS GlueをETLソリューションとして使用する場合に必要なPySparkへの特徴抽出コードの移植は必要ありません。私たちのソリューションは、SageMakerのみを使用し、他のETLソリューション(AWS Batchなど)の知識は必要ありません。これにより、機械学習(ML)パイプラインを本番に展開するために必要な時間を大幅に短縮することができます。このパイプラインはPythonで操作され、特徴抽出ワークフローとシームレスに統合されており、さまざまなデータ分析アプリケーションに適応可能です。

ソリューション概要

SageMaker Processingを使用したETLパイプラインの概要

パイプラインは、主な3つのフェーズで構成されています:

  1. 指定された日に関連する生のJSONLファイルを処理するために、Amazon SageMaker Processing ジョブを利用します。複数日のデータは、別々の処理ジョブで同時に処理できます。
  2. 複数日のデータを処理した後、データのクローリングにAWS Glue を使用します。
  3. 指定された日付範囲の処理済みフィーチャーを、Amazon Athena テーブルからのSQLを用いてロードし、ジョブリコメンダーモデルをトレーニングおよび展開します。

生のJSONLファイルの処理

SageMaker Processingジョブを使用して、指定された日の生のJSONLファイルを処理します。このジョブでは、フィーチャー抽出とデータ圧縮を実装し、処理済みのフィーチャーを1つのファイルに100万レコードのParquetファイルに保存します。CPU並列処理を利用して、各生のJSONLファイルのフィーチャー抽出を並列に実行します。各JSONLファイルの処理結果は一時ディレクトリ内の別々のParquetファイルに保存されます。すべてのJSONLファイルが処理された後、数千の小さなParquetファイルを100万レコードのファイルに圧縮します。圧縮されたParquetファイルは、処理ジョブの出力としてAmazon S3にアップロードされます。データの圧縮により、パイプラインの次のステージでのクローリングとSQLクエリが効率的に行われます。

以下は、SageMaker SDKを使用して指定された日(例:2020-01-01)のSageMaker Processingジョブをスケジュールするためのサンプルコードです。ジョブはAmazon S3から生のJSONLファイルを読み取り(たとえば、s3://bucket/raw-data/2020/01/01から)、圧縮されたParquetファイルをAmazon S3に保存します(たとえば、s3://bucket/processed/table-name/day_partition=2020-01-01/に)。

### 依存関係をインストール %pip install sagemaker pyarrow s3fs awswranglerimport sagemakerimport boto3from sagemaker.processing import FrameworkProcessorfrom sagemaker.sklearn.estimator import SKLearnfrom sagemaker import get_execution_rolefrom sagemaker.processing import ProcessingInput, ProcessingOutputregion = boto3.session.Session().region_namerole = get_execution_role()bucket = sagemaker.Session().default_bucket()### 16 CPUと128 GiBメモリを備えたインスタンスを使用します### スクリプトはコンパクション中にデータ全体をメモリに読み込みません### 個々のjsonlファイルのサイズに応じて、より大きなインスタンスが必要になる場合がありますインスタンス = "ml.r5.4xlarge"n_jobs = 8 ### 8つのプロセスワーカーを使用します日付= "2020-01-01" ### 1日のデータを処理するest_cls = SKLearnframework_version_str = "0.20.0"### 処理ジョブをスケジュールしますscript_processor = FrameworkProcessor(    role=role,    instance_count=1,    instance_type=instance,    estimator_cls=est_cls,    framework_version=framework_version_str,    volume_size_in_gb=500,)script_processor.run(    code="processing_script.py", ### メインの処理スクリプトの名前    source_dir="../src/etl/", ### ソースコードディレクトリの場所    ### 処理スクリプトはS3から生のjsonlファイルを直接ロードします    ### これにより、処理ジョブの起動時間が長くなることはありません    ### 生データをインスタンスにコピーする必要はありませんinputs=[], ### 処理ジョブの入力は空ですoutputs=[        ProcessingOutput(destination="s3://bucket/processed/table-name/",                         source="/opt/ml/processing/output"),    ],    arguments=[        ### ジョブの出力ディレクトリ        "--output", "/opt/ml/processing/output",        ### ジョブの一時ディレクトリ        "--tmp_output", "/opt/ml/tmp_output",        "--n_jobs", str(n_jobs), ### プロセスワーカーの数        "--date", date, ### 処理する日付        ### S3での生のjsonlファイルの場所        "--path", "s3://bucket/raw-data/",    ],    wait=False)

次のコードは、SageMaker Processingジョブを実行するメインスクリプト(processing_script.py)のコード概要です:

import concurrentimport pyarrow.dataset as dsimport osimport s3fsfrom pathlib import Path### 生のjsonlファイルを処理し、抽出したフィーチャーをparquetファイルに保存するための関数 from process_data import process_jsonl### コマンドライン引数を解析args = parse_args()### s3fsを使用してS3の入力パスの生のjsonlファイルをクロールしますfs = s3fs.S3FileSystem()### 生のjsonlファイルが日付でパーティションされたS3ディレクトリに保存されていると仮定します### 例:s3://bucket/raw-data/2020/01/01/jsons = fs.find(os.path.join(args.path, *args.date.split('-')))### プロセシングジョブインスタンス内の一時ディレクトリの場所tmp_out = os.path.join(args.tmp_output, f"day_partition={args.date}")### ジョブの出力ディレクトリの場所out_dir = os.path.join(args.output, f"day_partition={args.date}")### n_jobsプロセスワーカーを使用して、個々のjsonlファイルを並列に処理しますfutures=[]with concurrent.futures.ProcessPoolExecutor(max_workers=args.n_jobs) as executor:    for file in jsons:        inp_file = Path(file)        out_file = os.path.join(tmp_out, inp_file.stem + ".snappy.parquet")        ### process_jsonl関数はS3ロケーション(inp_file)から生のjsonlファイルを読み取り### 結果を一時ディレクトリ内のparquetファイル(out_file)に保存します        futures.append(executor.submit(process_jsonl, file, out_file))    ### すべてのjsonlファイルの処理が完了するまで待機    for future in concurrent.futures.as_completed(futures):        result = future.result()### parquetファイルを圧縮したataset = ds.dataset(tmp_out)if len(dataset.schema) > 0:    ### 1つのファイルに100万レコードで圧縮したparquetファイルを保存    ds.write_dataset(dataset, out_dir, format="parquet",                      max_rows_per_file=1024 * 1024)

スケーラビリティは私たちのパイプラインの重要な特徴です。まず、複数のSageMaker Processingジョブを使用して、複数日のデータを同時に処理することができます。第二に、処理されたデータまたは生データを一度にすべてメモリに読み込むことは避けます。代わりに、指定された各日のデータを処理する際にメモリにロードします。これにより、メモリ容量が一日分のデータを収容するのに十分でないインスタンスタイプを使用してデータを処理することができます。唯一の要件は、インスタンスタイプがN個の生JSONLファイルまたは処理済みのParquetファイルを同時にメモリに読み込むことができることです(Nは使用中の処理ワーカーの数です)。

AWS Glueを使用して処理済みデータをクロール

複数日の生データがすべて処理された後、AWS Glueのクローラを使用してAthenaテーブルを作成することができます。以下のスニペットを使用して、pandas用のAWS SDK(awswrangler)ライブラリを使用してテーブルを作成します:

import awswrangler as wr### S3内の処理済みテーブルのメタデータを保存しますres = wr.s3.store_parquet_metadata(    path='s3://bucket/processed/table-name/',    database="database_name",    table="table_name",    dataset=True,    mode="overwrite",    sampling=1.0,    path_suffix='.parquet',)### テーブルのスキーマを出力しますprint(res[0])

学習用の処理済み特徴量をロード

指定した日付範囲の処理済み特徴量は、AthenaテーブルからSQLを使用してロードすることができます。これらの特徴量を使用して、ジョブの推薦モデルのトレーニングを行うことができます。例えば、次のスニペットはawswranglerライブラリを使用して1ヶ月分の処理済み特徴量をDataFrameにロードします:

import awswrangler as wrquery = """    SELECT *     FROM table_name    WHERE day_partition BETWEN '2020-01-01' AND '2020-02-01' """### database_name.table_nameから1ヶ月分のデータをDataFrameにロードしますdf = wr.athena.read_sql_query(query, database='database_name')

さらに、トレーニング用の処理済み特徴量をロードするためにSQLを使用することは、さまざまな他のユースケースに対応するために拡張することができます。たとえば、ユーザーのインプレッションを格納するための別個のAthenaテーブルと、それらのインプレッション上でユーザーがクリックした結果としてのユーザーのクリックを格納するための別個のテーブルを維持するために、同様のパイプラインを適用することができます。SQLの結合ステートメントを使用して、ユーザーがクリックしたインプレッションまたはクリックしなかったインプレッションを取得し、これらのインプレッションをモデルトレーニングジョブに渡すことができます。

ソリューションの利点

提案されたソリューションを実装することで、以下のような利点をもたらします:

  • シンプルな実装 – このソリューションは、主要なMLライブラリを使用してPythonで特徴量抽出を実装することを可能にします。また、コードをPySparkに移植する必要はありません。これにより、データサイエンティストがノートブックで開発した同じコードがパイプラインによって実行されるため、特徴量抽出が簡略化されます。
  • 迅速な製品導入 – このソリューションは、データサイエンティストによって開発および展開され、このデータに対してML推薦モデルを開発することができます。同時に、MLエンジニアによって同じソリューションを少しの修正で本番環境に展開することも可能です。
  • 再利用性 – このソリューションは、スケールでの特徴量抽出のための再利用可能なパターンを提供し、他の推薦モデルの構築以外にも容易に適応することができます。
  • 効率性 – このソリューションはパフォーマンスが良く、Talent.comのデータを1日分処理するのに1時間未満の時間がかかりました。
  • 増分更新 – このソリューションは増分更新もサポートしています。新しい日次データはSageMaker Processingジョブで処理し、処理済みデータが含まれるS3の場所を再クロールしてAthenaテーブルを更新することができます。また、1日に数回(たとえば、3時間ごと)今日のデータを更新するためにcronジョブを使用することもできます。

私たちはこのETLパイプラインを使用して、Talent.comが1日に5万ファイル(計500万レコード)を処理し、Talent.comからの90日間の生データから抽出した特徴量を使用してトレーニングデータを作成しました。合計で9万ファイルにわたる4.5億レコードです。私たちのパイプラインは、わずか2週間でTalent.comが推薦システムを製品化するのに役立ちました。このソリューションはすべてのMLプロセスをAmazon SageMaker上で実行し、他のAWSサービスを利用しませんでした。ジョブ推奨システムは、以前のXGBoostベースのソリューションに対してオンラインA/Bテストでクリック率が8.6%増加し、数百万人のTalent.comユーザーをより良い求人につなげるのに役立ちました。

結論

この投稿では、Talent.comでのジョブ推奨モデルの学習と展開のために開発したETLパイプラインについて説明しています。当社のパイプラインはSageMaker Processingジョブを使用して効率的なデータ処理と大規模な特徴抽出を行っています。特徴抽出コードはPythonで実装されており、主要なMLライブラリを使用してスケーラブルな特徴抽出を行うことができます。さらに、コードをPySparkを使用するために変換する必要もありません。

読者には、スケーラブルな特徴抽出が必要なユースケースのテンプレートとして、このブログで紹介されているパイプラインの可能性を探求することをお勧めします。データサイエンティストはこのパイプラインを活用してMLモデルを構築し、同じパイプラインをMLエンジニアが本番環境で実行することができます。これにより、Talent.comの場合のようにMLソリューションを全体的に製品化するための時間を大幅に短縮することができます。読者は、SageMaker Processingジョブの設定と実行のためのチュートリアルを参照することができます。また、Amazon SageMakerを使用してTalent.comのジョブ推奨システムを構築する際のディープラーニングモデルのトレーニング手法については、この記事をご覧ください。Amazon SageMakerを使用して迅速にMLデータの準備を行う方法についても言及しています。

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more