top of page

Google Cloud IAP保護下Langfuseトークン動的更新の実装ガイド

  • Shogo Umeda
  • 5 日前
  • 読了時間: 12分

はじめに


LangfuseはSelf-Hosted可能で、過去のブログでもご紹介したとおり、Google Cloud上にも簡単にLangfuse環境の構築が可能です。



また、Google Cloud上でLangfuseを構築する場合、Identity-Aware Proxy(IAP)を利用すると、認証済みユーザーのみがLangfuseにアクセスできるようになり、Langfuseのセキュリティが強化されます。



IAPは非常に便利なサービスですが、IAPトークンの有効期限が約1時間という制約があるため、 長時間稼働するサーバー(RAGサーバー、チャットボットなど)では、サーバー起動時に取得したIAPトークンの有効期限が切れてしまうと、IAPの先にあるLangfuseにアクセスできなくなってしまいます。


本記事では、このIAPトークンの有効期限を考慮し、LLMアプリケーションがLangfuseにアクセスする際のIAPトークン更新方法をご紹介します。


本記事でわかること


  • IAP で保護された Langfuse に対して、LLM アプリケーションから安全にアクセスする方法

  • IAP トークンの有効期限(約 1 時間)をまたいで、トークンを自動更新する実装パターン

  • Langfuse Python SDK v3 系以下通信に対して、それぞれ IAP トークンを付与する方法

    • httpx を使う通常 API(プロンプト管理 / スコア登録 等)

    • requests.Session を使う OTEL Span Exporter(トレース送信)


前提・想定対象読者


前提及び想定対象読者は以下の通りです。

  • GCP / IAP / Cloud Run がある程度わかる人向け

  • Langfuse Python SDK v3.10.1 を前提

  • LangChain / Vertex AI Gemini を使う例ですが、他の LLM クライアントにも応用可能


IAP環境下でのLangfuseへのアクセス方法


LangfuseのIAP環境下での構成


LangfuseのIAP環境下での構成を下記に示します。


Langfuse IAP構成

認証ヘッダーの使い分け


IAP環境下のLangfuseでは、2種類のHTTP認証ヘッダーを使い分けます。

ヘッダー

用途

値の形式

Authorization

Langfuse API Key認証

Basic {base64(public_key:secret_key)}

Proxy-Authorization

IAPトークン

Bearer {iap_token}


注意点として、IAPトークンを Authorization ヘッダーに設定すると、LangfuseのAPI Key認証が上書きされてしまいます。IAP認証には、Proxy-Authorization ヘッダーを利用してください。


IAPトークン有効期限の問題


上記の通りIAPを通してLangfuseにアクセスするためには、クライアントはIAPトークンをHTTPヘッダーに含める必要があります。


Langfuseクライアントでは、`httpx_client` オプションを利用して独自のHTTPクライアントを指定できます。


以下のプログラム例では、まずIAPトークンを取得し、取得したトークンを `Proxy-Authorization` ヘッダーとして設定した `httpx.Client` を生成します。

生成したクライアントをLangfuseクライアントの `httpx_client` パラメータに渡すことで、Langfuseの各種API呼び出し時にIAPトークンが自動的に付与されるようになります。

この実装をベースに動的にIAPトークンを更新する実装を考えてみましょう。


import httpx
from google.oauth2 import id_token
from google.auth.transport.requests import Request

# IAPトークンを取得
iap_client_id = os.environ["IAP_CLIENT_ID"]
initial_token = id_token.fetch_id_token(Request(), iap_client_id)

# httpx.Clientにヘッダーを設定
httpx_client = httpx.Client(
    headers={"Proxy-Authorization": f"Bearer {initial_token}"}
)

# Langfuseのhttpx_clientオプションに渡す
langfuse = Langfuse(
    public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
    secret_key=os.environ["LANGFUSE_SECRET_KEY"],
    host=os.environ["LANGFUSE_HOST"],
    httpx_client=httpx_client,
)


