現代のデータエンジニアリング

ビューティー&ファッションの現代データエンジニアリング

プラットフォーム固有のツールと高度な技術

写真:クリストファー・バーンズによるUnsplash

現代のデータエコシステムは絶えず進化し、新しいデータツールが現れます。この記事では、データエンジニアに影響を与える重要な要素について話したいと思います。これらの知識を使用して、高度な分析パイプラインと運用の優位性を引き出す方法について考えてみましょう。

  • 現代のデータエンジニアリング(DE)。それは何ですか?
  • DEは、高度なデータパイプラインやビジネスインテリジェンス(BI)に充分な機能を持っていますか?
  • データパイプラインは効率的ですか?
  • 運用の優位性を実現するためには、技術的な観点から何が必要ですか?

今年の10月に、データエンジニアの役割や課題、責任、日常業務、成功するための方法について述べたことがあります。データエンジニアリングの状況は常に変化していますが、主要なトレンドは変わらないようです。

データエンジニアになる方法

2024年の初心者のためのショートカット

towardsdatascience.com

私たちデータエンジニアは、ほぼ毎日効率的なデータ処理を設計するように求められます。以下に考慮すべきいくつかのポイントを挙げました。これらは上記の質問に答えるのに役立ちます。

  • ETL vs ELT
  • 簡略化されたデータコネクタとAPIの統合
  • ETLフレームワークの急増
  • データインフラストラクチャのコーディング
  • Data Meshおよび分散型データ管理
  • AIを使用したビジネスインテリジェンスパイプラインの民主化
  • データリテラシーに重点を置く

ELT vs ETL

人気のあるSQLデータ変換ツールであるDataformDBTは、ELTアプローチの普及に大きな貢献をしました [1]。データのクレンジング、エンリッチメント、抽出などの必要なデータ変換を、データが保存されている場所で行うのは合理的です。よくあるのはデータウェアハウスソリューション(DWH)です。クラウドプラットフォームのリーダー企業は、DWH(Snowflake、BigQuery、Redshift、Firebolt)のインフラ管理を非常に簡素化しており、多くのシナリオで費用効果と速度の面で、専門の社内インフラ管理チームを上回ることがあります。

データウェアハウスの例。著者による画像

また、データプラットフォームのタイプによっては、データレイクが中心になることもあります。この場合、プログラムに詳しくないユーザーがデータをクエリすることが困難になり、SQLはオプションではなくなります。Databricks、Tabular、Galaxyなどのツールは、この問題を解決しようとしています。データレイクは、非構造化データを含むあらゆるタイプのデータを保存できるため、これらのデータセットを分析できる能力が必要です。

データレイクの例。著者による画像

トランザクション的に整合性のあるデータレイクテーブルと、時系列のスナップショット分離。

先に筆者の一つの記事でこのことについて書きました [2]。

Apache Icebergテーブルの紹介

データレイク向けにApache Icebergを選ぶ数々の魅力的な理由

towardsdatascience.com

データ統合が簡素化されました

Managed solutions like Fivetran and Stitch were built to manage third-party API integrations with ease. These days many companies choose this approach to simplify data interactions with their external data sources. This would be the right way to go for data analyst teams that are not familiar with coding.

Indeed, why would we build a data connector from scratch if it already exists and is being managed in the cloud?

The downside of this approach is it’s pricing model though.

Very often it is row-based and might become quite expensive on an enterprise level of data ingestion, i.e. big data pipelines. This is where open-source alternatives come into play. Frameworks like Airbyte and Meltano might be an easy and quick solution to deploy a data source integration microservice.

If you don’t have time to learn a new ETL framework you can create a simple data connector yourself. If you know a bit of Python it would be a trivial task. In one of my previous articles I wrote how easy it is to create a microservice that pulls data from NASA API [3]:

Python for Data Engineers

初心者向けの高度なETLテクニック

towardsdatascience.com

Consider this code snippet for app.py

