「LLM SaaSのためのFastAPIテンプレートPart 2 — CeleryとPg-vector」

「LLM SaaSのためのFastAPIテンプレートPart 2:CeleryとPg-vectorの活用方法」

このブログ投稿は、LLM SaaSシリーズのFastAPI + Supabaseテンプレートの一部です。Part 1(認証とファイルのアップロード)で紹介されたコンセプトに基づいて構築しています。

LLM SaaS用のFastAPIテンプレート Part 1 — 認証とファイルのアップロード

Python開発者の間で、FastAPIの人気が上昇しており、シンプlicityとネイティブのSwagger UIサポートが特筆されています…

pub.towardsai.net

コードの多くは、Quivrから参照されています。

長時間実行されるプロセスのためのCeleryワーカーとメッセージキュー

以下の図は、FastAPIエコシステムにおけるCeleryワーカーとメッセージキューの連携を示しています。プロセスは、FastAPIが指定されたブローカー(この場合はRedis)にタスクを送信することから始まります。その後、Celeryワーカーがこれらのタスクを分散タスクキュー内で取得し処理し、結果をResultバックエンド(またはRedis)に保存します。同時に、FastAPIはタスクの状態と結果を監視することができます。この例では、ブローカーとResultバックエンドに単一のRedisインスタンスを使用していますが、必要に応じて別々のインスタンスを使用することもできます。

Source: Author’s sketch

開発プロセスを開始するには、次のDockerコマンドを使用してRedisインスタンスを起動する必要があります:

# 最新のRedisイメージを取得docker pull redis:latest# Redisインスタンスを実行docker run --name redis -d -p 6379:6379 redis:latest

FastAPIプロジェクト内で、ブローカーとResultバックエンドの環境変数を設定します:

# ブローカーインスタンス - RedisCELERY_BROKER_URL=redis://localhost:6379/0# Resultバックエンド - RedisCELERY_RESULT_BACKEND=redis://localhost:6379/0

テスト用のmain.py内にダミータスクを作成します:

from celery import Celeryimport time

celery = Celery(    __name__,    broker=os.getenv("CELERY_BROKER_URL"),    backend=os.getenv("CELERY_RESULT_BACKEND"),)@Celery.taskdef test():    import time    time.sleep(5)    return "Hello, I like eating celery!"

ターミナルに戻り、celeryコマンドを入力します(すでに環境にceleryがインストールされている場合はpip installを使用してください)

celery --app=main.celery worker --concurrency=1 --loglevel=DEBUG

注意:Windowsマシンでスクリプトをテストしている場合、ローカル環境で動作させるためにコマンドに‘-P solo’を追加する必要があるかもしれません。 これは本番環境では必要ありません。

以下のような表示が表示されます。

-------------- celery@xxxx v5.2.7 (dawn-chorus)--- ***** ----- -- ******* ---- Windows-10-10.0.22621-SP0 2023-11-20 07:03:38- *** --- * --- - ** ---------- [config]- ** ---------- .> app:         main:0x22d23e16d70- ** ---------- .> transport:   redis://localhost:6379/0- ** ---------- .> results:     redis://localhost:6379/0- *** --- * --- .> concurrency: 4 (prefork)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)--- ***** -----  -------------- [queues]                .> celery           exchange=celery(direct) key=celery

これで、作業ディレクトリを基準に別のターミナルを開き、Python REPLを使用してクイックテストを行うことができます。

(.venv) PS C:\Users\xxx\backend> pythonPython 3.10.11 (tags/v3.10.11:7d4cc5a, Apr  5 2023, 00:38:17) [MSC v.1929 64 bit (AMD64)] on win32Type "help", "copyright", "credits" or "license" for more information.>>> from main import app, celery, test>>> test.delay()<AsyncResult: 2ba428c1-5d82-4f37-aa89-5cef76b7a6eb>

他のターミナルに戻り、Celeryワーカーが実行されているターミナルでタスクの実行を観察する必要があります。

