SAPシステムとのデータ統合のマスタリングと迅速なエンジニアリング

SAPシステムとのデータ統合 マスタリングと迅速なエンジニアリング

建設エンジニアが作業を調査している — 安定した拡散

イントロダクション

前回の記事で、データ準備タスクを解決するためにChatGPTをどのように活用するかを示しました。良いフィードバックをいただいた一方で、一つの重要なポイントが上がってきました。Prompt Engineeringは単純なタスクには役立つかもしれませんが、より困難な環境では本当に役立つのでしょうか?これは公平な指摘です。近年、データアーキテクチャはますます多様で複雑になっています。この複雑さの結果、データエンジニアは自身が必ずしも精通していない様々なデータソースを統合する必要があります。Prompt Engineeringはこのような状況でも役立つのでしょうか?

本記事では、人事管理の実際のユースケースに基づいてこの問題を検証します。我々は、SAP HCMデータモデルをChatGPTに導入し、収集した情報をApache Sparkで分析するためにフューショットラーニングを適用します。これにより、Prompt Engineeringが高度なデータエンジニアリングの設定でも価値を提供する方法を説明します。

ビジネスケースについて

VoAGIから大企業まで、ある時点における従業員の数とその組織的な割り当てを特定するという一般的なタスクは、達成する必要があります。我々のシナリオで関連するデータは、エンタープライズ環境における人材管理の主要なアプリケーションであるSAP HCMシステムに格納されています。

このような目的を解決するためには、どのデータエンジニアも基盤となるデータモデルに密接に関連したビジネスに関する知識を構築する必要があります。

本記事では、データモデルを構築し、それに基づくレポートソリューションの基礎を作成するために使用できるPySparkコードを作成する手順を提供します。

人員数を示すPowerBiの例のレポート

ステップ1:必要な情報を特定する

データサイエンスにおいて最も大きな課題の一つは、ビジネスユースケースに応じて必要な情報を選択し、それがソースシステムでどこから来るのかを特定することです。このために、ChatGPTにいくつかのビジネス知識を持ち込む必要があります。このために、ChatGPTにはSAP HCMの基本テーブルに関する情報を教え込みます。これらの情報はSAPリファレンスマニュアルで見つけることができます:人事 | SAPヘルプポータル。「csv-Sampleレコード」も含めて、テーブルごとに共に組み合わせることで、ChatGPTに関連情報を教授します。

最初のシナリオでは、特定の時点でのすべての稼働中の従業員を報告することを目的とします。結果には、従業員の個人番号、名前、ステータス、および組織的な割り当ても含まれるべきです。

必要な情報を収集するには、ChatGPTにデータベースのスキーマ、例データセット、フィールドの説明などを含めて、フューショットプロンプティングを使用してデータベースのスキーマを伝搬します。まずは、ChatGPTに基本テーブル情報を提供することから始めましょう。

Prompt:次の表を使用してください。1. 表 PA0000 - 従業員アクションフィールド;キー;データ要素;ドメイン;データ型;長さ;小数点;短い説明;チェックテーブルMANDT;;MANDT;MANDT;CLNT;3;0;クライアント;T000PERNR;;PERSNO;PERSNO;NUMC;8 ;0;個人番号;PA0003SUBTY;;SUBTY;SUBTY;CHAR;4;0;サブタイプ;OBJPS;;OBJPS;OBJPS;CHAR;2;0;オブジェクト識別...次のサンプルデータセットを含みます:100;99992228;;;;9999-12-31T00:00:00.0000000;2023-07-01T00:00:00.0000000;0;2023-01-10T00:00:00.0000000;TRP0001;;;;;;;;;;;;;;10;3;;0;0100;99992229;;;;9999-12-31T00:00:00.0000000;2023-07-01T00:00:00.0000000;0;2023-03-29T00:00:00.0000000;TRP0001;;;;;;;;;;;;;;10;6;;0;02. テーブル T529U - ステータス値フィールド;キー;データ要素;ドメイン;データ型;長さ;小数点;短い説明;チェックテーブルMANDT;;MANDT;MANDT;CLNT;3;0;クライアント;*SPRSL;;SPRAS;SPRAS;LANG;1;0;言語キー;T002STATN;;STATN;STATN;NUMC;1;0;ステータス番号;...次のサンプルデータセットを含みます:MANDT,SPRSL,STATN,STATV,TEXT1100,D,2,3,アクティブ100,D,2,0,辞職....

