カフカイベントストリーミングAIと自動化

カフカイベントストリーミングAIと自動化の魅力' (The allure of Kafka event streaming AI and automation)

Apache Kafkaは、データの移動において企業アーキテクチャで明確なリーダーとして台頭しています(DBトランザクションからイベントストリーミングへ)。Kafkaの動作原理やこのテクノロジースタックの拡張方法(オンプレミスまたはクラウド)について説明するプレゼンテーションが数多く存在します。このプロジェクトの次のフェーズでは、ChatGPTを使用してメッセージを消費し、メッセージを豊かにし、変換し、永続化するマイクロサービスを構築します。この例では、数秒ごとにJSONの温度読み取りを送信するIoTデバイス(RaspberryPi)からの入力を消費します。

メッセージを消費する

Kafkaの各イベントメッセージが生成されると(およびログに記録されると)、それぞれのメッセージを処理するためにKafkaのマイクロサービス消費者が準備されています。ChatGPTにPythonコードの生成を依頼し、特定の「トピック」からのポーリングと読み取りの基本を提供してもらいました。取得したコードは、トピック、キー、およびJSONペイロードを消費するためのスタートポイントとなります。ChatGPTが作成したコードは、SQLAlchemyを使用してこれをデータベースに永続化するためのものです。次に、JSONペイロードを変換し、ソースの温度がある範囲の外側かどうかに基づいて、検証、計算、および新しいメッセージのセットを生成するために、API Logic Server(GitHub上のオープンソースプロジェクト)のルールを使用したかったです。

注意: ChatGPTはConfluent Kafkaライブラリ(およびそれらのDocker Kafkaコンテナの使用)を選択しました-コードを変更して他のPython Kafkaライブラリを使用することもできます。

SQLAlchemyモデル

API Logic Server (ALS: Pythonのオープンソースプラットフォーム)を使用して、MySQLデータベースに接続します。ALSはテーブルを読み込み、SQLAlchemy ORMモデル、react-adminユーザーインターフェース、safrs-JSON Open API(Swagger)、およびそれぞれのORMエンドポイントの実行中のRESTウェブサービスを作成します。新しいTemperatureテーブルには、タイムスタンプ、IoTデバイスID、および温度読み取りが格納されます。ここでは、ALSコマンドラインユーティリティを使用してORMモデルを作成します:

API Logic Serverが生成したクラスは、Temperatureの値を保持するために使用されます。

変更

SQLデータベースにKafka JSON消費者メッセージを再保存する代わりに(およびルールを実行する代わりに)、JSONペイロードをアンラップ(util.row_to_entity)し、JSONペイロードを保存する代わりにTemperature テーブルに挿入します。温度読み取りごとに宣言的なルールが処理を担当します。

コンシューマーがメッセージを受信すると、セッションに追加され、commit_eventルール(以下)がトリガーされます。

宣言的なロジック:メッセージを生成する

API Logic Server(SQLAlchemy、Flask、およびLogicBankスプレッドシートのようなルールエンジン:式、合計、カウント、コピー、制約、イベントなどを使用した自動化フレームワーク)を使用して、ORMエンティティTemperatureに宣言的なcommit_eventルールを追加します。各メッセージがTemperatureテーブルに永続化されると、commit_eventルールが呼び出されます。温度読み取りがMAX_TEMPを超えるかMIN_TEMPより小さい場合、トピック“TempRangeAlert”にKafkaメッセージを送信します。正規の範囲(32132)内でデータを受け取ることを確認する制約も追加します。アラートメッセージを処理する別のイベントコンシューマーに処理を委ねます。

温度読み取りがMAX_TEMPより大きいかMIN_TEMPより小さい場合にのみアラートメッセージを生成します。ルールは常に順不同であり、仕様が変更されると導入される可能性があります。

TDD Behaveテスト

TDD(テスト駆動開発)を使用して、Behaveテストを使ってTemperatureテーブルにレコードを直接挿入し、その戻り値KafkaMessageSentをチェックすることができます。BehaveはFeature/Scenario.featureファイル)から始まります。各シナリオに対して、Behaveデコレータを使用して対応するPythonクラスを作成します。

機能の定義

TDD Pythonクラス

サマリー

KafkaのメッセージコードをConsumerとProducerの両方に生成するためにChatGPTを使用することは、良いスタート地点のようです。KafkaのためにConfluent Dockerをインストールします。API Logic Serverを使用して宣言的なロジックルールを追加し、SQLデータベースへのトランザクションの通常のフローに数式、制約、イベントを追加し、新しいKafkaメッセージを生成(および変換)することは素晴らしい組み合わせです。ChatGPTと宣言的なロジックは、”ペアプログラミング”の次のレベルです。

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more

データサイエンス

「モデルの解釈性のためのPFIに深く入り込む」

「モデルの評価方法を知っていることは、データサイエンティストとしての仕事において不可欠ですステークホルダーに完全に理...

機械学習

フィールドからフォークへ:スタートアップが食品業界にAIのスモーガスボードを提供

それは魔法のように機能しました。データセンターで実行されているコンピュータービジョンアルゴリズムが、インドの遠い小麦...

機械学習

中国における大量生産自動運転の課題

自律走行は、世界でも最も困難な運転の一つが既に存在する中国では、特に難しい課題です主に3つの要因が関係しています:動的...

人工知能

5つの最高のChatGPT SEOプラグイン

SEOの専門家たちは、ChatGPTプラグインがGoogleのランキングを上げるのを助けるすばらしいツールであることに気づき始めています

機械学習

「EコマースにおけるLLMSを使用したカスタマイズされたマーケティングコピーライティング」

紹介 技術革新と急速なデジタル化によって定義される時代において、Eコマースは現代のビジネスの基盤となっています。グロー...

機械学習

「ビジュアルAIがカナダ最大かつ最も賑やかな空港で飛躍する」

カナダのオンタリオ州にあるトロントピアソン国際空港は、年間約5000万人の旅客にサービスを提供する国内最大かつ最も混雑し...