import requestssession = requests.Session()url="https://api.nasa.gov/neo/rest/v1/feed"apiKey="your_api_key"requestParams = {    'api_key': apiKey,    'start_date': '2023-04-20',    'end_date': '2023-04-21'}response = session.get(url, params = requestParams, stream=True)print(response.status_code)

It can be deployed in any cloud vendor platform and scheduled to run with the required frequency. It’s always a good practice to use something like Terraform to deploy our data pipeline applications.

ETLフレームワークの爆発

データの抽出と変換のためのさまざまなETLフレームワークの「カンブリア爆発」を目の当たりにすることができます。驚くことではありませんが、その多くがオープンソースでPythonベースです。

Luigi [8] はその1つで、ETLパイプラインを作成するのに役立ちます。Spotifyによって作成され、巨大なデータ処理ワークロードを管理するために使用されました。コマンドラインインターフェースと優れた可視化機能を持っています。ただし、基本的なETLパイプラインにも一定のPythonプログラミングスキルが必要です。自分の経験から言えることは、厳密で直截的なパイプラインには非常に良いです。ただし、複雑な分岐ロジックを実装するのは特に難しいと感じますが、多くのシナリオで素晴らしい仕事をします。

Python ETL(PETL)[9] は、直感的なデータ変換用の広く使われているオープンソースETLフレームワークの1つです。テーブルでの作業、外部データソースからのデータの抽出、基本的なETLの実行に非常に役立ちます。多くの点で、それはPandasに似ていますが、後者はより高度な分析機能を持っています。PETLは集約と行レベルのETLに最適です。

Bonobo [10] は、バッチ処理データパイプラインの迅速な開発、自動化、並列実行に適したオープンソースの軽量データ処理ツールです。私が気に入っているのは、さまざまなデータファイルフォーマット(SQL、XML、XLS、CSV、JSONなど)での作業が本当に簡単になるという点です。Pythonの知識が少ない人々にとっては素晴らしいツールです。その他の利点の中で、セミ複雑なデータスキーマでうまく機能することが好きです。シンプルなETLに最適で、Dockerコンテナで実行できます(Docker拡張機能があります)。

Pandasはデータの世界で絶対的な存在であり、このストーリーではその機能をカバーする必要はありません。多くの現代のデータウェアハウスの基本的なデータローディングの方法の1つとして、そのデータフレーム変換が含まれていることを脚光に浴びています。以下はBigQueryデータウェアハウスソリューションへのデータローディングのサンプルです:

from google.cloud import bigquery
from google.oauth2 import service_account

# BigQueryクライアントの認証:service accountを使用
service_acount_str = config.get('BigQuery') # configから取得
credentials = service_account.Credentials.from_service_account_info(service_acount_str)
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

def load_table_from_dataframe(table_schema, table_name, dataset_id):
    # ソースデータファイルの形式は外部配列JSONである必要があります:
    """
    [ 
        {"id":"1"},
        {"id":"2"} 
    ] 
    """
    blob = """ 
        [ 
            {"id":"1","first_name":"John","last_name":"Doe","dob":"1968-01-22","addresses":[{"status":"current","address":"123 First Avenue","city":"Seattle","state":"WA","zip":"11111","numberOfYears":"1"},{"status":"previous","address":"456 Main Street","city":"Portland","state":"OR","zip":"22222","numberOfYears":"5"}]}, 
            {"id":"2","first_name":"John","last_name":"Doe","dob":"1968-01-22","addresses":[{"status":"current","address":"123 First Avenue","city":"Seattle","state":"WA","zip":"11111","numberOfYears":"1"},{"status":"previous","address":"456 Main Street","city":"Portland","state":"OR","zip":"22222","numberOfYears":"5"}]} 
        ] 
    """
    body = json.loads(blob)
    print(pandas.__version__)
    table_id = client.dataset(dataset_id).table(table_name)
    job_config = bigquery.LoadJobConfig()
    schema = create_schema_from_yaml(table_schema)
    job_config.schema = schema
    df = pandas.DataFrame(
        body,
        # 読み込まれたテーブルでは、列の順序はDataFrameの列の順序と一致します。
        columns=["id", "first_name","last_name","dob","addresses"],
    )
    df['addresses'] = df.addresses.astype(str)
    df = df[['id','first_name','last_name','dob','addresses']]
    print(df)
    load_job = client.load_table_from_dataframe(
        df,
        table_id,
        job_config=job_config,
    )
    load_job.result()
    print("Job finished.")