ステップ2:必要な基本テーブルを結合し、アクティブな従業員のみをフィルタリングします

まず、基本テーブルを結合するコードを作成しましょう:

Prompt: "Employee Actions"を制限して、対応する「Status Values」に基づいてアクティブとして示されている従業員のみを含めるためのpysparkコードを教えてください。テーブルPA0000のソースとして「structureVoAGI.PA0000」を使用し、「structureVoAGI.T529U」をテーブルT529Uのソースとして使用してください。意味のある列別名を使用してください。

これにより、かなり優れた形式で整形されたPySparkコードが生成されます:

from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col# Sparkセッションの初期化spark = SparkSession.builder.appName("EmployeeActionsFilter").getOrCreate()# テーブルをDataFramesに読み込むdf_pa0000 = spark.read.table("structureVoAGI.PA0000")df_t529u = spark.read.table("structureVoAGI.T529U")# 関連するステータスの説明と言語に基づいてT529Uをフィルタリングするdf_active_status = df_t529u.filter(    (col("TEXT1") == "Active") &    (col("SPRSL") == "E"))# ステータスフィールドSTAT2に基づいてPA0000をフィルタリングし、フィルタリングされたT529Uと結合するdf_active_employees = df_pa0000.join(    df_active_status,    (df_pa0000.STAT2 == df_active_status.STATV) &    (df_active_status.STATN == 2)  # 与えられたデータに基づいて、STATN=2は雇用状態を示すと仮定しています。).select(    df_pa0000["PERNR"].alias("従業員番号"),    df_pa0000["MASSN"].alias("行動タイプ"),    df_pa0000["MASSG"].alias("行動理由"),    df_active_status["TEXT1"].alias("ステータスの説明"))# 結果を表示df_active_employees.show()# 必要に応じて、このDataFrameを出力先に書き込むか、さらなる処理を行うこともできます。

GPT-4は両テーブルの結合条件を認識しました。これはテーブルPA0000の列STAT2とテーブルT529Uの列STATVに基づいています。さらに、作成されたコードにはビジネスの説明が列の別名として含まれているため、可読性が向上しています。

ステップ3:企業の従業員履歴を反映するタイムラインを作成する

次に、2020年1月1日から2024年1月1日までの期間の日付値を含むDataFrameを作成し、従業員を参加日(BEGDA)および可能な退社日(ENDDA)に基づいてすべて結合する必要があります。従業員を結合するために、人工的なタイムラインを作成する必要があります。

PromptIn Pyspark: 2020-01-01から2024-01-01までの各日付ごとに1行を含むデータフレームを作成してください。日付列は日付としてフォーマットされる必要があります。その後、開始日と終了日に基づいてすべての有効な従業員を結合してください。出力には、雇用状態に基づいてアクティブな従業員のみを含める必要があります。

from pyspark.sql import SparkSessionfrom pyspark.sql.functions import explode, sequence, to_date, lit, col# Sparkセッションを開始するspark = SparkSession.builder.appName("EmployeeTimeline").getOrCreate()# 開始日と終了日を定義するstart_date = to_date(lit("2020-01-01"))end_date = to_date(lit("2024-01-01"))# 各日付ごとに1行を含むDataFrameを作成するdate_df = (spark.range(1)   .select(explode(sequence(start_date, end_date)).alias("date")))# PA0000およびT529Uテーブルをロードするpa0000_df = spark.read.table("structureVoAGI.PA0000")t529u_df = spark.read.table("structureVoAGI.T529U")# 言語キー=Eで「Active」の従業員のためにT529Uをフィルタリングするactive_t529u_df = t529u_df.filter((col("SPRSL") == "E") & (col("TEXT1") == "Active"))# フィルタリングされたT529Uと結合してアクティブな従業員を取得するためにPA0000と結合するactive_pa0000_df = pa0000_df.join(active_t529u_df, pa0000_df.STAT2 == active_t529u_df.STATV)# 日付範囲DataFrameと結合してアクティブな従業員のタイムラインを取得するactive_employee_timeline_df = date_df.join(active_pa0000_df, (date_df.date >= active_pa0000_df.BEGDA) & (date_df.date <= active_pa0000_df.ENDDA))# DataFrameを表示するactive_employee_timeline_df.show()

