Azure OpenAI ServiceのGPTとStreamlitを使用したWebアプリケーションにおいて、ストリーミング応答を実装する方法をまとめます。
サンプルコード
早速ですが、以下のコードで実装できました。
import os
import streamlit as st
import asyncio
from openai import AsyncAzureOpenAI
from dotenv import load_dotenv
from tenacity import retry, stop_after_attempt, wait_fixed
# .envファイルの読み込み
load_dotenv(".env")
AZURE_OPENAI_ENDPOINT = os.getenv('AZURE_OPENAI_ENDPOINT')
AZURE_OPENAI_API_VERSION = os.getenv('AZURE_OPENAI_API_VERSION')
AZURE_OPENAI_API_KEY = os.getenv('AZURE_OPENAI_API_KEY')
AZURE_DEPLOYMENT = os.getenv('AZURE_DEPLOYMENT')
llm = AsyncAzureOpenAI(
api_key=AZURE_OPENAI_API_KEY,
api_version=AZURE_OPENAI_API_VERSION,
azure_endpoint=AZURE_OPENAI_ENDPOINT
)
api_wait_sec = 5
api_stop_after_attempt = 15
@retry(wait=wait_fixed(api_wait_sec), stop=stop_after_attempt(api_stop_after_attempt))
async def stream_chat_response(prompt):
"""Azure OpenAIからの非同期ストリーミング応答を取得"""
chat_completion = await llm.chat.completions.create(
model=AZURE_DEPLOYMENT,
messages=[{"role": "user", "content": prompt}],
temperature=0.0,
stream=True
)
async for chunk in chat_completion:
if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
st.title("Azure OpenAI ストリーミング")
prompt = st.text_input("プロンプトを入力してください", value="こんにちは")
if st.button("送信"):
if prompt.strip():
st.subheader("応答")
response_placeholder = st.empty() # 応答をリアルタイムで表示するためのプレースホルダー
async def process_streaming():
content = ""
async for chunk in stream_chat_response(prompt):
content += chunk
response_placeholder.text(content) # プレースホルダーにリアルタイムで追記
asyncio.run(process_streaming()) # 非同期関数を実行
st.success("応答完了!")
else:
st.warning("プロンプトを入力してください。")
コードの内容
上記のコードでは、Streamlitを使用してAzure OpenAIのAPIと連携し、非同期ストリーミング処理で応答をリアルタイムに表示するアプリケーションを構築しています。
それぞれの部分を見ていきます。
全体の構成
ライブラリのインポート
必要なライブラリをインポートしています。
asyncio
非同期処理の基盤となるライブラリ。streamlit
ウェブアプリケーションを簡単に構築するためのフレームワーク。openai.AsyncAzureOpenAI
Azure OpenAIの非同期APIクライアント。dotenv
環境変数を.env
ファイルから読み込むためのライブラリ。tenacity
リトライ処理を簡単に実装するためのライブラリ。os
環境変数を操作するための標準ライブラリ。
環境変数の読込み
.env
ファイルからAzure OpenAIのエンドポイントやAPIキーを取得します。
ストリーミング応答の処理
Azure OpenAIのストリーミングAPIを呼び出して、応答をリアルタイムで処理します。
StreamlitによるUI構築
ユーザーがプロンプトを入力して送信すると、応答がリアルタイムで表示されます。
詳細なコード解説
環境変数の読み込み
load_dotenv(".env")
AZURE_OPENAI_ENDPOINT = os.getenv('GPT4O_AZURE_OPENAI_ENDPOINT')
AZURE_OPENAI_API_VERSION = os.getenv('GPT4O_OPENAI_API_VERSION')
AZURE_OPENAI_API_KEY = os.getenv('GPT4O_AZURE_OPENAI_API_KEY')
AZURE_DEPLOYMENT = os.getenv('GPT4O_AZURE_DEPLOYMENT'
.env
ファイルを読み込んで環境変数を取得しています。
Azure OpenAIのAPIを利用するために必要な情報(エンドポイント、APIキーなど)を取得します。
非同期クライアントの初期化
llm = AsyncAzureOpenAI(
api_key=AZURE_OPENAI_API_KEY,
api_version=AZURE_OPENAI_API_VERSION,
azure_endpoint=AZURE_OPENAI_ENDPOINT
)
AsyncAzureOpenAI
クラスを用いてAzure OpenAIの非同期APIクライアントを初期化します。
api_key
やazure_endpoint
などの設定を引数として渡します。
リトライ処理の設定
@retry(wait=wait_fixed(api_wait_sec), stop=stop_after_attempt(api_stop_after_attempt))
async def stream_chat_response(prompt):
@retry
デコレーターを使用して、API呼び出しが失敗した場合のリトライ処理を設定しています。
wait_fixed(api_wait_sec)
リトライ間隔を固定(5秒)に設定。stop_after_attempt(api_stop_after_attempt)
最大試行回数を15回に設定。
非同期ストリーミング処理
chat_completion = await llm.chat.completions.create(
model=AZURE_DEPLOYMENT,
messages=[{"role": "user", "content": prompt}],
temperature=0.0,
stream=True
)
async for chunk in chat_completion:
if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
Azure OpenAI APIを非同期で呼び出し、stream=True
を指定することで応答をストリーミングで受け取ります。
応答の各部分(チャンク)を順次処理して、生成されたテキストをyield
で返します。
StreamlitによるUIの構築
st.title("Azure OpenAI ストリーミング")
prompt = st.text_input("プロンプトを入力してください", value="こんにちは")
ユーザーインターフェースを構築しています。
st.title
アプリのタイトルを表示。st.text_input
プロンプト入力欄を作成。
ボタン操作と応答のリアルタイム表示
if st.button("送信"):
if prompt.strip():
st.subheader("応答")
response_placeholder = st.empty()
async def process_streaming():
content = ""
async for chunk in stream_chat_response(prompt):
content += chunk
response_placeholder.text(content)
asyncio.run(process_streaming())
st.success("応答完了!")
「送信」ボタンを押すと以下の処理を実行します。
- プロンプトが空でないことを確認。
- 応答を表示するためのプレースホルダーを用意(
st.empty()
)。 - 非同期処理を
asyncio.run()
で実行し、ストリーミングされた応答をリアルタイムでプレースホルダーに表示。 - 処理が完了したら、
st.success
で通知。
全体の流れまとめ
- ユーザーがプロンプトを入力して「送信」ボタンをクリック。
stream_chat_response
がAzure OpenAI APIにリクエストを送信。- 応答がストリーミングで受信され、
async for
ループでリアルタイムに処理。 - Streamlitのプレースホルダーに応答を逐次表示。
おわりに
以上でAzure OpenAI ServiceのGPTのストリーミング応答を実装できました。結構簡単にできました。