[2023-11-20 12:21:46,743: INFO/MainProcess] Task main.test[2ba428c1-5d82-4f37-aa89-5cef76b7a6eb] received[2023-11-20 12:21:51,754: INFO/MainProcess] Task main.test[2ba428c1-5d82-4f37-aa89-5cef76b7a6eb] succeeded in 5.014999999999418s: 'こんにちは、セロリが好きです!'

ファイルのアップロードとベクトルデータストア(pg-vectorプラグイン)

Celeryのテストをベースに、実際の使用例ではバックグラウンドでCeleryのタスクを実行してPDFドキュメントを埋め込み、ベクトルデータストアに保存します。このプロセスでは、ファイルをSupabaseストレージバケットにアップロードし、Celeryのタスクをトリガーしてベクトルデータストア用にダウンロードして処理します。

Source: Author’s diagram

全体のプロセスは少し複雑です。まず、ファイルはSupabaseストレージバケットにアップロードされます。その後、このファイルをダウンロードしてベクトルデータストア用に処理するためにCeleryのタスクをトリガーします。このプロセスには、元のファイル形式から生のテキストに変換するためのDocument Loaderと、テキストをチャンクに分割するためのText Splitterが必要です(ベクトルデータストアの単一のベクトルのサイズ制限による)。また、特定のテキストチャンクにメタデータを追加します。最後に、テキストチャンクはベクトルに埋め込まれ、Supabaseベクトルデータストア(postgresのpg-vectorプラグイン)にアップロードされます。

Supabase上のSQLテーブル