例えば、Apache AirflowはETLツールそのものではありませんが、タスクの関係を表す依存グラフ(DAGs)を視覚化することで、ETLパイプラインを整理するのに役立ちます。一般的なAirflowアーキテクチャには、メタデータを基にしたスケジューラ、エグゼキュータ、ワーカー、タスクが含まれます。

例えば、データをクラウドストレージにエクスポート(bq_export_op)した後に、ml_engine_training_opを実行し、このワークフローを毎日または毎週実行することができます。

ML model training using Airflow. Image by author.

以下は、以下の例を考えてみてください。

これは、データをクラウドストレージバケットにエクスポートし、MLEngineTrainingOperatorを使用してMLモデルをトレーニングする単純なデータパイプライングラフを作成します。

"""recommendation_bespokeモデルトレーニングのDAG定義。"""
import airflow
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.hooks.base_hook import BaseHook
from airflow.operators.app_engine_admin_plugin import AppEngineVersionOperator
from airflow.operators.ml_engine_plugin import MLEngineTrainingOperator
import datetime

def _get_project_id():
    """デフォルトのGCP接続からプロジェクトIDを取得します。"""
    extras = BaseHook.get_connection('google_cloud_default').extra_dejson
    key = 'extra__google_cloud_platform__project'
    if key in extras:
        project_id = extras[key]
    else:
        raise ('Airflowコンソールからgoogle_cloud_default接続のproject_idを設定する必要があります')
    return project_id

PROJECT_ID = _get_project_id()

# BigQueryタスクで使用されるデータセットの定数たち。データに合わせてこれらを変更できます。
DATASET = 'staging'  #'analytics'
TABLE_NAME = 'recommendation_bespoke'

# GCSバケット名とリージョン、変更可能です。
BUCKET = 'gs://rec_wals_eu'
REGION = 'us-central1'  #'europe-west2' #'us-east1'
JOB_DIR = BUCKET + '/jobs'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['[email protected]'],
 'email_on_failure': True,
 'email_on_retry': False,
    'retries': 5,
    'retry_delay': datetime.timedelta(minutes=5)
}

# cronjob構文を使用したデフォルトのスケジュール間隔です。ここかAirflowコンソールでカスタマイズできます。
schedule_interval = '00 21 * * *'
dag = DAG('recommendations_training_v6', default_args=default_args,
          schedule_interval=schedule_interval)
dag.doc_md = __doc__

### タスクの定義 ###

# BigQueryトレーニングデータのエクスポートをGCSに行います。
training_file = BUCKET + '/data/recommendations_small.csv'  # ステージングのための数レコードだけのファイル
t1 = BigQueryToCloudStorageOperator(
    task_id='bq_export_op',
    source_project_dataset_table='%s.recommendation_bespoke' % DATASET,
    destination_cloud_storage_uris=[training_file],
    export_format='CSV',
    dag=dag
)

# ML Engineトレーニングジョブ
training_file = BUCKET + '/data/recommendations_small.csv'
job_id = 'recserve_{0}'.format(datetime.datetime.now().strftime('%Y%m%d%H%M'))
job_dir = BUCKET + '/jobs/' + job_id
output_dir = BUCKET
delimiter=','
data_type='user_groups'
master_image_uri='gcr.io/my-project/recommendation_bespoke_container:tf_rec_latest'
training_args = ['--job-dir', job_dir,
                 '--train-file', training_file,
                 '--output-dir', output_dir,
                 '--data-type', data_type]
master_config = {"imageUri": master_image_uri}
t3 = MLEngineTrainingOperator(
    task_id='ml_engine_training_op',
    project_id=PROJECT_ID,
    job_id=job_id,
    training_args=training_args,
    region=REGION,
    scale_tier='CUSTOM',
    master_type='complex_model_m_gpu',
    master_config=master_config,
    dag=dag
)
t3.set_upstream(t1)