Langfuse SDK V3のHTTPクライアントの構造


先ず、Langfuseクライアントの構成と IAPトークン更新の解決策の概要を構成図で示します。


ree

Langfuse SDK V3では、内部的に2つの異なるHTTPクライアントが使用されています。

クライアント種類

通信ライブラリ

用途例

本記事プログラム例でのIAPトークン更新の実装

Langfuseクライアント

httpx.Client

プロンプトAPIなど トレース送信以外のAPI呼び出し

event_hooksで動的に更新

OTEL Span Exporter

requests.Session

トレース送信

exportメソッドのラップで動的に更新


IAPトークンの更新方法が異なることに注意してください。以下で詳しく解説します。


1. Langfuseクライアント(httpx使用)


Langfuseクライアントは、プロンプト管理やスコア登録などのAPI呼び出しに使用されます。先に説明した通り、`httpx_client`パラメータでhttpxのカスタムクライアントを注入できます。

そのため`httpx`の`event_hooks`機能を使えば、リクエスト直前にIAPトークンを動的に更新できます。




2. OTEL Span Exporter(requests.Session使用)


OTEL Span Exporterは、トレース送信に使用されます。

Langfuseクライアントとは独立したHTTPクライアントを持っており、`httpx_client`の`event_hooks`は適用されません。

そのため、OTEL Span Exporterの`export`メソッドをラップして、エクスポート直前にIAPトークンを更新する必要があります。


注意: 本記事で記載している内容は公式にサポートされた方法ではなく、SDKの内部構造を利用した回避策のためLangfuse SDKのバージョンによって動作しない可能性があります。 Langfuse Python SDK V3.10.1でのみ動作することが確認されています。

実装例


以上を踏まえて、IAP保護下のLangfuseと連携するための実装例を以下に示します。


1. Google Cloud認証の準備


IAP(OAuth)クライアントの作成

下記のドキュメントを参考にIAPで利用するOAuth( IAP)クライアントを作成します。



クライアント作成後に表示されるクライアントIDとクライアントシークレットは必要になるのでメモしてください。

OAuthクライアント作成画面

また、クライアントの「承認済みのリダイレクトURI」に以下の設定をします。

https://iap.googleapis.com/v1/oauth/clientIds/YOUR_CLIENT_ID:handleRedirect
※`YOUR_CLIENT_ID`は上記でメモしたクライアントIDに置き換えてください。
リダイレクト設定

サービスアカウントの作成

下記のドキュメントを参考にLLMアプリケーションが利用するサービスアカウントを作成します。


サービスアカウントには以下のロールを割り当てる。

  • IAP で保護されたウェブアプリ ユーザー : IAP認証配下のLangfuseサーバーにアクセスするのに必要

  • Vertex AI ユーザー:Vertex AIのGeminiを利用するのに必要


サービスアカウント作成画面


また、以下のドキュメントを参考にサービスアカウントキーJSONファイルとしてダウンロードします。

ダウンロードしたキーファイルはサンプルプログラムのルートディレクトリに保存してください。


Langfuse側の環境変数設定


IAP構成でLangfuseを運用する場合、Langfuse Webの環境変数にIAPクライアントIDとシークレットを設定しLangfuse Webをリスタートします。


# IAP認証(Langfuse Web UIログイン用)
AUTH_GOOGLE_CLIENT_ID=your-iap-client-id
AUTH_GOOGLE_CLIENT_SECRET=your-iap-client-secret


2. 実装コード


コードの概要


実装は2つのファイルで構成されています。

ファイル

役割

iap_token_provider.py

IAPトークンの動的更新を担当するユーティリティモジュール

server.py

LangfuseとLangChainを使用したサンプルアプリケーション

ディレクトリ構成

以下の様なディレクトリ構成になっています。

.
├── .env                          # 環境変数設定
├── pyproject.toml                # 依存関係(UV用)
├── service-account-key.json      # サービスアカウントキー(※gitignore推奨)
├── iap_token_provider.py         # IAPトークン管理モジュール
└── server.py                     # サンプルアプリケーション

