カフカイベントストリーミングAIと自動化

カフカイベントストリーミングAIと自動化の魅力' (The allure of Kafka event streaming AI and automation)

Apache Kafkaは、データの移動において企業アーキテクチャで明確なリーダーとして台頭しています(DBトランザクションからイベントストリーミングへ)。Kafkaの動作原理やこのテクノロジースタックの拡張方法(オンプレミスまたはクラウド)について説明するプレゼンテーションが数多く存在します。このプロジェクトの次のフェーズでは、ChatGPTを使用してメッセージを消費し、メッセージを豊かにし、変換し、永続化するマイクロサービスを構築します。この例では、数秒ごとにJSONの温度読み取りを送信するIoTデバイス(RaspberryPi)からの入力を消費します。

メッセージを消費する

Kafkaの各イベントメッセージが生成されると(およびログに記録されると)、それぞれのメッセージを処理するためにKafkaのマイクロサービス消費者が準備されています。ChatGPTにPythonコードの生成を依頼し、特定の「トピック」からのポーリングと読み取りの基本を提供してもらいました。取得したコードは、トピック、キー、およびJSONペイロードを消費するためのスタートポイントとなります。ChatGPTが作成したコードは、SQLAlchemyを使用してこれをデータベースに永続化するためのものです。次に、JSONペイロードを変換し、ソースの温度がある範囲の外側かどうかに基づいて、検証、計算、および新しいメッセージのセットを生成するために、API Logic Server(GitHub上のオープンソースプロジェクト)のルールを使用したかったです。

注意: ChatGPTはConfluent Kafkaライブラリ(およびそれらのDocker Kafkaコンテナの使用)を選択しました-コードを変更して他のPython Kafkaライブラリを使用することもできます。

SQLAlchemyモデル

API Logic Server (ALS: Pythonのオープンソースプラットフォーム)を使用して、MySQLデータベースに接続します。ALSはテーブルを読み込み、SQLAlchemy ORMモデル、react-adminユーザーインターフェース、safrs-JSON Open API(Swagger)、およびそれぞれのORMエンドポイントの実行中のRESTウェブサービスを作成します。新しいTemperatureテーブルには、タイムスタンプ、IoTデバイスID、および温度読み取りが格納されます。ここでは、ALSコマンドラインユーティリティを使用してORMモデルを作成します:

API Logic Serverが生成したクラスは、Temperatureの値を保持するために使用されます。

変更

SQLデータベースにKafka JSON消費者メッセージを再保存する代わりに(およびルールを実行する代わりに)、JSONペイロードをアンラップ(util.row_to_entity)し、JSONペイロードを保存する代わりにTemperature テーブルに挿入します。温度読み取りごとに宣言的なルールが処理を担当します。

コンシューマーがメッセージを受信すると、セッションに追加され、commit_eventルール(以下)がトリガーされます。

宣言的なロジック:メッセージを生成する

API Logic Server(SQLAlchemy、Flask、およびLogicBankスプレッドシートのようなルールエンジン:式、合計、カウント、コピー、制約、イベントなどを使用した自動化フレームワーク)を使用して、ORMエンティティTemperatureに宣言的なcommit_eventルールを追加します。各メッセージがTemperatureテーブルに永続化されると、commit_eventルールが呼び出されます。温度読み取りがMAX_TEMPを超えるかMIN_TEMPより小さい場合、トピック“TempRangeAlert”にKafkaメッセージを送信します。正規の範囲(32132)内でデータを受け取ることを確認する制約も追加します。アラートメッセージを処理する別のイベントコンシューマーに処理を委ねます。

温度読み取りがMAX_TEMPより大きいかMIN_TEMPより小さい場合にのみアラートメッセージを生成します。ルールは常に順不同であり、仕様が変更されると導入される可能性があります。

TDD Behaveテスト

TDD(テスト駆動開発)を使用して、Behaveテストを使ってTemperatureテーブルにレコードを直接挿入し、その戻り値KafkaMessageSentをチェックすることができます。BehaveはFeature/Scenario.featureファイル)から始まります。各シナリオに対して、Behaveデコレータを使用して対応するPythonクラスを作成します。

機能の定義

TDD Pythonクラス

サマリー

KafkaのメッセージコードをConsumerとProducerの両方に生成するためにChatGPTを使用することは、良いスタート地点のようです。KafkaのためにConfluent Dockerをインストールします。API Logic Serverを使用して宣言的なロジックルールを追加し、SQLデータベースへのトランザクションの通常のフローに数式、制約、イベントを追加し、新しいKafkaメッセージを生成(および変換)することは素晴らしい組み合わせです。ChatGPTと宣言的なロジックは、”ペアプログラミング”の次のレベルです。

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more

データサイエンス

「トランスフォーマーはNFLプレーを生成できます:QB-GPTの紹介」

初めて「ストラトフォーマー」についての記事を書いて以来、多くのフィードバックとアイデアをいただいている(まず、ありが...

AIニュース

「イギリスの全ての人に無料のAIトレーニングを提供しています」

「ジョニー・コットムは、一人でスタートアップを運営する際に必要なジャグリングの技術を知っています昨年、エコフレンドリ...

機械学習

科学者たちは、AIと迅速な応答EEGを用いて、せん妄の検出を改善しました

うつ病を検出することは容易ではありませんが、それには大きな報酬があります。患者に必要な治療を迅速かつ確実に行うことで...

人工知能

「キャリアを将来に備えるための最高の無料AIコース」

今日から受講できる最高の無料AIコースのうち、8つをご紹介します

データサイエンス

「PandasAIを用いたデータ分析における生成型AIの活用」

「生成モデルを適用することで、PandasAIは人間のようなクエリを理解し、応答することができ、複雑なデータの操作を実行し、...

人工知能

BScの後に何をすべきか?トップ10のキャリアオプションを探索する

イントロダクション 科学はしばしば無限の可能性の源であり、さまざまな分野でのさらなる研究や雇用の広大な機会を提供します...