スポンサーリンク

【Azure OpenAI Service】Streamlitでストリーミング応答を実装!

記事内に広告が含まれています。

Azure OpenAI ServiceのGPTとStreamlitを使用したWebアプリケーションにおいて、ストリーミング応答を実装する方法をまとめます。

スポンサーリンク

サンプルコード

早速ですが、以下のコードで実装できました。

import os
import streamlit as st
import asyncio
from openai import AsyncAzureOpenAI
from dotenv import load_dotenv
from tenacity import retry, stop_after_attempt, wait_fixed


# .envファイルの読み込み
load_dotenv(".env")

AZURE_OPENAI_ENDPOINT = os.getenv('AZURE_OPENAI_ENDPOINT')
AZURE_OPENAI_API_VERSION = os.getenv('AZURE_OPENAI_API_VERSION')
AZURE_OPENAI_API_KEY = os.getenv('AZURE_OPENAI_API_KEY')
AZURE_DEPLOYMENT = os.getenv('AZURE_DEPLOYMENT')

llm = AsyncAzureOpenAI(
    api_key=AZURE_OPENAI_API_KEY,
    api_version=AZURE_OPENAI_API_VERSION,
    azure_endpoint=AZURE_OPENAI_ENDPOINT
)

api_wait_sec = 5
api_stop_after_attempt = 15


@retry(wait=wait_fixed(api_wait_sec), stop=stop_after_attempt(api_stop_after_attempt))
async def stream_chat_response(prompt):
    """Azure OpenAIからの非同期ストリーミング応答を取得"""
    chat_completion = await llm.chat.completions.create(
        model=AZURE_DEPLOYMENT,
        messages=[{"role": "user", "content": prompt}],
        temperature=0.0,
        stream=True
    )
    async for chunk in chat_completion:
        if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

st.title("Azure OpenAI ストリーミング")

prompt = st.text_input("プロンプトを入力してください", value="こんにちは")

if st.button("送信"):
    if prompt.strip():
        st.subheader("応答")
        response_placeholder = st.empty()  # 応答をリアルタイムで表示するためのプレースホルダー

        async def process_streaming():
            content = ""
            async for chunk in stream_chat_response(prompt):
                content += chunk
                response_placeholder.text(content)  # プレースホルダーにリアルタイムで追記

        asyncio.run(process_streaming())  # 非同期関数を実行
        st.success("応答完了!")
    else:
        st.warning("プロンプトを入力してください。")
スポンサーリンク

コードの内容

上記のコードでは、Streamlitを使用してAzure OpenAIのAPIと連携し、非同期ストリーミング処理で応答をリアルタイムに表示するアプリケーションを構築しています。

それぞれの部分を見ていきます。

全体の構成

ライブラリのインポート

必要なライブラリをインポートしています。

  • asyncio
    非同期処理の基盤となるライブラリ。
  • streamlit
    ウェブアプリケーションを簡単に構築するためのフレームワーク。
  • openai.AsyncAzureOpenAI
    Azure OpenAIの非同期APIクライアント。
  • dotenv
    環境変数を.envファイルから読み込むためのライブラリ。
  • tenacity
    リトライ処理を簡単に実装するためのライブラリ。
  • os
    環境変数を操作するための標準ライブラリ。

環境変数の読込み

.envファイルからAzure OpenAIのエンドポイントやAPIキーを取得します。

ストリーミング応答の処理

Azure OpenAIのストリーミングAPIを呼び出して、応答をリアルタイムで処理します。

StreamlitによるUI構築

ユーザーがプロンプトを入力して送信すると、応答がリアルタイムで表示されます。

詳細なコード解説

環境変数の読み込み

load_dotenv(".env")
AZURE_OPENAI_ENDPOINT = os.getenv('GPT4O_AZURE_OPENAI_ENDPOINT')
AZURE_OPENAI_API_VERSION = os.getenv('GPT4O_OPENAI_API_VERSION')
AZURE_OPENAI_API_KEY = os.getenv('GPT4O_AZURE_OPENAI_API_KEY')
AZURE_DEPLOYMENT = os.getenv('GPT4O_AZURE_DEPLOYMENT'

.envファイルを読み込んで環境変数を取得しています。

Azure OpenAIのAPIを利用するために必要な情報(エンドポイント、APIキーなど)を取得します。

非同期クライアントの初期化

llm = AsyncAzureOpenAI(
    api_key=AZURE_OPENAI_API_KEY,
    api_version=AZURE_OPENAI_API_VERSION,
    azure_endpoint=AZURE_OPENAI_ENDPOINT
)

AsyncAzureOpenAIクラスを用いてAzure OpenAIの非同期APIクライアントを初期化します。

api_keyazure_endpointなどの設定を引数として渡します。

リトライ処理の設定

@retry(wait=wait_fixed(api_wait_sec), stop=stop_after_attempt(api_stop_after_attempt))
async def stream_chat_response(prompt):

@retryデコレーターを使用して、API呼び出しが失敗した場合のリトライ処理を設定しています。

  • wait_fixed(api_wait_sec)
    リトライ間隔を固定(5秒)に設定。
  • stop_after_attempt(api_stop_after_attempt)
    最大試行回数を15回に設定。

非同期ストリーミング処理

chat_completion = await llm.chat.completions.create(
    model=AZURE_DEPLOYMENT,
    messages=[{"role": "user", "content": prompt}],
    temperature=0.0,
    stream=True
)
async for chunk in chat_completion:
    if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content:
        yield chunk.choices[0].delta.content

Azure OpenAI APIを非同期で呼び出し、stream=Trueを指定することで応答をストリーミングで受け取ります。

応答の各部分(チャンク)を順次処理して、生成されたテキストをyieldで返します。

StreamlitによるUIの構築

st.title("Azure OpenAI ストリーミング")
prompt = st.text_input("プロンプトを入力してください", value="こんにちは")

ユーザーインターフェースを構築しています。

  • st.title
    アプリのタイトルを表示。
  • st.text_input
    プロンプト入力欄を作成。

ボタン操作と応答のリアルタイム表示

if st.button("送信"):
    if prompt.strip():
        st.subheader("応答")
        response_placeholder = st.empty()

        async def process_streaming():
            content = ""
            async for chunk in stream_chat_response(prompt):
                content += chunk
                response_placeholder.text(content)

        asyncio.run(process_streaming())
        st.success("応答完了!")

「送信」ボタンを押すと以下の処理を実行します。

  • プロンプトが空でないことを確認。
  • 応答を表示するためのプレースホルダーを用意(st.empty())。
  • 非同期処理をasyncio.run()で実行し、ストリーミングされた応答をリアルタイムでプレースホルダーに表示。
  • 処理が完了したら、st.successで通知。

全体の流れまとめ

  1. ユーザーがプロンプトを入力して「送信」ボタンをクリック。
  2. stream_chat_responseがAzure OpenAI APIにリクエストを送信。
  3. 応答がストリーミングで受信され、async forループでリアルタイムに処理。
  4. Streamlitのプレースホルダーに応答を逐次表示。

おわりに

以上でAzure OpenAI ServiceのGPTのストリーミング応答を実装できました。結構簡単にできました。

タイトルとURLをコピーしました