ステップ4:最高レベルの組織オブジェクトを特定して親子関係テーブルを解消する

この最後のステップでは、会社の組織構造を表すDataFrameを構築し、各オブジェクトの組織への所属を特定したいと思います。具体的には、各子オブジェクトが割り当てられている最高レベルの組織単位(例:エリアまたは部門)を特定したいと思います。

組織構造は、従業員のタイムラインに結合して、後のステップで各従業員の組織への所属に関する詳細情報を取得するために使用できます。これを実現するために、SAPのHRP1001テーブルを利用する必要があります:

ヒント:以下のテーブルには、各従業員の組織への所属が含まれています:テーブルHRP1001-組織構造のフィールド;データ要素;ドメイン;データ型;長さ;小数点;短い説明;チェックテーブルMANDT;MANDT;MANDT;CLNT;3;0;クライアント;T000OTYPE;OTYPE;OTYPE;CHAR;2;0;オブジェクトタイプ;T778OOBJID;HROBJID;HROBJID;NUMC;8;0;オブジェクトID PLOGI...次のサンプルデータセットを含んでいるMANDT,OTYPE,OBJID,PLVAR,RSIGN,RELAT,ISTAT,PRIOX,BEGDA,ENDDA,VARYF,SEQNR,INFTY,OTJID,SUBTY,AEDTM,UNAME,REASN,HISTO,ITXNR,SCLAS,SOBID,PROZT,ADATANR100,P,00998833,A,209,1,,2018-01-01T00:00:00.0000000,9999-12-31T00:00:00.0000000,CP50035300,000,1001,P 00100769,A209,2020-12-11T00:00:00.0000000,TRP1000,,,00000000,CP,60095300,0.00,

ヒント:HRP1001テーブルには、会社の組織構造に関する親子関係情報が含まれています。列OBJIDにはオブジェクトIDが含まれ、SOBIDには関連する親オブジェクトIDが含まれています。列SCLASには、オブジェクトIDがポジション('S')、組織単位('O')、または役割('K')であるかどうかの情報が含まれています。Pysparkで:階層を再帰的に解消して、各オブジェクトの最も高い利用可能な組織単位のオブジェクトIDを含むデータフレームを作成するコードを教えてください。意味のある列エイリアスを使用してください。

言語モデルは、各オブジェクトの最も高いレベルの組織単位(「O」)を探索する再帰関数を生成します:

from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col# Sparkセッションを初期化spark = SparkSession.builder.appName("HighestAvailableOrgUnit").getOrCreate()HRP1001_df = spark.read.table("structureVoAGI.HRP1001")# 最も高い利用可能な組織単位を見つけるための再帰関数def find_highest_org_unit(df, obj_id):    # 現在のオブジェクトID用に行をフィルタリング    object_df = df.filter(col("OBJID") == obj_id)        if object_df.count() == 0:        return None        # 現在のオブジェクトの親オブジェクトID(SOBID)とSCLASを取得    parent_obj_id = object_df.select("SOBID").first()[0]    sclas = object_df.select("SCLAS").first()[0]        # 再帰的に最も高い利用可能な組織単位を見つける    highest_org_unit = find_highest_org_unit(df, parent_obj_id)        # 現在のオブジェクトが組織単位('O')であるか、highest_org_unitがNoneである場合、結果を更新    if sclas == 'O' or highest_org_unit is None:        highest_org_unit = obj_id        return highest_org_unit# 再帰関数を適用するためのUDF(ユーザー定義関数)を作成from pyspark.sql.functions import udffrom pyspark.sql.types import StringTypefind_highest_org_unit_udf = udf(find_highest_org_unit, StringType())# 各オブジェクトの最も高い利用可能な組織単位を見つけるためにUDFを適用highest_org_unit_df = df.withColumn("HighestAvailableOrgUnit", find_highest_org_unit_udf(df, col("OBJID")))# 意味のある列とエイリアスを選択result_df = highest_org_unit_df.select(    col("OBJID").alias("オブジェクトID"),    col("HighestAvailableOrgUnit").alias("最も高い利用可能な組織単位"))# 結果のデータフレームを表示result_df.show()# Sparkセッションを停止spark.stop()

