top of page

Observation Types で mask オプション内での再起呼び出しを回避する

  • 執筆者の写真: Hiromi Kuwa
    Hiromi Kuwa
  • 10月28日
  • 読了時間: 4分

以前、Langfuse の mask オプションを利用する際のトレース保存方法について解説しました。(該当記事

当時、 mask オプションに設定した関数内でトレースを保存しようとすると、該当の関数が再帰的に呼び出されてしまう問題がありました。該当記事内では、グローバル変数を用いて制御しましたがあまりスマートな方法とは言えません。


しかし、この課題をよりスマートに解決出来そうなアップデートが行われました。

実際にどのように解決していけるか、試してみたいと思います。


以前のコードと課題


以前のコードでは、masking_function が再帰的に呼び出されないよう、グローバル変数で制御していました。


require_mask = True

def masking_function(data: any, kwargs) -> any:
    global require_mask

    if require_mask and isinstance(data, str):
        try:
            require_mask = False

            # PII フィルター適用時のトレース保存
            with langfuse.start_as_current_generation(
                name="pii_filter_prompt",
                prompt=[使用するプロンプト]
            ) as generation:
                # LLM呼び出し(マスキング処理)
                generation.update(
                    output=[フィルター済みデータ],
                )
        finally:
            require_mask = True
        return [フィルター済みデータ]

langfuse = Langfuse(
    public_key="**************",
    secret_key="****************",
    host="*****************",
    mask=masking_function,
)

# LLM呼び出しのトレース
with langfuse.start_as_current_generation(
    name="llm_called_trace",
    input=[ユーザの入力],
) as generation:
    # LLM呼び出し(入力に対する回答取得用)
    generation.update(
        output=[LLMの回答],
    )

このアプローチでは、上記のような簡単なコードであればさほど難しくはありませんが、規模が大きくなるにつれ、コードや管理が複雑になってしまいます。


Langfuse アップデート:Observation Typesの追加


2025年8月末頃のアップデートで、Observation Types が追加されました。(該当記事

これまでも、 span や generation はありましたが、 guardrail や evaluator など、多くの種類が追加されました。

こちらのアップデート記事を読んだ際に、Observation Types = "guardrail" としてトレースを保存すれば、Langfuse で良い感じに処理して再帰的に mask オプションの関数が呼ばれることは無くなるのでは?と期待しました。


そこで、実際に以前の実験時に利用した、上記のコードを基に再起呼び出しを防ぐ対応が不要になるか試してみました。


1.Observation Types = "guardrail" の利用


事前準備


Observation Types の追加は Python SDK 3.3.1 以上でなければ対応していないため、利用するSDKをアップグレードします。

また、トレースの保存方法が start_as_current_generation から start_as_current_observation (as_type="generation") と少々変更されているため、合わせて調整します。


# 念のための制御用
loop = 0 

def masking_function(data: any, kwargs) -> any:
    global loop

    if loop > 5:
        return data
    if isinstance(data, str):
        loop += 1 
        # PII フィルター適用時のトレース保存
        with langfuse.start_as_current_observation(
            as_type="guardrail",
            name=f"in masking function: {loop}",
            prompt=[使用するプロンプト]
        ) as guardrail:
            # マスキング処理
            sanitized_data = f"sanitized data:{data}"

            guardrail.update(output=sanitized_data)
            return sanitized_data
    else: return data

# Langfuseの初期化 

langfuse = Langfuse(
    public_key="**************",
    secret_key="****************",
    host="*****************",
    mask=masking_function,
)

# LLM呼び出しのトレース
with langfuse.start_as_current_observation(
    as_type="generation",
    name="test_type_guardrails",
    input=[ユーザの入力],
) as generation:
    # LLM呼び出し(入力に対する回答取得用)
    generation.update(
        output=[LLMの回答],
    )

無限ループに陥らないよう、念のため複数回再帰的に呼び出されたタイミングで終了するようにしました。

こちらで実行してみます。


結果


ree

残念ながら、 observation type = "guardrail" の指定だけでは、再帰的に実行されてしまう状態は変わらないようです。


2.OpenTelemetry の利用


Langfuseは内部でOpenTelemetryを利用しており、これを使うことでmasking_functionの呼び出し元のトレース情報を取得できることがわかりました 。

トレース情報からObservationTypesを取得し、判断に利用すれば、グローバル変数を利用する形よりはいくらかスマートに対応できます。


修正


from opentelemetry import trace as otel_trace_api
from langfuse._client.attributes import LangfuseOtelSpanAttributes

// 中略 

def masking_function(data: any, kwargs) -> any:
    current_observation_type = None
    try:
        current_span = otel_trace_api.get_current_span()

        if current_span and current_span != otel_trace_api.INVALID_SPAN:
            current_observation_type = current_span.attributes.get( 
                LangfuseOtelSpanAttributes.OBSERVATION_TYPE,
                "unknown"
            )
    except Exception as e:
        print(f"Error getting current observation info: {e}") 
        return data

    # guardrailの場合はマスキングを行わない
    if current_observation_type == "guardrail":
        return data
    if isinstance(data, str):
        with langfuse.start_as_current_observation(
            as_type="guardrail",
            name="in masking function",
            prompt=[使用するプロンプト]
        ) as guardrail:
            # マスキング処理
            sanitized_data = f"sanitized data:{data}" 

            guardrail.update(output=sanitized_data)
            return sanitized_data
    else:
        return data 

opentelemetry.trace.get_current_span() を利用して、現在のトレースの情報を取得します。

今回は、マスク用の関数内以外では as_type = "guardrail" の指定を利用しないこと、マスク用の関数内で別のObservation Typeを設定しないことを前提としています。そのため、単純に取得した observation type が guardrail の場合は、masking_function内での呼びだしとみなし、処理をせずに入力値をそのまま返す形にしています。


結果


ree

無事、Observation Typeを取得し、制御することに成功しました!

これにより、 mask オプションを利用する際も、グローバル変数を利用せずトレースの保存を制御できることが確認出来ました。


まとめ


Observation Types の追加によって、mask関数の再帰呼び出しが自動的に回避されるといった変更はありませんでした。しかし、期待した方法とは異なりますが、Observation TypesとOpenTelemetryを組み合わせることで、これまでよりもはるかにスマートな方法で、トレース保存を制御できることが確認できました 。

コメント


bottom of page