まず、このデモンストレーション用にSupabaseに2つのテーブルが作成されていることを確認してください:(より詳しいSQLスクリプトの例については、https://github.com/StanGirard/quivr/tree/main/scriptsを参照してください)

-- Create users X vectors tableCREATE TABLE IF NOT EXISTS user_vectors (  user_id UUID,  vector_id UUID,  PRIMARY KEY (user_id, vector_id),  FOREIGN KEY (vector_id) REFERENCES vectors (id),  FOREIGN KEY (user_id) REFERENCES auth.users (id));-- Create vector extensionCREATE EXTENSION IF NOT EXISTS vector;-- Create vectors tableCREATE TABLE IF NOT EXISTS vectors (    id UUID DEFAULT uuid_generate_v4() PRIMARY KEY,    content TEXT,    metadata JSONB,    embedding VECTOR(1536));

ルートとエンドポイントの定義

main.pyに新しいルーター ‘upload_router’ を追加します。

from routes.upload_routes import upload_routerapp.include_router(upload_router)

‘routes’という名前の新しいディレクトリを作成し、’upload_routes.py’という名前のファイルを作成します。

from fastapi.responses import JSONResponsefrom auth import AuthBearer, get_current_userfrom celery_worker import process_filefrom celery.result import AsyncResultfrom fastapi import APIRouter, Depends, HTTPException, Request, UploadFilefrom repository.files.upload_file import upload_file_storagefrom logger import get_loggerfrom models import UserIdentitylogger = get_logger(__name__)upload_router = APIRouter()@upload_router.get("/upload/healthz", tags=["Health"])async def healthz():    return {"status": "ok"}@upload_router.post("/upload", dependencies=[Depends(AuthBearer())], tags=["Upload"])async def upload_file(    request: Request,    uploadFile: UploadFile,    current_user: UserIdentity = Depends(get_current_user),):    file_content = await uploadFile.read()    filename_with_user_id = str(current_user.id) + "/" + str(uploadFile.filename)    logger.info(f"ファイル名は: {filename_with_user_id}")    try:        fileInStorage = upload_file_storage(file_content, filename_with_user_id)        logger.info(f"ファイル{fileInStorage}が正常にアップロードされました")          except Exception as e:        if "The resource already exists" in str(e):            raise HTTPException(                status_code=403,                detail=f"ファイル{uploadFile.filename}はすでにストレージに存在しています。",            )        else:            raise HTTPException(                status_code=500, detail="ファイルのアップロードに失敗しました。"            )    task = process_file.delay(        file_name=filename_with_user_id,        file_original_name=uploadFile.filename,        user_id=current_user.id,    )    return JSONResponse({"task_id": task.id})@upload_router.get("/upload/{task_id}", dependencies=[Depends(AuthBearer())], tags=["Upload"])def get_status(task_id: str):    task_result = AsyncResult(task_id)    result = {        "task_id": task_id,        "task_status": task_result.status    }    return JSONResponse(result)

このスクリプトでは、’upload_routes.py’に2つのエンドポイントを定義して、ファイルのアップロードとタスクの状態をチェックします。

ソース:著者のスクリーンショット

/uploadには ‘process_file’というCeleryタスクがあることがわかります。では、celeryでこのタスクを作成しましょう。

Celeryのワーカーとタスク

まず、メインディレクトリに ‘celery_worker.py’というファイルを作成します。

import osfrom celery import Celeryimport asynciofrom utils.process_file import get_supabase_client,file_handlercelery = Celery(    __name__,    broker="redis://127.0.0.1:6379/0",    backend="redis://127.0.0.1:6379/0")@celery.task(name="process_file")def process_file(    file_name: str,    file_original_name: str,    user_id: str,):    supabase_client = get_supabase_client()    tmp_file_name = "tmp-file-"+file_name    tmp_file_name = tmp_file_name.replace("/", "_")        with open(tmp_file_name, "wb+") as file:        res = supabase_client.storage.from_("quivr").download(file_name)        file.write(res)        loop = asyncio.new_event_loop()        message = loop.run_until_complete(            file_handler(                file=tmp_file_name,                user_id=user_id,                file_original_name=file_original_name            )        )                file.close    os.remove(tmp_file_name)

この ‘process_file’ タスクは(上記のプロセスダイアグラムを参照)ファイルをダウンロードし、 file_handlerを使用してファイルを処理し、完了後に一時ファイルを削除します。

ファイルの処理と埋め込み

簡単な処理のために、以下の file_handlerスクリプトを使用できます。このスクリプトには、全ての埋め込みを行うワーカーが含まれています。Quivrのコードベースも確認できます。ここには、複数のワーカーに埋め込みが割り当てられる別の共有タスクが含まれています。

# utils/process_file.pyを使用してアップロードされたファイルを処理import osimport timefrom logger import get_loggerfrom repository.files.upload_file import DocumentSerializablefrom langchain.document_loaders import UnstructuredPDFLoaderfrom models.databases.supabase.supabase import SupabaseDBfrom supabase.client import Client, create_clientfrom langchain.vectorstores import SupabaseVectorStorefrom langchain.embeddings.openai import OpenAIEmbeddingsfrom langchain.text_splitter import RecursiveCharacterTextSplitterfrom dotenv import load_dotenvload_dotenv()logger = get_logger(__name__)def get_supabase_client() -> Client:    supabase_client: Client = create_client(        os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_SERVICE_KEY")    )    return supabase_clientdef get_supabase_db() -> SupabaseDB:    supabase_client = get_supabase_client()    return SupabaseDB(supabase_client)def get_embeddings() -> OpenAIEmbeddings:    embeddings = OpenAIEmbeddings(        openai_api_key=os.getenv("OPENAI_API_KEY")    )  # pyright: ignore reportPrivateUsage=none    return embeddingsdef get_documents_vector_store() -> SupabaseVectorStore:    # settings = BrainSettings()  # pyright: ignore reportPrivateUsage=none    embeddings = get_embeddings()    supabase_client: Client = create_client(        os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_SERVICE_KEY")    )    documents_vector_store = SupabaseVectorStore(        supabase_client, embeddings, table_name="vectors"    )    return documents_vector_storedef create_vector(doc):    documents_vector_store = get_documents_vector_store()    try:         sids = documents_vector_store.add_documents([doc])        if sids and len(sids) > 0:            return sids            except Exception as e:        logger.error(f"Error creating vector for document: {e}")        def create_user_vector(user_id, vector_id):    database = get_supabase_db()    response = (        database.db.table("user_vectors")        .insert(            {                "user_id": str(user_id),                "vector_id": str(vector_id),            }        )        .execute()    )    return response.data    def create_embedding_for_document(user_id, doc_with_metadata):    doc = DocumentSerializable.from_json(doc_with_metadata)    created_vector = create_vector(doc)    created_vector_id = created_vector[0]  # pyright: ignore reportPrivateUsage=none        create_user_vector(user_id, created_vector_id)    def compute_documents_from_pdf(file,loader):    loader = loader(file)    documents=[]    documents = loader.load()    # split the documents into chunks    text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(        chunk_size=500, chunk_overlap=0    )    documents = text_splitter.split_documents(documents)    return documents        async def file_handler(    file: str,    file_original_name: str,    user_id,    loader_class=UnstructuredPDFLoader,   #Langchainのローダークラス):    dateshort = time.strftime("%Y%m%d")        documents = compute_documents_from_pdf(file,loader_class)    for doc in documents:  # pyright: ignore reportPrivateUsage=none        metadata = {            "file_name": file_original_name,            "date": dateshort        }        doc_with_metadata = DocumentSerializable(            page_content=doc.page_content, metadata=metadata        )        create_embedding_for_document(            user_id, doc_with_metadata.to_json()        )    return "Hello, processing is done!"

デモンストレーション目的で、ここではpdfファイルのみがテストされています。より多くのファイル形式については、ファイルクラスを使用して広範なファイル形式を処理しているQuivrのコードベースを参照してください。

エンドツーエンドのテスト

テストするには、Uvicornサーバー(FastAPI用)とCeleryサーバーの両方をオンにします。

uvicorn main:app --reload

celery -A celery_worker worker --loglevel=info --logfile=celery.log --concurrency=1 -P solo

— logfile(オプション):作業ディレクトリにceleryログファイルを保存できます

— concurrency(オプション):同時にオンにしたいワーカーの数を設定します

— P solo:celeryをWindowsのノートパソコンで実行するために必要です。Mac/Dockerで実行する場合は、おそらくこれは不要です。

ここではエンドポイントのテストのスニペットがあります。

Source: Author’s screenshot
Source: Author’s screenshot

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more

人工知能

「UVeyeの共同設立者兼CEO、アミール・ヘヴェルについてのインタビューシリーズ」

アミール・ヘヴァーは、UVeyeのCEO兼共同創設者であり、高速かつ正確な異常検出により、自動車およびセキュリティ産業に直面...

人工知能

「ゲイリー・ヒュースティス、パワーハウスフォレンジクスのオーナー兼ディレクター- インタビューシリーズ」

ゲイリー・ヒュースティス氏は、パワーハウスフォレンジックスのオーナー兼ディレクターであり、ライセンスを持つ私立探偵、...

人工知能

「LeanTaaSの創設者兼CEO、モハン・ギリダラダスによるインタビューシリーズ」

モーハン・ギリダラダスは、AIを活用したSaaSベースのキャパシティ管理、スタッフ配置、患者フローのソフトウェアを提供する...

人工知能

『DeepHowのCEO兼共同創業者、サム・ジェン氏によるインタビューシリーズ』

ディープハウのCEO兼共同創設者であるサム・ジェンは、著名な投資家から支持される急速に進化するスタートアップを率いていま...

人工知能

「クリス・サレンス氏、CentralReachのCEO - インタビューシリーズ」

クリス・サレンズはCentralReachの最高経営責任者であり、同社を率いて、自閉症や関連する障害を持つ人々のために優れたクラ...

人工知能

ジョシュ・フィースト、CogitoのCEO兼共同創業者 - インタビューシリーズ

ジョシュ・フィーストは、CogitoのCEO兼共同創業者であり、感情と会話AIを組み合わせた革新的なプラットフォームを提供するエ...