カフカイベントストリーミングAIと自動化
カフカイベントストリーミングAIと自動化の魅力' (The allure of Kafka event streaming AI and automation)
Apache Kafkaは、データの移動において企業アーキテクチャで明確なリーダーとして台頭しています(DBトランザクションからイベントストリーミングへ)。Kafkaの動作原理やこのテクノロジースタックの拡張方法(オンプレミスまたはクラウド)について説明するプレゼンテーションが数多く存在します。このプロジェクトの次のフェーズでは、ChatGPTを使用してメッセージを消費し、メッセージを豊かにし、変換し、永続化するマイクロサービスを構築します。この例では、数秒ごとにJSONの温度読み取りを送信するIoTデバイス(RaspberryPi)からの入力を消費します。
メッセージを消費する
Kafkaの各イベントメッセージが生成されると(およびログに記録されると)、それぞれのメッセージを処理するためにKafkaのマイクロサービス消費者が準備されています。ChatGPTにPythonコードの生成を依頼し、特定の「トピック」からのポーリングと読み取りの基本を提供してもらいました。取得したコードは、トピック、キー、およびJSONペイロードを消費するためのスタートポイントとなります。ChatGPTが作成したコードは、SQLAlchemyを使用してこれをデータベースに永続化するためのものです。次に、JSONペイロードを変換し、ソースの温度がある範囲の外側かどうかに基づいて、検証、計算、および新しいメッセージのセットを生成するために、API Logic Server(GitHub上のオープンソースプロジェクト)のルールを使用したかったです。
注意: ChatGPTはConfluent Kafkaライブラリ(およびそれらのDocker Kafkaコンテナの使用)を選択しました-コードを変更して他のPython Kafkaライブラリを使用することもできます。
SQLAlchemyモデル
API Logic Server (ALS: Pythonのオープンソースプラットフォーム)を使用して、MySQLデータベースに接続します。ALSはテーブルを読み込み、SQLAlchemy ORMモデル、react-adminユーザーインターフェース、safrs-JSON Open API(Swagger)、およびそれぞれのORMエンドポイントの実行中のRESTウェブサービスを作成します。新しいTemperatureテーブルには、タイムスタンプ、IoTデバイスID、および温度読み取りが格納されます。ここでは、ALSコマンドラインユーティリティを使用してORMモデルを作成します:
- LLama Indexを使用してRAGパイプラインを構築する
- 「私たちのLLMモデルを強化するための素晴らしいプロンプトエンジニアリング技術」
- 「MozillaがFirefoxに偽レビューチェッカーAIツールを導入」
API Logic Serverが生成したクラスは、Temperature
の値を保持するために使用されます。
変更
SQLデータベースにKafka JSON消費者メッセージを再保存する代わりに(およびルールを実行する代わりに)、JSONペイロードをアンラップ(util.row_to_entity
)し、JSONペイロードを保存する代わりにTemperature テーブルに挿入します。温度読み取りごとに宣言的なルールが処理を担当します。
コンシューマーがメッセージを受信すると、セッションに追加され、commit_event
ルール(以下)がトリガーされます。
宣言的なロジック:メッセージを生成する
API Logic Server(SQLAlchemy、Flask、およびLogicBankスプレッドシートのようなルールエンジン:式、合計、カウント、コピー、制約、イベントなどを使用した自動化フレームワーク)を使用して、ORMエンティティTemperatureに宣言的なcommit_event
ルールを追加します。各メッセージがTemperatureテーブルに永続化されると、commit_event
ルールが呼び出されます。温度読み取りがMAX_TEMP
を超えるかMIN_TEMP
より小さい場合、トピック“TempRangeAlert”
にKafkaメッセージを送信します。正規の範囲(32
–132
)内でデータを受け取ることを確認する制約も追加します。アラートメッセージを処理する別のイベントコンシューマーに処理を委ねます。
温度読み取りがMAX_TEMP
より大きいかMIN_TEMP
より小さい場合にのみアラートメッセージを生成します。ルールは常に順不同であり、仕様が変更されると導入される可能性があります。
TDD Behaveテスト
TDD(テスト駆動開発)を使用して、Behaveテストを使ってTemperatureテーブルにレコードを直接挿入し、その戻り値KafkaMessageSent
をチェックすることができます。BehaveはFeature
/Scenario
(.featureファイル)から始まります。各シナリオに対して、Behave
デコレータを使用して対応するPythonクラスを作成します。
機能の定義
TDD Pythonクラス
サマリー
KafkaのメッセージコードをConsumerとProducerの両方に生成するためにChatGPTを使用することは、良いスタート地点のようです。KafkaのためにConfluent Dockerをインストールします。API Logic Serverを使用して宣言的なロジックルールを追加し、SQLデータベースへのトランザクションの通常のフローに数式、制約、イベントを追加し、新しいKafkaメッセージを生成(および変換)することは素晴らしい組み合わせです。ChatGPTと宣言的なロジックは、”ペアプログラミング”の次のレベルです。
We will continue to update VoAGI; if you have any questions or suggestions, please contact us!
Was this article helpful?
93 out of 132 found this helpful
Related articles
- K-平均クラスタリングのためのワンストップショップ
- 9/10から15/10までの週のトップ重要なコンピュータビジョン論文
- ユニバーサルシミュレータ(UniSim)をご紹介します:生成モデリングを通じたリアルワールドの対話をインタラクティブにシミュレートするシミュレータ
- ChatGPT vs. BARD’の比較
- 「切り分けて学ぶ」による機械学習におけるオブジェクトの状態合成の認識と生成
- ウェアラブルテックを革命:エッジインパルスの超効率的な心拍数アルゴリズムと拡大するヘルスケアスイート
- このAI論文は、言語エージェントのための自然言語とコードの調和を目指して、LemurとLemur Chatを紹介しています