作成されたコードは最適化されていないかもしれませんが、階層を解消する再帰関数を含んでいます。共通テーブル式(CTE)を好むユーザーは、より読みやすく理解しやすいPySparkステートメントを作成するために入力プロンプトでヒント(共通テーブル式を使用)を与えるべきです。

< pre >< code >from pyspark.sql import SparkSession
# スパークセッションの作成
spark = SparkSession.builder.appName(“OrganizationalUnitHierarchy”).getOrCreate()

# HRP1001 DataFrameを定義する(既にロードされていると仮定)
# ‘hrp1001_df’を実際のDataFrameに置き換えてください
# 適切な列(’OBJID’、’SOBID’、’SCLAS’)があることを確認してください
# HRP1001 DataFrameを一時的なSQLテーブルとして登録する
hrp1001_df.createOrReplaceTempView(“hrp1001”)

# 階層を解消して最も高い組織単位を見つける再帰的なクエリを定義する
recursive_query = “””WITH RECURSIVE OrgHierarchy AS (
SELECT OBJID, SOBID, SCLAS
FROM hrp1001
WHERE SCLAS = ‘O’ — 組織単位で始める
UNION ALL
SELECT h.OBJID, p.SOBID, p.SCLAS
FROM OrgHierarchy h
JOIN hrp1001 p ON h.SOBID = p.OBJID
)
SELECT DISTINCT
h.OBJID AS EmployeeID,
h.SOBID AS HighestOrganizationalUnit
FROM OrgHierarchy h”””

# 再帰的なクエリを実行して結果のDataFrameを作成する
result_df = spark.sql(recursive_query)

# 結果を表示する
result_df.show()

# 終了時にSparkセッションを停止する
spark.stop()
< /code >< /pre >

< p > どちらのコードバージョンも、フラット化された階層的な組織構造のデータフレームを作成し、前に生成されたDataFrameに簡単に結合してさらなるデータ統合手順に使用することができます。< /p >

< figure >< img alt = “階層情報を含むデータセット” src = “https://ai.miximages.com/miro.medium.com/v2/resize:fit:640/format:webp/1*IM5ipLIqjwaV_Skr5LcTWA.png” / >< figcaption aria-hidden = “true” > データセットには階層情報が含まれています。 < /figcaption >< /figure >

< h2 id = “conclusion” > 結論 < /h2 >

< p > 私たちは、ChatGPTを使用してSAPシステムからデータを抽出し、統合するためのより高度なデータエンジニアリングのタスクに取り組む方法を示しました。大規模言語モデルはまだ完璧ではありませんが、データエンジニアリングのためにこれらの技術がどれだけ強力になる可能性があるかは、誰もがすでに想像できるでしょう。いくつかの重要なポイントは次のとおりです:< /p >

< ul >
< li > ChatGPTはデータモデルの基本的な原則を理解することができます。詳細な知識を提供するためのプロンプティング技術を利用して理解を深めることができます。< /li >
< li > 最初の試行で完璧なコードを生成することはないかもしれませんが、作成されたコードを個々のシナリオに合わせて簡単に調整することができます。< /li >
< li > オープンリファレンスドキュメントやSAPナレッジベースの広範な利用可能性により、このアプローチはリトリーバル補完生成(RAG)ソリューションに拡張することができます。< /li >
< /ul >

< p > プロンプトエンジニアリングのベストプラクティスに関しては、可能な限り正確になるように心がけ、Spark環境から返されるエラーコードを提供して生成されたコードのリファクタリングにLLMの機能を活用してください。コードを洗練させるために複数の試行が必要な場合もありますが、「precise」などのキーワードをプロンプトに追加することで、ChatGPTがより良い結果を出力することができるかもしれません。解決策アプローチの詳細な説明を求めることで、ChatGPTのトランスフォーマーモデルがより深く探求することを強制することができます。< /p >

< p > 注意:この記事の長さの制約のため、csvの例データセットを含むプロンプトは省略されています。< /p >

< h2 id = “about-the-authors” > 著者について < /h2 >

< p > Markus Stadiは、Dehn SEのシニアクラウドデータエンジニアであり、多年にわたりデータエンジニアリング、データサイエンス、データ分析の分野で活動しています。< /p >

< p > Christian Kochは、BWI GmbHのエンタープライズアーキテクトであり、Nuremberg Institute of Technology Georg Simon Ohmの講師です。< /p >

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more