Bubbles [11] は Python の世界でのETLのためのオープンソースツールです。高速な開発が可能であり、データパイプラインを記述するためのメタデータの使用方法が気に入っています。Bubblesの作成者はそれを「抽象フレームワーク」と呼び、Pythonだけでなく、多くの他のプログラミング言語でも使用できると述べています。

PyQuery、BeautifulSoupなど、特定のアプリケーションに向けた他の多くのツールがあります。たとえば、webページからデータを抽出する(PyQuery、BeautifulSoupなど)や並列データ処理などです。これについては別の話題になりますが、以前にいくつかについて書いたことがあります、joblib ライブラリ [12]

データインフラストラクチャとしてのコード

コードとしてのインフラストラクチャ (IaC) は、データプラットフォームのリソースを管理するための人気のある、非常に機能的な手法です。データにおいても、現在はかなりの標準となっており、DevOpsの標準に精通していることを潜在的な雇用主に伝えるためには非常に良い方法です。Terraform(プラットフォームに依存しない)やCloudFormationのようなツールを使用すると、開発作業とデプロイメント(オペレーション)を簡単に統合することができます。

一般的には、データパイプラインのためにステージングや本番のデータ環境を持つことを望んでいます。これにより、パイプラインのテストやチーム間の協力が容易になります。

以下の図をご覧ください。データ環境の動作を説明しています。

Data environments. Image by author.

通常、テスト目的やETLサービスがCI/CDワークフローをトリガーした場合にデータ変換ユニットテストを実行するための追加のサンドボックスが必要になることがあります。以前はここについて書いたことがあります。

初心者のためのインフラストラクチャコード

これらのテンプレートを使用してデータパイプラインをプロのように展開してください

levelup.gitconnected.com

AWS CloudFormationテンプレートファイルを使用すると、必要なリソースとその依存関係を記述して、1つのスタックとしてまとめて起動して構成することができます。

あなたがデータの専門家であれば、このアプローチは異なるデータ環境での作業やデータプラットフォームのリソースの複製をより迅速かつ一貫してエラーなく行うのに役立つでしょう。

問題は、多くのデータの専門家がIaCについてよく知らないため、開発プロセス中に多くのエラーが発生することです。

データメッシュと分散データ管理

データスペースはこの10年間で大きく進化し、今ではたくさんのデータツールとフレームワークがあります。データメッシュは、異なるデータドメイン(企業の部門)がそれぞれのチームと共有のデータリソースを持つ状態を定義しています。各チームには個別の目標、KPI、データの役割と責任があります。

長い間、データのビューロクラシーは多くの企業にとって本当の痛みでした。

このデータプラットフォームタイプ [4] は少し混沌として見えるかもしれませんが、それは企業での成功と効率的な選択肢となることを意図しています。分散化により、異なるチームがクロスドメインのデータセットにアクセスし、独自の分析やETLタスクを実行できるようになります。

実際には、データエンジニアリングのヘルプなしでデータレイクデータを読み込みたい場合は、Sparkに詳しくない場合があります。このシナリオでは、データセットに関するメタデータレコードのバンチは非常に役に立ちます。それがData Meshが非常に成功している理由です。

これにより、ユーザーはデータに関する知識、その起源、他のチームが以前は認識していなかったそのデータセットを最大限に活用する方法を理解することができます。

データセットやデータソースの接続が非常に複雑になる場合があり、メタデータとデータセットの説明を持つ単一の真のデータシロやリポジトリを持つことは常に良いプラクティスです。

以前の私の記事の1つでは、SQLがチームとデータのための統一されたクエリ言語としての役割について書きました。実際、それは分析的で自己記述的であり、動的になることさえあるため、すべてのデータユーザーにとって完璧なツールです。

しばしば、すべてが大きな混乱になることがあります

