リアルタイムなSlackボットを生成的AIで構築する
生成的AIを使用してリアルタイムなSlackボットを構築する
この記事では、Cloudera DataFlow(Powered by Apache NiFi)を使用して、IBM WatsonX.AI Foundationの大規模な言語モデルとリアルタイムでやり取りする方法を紹介します。Google FLAN T5 XXLやIBM Graniteモデルなど、Foundationのモデルを使用できます。
IBM Cloudで実行されている安全なWatsonX.AIモデルに直接質問をSlackのようなモバイルアプリケーションにリアルタイムで送信するデータパイプラインを作成するのがどれほど簡単かをご紹介します。Cloudera Data Flowを使用して、セキュリティ、管理、系譜、ガバナンスのすべてを処理します。意思決定の一環として、プロンプトの種類に応じて、フライ(fly)中に異なるWatsonX.AIモデルを選択できます。たとえば、文を続けたい場合と質問に答えたい場合では、異なるモデルを選択できます。Google FLAN T5 XXLを使って質問に答える場合にはうまく機能します。文を続けたい場合にはIBM Graniteモデルの一つを使用します。
WatsonX.AIモデルが返す結果がどれだけ素晴らしく速いかに気付くでしょう。迅速にエンリッチメントと変換を行い、それからそれらをCloudera Apache Kafkaに送信し、連続的な分析と多くの他のアプリケーション、システム、プラットフォーム、およびダウンストリームの消費者への配布に使用します。また、オリジナルのリクエスターへの回答も出力します。これはSlackチャネルの誰かでも、アプリケーションの誰かでもあるかもしれません。すべてがリアルタイムで行われ、コード不要、任意のスケールとプラットフォームでフルガバナンス、系統、データ管理、セキュリティが行われます。
リアルタイムなデータとAIのために、IBMとClouderaのパワーをプライベート、パブリック、ハイブリッドクラウド環境で組み合わせることは、まだ始まったばかりです。今日試してみてください。
- 責任あるAI:AIウォッチドッグの重要な役割-選挙のディスインフォメーションに対抗する
- 11月20日から11月26日までの週の重要なLLM論文トップ
- 「AWSとNVIDIAは新たな戦略的なパートナーシップを発表」
ステップバイステップのリアルタイムフロー
まず、Slackに質問を入力します:
“Q: ジェネレーティブAIとApache NiFiを統合する良い方法はありますか?”
NiFi Flow Top
その質問が入力されたら、Slackサーバーは登録されたサービスにこれらのイベントを送信します。これは公に面するどこでもホストすることができます。
- (Slack APIリンクはこちら)
Slack API
有効にすると、サーバーは各Slack投稿のJSONイベントを受信し始めます。これはNiFiで簡単に受信して解析できます。Cloudera DataFlowは、デザイナーモードでも、パブリッククラウドホスト版で安全なHTTPS REST呼び出しを受信できます。
NiFi Top Flow 2
フローの最初の部分では、以下のREST JSON Postを受信しました。
Slackbot 1.0 (+https://api.slack.com/robots)application/jsonPOSTHTTP/1.1{ "token" : "qHvJe59yetAp1bao6wmQzH0C", "team_id" : "T1SD6MZMF", "context_team_id" : "T1SD6MZMF", "context_enterprise_id" : null, "api_app_id" : "A04U64MN9HS", "event" : { "type" : "message", "subtype" : "bot_message", "text" : "==== NiFi to IBM <http://WatsonX.AI|WatsonX.AI> LLM Answers\n\nOn Date: Wed, 15 Nov 20
これは非常に詳細なJSONファイルで、直ちにApache Iceberg Open Cloud Lakehouse、Kafkaトピック、またはJSONドキュメントとしてオブジェクトストアに生データとしてプッシュできます(強化オプション)。必要なデータのみを解析します。
EvaluateJSONPath
投稿のチャネルIDとプレーンテキストをパースします。私はgeneral(“C1SD6N197”)からのメッセージのみを取得したいです。それからテキストをinputsフィールドにコピーして、Hugging Faceに必要な形式にします。
入力を確認し、それが株式や天気(これからさらに増えます)である場合はLLMの呼び出しを回避します。
SELECT * FROM FLOWFILEWHERE upper(inputs) like '%WEATHER%'AND not upper(inputs) like '%LLM SKIPPED%'SELECT * FROM FLOWFILEWHERE upper(inputs) like '%STOCK%'AND not upper(inputs) like '%LLM SKIPPED%'SELECT * FROM FLOWFILEWHERE (upper(inputs) like 'QUESTION:%'OR upper(inputs) like 'Q:%') and not upper(inputs) like '%WEATHER%'and not upper(inputs) like '%STOCK%'
ストック処理について:
必要な株式を解析するために、私は自分のOpen NLPプロセッサを使用しています。
そのためにプロセッサとエンティティ抽出モデルをダウンロードする必要があります。
その後、私たちは会社名をAlphaVantageのHTTP RESTエンドポイントに渡し、会社名を株式シンボルに変換します。無料アカウントでは、1日に数回しか呼び出すことができません。したがって、失敗した場合はこのステップをバイパスし、渡されたものを使用しようとします。
RouteOnContentを使用して、エラーメッセージをフィルタリングします。
次に、QueryRecordプロセッサを使用してCSVからJSONに変換し、フィルタリングします。
SELECT name as companyName, symbol FROM FLOWFILE ORDER BY matchScore DESC LIMIT 1
SplitRecordを行い、レコードが1つだけであることを確認します。その後、EvaluateJsonPathを実行して属性としてフィールドを取得します。
UpdateAttributeでシンボルをトリムします。
${stockSymbol:trim()}
その株式シンボルをTwelve Dataに渡し、InvokeHTTPを使用して株式データを取得します。
そして、たくさんの株式データを受け取ります。
{ "meta" : { "symbol" : "IBM", "interval" : "1min", "currency" : "USD", "exchange_timezone" : "America/New_York", "exchange" : "NYSE", "mic_code" : "XNYS", "type" : "Common Stock" }, "values" : [ { "datetime" : "2023-11-15 10:37:00", "open" : "152.07001", "high" : "152.08000", "low" : "151.99500", "close" : "152.00999", "volume" : "8525" }, { "datetime" : "2023-11-15 10:36:00", "open" : "152.08501", "high" : "152.12250", "low" : "152.08000", "close" : "152.08501", "volume" : "15204" } ...
EvaluateJSONPathを実行して取引所情報を取得します。
レコードを分岐させ、1つのレコードのみを取得します。これはSlackに返すためのものです。株式データを他の値で豊かにします。そして、QueryRecordを実行して1レコードに制限してSlackに送信します。
SELECT * FROM FLOWFILE ORDER BY 'datetime' DESC LIMIT 1
EvaluateJsonPathを実行して表示するために最も価値のあるフィールドを取得します。
そのメッセージをSlackに送信します。
LLM スキップ。${date}の${companyName} [${nlp_org_1}/${stockSymbol}]の株価は${closeStockValue}です。株式日時は${stockdateTime}です。株式取引所は${exchange}です。
会社名から別のフローが分割されていることもあります。
最初のステップでは、ヤフーファイナンスに電子送信の見出しを取得するために呼び出しを行います。
https://feeds.finance.yahoo.com/rss/2.0/headline?s=${stockSymbol:trim()}®ion=US&lang=en-US
QueryRecordを使用してRSS/XMLレコードをJSONに変換します。
SplitJSONを実行してニュースアイテムをブレイクアウトします。
SplitRecordを実行して1つのレコードに制限します。必要なフィールドをSlackメッセージのために取得するためにEvaluateJSONPathを使用します。
最後に、UpdateRecordを実行してJSONを完成させます。
それからこのメッセージをSlackに送信します。
LLM スキップ。${date}の${companyName} [${nlp_org_1}/${stockSymbol}]の株式ニュース情報は${title}:${description}です。${guid} 記事日時は${pubdate}です。
天気を選んだ方々へ、私たちは株と同様のルートをたどります(Redis @ Aivenを使ってキャッシュを追加するべきです)。OpenNLPプロセッサを使用して、天気情報が必要な場所を抽出します。
次のステップは、プロセッサの出力を取り出して、Geoencoderに送信する値を構築することです。
weatherlocation = ${nlp_location_1:notNull():ifElse(${nlp_location_1}, "ニューヨーク市")}
もし有効な場所が見つからない場合、「ニューヨーク市」と言います。他の検索を使用することもできます。すべての場所をロードする作業を行っており、それに対して高度なPostgreSQLの検索や、OpenSearchやベクトル化されたデータストアで検索することもできます。
その場所をOpen Meteoに渡して、ジオを見つけます。InvokeHTTPを使用します。
https://geocoding-api.open-meteo.com/v1/search?name=${weatherlocation:trim():urlEncode()}&count=1&language=en&format=json
その後、結果から必要な値を解析します。
{ "results" : [ { "id" : 5128581, "name" : "ニューヨーク", "latitude" : 40.71427, "longitude" : -74.00597, "elevation" : 10.0, "feature_code" : "PPL", "country_code" : "US", "admin1_id" : 5128638, "timezone" : "America/New_York", "population" : 8175133, "postcodes" : [ "10001", "10002", "10003", "10004", "10005", "10006", "10007", "10008", "10009", "10010", "10011", "10012", "10013", "10014", "10016", "10017", "10018", "10019", "10020", "10021", "10022", "10023", "10024", "10025", "10026", "10027", "10028", "10029", "10030", "10031", "10032", "10033", "10034", "10035", "10036", "10037", "10038", "10039", "10040", "10041", "10043", "10044", "10045", "10055", "10060", "10065", "10069", "10080", "10081", "10087", "10090", "10101", "10102", "10103", "10104", "10105", "10106", "10107", "10108", "10109", "10110", "10111", "10112", "10113", "10114", "10115", "10116", "10117", "10118", "10119", "10120", "10121", "10122", "10123", "10124", "10125", "10126", "10128", "10129", "10130", "10131", "10132", "10133", "10138", "10150", "10151", "10152", "10153", "10154", "10155", "10156", "10157", "10158", "10159", "10160", "10161", "10162", "10163", "10164", "10165", "10166", "10167", "10168", "10169", "10170", "10171", "10172", "10173", "10174", "10175", "10176", "10177", "10178", "10179", "10185", "10199", "10203", "10211", "10212", "10213", "10242", "10249", "10256", "10258", "10259", "10260", "10261", "10265", "10268", "10269", "10270", "10271", "10272", "10273", "10274", "10275", "10276", "10277", "10278", "10279", "10280", "10281", "10282", "10285", "10286" ], "country_id" : 6252001, "country" : "アメリカ合衆国", "admin1" : "ニューヨーク" } ], "generationtime_ms" : 0.92196465}
その後、結果をパースし、InvokeHTTPを使用して指定した緯度と経度に対応する現在の天候を取得するための別のAPIを呼び出します。
https://api.weather.gov/points/${latitude:trim()},${longitude:trim()}
結果はジオJSONです。
{ "@context": [ "https://geojson.org/geojson-ld/geojson-context.jsonld", { "@version": "1.1", "wx": "https://api.weather.gov/ontology#", "s": "https://schema.org/", "geo": "http://www.opengis.net/ont/geosparql#", "unit": "http://codes.wmo.int/common/unit/", "@vocab": "https://api.weather.gov/ontology#", "geometry": { "@id": "s:GeoCoordinates", "@type": "geo:wktLiteral" }, "city": "s:addressLocality", "state": "s:addressRegion", "distance": { "@id": "s:Distance", "@type": "s:QuantitativeValue" }, "bearing": { "@type": "s:QuantitativeValue" }, "value": { "@id": "s:value" }, "unitCode": { "@id": "s:unitCode", "@type": "@id" }, "forecastOffice": { "@type": "@id" }, "forecastGridData": { "@type": "@id" }, "publicZone": { "@type": "@id" }, "county": { "@type": "@id" } } ], "id": "https://api.weather.gov/points/40.7143,-74.006", "type": "Feature", "geometry": { "type": "Point", "coordinates": [ -74.006, 40.714300000000001 ] }, "properties": { "@id": "https://api.weather.gov/points/40.7143,-74.006", "@type": "wx:Point", "cwa": "OKX", "forecastOffice": "https://api.weather.gov/offices/OKX", "gridId": "OKX", "gridX": 33, "gridY": 35, "forecast": "https://api.weather.gov/gridpoints/OKX/33,35/forecast", "forecastHourly": "https://api.weather.gov/gridpoints/OKX/33,35/forecast/hourly", "forecastGridData": "https://api.weather.gov/gridpoints/OKX/33,35", "observationStations": "https://api.weather.gov/gridpoints/OKX/33,35/stations", "relativeLocation": { "type": "Feature", "geometry": { "type": "Point", "coordinates": [ -74.0279259, 40.745251000000003 ] }, "properties": { "city": "Hoboken", "state": "NJ", "distance": { "unitCode": "wmoUnit:m", "value": 3906.1522008034999 }, "bearing": { "unitCode": "wmoUnit:degree_(angle)", "value": 151 } } }, "forecastZone": "https://api.weather.gov/zones/forecast/NYZ072", "county": "https://api.weather.gov/zones/county/NYC061", "fireWeatherZone": "https://api.weather.gov/zones/fire/NYZ212", "timeZone": "America/New_York", "radarStation": "KDIX" }}
予報URLを取得するためにEvaluateJSONPathを使用します。
その後、invokeHTTPを使用して予報URLを呼び出します。
これにより、返すための結果を抽出するために、より大きなJSON出力をパースします。
天候の主要なフィールドを取得するために、データをEvaluateJSONPathでパースします。
そして、それらのフィールドをPutSlackにフォーマットします。
LLM スキップ。${date}における${weatherlocation}の予報を読み込みました @ ${latitude},${longitude}使用したURL: ${forecasturl} ${icon} Temp: ${temperature} ${temperatureunit} - ${temperaturetrend}風があります。風向きは${winddirection}、風速は${windspeed}です。${detailedforecast}
Slackの出力です
LLMの質問がある場合は、1つのレコードであることを確認しましょう。
私たちはIBM Cloud上のIBM WatsonX.AIで利用可能ないくつかのモデルを使用して、RESTプロンプトで簡単にアクセスできるようにしています。
私は最初にIBMのPrompt Labでプロンプトをテストして構築し、その後、初期のcurlステートメントをそこからコピーしました。
IBM watsonx.aiで利用可能なサポートされている基礎モデルについては、こちらをクリックしてください。
ibm/mpt-7b-instruct2meta-llama/llama-2–70b-chatibm/granite-13b-chat-v1
私たちはIBMに独自の安全なキーを送信し、次回の呼び出しで使用するトークンを受け取ります。
私たちは質問を解析し、それをREST API経由でWatsonXに送信します。
次のようにIBMに送信するプロンプトを作成します。
{ "model_id": "meta-llama/llama-2-70b-chat", "input": "${inputs:urlEncode()}", "parameters": { "decoding_method": "greedy", "max_new_tokens": 200, "min_new_tokens": 50, "stop_sequences": [], "repetition_penalty": 1 }, "project_id": "0ead8ec4-d137-4f9c-8956-50b0da4a7068" }
生成されたテキストを解析し、それは私たちの生成的AIの結果に加え、タイミングに関する便利なメタデータです。
Slackに投稿される結果は次の通りです:
「Apache NiFiを使用して生成的AIモデルをいくつかの方法で統合できます:
- データ前処理: NiFiを使用して、生成的AIモデルにフィードする前にデータを前処理できます。これにはデータのクリーニング、変換、特徴エンジニアリングなどが含まれます。
- モデルのトレーニング: NiFiを使用して、生成的AIモデルのトレーニングプロセスを自動化できます。NiFiのPutFileやPutFile_SFTPプロセッサを使用してトレーニングデータをファイルに書き込み、ExecuteScriptなどのプロセッサを使用してトレーニングスクリプトを実行できます。
- モデルの展開: 生成的AIモデルのトレーニングが完了したら、NiFiを使用して展開できます。NiFiフローを作成して、入力データを受け取り、生成的AIモデルを実行し、生成されたデータを出力します。
- リアルタイム推論: NiFiのStreamingJobsを使用できます」
Slackbotが結果を投稿した後、メトリクスとデバッグ情報がチャットチャンネルに投稿されました。
すべてのメタデータは管理者モニタリング用の別のSlackチャンネルに投稿されます。
==== NiFi to IBM WatsonX.AI LLM AnswersOn 日付: Wed, 15 Nov 2023 15:43:29 GMT 作成日: 2023-11-15T15:43:29.248Z プロンプト: 質問: 生成的AIとApache NiFiを統合する良い方法は何ですか?回答: )Apache NiFiを使用して、生成的AIモデルをいくつかの方法で統合できます: 1. データ前処理: NiFiを使用して、生成的AIモデルにフィードする前にデータを前処理できます。これにはデータのクリーニング、変換、特徴エンジニアリングなどが含まれます。2. モデルのトレーニング: NiFiを使用して、生成的AIモデルのトレーニングプロセスを自動化できます。NiFiのPutFileやPutFile_SFTPプロセッサを使用してトレーニングデータをファイルに書き込み、ExecuteScriptなどのプロセッサを使用してトレーニングスクリプトを実行できます。3. モデルの展開: 生成的AIモデルのトレーニングが完了したら、NiFiを使用して展開できます。NiFiフローを作成して、入力データを受け取り、生成的AIモデルを実行し、生成されたデータを出力します。4. リアルタイム推論: NiFiのStreamingJobsを使用できます。トークン: 200リクエストの所要時間: 8153HTTP TX ID: 89d71099-da23-4e7e-89f9-4e8f5620c0fbIBMメッセージ: このモデルは、使用制限やその他の義務を課すサードパーティのライセンスによって管理されている非IBM製品です。このモデルを使用することで、次のURLで特定される条件に同意するものとします。 URL: https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-models.html?context=wxIBMメッセージID: 免責事項ワーニングモデルID: meta-llama/llama-2-70b-chat停止理由: max_tokensトークン数: 38TX ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756UUID: da0806cb-6133-4bf4-808e-1fbf419c09e3Corr ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756Global TX ID: 20c3a9cf276c38bcdaf26e3c27d0479bサービスの所要時間: 478リクエストID: 03c2726a-dcb6-407f-96f1-f83f20fe9c9cファイル名: 1a3c4386-86d2-4969-805b-37649c16addbリクエストの所要時間: 8153リクエストURL: https://us-south.ml.cloud.ibm.com/ml/v1-beta/generation/text?version=2023-05-29cf-ray: 82689bfd28e48ce2-EWR=====
自分自身のSlackbotを作成する
Slackの出力
Kafkaの配信
Apache Flink SQLテーブル作成DDL
CREATE TABLE `ssb`.`Meetups`.`watsonairesults` ( `date` VARCHAR(2147483647), `x_global_transaction_id` VARCHAR(2147483647), `x_request_id` VARCHAR(2147483647), `cf_ray` VARCHAR(2147483647), `inputs` VARCHAR(2147483647), `created_at` VARCHAR(2147483647), `stop_reason` VARCHAR(2147483647), `x_correlation_id` VARCHAR(2147483647), `x_proxy_upstream_service_time` VARCHAR(2147483647), `message_id` VARCHAR(2147483647), `model_id` VARCHAR(2147483647), `invokehttp_request_duration` VARCHAR(2147483647), `message` VARCHAR(2147483647), `uuid` VARCHAR(2147483647), `generated_text` VARCHAR(2147483647), `transaction_id` VARCHAR(2147483647), `tokencount` VARCHAR(2147483647), `generated_token` VARCHAR(2147483647), `ts` VARCHAR(2147483647), `advisoryId` VARCHAR(2147483647), `eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp', WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND) WITH ( 'deserialization.failure.policy' = 'ignore_and_log', 'properties.request.timeout.ms' = '120000', 'format' = 'json', 'properties.bootstrap.servers' = 'kafka:9092', 'connector' = 'kafka', 'properties.transaction.timeout.ms' = '900000', 'topic' = 'watsonxaillmanswers', 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'earliest', 'properties.group.id' = 'watsonxaillmconsumer')CREATE TABLE `ssb`.`Meetups`.`watsonxresults` ( `date` VARCHAR(2147483647), `x_global_transaction_id` VARCHAR(2147483647), `x_request_id` VARCHAR(2147483647), `cf_ray` VARCHAR(2147483647), `inputs` VARCHAR(2147483647), `created_at` VARCHAR(2147483647), `stop_reason` VARCHAR(2147483647), `x_correlation_id` VARCHAR(2147483647), `x_proxy_upstream_service_time` VARCHAR(2147483647), `message_id` VARCHAR(2147483647), `model_id` VARCHAR(2147483647), `invokehttp_request_duration` VARCHAR(2147483647), `message` VARCHAR(2147483647), `uuid` VARCHAR(2147483647), `generated_text` VARCHAR(2147483647), `transaction_id` VARCHAR(2147483647), `tokencount` VARCHAR(2147483647), `generated_token` VARCHAR(2147483647), `ts` VARCHAR(2147483647), `eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp', WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND) WITH ( 'deserialization.failure.policy' = 'ignore_and_log', 'properties.request.timeout.ms' = '120000', 'format' = 'json', 'properties.bootstrap.servers' = 'kafka:9092', 'connector' = 'kafka', 'properties.transaction.timeout.ms' = '900000', 'topic' = 'watsonxaillm', 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'earliest', 'properties.group.id' = 'allwatsonx1')
例のプロンプト
{"inputs":"以下の質問に答えてください。アメリカ合衆国の首都は何ですか?"}
IBM DB2 SQL
alter table "DB2INST1"."TRAVELADVISORY"add column "summary" VARCHAR(2048);-- DB2INST1.TRAVELADVISORYの定義CREATE TABLE "DB2INST1"."TRAVELADVISORY" ( "TITLE" VARCHAR(250 OCTETS) , "PUBDATE" VARCHAR(250 OCTETS) , "LINK" VARCHAR(250 OCTETS) , "GUID" VARCHAR(250 OCTETS) , "ADVISORYID" VARCHAR(250 OCTETS) , "DOMAIN" VARCHAR(250 OCTETS) , "CATEGORY" VARCHAR(4096 OCTETS) , "DESCRIPTION" VARCHAR(4096 OCTETS) , "UUID" VARCHAR(250 OCTETS) NOT NULL , "TS" BIGINT NOT NULL , "summary" VARCHAR(2048 OCTETS) ) IN "IBMDB2SAMPLEREL" ORGANIZE BY ROW;ALTER TABLE "DB2INST1"."TRAVELADVISORY" ADD PRIMARY KEY ("UUID") ENFORCED;GRANT CONTROL ON TABLE "DB2INST1"."TRAVELADVISORY" TO USER "DB2INST1";GRANT CONTROL ON INDEX "SYSIBM "."SQL230620142604860" TO USER "DB2INST1";SELECT "summary", TITLE , ADVISORYID , TS, PUBDATE FROM DB2INST1.TRAVELADVISORY t WHERE "summary" IS NOT NULL ORDER BY ts DESC
Example Output Email
ビデオ
ソースコード
We will continue to update VoAGI; if you have any questions or suggestions, please contact us!
Was this article helpful?
93 out of 132 found this helpful
Related articles