ChatGPT APIへの適切な方法での通話の作り方

ChatGPT APIへの通話作り方

信頼性のある呼び出しを行い、堅牢なアプリケーションを構築するためのChatGPT APIの使い方

LLM(Language Model)は今やどこにでも存在しており、特にChatGPTは非常に人気です。多くのアプリケーションがこれをベースに開発されており、もしあなたがまだ試していないのであれば、ぜひ試してみてください。

Created with Midjourney.

ChatGPTをベースにアプリケーションを構築する場合、複数の並列呼び出しを行う必要があるでしょう。ただ、あなただけではありません。1日に何百万ものリクエストを処理する多くのアプリケーション(エンジニアリングチームに感謝します)があるため、APIはしばしば「リクエストが多すぎる」というエラーを返します。そのため、複数の並列呼び出しを行う際に、このようなエラーにうまく対処する方法が必要です。

この小さなPythonチュートリアルでは、次の2つの重要なトピックについて説明します。ChatGPT APIへの効率的な呼び出しを行うためには、以下の2つの重要なトピックをカバーします:

  1. 複数の呼び出しを並列で実行する
  2. 失敗した場合に呼び出しを再試行する

1. 複数の呼び出しを並列で実行する

呼び出しを行う最も簡単な方法は、同期的に行うことです。つまり、リクエストを送信し、レスポンスが到着するのを待ってからプログラムを続行します。以下のように簡単に行うことができます:

import requestsheaders = {    "Content-Type": "application/json",    "Authorization": f"Bearer {OPENAI_API_KEY}"}response_json = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json={    "model": "gpt-3.5-turbo",    "messages": [{"role": "user", "content": "ping"}],    "temperature": 0}).json()print(response_json["choices"][0]["message"]["content"])

Pong!

シンプルなシステムで作業している場合は、この方法で問題ありません。ただし、APIやデータベースなどのリソースに対して複数の呼び出しを並列で実行したい場合は、非同期で行うことでより高速なレスポンスを得ることができます。

非同期タスクを実行すると、すべてのアクションがトリガされ、それらが並列で完了するのを待ちます。これにより、待ち時間が短縮されます。

これを行うための基本的な方法は、異なるスレッドを作成して各リクエストを処理することですが、非同期呼び出しを使用することでより良い方法があります。

非同期呼び出しを行うと、アプリケーションが待機すべき場所を正確に指定することができるため、従来のスレッド処理ではシステムが自動的にスレッドを待機状態にするため、効率が低下する可能性があります。

以下に、同期呼び出しと非同期呼び出しの違いを示す例を示します。

# 同期呼び出しimport timedef delay_print(msg):    print(msg, end=" ")    time.sleep(1)def sync_print():    for i in range(10):        delay_print(i)start_time = time.time()sync_print()print("\n", time.time() - start_time, "seconds.")

0 1 2 3 4 5 6 7 8 9  10.019574642181396 seconds.

# 非同期呼び出しimport asyncioasync def delay_print_async(msg):    print(msg, end=" ")    await asyncio.sleep(1)async def async_print():    asyncio.gather(*[delay_print_async(i) for i in range(10)])start_time = time.time()await async_print()print("\n", time.time() - start_time, "seconds.")

0.0002448558807373047 seconds.0 1 2 3 4 5 6 7 8 9 

asyncio.gatherメソッドは、それに渡されたすべての非同期呼び出しをトリガし、結果が利用可能になったら返します。

残念ながら、requestsライブラリを使用して非同期呼び出しを行うことはできません。代わりに、aiohttpライブラリを使用することができます。以下に、aiohttpを使用して非同期呼び出しを行う方法の例を示します。

import aiohttpasync def get_completion(content):    async with aiohttp.ClientSession() as session:        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={            "model": "gpt-3.5-turbo",            "messages": [{"role": "user", "content": content}],            "temperature": 0        }) as resp:            response_json = await resp.json()            return response_json["choices"][0]['message']["content"]await get_completion("Ping")

Pong!

前述の通り、非同期リクエストを実行するには、asyncio.gatherメソッドを使用する必要があります。

async def get_completion_list(content_list):    return await asyncio.gather(*[get_completion(content) for content in content_list])await get_completion_list(["ping", "pong"]*5)

['Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!']

これは動作しますが、この方法で呼び出しを行うことは理想的ではありません。なぜなら、すべての呼び出しのたびにセッションオブジェクトを再作成しているため、リソースと時間を節約することができます。次のように同じセッションオブジェクトを再利用することができます:

