カフカイベントストリーミングAIと自動化

カフカイベントストリーミングAIと自動化の魅力' (The allure of Kafka event streaming AI and automation)

Apache Kafkaは、データの移動において企業アーキテクチャで明確なリーダーとして台頭しています(DBトランザクションからイベントストリーミングへ)。Kafkaの動作原理やこのテクノロジースタックの拡張方法(オンプレミスまたはクラウド)について説明するプレゼンテーションが数多く存在します。このプロジェクトの次のフェーズでは、ChatGPTを使用してメッセージを消費し、メッセージを豊かにし、変換し、永続化するマイクロサービスを構築します。この例では、数秒ごとにJSONの温度読み取りを送信するIoTデバイス(RaspberryPi)からの入力を消費します。

メッセージを消費する

Kafkaの各イベントメッセージが生成されると(およびログに記録されると)、それぞれのメッセージを処理するためにKafkaのマイクロサービス消費者が準備されています。ChatGPTにPythonコードの生成を依頼し、特定の「トピック」からのポーリングと読み取りの基本を提供してもらいました。取得したコードは、トピック、キー、およびJSONペイロードを消費するためのスタートポイントとなります。ChatGPTが作成したコードは、SQLAlchemyを使用してこれをデータベースに永続化するためのものです。次に、JSONペイロードを変換し、ソースの温度がある範囲の外側かどうかに基づいて、検証、計算、および新しいメッセージのセットを生成するために、API Logic Server(GitHub上のオープンソースプロジェクト)のルールを使用したかったです。

注意: ChatGPTはConfluent Kafkaライブラリ(およびそれらのDocker Kafkaコンテナの使用)を選択しました-コードを変更して他のPython Kafkaライブラリを使用することもできます。

SQLAlchemyモデル

API Logic Server (ALS: Pythonのオープンソースプラットフォーム)を使用して、MySQLデータベースに接続します。ALSはテーブルを読み込み、SQLAlchemy ORMモデル、react-adminユーザーインターフェース、safrs-JSON Open API(Swagger)、およびそれぞれのORMエンドポイントの実行中のRESTウェブサービスを作成します。新しいTemperatureテーブルには、タイムスタンプ、IoTデバイスID、および温度読み取りが格納されます。ここでは、ALSコマンドラインユーティリティを使用してORMモデルを作成します:

API Logic Serverが生成したクラスは、Temperatureの値を保持するために使用されます。

変更

SQLデータベースにKafka JSON消費者メッセージを再保存する代わりに(およびルールを実行する代わりに)、JSONペイロードをアンラップ(util.row_to_entity)し、JSONペイロードを保存する代わりにTemperature テーブルに挿入します。温度読み取りごとに宣言的なルールが処理を担当します。

コンシューマーがメッセージを受信すると、セッションに追加され、commit_eventルール(以下)がトリガーされます。

宣言的なロジック:メッセージを生成する

API Logic Server(SQLAlchemy、Flask、およびLogicBankスプレッドシートのようなルールエンジン:式、合計、カウント、コピー、制約、イベントなどを使用した自動化フレームワーク)を使用して、ORMエンティティTemperatureに宣言的なcommit_eventルールを追加します。各メッセージがTemperatureテーブルに永続化されると、commit_eventルールが呼び出されます。温度読み取りがMAX_TEMPを超えるかMIN_TEMPより小さい場合、トピック“TempRangeAlert”にKafkaメッセージを送信します。正規の範囲(32132)内でデータを受け取ることを確認する制約も追加します。アラートメッセージを処理する別のイベントコンシューマーに処理を委ねます。

温度読み取りがMAX_TEMPより大きいかMIN_TEMPより小さい場合にのみアラートメッセージを生成します。ルールは常に順不同であり、仕様が変更されると導入される可能性があります。

TDD Behaveテスト

TDD(テスト駆動開発)を使用して、Behaveテストを使ってTemperatureテーブルにレコードを直接挿入し、その戻り値KafkaMessageSentをチェックすることができます。BehaveはFeature/Scenario.featureファイル)から始まります。各シナリオに対して、Behaveデコレータを使用して対応するPythonクラスを作成します。

機能の定義

TDD Pythonクラス

サマリー

KafkaのメッセージコードをConsumerとProducerの両方に生成するためにChatGPTを使用することは、良いスタート地点のようです。KafkaのためにConfluent Dockerをインストールします。API Logic Serverを使用して宣言的なロジックルールを追加し、SQLデータベースへのトランザクションの通常のフローに数式、制約、イベントを追加し、新しいKafkaメッセージを生成(および変換)することは素晴らしい組み合わせです。ChatGPTと宣言的なロジックは、”ペアプログラミング”の次のレベルです。

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more

データサイエンス

クラウドセキュリティの未来:トレンドと予測

この記事では、AIによる脅威検出、ゼロトラストアーキテクチャ、進化するサイバー脅威を含む、クラウドセキュリティのトレン...

データサイエンス

なぜAIチップの将来がニューロモーフィックコンピューティングにおいて重要なのか?

神経形態計算はAIとIoTを変革する可能性がありますより正確で多様性に富み、信頼性の高いアクセスしやすいAIの波を引き起こす...

人工知能

エッセンシャルコンプレクシティは、開発者のユニークセリングポイントです

AIは、私たちが本質的な複雑さを理解するのを助けることができます私たちがそれをオートパイロットで最も偶発的な複雑さを処...

機械学習

「リトリーバル増強生成によるジェネラティブAIの最適化:アーキテクチャ、アルゴリズム、およびアプリケーションの概要」

この記事はAIの専門家を対象にし、AIのアーキテクチャー、トレーニング、そして応用に焦点を当てて検討します

人工知能

会話の魔法を解き放つ:ChatGPTをReact.jsとNode.jsと統合する

この包括的なガイドでは、ChatGPTのフロントエンドにはReact.js、バックエンドにはNode.jsを組み合わせた強力なデュオの統合...

データサイエンス

エグゼクティブアーキテクトのFinOpsへのアプローチ:AIと自動化がデータ管理を効率化する方法

フィンオプスは進化するクラウド金融管理の学問と文化的実践であり、組織が最大のビジネス価値を得ることを可能にします