Langfuseのサーバーサイドマスキングを試してみた
- Hiromi Kuwa
- 3 日前
- 読了時間: 4分
過去、Langfuseでのマスキングについて触れてきましたが、これまではクライアントサイドで対応するしかありませんでした。しかし、ついに先日のリリース(v3.152.0)で、サーバーサイドでのマスキングが設定可能になりました。
※注意:現時点ではEE(Enterprise Edition)ライセンス専用機能となっています。
今回は公式ドキュメントを参考に、実際に設定してみました。
基本設定
事前準備
マスク処理を行う、Langfuseからのコールバック先が必要になります。
今回は設定の挙動を確認したいため、公式の実装例をそのまま利用し、Cloud Runにデプロイしました。
設定
設定自体は、Langfuse Workerのコンテナに環境変数 LANGFUSE_INGESTION_MASKING_CALLBACK_URL を設定すれば利用できるようになります。
※ 注意:Langfuse Worker コンテナにEEライセンスが適用されていない場合、コールバックURLを設定しても、そのURLへリクエストが送られることはありません。
実際にEEライセンスが適用されていない状態でトレースを送信すると、手元の環境では以下のエラーログが出力されました。
warn: Ingestion masking callback URL is configured but enterprise license is not available. Masking will be disabled.
実験
サーバーサイドマスキングは、OpenTelemetryエンドポイント(/api/public/otel)経由のイベントに適用されます。Python SDK v3.x系はこれに対応しているため、SDKを利用した簡易的なトレース送信コードを作成し、テストしました。
テストコード
# ※環境に合わせてクライアントを初期化してください
langfuse = get_client()
with langfuse.start_as_current_observation(
as_type="span",
name="test-trace",
input="山田太郎様からの問い合わせ",
) as root_span:
trace_id = langfuse.get_current_trace_id()
with langfuse.start_as_current_observation(
as_type="span",
name="test-span",
input="山田太郎です。登録しているメールアドレスは何ですか?",
) as span:
span.update(
output="山田太郎様のメールアドレスはtest-user@example.comです。",
metadata={
"user_email": "test-user@example.com",
"support_phone": "123-456-7890"
},
)
root_span.update_trace(
output="問い合わせが完了しました。",
)
langfuse.flush()検証1:公式の実装例
まずは、サンプル実装に対してリクエストを送信した場合にどうなるか確認してみます。
1階層目のトレース(test-trace)には、今回置換対象となる項目が含まれていないため、2階層目、3階層目のトレース情報を確認してみます。
結果
2階層目

3階層目

置換対象となる、メールアドレス・電話番号が含まれている箇所について、Output、Metadata ともにマスキング(置換)されていることが確認出来ました。
検証2:組織・プロジェクト単位でのマスク
mask_trace 関数内で x_langfuse_org_id や x_langfuse_project_id を利用することで、組織単位・プロジェクト単位で異なるマスク処理を適用することができます。
ここで、コールバック先に設定しているアプリケーションの mask_trace を以下の通り変更します。特定の組織のみ、マスク処理の異なるmask_pii2が適用されるようにしました。
def mask_pii2(data):
print(f"mask_pii2: {data}")
if isinstance(data, str):
data = f"[REDACTED] {data}"
return data
elif isinstance(data, dict):
return {k: mask_pii2(v) for k, v in data.items()}
elif isinstance(data, list):
return [mask_pii2(item) for item in data]
return data
async def mask_trace(
request: Request,
x_langfuse_org_id: Optional[str] = Header(None),
x_langfuse_project_id: Optional[str] = Header(None)
):
body = await request.json()
if x_langfuse_org_id == "xxxxxxxxxxxxxxxxxxxxxxx":
masked_body = mask_pii2(body)
else:
masked_body = mask_pii(body)
return masked_bodyコード上の引数 x_langfuse_org_id、x_langfuse_project_id は、Langfuseから送られてくるHTTPヘッダー(x-langfuse-org-id、x-langfuse-project-id)から取得されています。そのため、ヘッダーが付与されていない際には値が取得できない可能性があります。なお、今回利用したPython SDKによる送信方法では、問題なく付与されていました。
※ 上記コードのように条件分岐を行う場合、指定するのは name ではなく、id であることに注意して下さい
結果
指定した組織には mask_pii2 が、それ以外の組織には mask_pii が適応されることを確認出来ました。
しかし、mask_pii2は以下の通り、想定される形にはなりませんでした。

サーバサイドで行う場合は、リクエストボディ全体が処理対象となります。そのため、今回のように再帰的に処理をかけると、トレースの構造を定義する resource.attributes などのキーまで書き換わってしまい、Langfuse側で正しくInput/Outputとして認識されなくなってしまいました。
クライアントサイドで利用していた処理を移植したい場合は、メタデータを破壊しないよう、対象となるデータ部分だけを加工するなどの工夫が必要となりそうです。

補足:Dataset Run への影響について
UIから Dataset Runを実行した場合にはマスク処理が適応されないことを確認しました。
マスキングされることによるEvaluator への影響や、結果の目視がしにくいと言った問題は起こらなさそうです。
まとめ
実際に試してみた結果、細かい制御の手軽さという点では、クライアントサイドでマスク処理を行う方に分がある印象です。
しかし、「いざという時のセーフティーネット」として活用できたり、既にOpenTelemetryエンドポイント経由で動作しているアプリケーションに対して、「アプリ側のコード改修無しでマスク処理を適用できる」など、サーバサイドならではのメリットも多く存在します。
また、今回は検証していませんが、環境変数 LANGFUSE_INGESTION_MASKING_PROPAGATED_HEADERS を設定することで、オリジナルのリクエストから任意のカスタムヘッダーをコールバック先へ伝播させることも可能なようです。これを活用すれば、要件に合わせてさらに柔軟なマスキング制御を実現していくこともできそうです。



コメント