async def get_completion(content, session):    async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={        "model": "gpt-3.5-turbo",        "messages": [{"role": "user", "content": content}],        "temperature": 0    }) as resp:        response_json = await resp.json()        return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list):    async with aiohttp.ClientSession() as session:        return await asyncio.gather(*[get_completion(content, session) for content in content_list])await get_completion_list(["ping", "pong"]*5)

簡単ですね。これにより、簡単に複数の呼び出しを行うことができます。ただし、この方法で無制限に呼び出しを行うことは良い慣例ではありません。システムを過負荷にさせ、一定時間の間追加のリクエストを行えなくなる可能性があるためです(信じてください、そうなります)。したがって、同時に実行できる呼び出しの数を制限することは良い考えです。これは、asyncio.Semaphoreクラスを使用して簡単に行うことができます。

Semaphoreクラスは、現在そのコンテキスト内で実行されている非同期呼び出しの数を管理するコンテキストマネージャを作成します。最大数に達すると、呼び出しが完了するまでブロックされます。

async def get_completion(content, session, semaphore):    async with semaphore:        await asyncio.sleep(1)        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={            "model": "gpt-3.5-turbo",            "messages": [{"role": "user", "content": content}],            "temperature": 0        }) as resp:            response_json = await resp.json()            return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list, max_parallel_calls):    semaphore = asyncio.Semaphore(value=max_parallel_calls)    async with aiohttp.ClientSession() as session:        return await asyncio.gather(*[get_completion(content, session, semaphore) for content in content_list])start_time = time.perf_counter()completion_list = await get_completion_list(["ping", "pong"]*5, 100)print("経過時間:", time.perf_counter() - start_time, "秒")print(completion_list)

経過時間: 1.8094507199984946 秒['Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!']

ここでオプションのこととして、呼び出しの進行状況を報告することもできます。これは、進行状況を保持し、すべての呼び出しで共有される小さなクラスを作成することによって行うことができます。次のように行うことができます:

class ProgressLog:    def __init__(self, total):        self.total = total        self.done = 0    def increment(self):        self.done = self.done + 1    def __repr__(self):        return f"実行完了 {self.done}/{self.total}回"async def get_completion(content, session, semaphore, progress_log):    async with semaphore:        await asyncio.sleep(1)        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={            "model": "gpt-3.5-turbo",            "messages": [{"role": "user", "content": content}],            "temperature": 0        }) as resp:            response_json = await resp.json()            progress_log.increment()            print(progress_log)            return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list, max_parallel_calls):    semaphore = asyncio.Semaphore(value=max_parallel_calls)    progress_log = ProgressLog(len(content_list))    async with aiohttp.ClientSession() as session:        return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])start_time = time.perf_counter()completion_list = await get_completion_list(["ping", "pong"]*5, 100)print("経過時間:", time.perf_counter() - start_time, "秒")print(completion_list)

実行完了 1/10回実行完了 2/10回実行完了 3/10回実行完了 4/10回実行完了 5/10回実行完了 6/10回実行完了 7/10回実行完了 8/10回実行完了 9/10回実行完了 10/10回経過時間: 1.755018908999773 秒['Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!']

このセクションでは、複数の非同期リクエストを実行する方法について説明しました。これにより、複数の非同期呼び出しを行い、一度に行う呼び出しの数を制限し、進捗を報告することができます。ただし、まだいくつかの問題があります。

行われるリクエストは、サーバーの過負荷、接続の中断、不正なリクエストなど、さまざまな理由で失敗する可能性があります。これらは例外を生成したり、予測不可能な応答を返すことがあるため、これらのケースを処理し、失敗した呼び出しを自動的に再試行する必要があります。

2. 失敗した場合に呼び出しを再試行する

失敗した呼び出しを処理するために、tenacityライブラリを使用します。 Tenacityは、例外が発生した場合に関数呼び出しを自動的に再試行するための関数デコレータを提供します。

from tenacity import (    retry,    stop_after_attempt,    wait_random_exponential,)

呼び出しに再試行機能を提供するには、@retryデコレータを配置する必要があります。追加のパラメータなしで使用すると、関数は失敗した場合に即座に無期限に再試行されます。これはいくつかの理由で良くありません。

1つは、関数呼び出しがサーバーの過負荷によって失敗する可能性があるため、再試行する前にある程度の時間を待つことが合理的であるということです。待機時間を示すために、wait=wait_random_exponential(min=min_value, max=max_value)というパラメータを使用します。これにより、関数が失敗するほど待機時間が増加します。

