「LLM SaaSのためのFastAPIテンプレートPart 2 — CeleryとPg-vector」
「LLM SaaSのためのFastAPIテンプレートPart 2:CeleryとPg-vectorの活用方法」
このブログ投稿は、LLM SaaSシリーズのFastAPI + Supabaseテンプレートの一部です。Part 1(認証とファイルのアップロード)で紹介されたコンセプトに基づいて構築しています。
LLM SaaS用のFastAPIテンプレート Part 1 — 認証とファイルのアップロード
Python開発者の間で、FastAPIの人気が上昇しており、シンプlicityとネイティブのSwagger UIサポートが特筆されています…
pub.towardsai.net
コードの多くは、Quivrから参照されています。
長時間実行されるプロセスのためのCeleryワーカーとメッセージキュー
以下の図は、FastAPIエコシステムにおけるCeleryワーカーとメッセージキューの連携を示しています。プロセスは、FastAPIが指定されたブローカー(この場合はRedis)にタスクを送信することから始まります。その後、Celeryワーカーがこれらのタスクを分散タスクキュー内で取得し処理し、結果をResultバックエンド(またはRedis)に保存します。同時に、FastAPIはタスクの状態と結果を監視することができます。この例では、ブローカーとResultバックエンドに単一のRedisインスタンスを使用していますが、必要に応じて別々のインスタンスを使用することもできます。
開発プロセスを開始するには、次のDockerコマンドを使用してRedisインスタンスを起動する必要があります:
# 最新のRedisイメージを取得docker pull redis:latest# Redisインスタンスを実行docker run --name redis -d -p 6379:6379 redis:latest
FastAPIプロジェクト内で、ブローカーとResultバックエンドの環境変数を設定します:
# ブローカーインスタンス - RedisCELERY_BROKER_URL=redis://localhost:6379/0# Resultバックエンド - RedisCELERY_RESULT_BACKEND=redis://localhost:6379/0
テスト用のmain.py
内にダミータスクを作成します:
from celery import Celeryimport time
celery = Celery( __name__, broker=os.getenv("CELERY_BROKER_URL"), backend=os.getenv("CELERY_RESULT_BACKEND"),)@Celery.taskdef test(): import time time.sleep(5) return "Hello, I like eating celery!"
ターミナルに戻り、celeryコマンドを入力します(すでに環境にceleryがインストールされている場合はpip installを使用してください)
celery --app=main.celery worker --concurrency=1 --loglevel=DEBUG
注意:Windowsマシンでスクリプトをテストしている場合、ローカル環境で動作させるためにコマンドに‘-P solo’を追加する必要があるかもしれません。 これは本番環境では必要ありません。
以下のような表示が表示されます。
-------------- celery@xxxx v5.2.7 (dawn-chorus)--- ***** ----- -- ******* ---- Windows-10-10.0.22621-SP0 2023-11-20 07:03:38- *** --- * --- - ** ---------- [config]- ** ---------- .> app: main:0x22d23e16d70- ** ---------- .> transport: redis://localhost:6379/0- ** ---------- .> results: redis://localhost:6379/0- *** --- * --- .> concurrency: 4 (prefork)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)--- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery
これで、作業ディレクトリを基準に別のターミナルを開き、Python REPLを使用してクイックテストを行うことができます。
(.venv) PS C:\Users\xxx\backend> pythonPython 3.10.11 (tags/v3.10.11:7d4cc5a, Apr 5 2023, 00:38:17) [MSC v.1929 64 bit (AMD64)] on win32Type "help", "copyright", "credits" or "license" for more information.>>> from main import app, celery, test>>> test.delay()<AsyncResult: 2ba428c1-5d82-4f37-aa89-5cef76b7a6eb>
他のターミナルに戻り、Celeryワーカーが実行されているターミナルでタスクの実行を観察する必要があります。
[2023-11-20 12:21:46,743: INFO/MainProcess] Task main.test[2ba428c1-5d82-4f37-aa89-5cef76b7a6eb] received[2023-11-20 12:21:51,754: INFO/MainProcess] Task main.test[2ba428c1-5d82-4f37-aa89-5cef76b7a6eb] succeeded in 5.014999999999418s: 'こんにちは、セロリが好きです!'
ファイルのアップロードとベクトルデータストア(pg-vectorプラグイン)
Celeryのテストをベースに、実際の使用例ではバックグラウンドでCeleryのタスクを実行してPDFドキュメントを埋め込み、ベクトルデータストアに保存します。このプロセスでは、ファイルをSupabaseストレージバケットにアップロードし、Celeryのタスクをトリガーしてベクトルデータストア用にダウンロードして処理します。
全体のプロセスは少し複雑です。まず、ファイルはSupabaseストレージバケットにアップロードされます。その後、このファイルをダウンロードしてベクトルデータストア用に処理するためにCeleryのタスクをトリガーします。このプロセスには、元のファイル形式から生のテキストに変換するためのDocument Loaderと、テキストをチャンクに分割するためのText Splitterが必要です(ベクトルデータストアの単一のベクトルのサイズ制限による)。また、特定のテキストチャンクにメタデータを追加します。最後に、テキストチャンクはベクトルに埋め込まれ、Supabaseベクトルデータストア(postgresのpg-vectorプラグイン)にアップロードされます。
Supabase上のSQLテーブル
まず、このデモンストレーション用にSupabaseに2つのテーブルが作成されていることを確認してください:(より詳しいSQLスクリプトの例については、https://github.com/StanGirard/quivr/tree/main/scriptsを参照してください)
-- Create users X vectors tableCREATE TABLE IF NOT EXISTS user_vectors ( user_id UUID, vector_id UUID, PRIMARY KEY (user_id, vector_id), FOREIGN KEY (vector_id) REFERENCES vectors (id), FOREIGN KEY (user_id) REFERENCES auth.users (id));-- Create vector extensionCREATE EXTENSION IF NOT EXISTS vector;-- Create vectors tableCREATE TABLE IF NOT EXISTS vectors ( id UUID DEFAULT uuid_generate_v4() PRIMARY KEY, content TEXT, metadata JSONB, embedding VECTOR(1536));
ルートとエンドポイントの定義
main.pyに新しいルーター ‘upload_router’ を追加します。
from routes.upload_routes import upload_routerapp.include_router(upload_router)
‘routes’という名前の新しいディレクトリを作成し、’upload_routes.py’という名前のファイルを作成します。
from fastapi.responses import JSONResponsefrom auth import AuthBearer, get_current_userfrom celery_worker import process_filefrom celery.result import AsyncResultfrom fastapi import APIRouter, Depends, HTTPException, Request, UploadFilefrom repository.files.upload_file import upload_file_storagefrom logger import get_loggerfrom models import UserIdentitylogger = get_logger(__name__)upload_router = APIRouter()@upload_router.get("/upload/healthz", tags=["Health"])async def healthz(): return {"status": "ok"}@upload_router.post("/upload", dependencies=[Depends(AuthBearer())], tags=["Upload"])async def upload_file( request: Request, uploadFile: UploadFile, current_user: UserIdentity = Depends(get_current_user),): file_content = await uploadFile.read() filename_with_user_id = str(current_user.id) + "/" + str(uploadFile.filename) logger.info(f"ファイル名は: {filename_with_user_id}") try: fileInStorage = upload_file_storage(file_content, filename_with_user_id) logger.info(f"ファイル{fileInStorage}が正常にアップロードされました") except Exception as e: if "The resource already exists" in str(e): raise HTTPException( status_code=403, detail=f"ファイル{uploadFile.filename}はすでにストレージに存在しています。", ) else: raise HTTPException( status_code=500, detail="ファイルのアップロードに失敗しました。" ) task = process_file.delay( file_name=filename_with_user_id, file_original_name=uploadFile.filename, user_id=current_user.id, ) return JSONResponse({"task_id": task.id})@upload_router.get("/upload/{task_id}", dependencies=[Depends(AuthBearer())], tags=["Upload"])def get_status(task_id: str): task_result = AsyncResult(task_id) result = { "task_id": task_id, "task_status": task_result.status } return JSONResponse(result)
このスクリプトでは、’upload_routes.py’に2つのエンドポイントを定義して、ファイルのアップロードとタスクの状態をチェックします。
/uploadには ‘process_file’というCeleryタスクがあることがわかります。では、celeryでこのタスクを作成しましょう。
Celeryのワーカーとタスク
まず、メインディレクトリに ‘celery_worker.py’というファイルを作成します。
import osfrom celery import Celeryimport asynciofrom utils.process_file import get_supabase_client,file_handlercelery = Celery( __name__, broker="redis://127.0.0.1:6379/0", backend="redis://127.0.0.1:6379/0")@celery.task(name="process_file")def process_file( file_name: str, file_original_name: str, user_id: str,): supabase_client = get_supabase_client() tmp_file_name = "tmp-file-"+file_name tmp_file_name = tmp_file_name.replace("/", "_") with open(tmp_file_name, "wb+") as file: res = supabase_client.storage.from_("quivr").download(file_name) file.write(res) loop = asyncio.new_event_loop() message = loop.run_until_complete( file_handler( file=tmp_file_name, user_id=user_id, file_original_name=file_original_name ) ) file.close os.remove(tmp_file_name)
この ‘process_file’ タスクは(上記のプロセスダイアグラムを参照)ファイルをダウンロードし、 file_handlerを使用してファイルを処理し、完了後に一時ファイルを削除します。
ファイルの処理と埋め込み
簡単な処理のために、以下の file_handlerスクリプトを使用できます。このスクリプトには、全ての埋め込みを行うワーカーが含まれています。Quivrのコードベースも確認できます。ここには、複数のワーカーに埋め込みが割り当てられる別の共有タスクが含まれています。
# utils/process_file.pyを使用してアップロードされたファイルを処理import osimport timefrom logger import get_loggerfrom repository.files.upload_file import DocumentSerializablefrom langchain.document_loaders import UnstructuredPDFLoaderfrom models.databases.supabase.supabase import SupabaseDBfrom supabase.client import Client, create_clientfrom langchain.vectorstores import SupabaseVectorStorefrom langchain.embeddings.openai import OpenAIEmbeddingsfrom langchain.text_splitter import RecursiveCharacterTextSplitterfrom dotenv import load_dotenvload_dotenv()logger = get_logger(__name__)def get_supabase_client() -> Client: supabase_client: Client = create_client( os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_SERVICE_KEY") ) return supabase_clientdef get_supabase_db() -> SupabaseDB: supabase_client = get_supabase_client() return SupabaseDB(supabase_client)def get_embeddings() -> OpenAIEmbeddings: embeddings = OpenAIEmbeddings( openai_api_key=os.getenv("OPENAI_API_KEY") ) # pyright: ignore reportPrivateUsage=none return embeddingsdef get_documents_vector_store() -> SupabaseVectorStore: # settings = BrainSettings() # pyright: ignore reportPrivateUsage=none embeddings = get_embeddings() supabase_client: Client = create_client( os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_SERVICE_KEY") ) documents_vector_store = SupabaseVectorStore( supabase_client, embeddings, table_name="vectors" ) return documents_vector_storedef create_vector(doc): documents_vector_store = get_documents_vector_store() try: sids = documents_vector_store.add_documents([doc]) if sids and len(sids) > 0: return sids except Exception as e: logger.error(f"Error creating vector for document: {e}") def create_user_vector(user_id, vector_id): database = get_supabase_db() response = ( database.db.table("user_vectors") .insert( { "user_id": str(user_id), "vector_id": str(vector_id), } ) .execute() ) return response.data def create_embedding_for_document(user_id, doc_with_metadata): doc = DocumentSerializable.from_json(doc_with_metadata) created_vector = create_vector(doc) created_vector_id = created_vector[0] # pyright: ignore reportPrivateUsage=none create_user_vector(user_id, created_vector_id) def compute_documents_from_pdf(file,loader): loader = loader(file) documents=[] documents = loader.load() # split the documents into chunks text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( chunk_size=500, chunk_overlap=0 ) documents = text_splitter.split_documents(documents) return documents async def file_handler( file: str, file_original_name: str, user_id, loader_class=UnstructuredPDFLoader, #Langchainのローダークラス): dateshort = time.strftime("%Y%m%d") documents = compute_documents_from_pdf(file,loader_class) for doc in documents: # pyright: ignore reportPrivateUsage=none metadata = { "file_name": file_original_name, "date": dateshort } doc_with_metadata = DocumentSerializable( page_content=doc.page_content, metadata=metadata ) create_embedding_for_document( user_id, doc_with_metadata.to_json() ) return "Hello, processing is done!"
デモンストレーション目的で、ここではpdfファイルのみがテストされています。より多くのファイル形式については、ファイルクラスを使用して広範なファイル形式を処理しているQuivrのコードベースを参照してください。
エンドツーエンドのテスト
テストするには、Uvicornサーバー(FastAPI用)とCeleryサーバーの両方をオンにします。
uvicorn main:app --reload
celery -A celery_worker worker --loglevel=info --logfile=celery.log --concurrency=1 -P solo
— logfile(オプション):作業ディレクトリにceleryログファイルを保存できます
— concurrency(オプション):同時にオンにしたいワーカーの数を設定します
— P solo:celeryをWindowsのノートパソコンで実行するために必要です。Mac/Dockerで実行する場合は、おそらくこれは不要です。
ここではエンドポイントのテストのスニペットがあります。
We will continue to update VoAGI; if you have any questions or suggestions, please contact us!
Was this article helpful?
93 out of 132 found this helpful
Related articles
- In this article, we will explore the fascinating world of NOIR, Stanford University’s mind-controlled AI robot.
- 「ChatGPT for Parents — 生産性を高めるために必要なプロンプト」
- 「物理的な制約が脳のようなAIの進化を促す」
- 「CNN(畳み込みニューラルネットワーク)におけるポイントワイズ畳み込みの探求:全結合層の置き換え」
- 「起業家にとって最も優れたChatGPTプロンプト20選」
- ゲームに飢える:GeForce NOWに参加する18の新しいゲーム
- Zephyr-7B:HuggingFaceのハイパーオプティマイズされたLLM、Mistral 7Bの上に構築