環境変数の設定


`.env`ファイルを作成し、以下の環境変数を設定します。

# Langfuse設定
LANGFUSE_PUBLIC_KEY=pk-lf-...  # Langfuse Project Settings → API Keysで発行・確認
LANGFUSE_SECRET_KEY=sk-lf-...  # Langfuse Project Settings → API Keysで発行・確認
LANGFUSE_HOST=https://langfuse.your-domain.com  # LangfuseのURL

# IAP設定
IAP_CLIENT_ID=123456789-abcdefg.apps.googleusercontent.com  # 先にメモしたクライアントID

# サービスアカウント認証
GOOGLE_APPLICATION_CREDENTIALS=./service-account-key.json  # サービスアカウントキーのパス

# GCP設定
GCP_PROJECT_ID=your-project-id             # Google CloudプロジェクトID
GCP_LOCATION=asia-northeast1               # リージョン


仮想環境作成・依存関係のインストール


本記事では、高速なPythonパッケージマネージャー UVを使用して仮想環境を構成します。

# UVのインストール(未インストールの場合)
curl -LsSf https://astral.sh/uv/install.sh | sh

# 仮想環境を作成
uv venv

# 仮想環境を有効化
source .venv/bin/activate 

# プロジェクトの初期化
uv init

# 依存関係のインストール(本記事執筆時のバージョン)
uv add "langfuse>=3.10.1" \
    "langchain>=1.1.0" \
    "langchain-core>=1.1.0" \
    "langchain-google-vertexai>=3.1.0" \
    "httpx>=0.28.1" \
    "google-auth>=2.43.0" \
    "python-dotenv>=1.2.1" \
    "fastapi>=0.122.0" \
    "uvicorn>=0.38.0"

上記コマンドを実行すると、`pyproject.toml`に依存関係が記録され、`.venv`ディレクトリに仮想環境が作成されます。


iap_token_provider.py


`iap_token_provider.py`では、以下の3つの機能を提供します


  1. IAPTokenProvider:サービスアカウント認証でIAPトークンを取得・キャッシュ・更新

  2. create_httpx_client_with_iap: `event_hooks`でリクエスト直前にトークンを更新するhttpxクライアントを作成

  3. wrap_otel_exporter_with_iap_token: OTEL Exporterの`export`メソッドをラップしてトークンを動的に更新


具体的なコードは下記の通りです。

iap_token_provider.py


import logging
import os
from typing import Optional
import httpx
from google.auth.transport.requests import Request as GoogleAuthRequest
from google.oauth2 import service_account

logger = logging.getLogger(__name__)


class IAPTokenProvider:
    """
    Google Cloud IAP用のIDトークンを提供するクラス
    
    サービスアカウントの認証情報を使用してIAP用のIDトークンを取得します。
    トークンは自動的にキャッシュされ、期限切れ時に自動更新されます。
    """
    
    def __init__(self, client_id: str, service_account_file: Optional[str] = None):
        """
        Args:
            client_id: IAPで保護されたリソースのOAuth 2.0クライアントID
                      (Google Cloud Consoleで確認可能)
            service_account_file: サービスアカウントキーファイルのパス
                                  指定しない場合はGOOGLE_APPLICATION_CREDENTIALS環境変数を使用
        """
        self.client_id = client_id
        self._request = GoogleAuthRequest()
        
        sa_file = service_account_file or os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
        
        if not sa_file:
            raise ValueError(
                "サービスアカウントキーファイルが指定されていません。"
                "service_account_file引数またはGOOGLE_APPLICATION_CREDENTIALS環境変数を設定してください。"
            )
        
        # IAP用のIDトークンを取得するための認証情報を作成
        self._credentials = service_account.IDTokenCredentials.from_service_account_file(
            sa_file,
            target_audience=client_id,
        )
    
    def get_token(self) -> str:
        """
        最新のIDトークンを取得する
        
        認証情報が期限切れの場合は自動的に更新されます。
        
        Returns:
            有効なIDトークン文字列
        """
        if not self._credentials.valid:
            self._credentials.refresh(self._request)
        return self._credentials.token


