LLM SaaSのためのFastAPIテンプレート パート1 — Authとファイルのアップロード
LLM SaaSのためのFastAPIテンプレート パート1 — Authとファイルのアップロードのための最速APIテンプレート
最近Pythonバックエンド開発者コミュニティでFastAPIが非常に注目されています。その理由は、シンプルさ、非同期性、およびネイティブのSwagger UIにあります。
GitHubで人気のあるLLMオープンソースプロジェクトの中でも、Quivrは最高の一つであり、多くのスター(執筆時点で24.2k)としっかりしたコードベースを持っています。まず第一に、このリポジトリとその貢献者に敬意を表し、Pythonコミュニティのための素晴らしい参考プロジェクトを作成してくれた彼らの素晴らしい仕事に感謝したいと思います。
GitHub – StanGirard/quivr: 🧠あなたのスーパーチャージされたセカンドブレイン 🧠あなたの個人の生産性…
🧠あなたのスーパーチャージされたセカンドブレイン 🧠あなたの個人の生産性アシスタントでファイル(PDF、CSV)とアプリとのチャット…
github.com
- スタートアップ企業向けの20の最高のChatGPTプロンプト
- 即座のマルチビジュアライゼーションダッシュボードのためにGPT-4を促す
- 「2023年最終的なLLMOpsガイド:初心者からエキスパートまでの無料トレーニング」
このリポジトリから多くの良い要素があるため、さらに将来のLLMユースケースに基づいたテンプレートを作成したいと考えています。そのため、この記事を2つに分けることにしました。この記事では、以下に焦点を当てます:
- 高レベルのアーキテクチャ
- FastAPIでのSupabase Auth
- Supabaseでのファイルアップロード
第2部では、以下をカバーします:
- 長時間実行されるプロセスのためのCeleryワーカーとメッセージキュー
- Postgresのpg-vectorプラグイン
第3部では、以下をカバーします:
- ストリーミングペイロードのChatGPTに対するFastAPI
- ストライプ決済
- APIテスト
- 将来のユースケースのためのテンプレート
高レベルのアーキテクチャ
バックエンドアーキテクチャは主に3つのパーツで構成されています:Supabase DB、FastAPIバックエンドサーバー、およびCeleryサーバー。Celeryは長時間実行されるバックグラウンドタスク(例:大きなPDF文書の埋め込み)に使用されます。FastAPIとCeleryサーバーの間では、Redisがメッセージブローカーとして使用されます。FastAPI/CeleryからSupabaseへの通信は、Supabaseクライアント(Python SDK)を介して行われます。
Supabase Auth
Supabaseは、オープンソースのFirebaseの代替です。実質的にはPostgresデータベースですが、他の組み込み機能(認証、エッジ関数、ブロブストレージ、pg-vectorなど)があり、ゼロからPostgres DBを使用する場合と比較して開発作業を効率化します。
supabase authでは、supabaseクライアントライブラリから簡単にsignUp()およびsignIn()関数を呼び出すことができます。以下はJavaScriptでの例です(ソース:https://supabase.com/docs/guides/auth/social-login)。
async function signUpNewUser() { const { data, error } = await supabase.auth.signUp({ email: '[email protected]', password: 'example-password', options: { redirectTo: 'https//example.com/welcome' } })}
async function signInWithEmail() { const { data, error } = await supabase.auth.signInWithPassword({ email: '[email protected]', password: 'example-password', options: { redirectTo: 'https//example.com/welcome' } })}
async function signOut() { const { error } = await supabase.auth.signOut()}
これはフロントエンドコードですが、バックエンドでは何をする必要があるのでしょうか?
良い質問です。フロントエンドとSupabaseの間のやり取りにより、Supabaseは実際にauth.usersというテーブルを作成します。このテーブルは、SupabaseダッシュボードのAuthenticationセクションの下にあります。
auth.users を参照する将来のテーブルが必要な場合、次のようにするだけです。
CREATE TABLE IF NOT EXISTS user_daily_usage( user_id UUID REFERENCES auth.users (id), email TEXT, date TEXT, daily_requests_count INT, PRIMARY KEY (user_id, date));
その後、バックエンドAPIの認証のためにユーザーを認証する必要があります。フロントエンドが直接 Supabase.auth を使用する場合、バックエンドは他のAPI呼び出しのユーザーリクエストをどのように認証しますか?
これを説明するために、JWT(JavaScript Web Token)の動作を理解する必要があります。
JWTのエンコードとデコードをテストできます。https://jwt.io/にアクセスしてください。ユーザーが認証サーバーでサインアップ/サインインした後、JWTを取得します。したがって、ユーザーがトークンの有効期限が切れる前の短い時間枠内にウェブサイトを再度読み込もうとした場合、再度パスワードを入力する必要はありません。
ユーザーのためにJWTを生成するには、”sub”(auth.usersから自動的に割り当てられるユーザーのUUID)とサインアップに使用したメールが必要です。
ですから、認証サーバーまたはバックエンドがJWTをデコードするには、256ビットのシークレットキーがどちらも必要です。Supabaseの認証を使用する場合、管理ダッシュボードから「Anon key」と呼ばれます。これはバックエンドがJWTをデコードするために使用する同じキーです。
FastAPIバックエンドのAuthモジュールは次のようになります:
import osfrom typing import Optionalfrom auth.jwt_token_handler import decode_access_token, verify_tokenfrom fastapi import Depends, HTTPException, Requestfrom fastapi.security import HTTPAuthorizationCredentials, HTTPBearerfrom models import UserIdentityclass AuthBearer(HTTPBearer): def __init__(self, auto_error: bool = True): super().__init__(auto_error=auto_error) async def __call__( self, request: Request, ): credentials: Optional[HTTPAuthorizationCredentials] = await super().__call__( request ) self.check_scheme(credentials) token = credentials.credentials # pyright: ignore reportPrivateUsage=none return await self.authenticate( token, ) def check_scheme(self, credentials): if credentials and credentials.scheme != "Bearer": raise HTTPException(status_code=401, detail="Token must be Bearer") elif not credentials: raise HTTPException( status_code=403, detail="Authentication credentials missing" ) async def authenticate( self, token: str, ) -> UserIdentity: if os.environ.get("AUTHENTICATE") == "false": return self.get_test_user() elif verify_token(token): return decode_access_token(token) else: raise HTTPException(status_code=401, detail="Invalid token or api key.") def get_test_user(self) -> UserIdentity: return UserIdentity( email="[email protected]", id="696dda89-d395-4601-af3d-e1c66de3df1a" # type: ignore ) # replace with test user informationdef get_current_user(user: UserIdentity = Depends(AuthBearer())) -> UserIdentity: return user
import osfrom datetime import datetime, timedeltafrom typing import Optionalfrom jose import jwtfrom jose.exceptions import JWTErrorfrom models import UserIdentitySECRET_KEY = os.environ.get("JWT_SECRET_KEY")ALGORITHM = "HS256"if not SECRET_KEY: raise ValueError("JWT_SECRET_KEY environment variable not set")def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwtdef decode_access_token(token: str) -> UserIdentity: try: payload = jwt.decode( token, SECRET_KEY, algorithms=[ALGORITHM], options={"verify_aud": False} ) except JWTError: return None # pyright: ignore reportPrivateUsage=none return UserIdentity( email=payload.get("email"), id=payload.get("sub"), # pyright: ignore reportPrivateUsage=none )def verify_token(token: str): payload = decode_access_token(token) return payload is not None
Supabaseとのファイルアップロード
Supabaseのクライアントライブラリを直接呼び出してファイルをアップロードすることができます。以下のようなユーティリティ関数を作成することができます:
import jsonfrom multiprocessing import get_loggerfrom langchain.pydantic_v1 import Fieldfrom langchain.schema import Documentfrom supabase.client import Client, create_clientimport osfrom dotenv import load_dotenvload_dotenv()logger = get_logger()def get_supabase_client() -> Client: supabase_client: Client = create_client( os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_SERVICE_KEY") ) return supabase_clientdef upload_file_storage(file, file_identifier: str): supabase_client: Client = get_supabase_client() # res = supabase_client.storage.create_bucket("quivr") response = None try: response = supabase_client.storage.from_(os.getenv("SUPABASE_BUCKET")).upload(file_identifier, file) return response except Exception as e: logger.error(e) raise e
以下はFastAPIのためのルートです:
import osfrom typing import Optionalfrom uuid import UUIDfrom auth import AuthBearer, get_current_userfrom fastapi import APIRouter, Depends, HTTPException, Query, Request, UploadFilefrom logger import get_loggerfrom models import UserIdentity, UserUsagefrom repository.files.upload_file import upload_file_storagefrom repository.user_identity import get_user_identitylogger = get_logger(__name__)upload_router = APIRouter()@upload_router.get("/upload/healthz", tags=["Health"])async def healthz(): return {"status": "ok"}@upload_router.post("/upload", dependencies=[Depends(AuthBearer())], tags=["Upload"])async def upload_file( request: Request, uploadFile: UploadFile, chat_id: Optional[UUID] = Query(None, description="The ID of the chat"), current_user: UserIdentity = Depends(get_current_user),): file_content = await uploadFile.read() filename_with_user_id = str(current_user.id) + "/" + str(uploadFile.filename) try: fileInStorage = upload_file_storage(file_content, filename_with_user_id) logger.info(f"File {fileInStorage} uploaded successfully") except Exception as e: if "The resource already exists" in str(e): raise HTTPException( status_code=403, detail=f"ファイル {uploadFile.filename} は既にストレージに存在しています。", ) else: raise HTTPException( status_code=500, detail="ファイルのアップロードに失敗しました。" ) return {"message": "ファイルの処理が開始されました。"}
続きはパート2でお届けします…
FastAPIに慣れていない方にとっては少し複雑かもしれませんが、パート3の最後にはGitHubリポジトリ全体を共有するので、より明確になるでしょう。お楽しみに。
We will continue to update VoAGI; if you have any questions or suggestions, please contact us!
Was this article helpful?
93 out of 132 found this helpful
Related articles