オプションとして、リトライが発生した場合にメッセージを記録することもできます。これを行うには、パラメータbefore_sleepに関数を指定します。ここではprint関数を使用しますが、より良い方法はloggingモジュールを使用して、このパラメータにlogging.errorまたはlogging.debug関数を渡すことです。

デモンストレーションのために、ランダムな例外を生成します。

import randomclass ProgressLog:    def __init__(self, total):        self.total = total        self.done = 0    def increment(self):        self.done = self.done + 1    def __repr__(self):        return f"完了した実行 {self.done}/{self.total}."@retry(wait=wait_random_exponential(min=1, max=60), before_sleep=print)async def get_completion(content, session, semaphore, progress_log):    async with semaphore:        #await asyncio.sleep(1)        if random.random() < 0.2:            raise Exception("ランダムな例外")        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={            "model": "gpt-3.5-turbo",            "messages": [{"role": "user", "content": content}],            "temperature": 0        }) as resp:            response_json = await resp.json()            progress_log.increment()            print(progress_log)            return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list, max_parallel_calls):    semaphore = asyncio.Semaphore(value=max_parallel_calls)    progress_log = ProgressLog(len(content_list))    async with aiohttp.ClientSession() as session:        return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])start_time = time.perf_counter()completion_list = await get_completion_list(["ping", "pong"]*5, 100)print("経過時間:", time.perf_counter() - start_time, "秒。")print(completion_list)

<RetryCallState 133364377433616: 試行回数 #1; 0.74秒待機; 最後の結果: 失敗 (例外: ランダムな例外)><RetryCallState 133364377424496: 試行回数 #1; 0.79秒待機; 最後の結果: 失敗 (例外: ランダムな例外)>完了した実行 1/10.完了した実行 2/10.完了した実行 3/10.完了した実行 4/10.完了した実行 5/10.完了した実行 6/10.完了した実行 7/10.完了した実行 8/10.完了した実行 9/10.完了した実行 10/10.経過時間: 1.1305301820011664秒。['ポン!', 'ピン!', 'ポン!', 'ピン!', 'ポン!', 'ピン!', 'ポン!', 'ピン!', 'ポン!', 'ピン!']

これにより、関数が再試行する前に一定の時間待機します。ただし、失敗の理由は、サーバーダウンタイムや不正なペイロードなどのシステマティックな要因による場合もあります。この場合、再試行回数を制限したいとします。これはパラメータstop=stop_after_attempt(n)を使用して行うことができます。

import random

class ProgressLog:
    def __init__(self, total):
        self.total = total
        self.done = 0

    def increment(self):
        self.done = self.done + 1

    def __repr__(self):
        return f"完了した実行数 {self.done}/{self.total}."

@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(2), before_sleep=print)
async def get_completion(content, session, semaphore, progress_log):
    async with semaphore:
        #await asyncio.sleep(1)
        if random.random() < 0.9:
            raise Exception("ランダムな例外")
        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
            "model": "gpt-3.5-turbo",
            "messages": [{"role": "user", "content": content}],
            "temperature": 0
        }) as resp:
            response_json = await resp.json()
            progress_log.increment()
            print(progress_log)
            return response_json["choices"][0]['message']["content"]

async def get_completion_list(content_list, max_parallel_calls):
    semaphore = asyncio.Semaphore(value=max_parallel_calls)
    progress_log = ProgressLog(len(content_list))
    async with aiohttp.ClientSession() as session:
        return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])

start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*5, 100)
print("経過時間: ", time.perf_counter() - start_time, "秒.")
print(completion_list)

<RetryCallState 133364608660048: attempt #1; slept for 0.1; last result: failed (例外 ランダムな例外)><RetryCallState 133364377435680: attempt #1; slept for 0.71; last result: failed (例外 ランダムな例外)><RetryCallState 133364377421472: attempt #1; slept for 0.17; last result: failed (例外 ランダムな例外)><RetryCallState 133364377424256: attempt #1; slept for 0.37; last result: failed (例外 ランダムな例外)><RetryCallState 133364377430928: attempt #1; slept for 0.87; last result: failed (例外 ランダムな例外)><RetryCallState 133364377420752: attempt #1; slept for 0.42; last result: failed (例外 ランダムな例外)><RetryCallState 133364377422576: attempt #1; slept for 0.47; last result: failed (例外 ランダムな例外)><RetryCallState 133364377431312: attempt #1; slept for 0.11; last result: failed (例外 ランダムな例外)><RetryCallState 133364377425840: attempt #1; slept for 0.69; last result: failed (例外 ランダムな例外)><RetryCallState 133364377424592: attempt #1; slept for 0.89; last result: failed (例外 ランダムな例外)>---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
/usr/local/lib/python3.10/dist-packages/tenacity/_asyncio.py in __call__(self, fn, *args, **kwargs)
     49                 try:
---> 50                     result = await fn(*args, **kwargs)
     51                 except BaseException:  # noqa: B9025 frames

Exception: ランダムな例外

The above exception was the direct cause of the following exception:

RetryError                                Traceback (most recent call last)
/usr/local/lib/python3.10/dist-packages/tenacity/__init__.py in iter(self, retry_state)
    324             if self.reraise:
    325                 raise retry_exc.reraise()
--> 326             raise retry_exc from fut.exception()
    327 
    328         if self.wait:

RetryError: RetryError[<Future at 0x794b5057a590 state=finished raised Exception>]

このパラメータが設定されている場合、RetryErrorは試行回数が最大値に達した時点で発生します。ただし、例外を生成せずに実行を継続し、後で処理するために、呼び出しの返り値にNone値を保存することもあります。これを行うには、コールバック関数retry_error_callbackを使用して、RetryErrorエラーが発生した場合にNone値を返すだけです:

import random

class ProgressLog:
    def __init__(self, total):
        self.total = total
        self.done = 0
    
    def increment(self):
        self.done = self.done + 1
    
    def __repr__(self):
        return f"完了した実行数 {self.done}/{self.total}."

@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(2), before_sleep=print, retry_error_callback=lambda _: None)
async def get_completion(content, session, semaphore, progress_log):
    async with semaphore:
        #await asyncio.sleep(1)
        if random.random() < 0.7:
            raise Exception("ランダムな例外")
        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
            "model": "gpt-3.5-turbo",
            "messages": [{"role": "user", "content": content}],
            "temperature": 0
        }) as resp:
            response_json = await resp.json()
            progress_log.increment()
            print(progress_log)
            return response_json["choices"][0]['message']["content"]

async def get_completion_list(content_list, max_parallel_calls):
    semaphore = asyncio.Semaphore(value=max_parallel_calls)
    progress_log = ProgressLog(len(content_list))
    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(1)) as session:
        return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])

start_time = time.perf_counter()
completion_list = await get_completion_list(["ピン", "ポン"]*5, 100)
print("経過時間:", time.perf_counter() - start_time, "秒。")
print(completion_list)

<RetryCallState 133364377805024: 試行 #1; 0.22秒スリープ; 最後の結果: 失敗 (例外 ランダムな例外)>
<RetryCallState 133364377799456: 試行 #1; 0.53秒スリープ; 最後の結果: 失敗 (例外 ランダムな例外)>
<RetryCallState 133364377801328: 試行 #1; 0.24秒スリープ; 最後の結果: 失敗 (例外 ランダムな例外)>
<RetryCallState 133364377810208: 試行 #1; 0.38秒スリープ; 最後の結果: 失敗 (例外 ランダムな例外)>
<RetryCallState 133364377801616: 試行 #1; 0.54秒スリープ; 最後の結果: 失敗 (例外 ランダムな例外)>
<RetryCallState 133364377422096: 試行 #1; 0.59秒スリープ; 最後の結果: 失敗 (例外 ランダムな例外)>
<RetryCallState 133364377430592: 試行 #1; 0.07秒スリープ; 最後の結果: 失敗 (例外 ランダムな例外)>
<RetryCallState 133364377425648: 試行 #1; 0.05秒スリープ; 最後の結果: 失敗 (例外 ランダムな例外)>
完了した実行数 1/10.
完了した実行数 2/10.
完了した実行数 3/10.
経過時間: 2.6409040250000544 秒。
['ポン!', 'ピン!', None, None, None, None, None, 'ピン!', None, None]

これにより、エラーが発生する代わりにNoneの値が返されます。

まだ処理されていない問題の1つは、接続が詰まる問題です。これは、リクエストを実行し、何らかの理由でホストが接続を保持しているが失敗せずに何も返さない場合に発生します。そのようなケースを処理するには、一定期間内に値が返らない場合にタイムアウトして返すためにaiohttpライブラリのtimeoutパラメータとaiohttp.ClientTimeoutクラスを使用する必要があります。ここでタイムアウトが発生すると、TimeoutErrorが発生し、それがtenacityretryデコレータで処理され、自動的に関数が再実行されます。