def create_httpx_client_with_iap(
    client_id: str,
    service_account_file: Optional[str] = None,
    timeout: float = 30.0,
) -> httpx.Client:
    """
    IAP認証付きのhttpxクライアントを作成する
    
    event_hooksを使用して、リクエスト直前にトークンを動的に更新します。
    これにより、長時間稼働するサーバーでもトークンの有効期限切れを防ぎます。
    
    Args:
        client_id: IAPで保護されたリソースのOAuth 2.0クライアントID
        service_account_file: サービスアカウントキーファイルのパス
        timeout: リクエストタイムアウト(秒)
    
    Returns:
        IAP認証ヘッダー付きのhttpx.Client
    """
    token_provider = IAPTokenProvider(client_id, service_account_file)
    
    def update_iap_token(request: httpx.Request):
        """リクエスト直前にIAPトークンを動的に更新"""
        token = token_provider.get_token()
        request.headers["Proxy-Authorization"] = f"Bearer {token}"
    
    return httpx.Client(
        event_hooks={"request": [update_iap_token]},
        timeout=timeout,
    )


# =============================================================================
# OTEL Exporter ラップ機能
# =============================================================================

_otel_exporter_wrapped = False


def wrap_otel_exporter_with_iap_token(iap_client_id: str) -> bool:
    """
    Langfuse SDK V3のOTEL Exporterをラップして、IAPトークンを動的に更新する
    
    この関数は、LangfuseResourceManagerの内部プロセッサーを探し、
    OTLPSpanExporterのexportメソッドをラップします。
    
    Args:
        iap_client_id: IAPで保護されたリソースのOAuth 2.0クライアントID
    
    Returns:
        ラップが成功した場合はTrue
    """
    global _otel_exporter_wrapped
    
    if _otel_exporter_wrapped:
        logger.debug("OTEL Exporterは既にラップ済みです")
        return True
    
    try:
        from langfuse._client.resource_manager import LangfuseResourceManager
    except ImportError as e:
        logger.warning(f"LangfuseResourceManagerをインポートできませんでした: {e}")
        return False
    
    if not hasattr(LangfuseResourceManager, '_instances'):
        logger.warning("LangfuseResourceManager._instancesが見つかりません")
        return False
    
    instances = LangfuseResourceManager._instances
    
    if not instances:
        logger.warning("LangfuseResourceManagerにインスタンスが登録されていません")
        return False
    
    # 各インスタンスのプロセッサーをラップ
    for public_key, instance in instances.items():
        _wrap_instance_processors(instance, iap_client_id)
    
    _otel_exporter_wrapped = True
    logger.info("OTEL Exporterのラップが完了しました")
    return True


def _wrap_instance_processors(instance, iap_client_id: str) -> bool:
    """LangfuseResourceManagerインスタンスのプロセッサーをラップする"""
    if not hasattr(instance, '_otel_tracer'):
        return False
    
    otel_tracer = instance._otel_tracer
    
    if not hasattr(otel_tracer, 'span_processor'):
        return False
    
    span_processor = otel_tracer.span_processor
    
    if hasattr(span_processor, '_span_processors'):
        processors = list(span_processor._span_processors)
        for processor in processors:
            _wrap_processor_exporter(processor, iap_client_id)
        return True
    else:
        return _wrap_processor_exporter(span_processor, iap_client_id)