この事実から、DBT、Jinja、DataformなどのSQLベースのテンプレートエンジンが非常に人気です。 ほとんどSQLのようなプラットフォームで、データセットとその変換が詳細に記述されていると想像してください [6]。

Dataformの依存関係グラフとメタデータ。画像は著者によるものです。

データチームがデータソースとスキーマとの関係を理解するのは大変なチャレンジかもしれません。しばしばデータセットの依存関係とETL変換のスパゲッティに絡まっています。データエンジニアリングは、最新のデータ処理技術やベストプラクティスで他の部署にデータリテラシーを向上させ、最先端のデータ処理手法を提供する上で重要な役割を果たしています。

AIを使用したビジネスインテリジェンスパイプラインの民主化

データのアクセシビリティの向上は常にデータ領域で人気のあるトピックであり、データに慣れていなかったチームにもデータパイプラインの設計プロセスがますますアクセス可能になっていることは興味深いです。現在、ほとんどの部門は、Google Big Query、Redshift、Snowflake、DatabricksなどのモダンなDWHソリューションに格納されたデータから洞察を得るために、ビルトインのAI機能を利用できます。

彼らが必要なのは、自分の言葉でBIの要件を記述することだけです。

たとえば、ThoughtspotのようなBIツールは、直感的な「Googleのような検索インターフェース」[7]を使用して、Google Big Query、Redshift、Snowflake、DatabricksなどのどのモダンなDWHソリューションに格納されたデータから洞察を得るためにAIを利用しています。

モダンデータスタックには、データモデリングと可視化を支援するBIツールが含まれています。これらの多くは、ユーザーの行動に基づいてデータ洞察をより速く得るためにビルトインのAI機能をすでに備えています。

私はGPTとBIを統合することはかなり簡単なタスクだと信じています。次の数年間で、この技術を使用した多くの新製品を見ることができます。

GPTはテキストデータを前処理し、意図を理解して質問に答えるSQLクエリを生成することができます。

結論

この記事では、最近のデータエンジニアリングの役割に影響を与える主要なデータトレンドの概要を説明しました。データメッシュとテンプレート化されたSQLおよび依存関係グラフは、データリテラシーを促進し、分析プロセス全体を民主化しています。複雑なETL技術と変換を備えた高度なデータパイプラインは、組織内の誰にとっても透明になりつつあります。データパイプラインは、他のチームにもますますアクセス可能になり、ETLの複雑さを理解するためにプログラミングを知る必要はありません。データメッシュとメタデータは、この問題を解決するのに役立ちます。私の経験から言うと、変換レイヤーに貢献するためにSQLを学ぶ人々が増えていることを確認しています。 “高度なデータ分析”時代に生まれた企業は、クラウドベンダーの製品とその管理サービスに簡単にアクセスする特権を持っています。これは、必要なデータスキルを習得し、競争力を高めるのに役立ちます。

[1] https://medium.com/towards-data-science/data-pipeline-design-patterns-100afa4b93e3

[2] https://towardsdatascience.com/introduction-to-apache-iceberg-tables-a791f1758009

[3] https://towardsdatascience.com/python-for-data-engineers-f3d5db59b6dd

[4] https://medium.com/towards-data-science/data-platform-architecture-types-f255ac6e0b7

[5] https://medium.com/towards-data-science/advanced-sql-techniques-for-beginners-211851a28488

[6] https://medium.com/towards-data-science/easy-way-to-create-live-and-staging-environments-for-your-data-e4f03eb73365

[7] https://docs.thoughtspot.com/cloud/latest/search-sage

[8] https://github.com/spotify/luigi

[9] https://petl.readthedocs.io/en/stable/

[10] https://www.bonobo-project.org

[11] http://bubbles.databrewery.org/

[12] https://medium.com/towards-data-science/how-to-become-a-data-engineer-c0319cb226c2

[8] https://github.com/spotify/luigi

[9] https://petl.readthedocs.io/en/stable/

[10] https://www.bonobo-project.org

[11] http://bubbles.databrewery.org/

[12] https://medium.com/towards-data-science/how-to-become-a-data-engineer-c0319cb226c2

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more