class ProgressLog:
    def __init__(self, total):
        self.total = total
        self.done = 0
    
    def increment(self):
        self.done = self.done + 1
    
    def __repr__(self):
        return f"完了した実行数 {self.done}/{self.total}."

@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(20), before_sleep=print, retry_error_callback=lambda _: None)
async def get_completion(content, session, semaphore, progress_log):
    async with semaphore:
        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
            "model": "gpt-3.5-turbo",
            "messages": [{"role": "user", "content": content}],
            "temperature": 0
        }) as resp:
            response_json = await resp.json()
            progress_log.increment()
            print(progress_log)
            return response_json["choices"][0]['message']["content"]

async def get_completion_list(content_list, max_parallel_calls):
    semaphore = asyncio.Semaphore(value=max_parallel_calls)
    progress_log = ProgressLog(len(content_list))
    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(10)) as session:
        return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])

start_time = time.perf_counter()
completion_list = await get_completion_list(["ピン", "ポン"]*100, 100)
print("経過時間:", time.perf_counter() - start_time, "秒。")

すばらしい!これで、複数の並列リクエストを実行するための堅牢な方法ができました。いくつかのエラーが発生した場合には自動的に再試行され、エラーがシステム的なものである場合にはNoneの値が返されます。最終的なコードは以下のようになります:

import asyncio
import aiohttp
from tenacity import (
    retry,
    stop_after_attempt,
    wait_random_exponential,
)

headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {OPENAI_API_KEY}"
}

class ProgressLog:
    def __init__(self, total):
        self.total = total
        self.done = 0
    
    def increment(self):
        self.done = self.done + 1
    
    def __repr__(self):
        return f"完了した実行数 {self.done}/{self.total}."

@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(20), before_sleep=print, retry_error_callback=lambda _: None)
async def get_completion(content, session, semaphore, progress_log):
    async with semaphore:
        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
            "model": "gpt-3.5-turbo",
            "messages": [{"role": "user", "content": content}],
            "temperature": 0
        }) as resp:
            response_json = await resp.json()
            progress_log.increment()
            print(progress_log)
            return response_json["choices"][0]['message']["content"]

async def get_completion_list(content_list, max_parallel_calls, timeout):
    semaphore = asyncio.Semaphore(value=max_parallel_calls)
    progress_log = ProgressLog(len(content_list))
    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(timeout)) as session:
        return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])

要約すると、以下の機能を実装しました:

  1. 待ち時間を短縮するための非同期呼び出し。
  2. 非同期呼び出しの進捗をログに記録。
  3. 呼び出しが失敗した場合に自動的に再試行。
  4. 失敗がシステム的な場合にはNoneの値を返す。
  5. タイムアウトした場合には呼び出しを再試行し、何も返さない。

質問がある場合、エラーを見つけた場合、またはこの改善に関するアイデアがある場合は、コメントを残してください!

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more

機械学習

マシンラーニングのロードマップ:コミュニティの推奨事項2023

前回の記事で、このロードマップの第1部では、機械学習のための出発点と方向性について簡単に説明しました初心者が堅固な基盤...

データサイエンス

オープンAIによるこの動きは、AGIへの道を開くだろう

人工知能(AI)の能力向上を目指した画期的な取り組みの一環として、OpenAIはデータパートナーシップイニシアチブを発表しま...

機械学習

「AIアクトの解読」

AI法 [1]は、長く苦痛な過程を経て形成されましたこれは、ヨーロッパの立法プロセスにおける政治の影響と重要性を完璧に示す...

機械学習

音声合成:進化、倫理、そして法律

ロマン・ガーリン、シニアバイスプレジデント @イノベーション、スポートレーダー この記事では、音声合成の進化を辿り、それ...

データサイエンス

「AIはほとんどのパスワードを1分以内に解読できますAI攻撃からパスワードを保護する方法」

人工知能(AI)は、次の技術革新の波をもたらしています。AIの能力に魅了される一方で、その潜在的なリスクへの懸念も高まっ...

データサイエンス

「変革を受け入れる:AWSとNVIDIAが創発的なAIとクラウドイノベーションを進める」

Amazon Web ServicesとNVIDIAは、最新の生成AI技術を世界中の企業にもたらします。 AIとクラウドコンピューティングを結び付...