カフカイベントストリーミングAIと自動化

カフカイベントストリーミングAIと自動化の魅力' (The allure of Kafka event streaming AI and automation)

Apache Kafkaは、データの移動において企業アーキテクチャで明確なリーダーとして台頭しています(DBトランザクションからイベントストリーミングへ)。Kafkaの動作原理やこのテクノロジースタックの拡張方法(オンプレミスまたはクラウド)について説明するプレゼンテーションが数多く存在します。このプロジェクトの次のフェーズでは、ChatGPTを使用してメッセージを消費し、メッセージを豊かにし、変換し、永続化するマイクロサービスを構築します。この例では、数秒ごとにJSONの温度読み取りを送信するIoTデバイス(RaspberryPi)からの入力を消費します。

メッセージを消費する

Kafkaの各イベントメッセージが生成されると(およびログに記録されると)、それぞれのメッセージを処理するためにKafkaのマイクロサービス消費者が準備されています。ChatGPTにPythonコードの生成を依頼し、特定の「トピック」からのポーリングと読み取りの基本を提供してもらいました。取得したコードは、トピック、キー、およびJSONペイロードを消費するためのスタートポイントとなります。ChatGPTが作成したコードは、SQLAlchemyを使用してこれをデータベースに永続化するためのものです。次に、JSONペイロードを変換し、ソースの温度がある範囲の外側かどうかに基づいて、検証、計算、および新しいメッセージのセットを生成するために、API Logic Server(GitHub上のオープンソースプロジェクト)のルールを使用したかったです。

注意: ChatGPTはConfluent Kafkaライブラリ(およびそれらのDocker Kafkaコンテナの使用)を選択しました-コードを変更して他のPython Kafkaライブラリを使用することもできます。

SQLAlchemyモデル

API Logic Server (ALS: Pythonのオープンソースプラットフォーム)を使用して、MySQLデータベースに接続します。ALSはテーブルを読み込み、SQLAlchemy ORMモデル、react-adminユーザーインターフェース、safrs-JSON Open API(Swagger)、およびそれぞれのORMエンドポイントの実行中のRESTウェブサービスを作成します。新しいTemperatureテーブルには、タイムスタンプ、IoTデバイスID、および温度読み取りが格納されます。ここでは、ALSコマンドラインユーティリティを使用してORMモデルを作成します:

API Logic Serverが生成したクラスは、Temperatureの値を保持するために使用されます。

変更

SQLデータベースにKafka JSON消費者メッセージを再保存する代わりに(およびルールを実行する代わりに)、JSONペイロードをアンラップ(util.row_to_entity)し、JSONペイロードを保存する代わりにTemperature テーブルに挿入します。温度読み取りごとに宣言的なルールが処理を担当します。

コンシューマーがメッセージを受信すると、セッションに追加され、commit_eventルール(以下)がトリガーされます。

宣言的なロジック:メッセージを生成する

API Logic Server(SQLAlchemy、Flask、およびLogicBankスプレッドシートのようなルールエンジン:式、合計、カウント、コピー、制約、イベントなどを使用した自動化フレームワーク)を使用して、ORMエンティティTemperatureに宣言的なcommit_eventルールを追加します。各メッセージがTemperatureテーブルに永続化されると、commit_eventルールが呼び出されます。温度読み取りがMAX_TEMPを超えるかMIN_TEMPより小さい場合、トピック“TempRangeAlert”にKafkaメッセージを送信します。正規の範囲(32132)内でデータを受け取ることを確認する制約も追加します。アラートメッセージを処理する別のイベントコンシューマーに処理を委ねます。

温度読み取りがMAX_TEMPより大きいかMIN_TEMPより小さい場合にのみアラートメッセージを生成します。ルールは常に順不同であり、仕様が変更されると導入される可能性があります。

TDD Behaveテスト

TDD(テスト駆動開発)を使用して、Behaveテストを使ってTemperatureテーブルにレコードを直接挿入し、その戻り値KafkaMessageSentをチェックすることができます。BehaveはFeature/Scenario.featureファイル)から始まります。各シナリオに対して、Behaveデコレータを使用して対応するPythonクラスを作成します。

機能の定義

TDD Pythonクラス

サマリー

KafkaのメッセージコードをConsumerとProducerの両方に生成するためにChatGPTを使用することは、良いスタート地点のようです。KafkaのためにConfluent Dockerをインストールします。API Logic Serverを使用して宣言的なロジックルールを追加し、SQLデータベースへのトランザクションの通常のフローに数式、制約、イベントを追加し、新しいKafkaメッセージを生成(および変換)することは素晴らしい組み合わせです。ChatGPTと宣言的なロジックは、”ペアプログラミング”の次のレベルです。

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more

データサイエンス

「拡散を通じた適応学習:先進のパラダイム」

イントロダクション 教育と機械学習のダイナミックな風景において、適応学習を通じた拡散はパラダイムシフトを示しています。...

機械学習

「大規模な言語モデルの探索-パート3」

「この記事は主に自己学習のために書かれていますしたがって、広く深く展開されています興味のあるセクションをスキップした...

データサイエンス

リトリーバル オーグメンテッド ジェネレーション(RAG)推論エンジンは、CPU上でLangChainを使用しています

「リトリーバル増強生成(RAG)は広範にカバーされており、特にチャットベースのLLMへの応用については詳しく語られています...

AIニュース

「チャンドラヤーン3の着陸:AIとセンサーがISROの壮大な月探査を支援」

宇宙探査の魅惑的な広がりの中で、すべてのミッションは未知へのサイコロのような賭けです。インドの国立宇宙機関であるイン...

機械学習

AIによる生産性向上 生成AIが様々な産業において効率の新たな時代を開く

2022年11月22日、ほとんど仮想的な瞬間が訪れ、それは地球上のほぼすべての産業の基盤を揺るがしました。 その日、OpenAIは史...

機械学習

「LLaMA-v2-Chat対アルパカ:どのAIモデルを使用するべきですか?」

この記事は以下の質問に答えます:LLaMA-v2-Chat vs アルパカ、どちらを使うべきですか?両方のAIモデルの利点と欠点は何です...