def _wrap_processor_exporter(processor, iap_client_id: str) -> bool:
    """プロセッサーのExporterをラップする"""
    if not hasattr(processor, 'span_exporter'):
        return False
    
    exporter = processor.span_exporter
    original_export = exporter.export
    
    # IAPTokenProviderを使用してトークンをキャッシュ・自動更新
    token_provider = IAPTokenProvider(iap_client_id)
    
    def export_with_iap_token(spans):
        """エクスポート前にIAPトークンを更新するラッパー"""
        try:
            token = token_provider.get_token()
            if hasattr(exporter, '_session'):
                exporter._session.headers["Proxy-Authorization"] = f"Bearer {token}"
        except Exception as e:
            logger.error(f"IAPトークンの更新に失敗しました: {e}")
        
        return original_export(spans)
    
    exporter.export = export_with_iap_token
    return True


server.py


`server.py` では、FastAPIを使用して、ユーザーからの質問に回答し、Langfuseにトレースを送信するサーバーを実装しています。

以下の機能を提供します。

  1. ユーザーからの質問についてVertexAI Geminiを使用して回答

  2. システムプロンプトはLangfuseから取得して使用

  3. Langfuseにトレースを送信


「2.」によりLangfuseクライアントのIAP認証を確認、「3.」によりOTEL Span Exporterの IAP認証を確認します。


具体的なコードは下記の通りです。

server.py

import logging
import os
from typing import Optional

from dotenv import load_dotenv
load_dotenv()  # .envファイルを読み込む

from fastapi import FastAPI, Query
from langfuse import Langfuse
from langfuse.langchain import CallbackHandler
from langchain_google_vertexai import ChatVertexAI
from langchain_core.prompts import ChatPromptTemplate

from iap_token_provider import (
    create_httpx_client_with_iap,
    wrap_otel_exporter_with_iap_token,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(title="IAP Token Expiry Test Server")

_langfuse: Optional[Langfuse] = None


def get_langfuse() -> Langfuse:
    """Langfuseクライアントのシングルトンを取得"""
    global _langfuse
    if _langfuse is None:
        iap_client_id = os.environ["IAP_CLIENT_ID"]
        
        # httpx_clientはプロンプト管理などのAPI呼び出しに使用
        # event_hooksで動的にIAPトークンを更新
        httpx_client = create_httpx_client_with_iap(iap_client_id)
        
        _langfuse = Langfuse(
            public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
            secret_key=os.environ["LANGFUSE_SECRET_KEY"],
            host=os.environ["LANGFUSE_HOST"],
            httpx_client=httpx_client,
        )
        
        # OTEL Exporterをラップ(トレース送信用)
        # httpx_clientのevent_hooksはOTEL Exporterには適用されないため、
        # 別途ラップが必要
        wrap_otel_exporter_with_iap_token(iap_client_id)
    
    return _langfuse


@app.get("/qa")
async def qa(q: str = Query(default="日本の首都はどこですか?", description="質問")):
    """質問に回答し、Langfuseにトレースを送信する"""
    langfuse = get_langfuse()
    langfuse_handler = CallbackHandler()
    
    # LangfuseからPromptを取得
    try:
        langfuse_prompt = langfuse.get_prompt("qa-assistant")
        logger.info(f"Langfuseからプロンプトを取得しました: name={langfuse_prompt.name}, version={langfuse_prompt.version}")
    except Exception as e:
        logger.error(f"Langfuseからプロンプトの取得に失敗しました: {e}")
        raise
    
    # with_configでmetadataを設定してトレースとPromptを紐づける
    prompt = ChatPromptTemplate.from_messages(
        langfuse_prompt.get_langchain_prompt()
    ).with_config({
        "metadata": {"langfuse_prompt": langfuse_prompt}
    })
    
    # LLMを初期化
    llm = ChatVertexAI(
        model="gemini-2.5-flash",
        project=os.environ["GCP_PROJECT_ID"],
        location=os.environ["GCP_LOCATION"],
        temperature=0,
    )
    chain = prompt | llm
    
    # 質問に回答(LangChain invokeメソッドのcallbacksにLangfuseのCallbackHandlerを設定することで、トレースはLangfuseに送信される)
    response = chain.invoke(
        {"question": q},
        config={
            "callbacks": [langfuse_handler],
            "metadata": {
                "langfuse_user_id": "test-user",
                "langfuse_tags": ["iap-test"]
            }
        }
    )
    
    # トレースをLangfuseに送信
    try:
        langfuse.flush()
        logger.info("Langfuseへトレースを送信しました")
    except Exception as e:
        logger.error(f"Langfuseへのトレース送信に失敗しました: {e}")
    
    return {"query": q, "answer": response.content}


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)


3. 動作確認


実際に動作確認します。

Langfuse にプロンプトを登録


以下のスクリーンショットを参考に、LangfuseのUIからサンプルプログラムが利用するプロンプトを登録します。


プロンプト作成画面

サーバーを起動


ターミナルから以下コマンドで `server.py`を起動します。


uv run python server.py

以下のような表示が出たら正常に起動しています。

INFO:     Started server process [xxxxx]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)

サーバーにリクエスト

別のターミナルから以下のコマンドでサーバーに対して問い合わせを開始します。


curl -G "http://localhost:8080/qa" --data-urlencode "q=Langfuseってなに?"

数秒後に以下のように結果が返ってくるはずです。

{"query":"Langfuseってなに?","answer":"Langfuseは、**LLM(大規模言語モデル)アプリケーションのオブザーバビリティ(可観測性)と評価のためのオープンソースプラットフォーム**です。\n\n簡単に言うと、LLMを使ったアプリケーションを開発・運用する際に、以下のようなことを助けてくれるツールです。\n\n1.  **トレース(実行履歴の記録)**:\n    *   ユーザーからの入力がLLMにどのように渡され、どのようなプロンプトが生成され、どのような応答が返されたか、その過程を詳細に記録・可視化します。\n    *   複数のLLM呼び出しやツール利用を含む複雑なチェーンの動きも追跡できます。\n2.  **モニタリング**:\n    *   アプリケーションのパフォーマンス(応答速度、エラー率など)やコスト(API利用料など)をリアルタイムで監視し、異常を検知します。\n3.  **評価**:\n    *   LLMの出力品質を人間による評価や自動評価(評価モデルなど)で測定し、改善点を見つけ出します。\n    *   異なるプロンプトやモデルバージョンのA/Bテストにも利用できます。\n4.  **プロンプト管理**:\n    *   使用しているプロンプトのバージョン管理や、効果的なプロンプトの特定を支援します。\n\nこれにより、開発者はLLMアプリケーションのデバッグ、改善、最適化を効率的に行うことができます。"}

また、サーバー側のターミナルには以下のようなログが出力されていることを確認してください。

INFO:iap_token_provider:OTEL Exporterのラップが完了しました
INFO:__main__:Langfuseからプロンプトを取得しました: name=qa-assistant, version=1
INFO:__main__:Langfuseへトレースを送信しました
INFO:     127.0.0.1:49752 - "GET /qa?q=Langfuse%e3%81%a3%e3%81%a6%e3%81%aa%e3%81%ab%ef%bc%9f HTTP/1.1" 200 OK

Langfuse でトレースを確認


次にLangfuse Web UIにアクセスし、トレースが送信されていること、上記の回答生成プロンプトにLangfuseからフェッチしたプロンプトが利用されていることを確認します。


以下のようにトレースが飛んでいること、Prompt:qa-assistantが利用されたことが確認できます。

トレース画面

まとめ


IAP保護下でLangfuseを運用するには、以下の2点がポイントです。


1. トレース送信以外の機能(Langfuseクライアント): `httpx`の`event_hooks`でリクエスト直前にIAPトークンを動的に更新


2. トレース送信(OTEL Exporter): `export`メソッドをラップして、エクスポート直前にIAPトークンを動的に更新


これで、長時間稼働するようなLLMアプリケーションでもIAPトークンを更新しつつ継続的にLangfuse APIを利用できます。


コメント


bottom of page