top of page

Strands Agents と ADK でリモートMCPサーバーを使ったAgentを作り、その処理をLangfuseで可視化する

  • 執筆者の写真: KAMON Nobuchika
    KAMON Nobuchika
  • 16 時間前
  • 読了時間: 5分

はじめに

本記事では、Strands Agents とADK の二つのフレームワークを使用したシンプルなエージェント (両方ともLangfuse のMCPサーバーを使う) が、Langfuseを使ってどのように可視化されるのかをクイックに紹介する記事です。最近のアップデートで標準でグラフも出たりして便利です。タイトルが長すぎて、単にリフレーズしただけの冒頭文になってしまいました。


利用するMCPサーバー

今回はLangfuse のドキュメントを参照してくれるMCPサーバーを利用します。

開発の時にMCPクライアント (Cursor など) に入れておくと開発が捗りますが、本題ではないので今回はこれ自体の説明は割愛します。詳細はこちらのリンクをご覧ください。


Strands Agents のサンプルコード

こちらはターミナル上で実行することを想定しているものです。Strands Agents はこのようなコードを大変シンプルに短く書けるのが非常に魅力的だと思います。


サンプルコード

#!/usr/bin/env -S uv run --script

# /// script

# requires-python = ">=3.10"

# dependencies = [

#   "strands-agents[openai]",

#   "mcp",

#   "langfuse",

# ]

# ///

import os, sys

from strands import Agent

from strands.models.openai import OpenAIModel

from mcp.client.streamable_http import streamablehttp_client

from strands.tools.mcp.mcp_client import MCPClient

from langfuse import observe, get_client


SYSTEM_PROMPT = (

    "You are a helpful assistant for Langfuse developers.\n"

    "- For Langfuse questions, proactively use MCP tools from the docs server: "

    "call `searchLangfuseDocs` first, then `getLangfuseDocsPage` for details.\n"

    "- Prefer up-to-date docs. Keep answers concise; include code when useful."

)


# モデルとMCPはプロセスで1回だけ初期化

model = OpenAIModel(

    client_args={"api_key": os.environ["OPENAI_API_KEY"]},

    model_id=os.environ.get("OPENAI_MODEL", "gpt-4o-mini"),

    params={"temperature": 0},

)

mcp = MCPClient(lambda: streamablehttp_client("https://langfuse.com/api/mcp"))


@observe(name="agent-run", as_type="generation") 

def run_agent(prompt: str) -> str:

    with mcp:  # MCPセッション(with内で有効)

        agent = Agent(model=model, tools=mcp.list_tools_sync(), system_prompt=SYSTEM_PROMPT)

        res = agent(prompt)

        return getattr(res, "message", str(res))


def main():

    prompt = " ".join(sys.argv[1:]) or input("> ")

    out = run_agent(prompt)          

    print(out)

    get_client().flush()             


if name == "__main__":

    main()


そして特に何もせずとも Langfuse にはこんな感じでTraceとして可視化されます。

OutputはTrace の中に Content として入っています。


Strands Agents の Trace
Strands Agents の Trace

いい感じでグラフも自動生成されます。処理の部分をクリックすると、その詳細が表示されます。例えば、 execute_tool searchLangfuseDocs というツール実行をクリックすると、実際にはユーザーの質問 (Datasetの作り方を知りたい) がどんな query になってて ("Langfuse dataset creation") 飛んで、どんな情報が取れて ... みたいな内容を確認して、デバックすることができます。


ree


ADK (Agent Deployment Kit) のサンプルコード

こちらも同様にCLIで動作して、同じ動作をします。こちらは OpenAI ではなくて、VertexAI上のGeminiを利用しています。OpenAI を使おうとすると LiteLLMが必要ですが、本題ではないのでおとなしくGeminiを使います。2.5-flashは速くて安くて便利です。


サンプルコード

#!/usr/bin/env -S uv run --script

# /// script

# requires-python = ">=3.10"

# dependencies = [

#   "google-adk",

#   "google-genai",

#   "langfuse>=3",

#   "mcp",

# ]

# ///

import os, sys, asyncio

from typing import Optional, List, Literal

from datetime import timedelta


from google.adk.agents import Agent

from google.adk.runners import InMemoryRunner

from google.genai import types


from mcp import ClientSession

from mcp.client.streamable_http import streamablehttp_client


from langfuse import observe, get_client


SYSTEM_PROMPT = (

    "You are a helpful assistant for Langfuse developers.\n"

    "- For Langfuse questions, proactively use MCP tools from the docs server: "

    "call `lf_docs(mode=\"search\", query=...)` first, then "

    "`lf_docs(mode=\"get\", pathOrUrl=...)` for details.\n"

    "- Prefer up-to-date docs. Keep answers concise; include code when useful."

)


MCP_URL = (os.getenv("LF_MCP_URL") or "https://langfuse.com/api/mcp").strip()


async def mcpcall(tool_name: str, **arguments) -> str:

    for k, v in list(arguments.items()):

        if isinstance(v, str):

            arguments[k] = v.strip().replace("\r", "")

    async with streamablehttp_client(url=MCP_URL) as (r, w, _):

        async with ClientSession(r, w) as session:

            await session.initialize()

            resp = await session.call_tool(

                tool_name,

                arguments=arguments,

                read_timeout_seconds=timedelta(seconds=60),

            )

    texts: List[str] = []

    for c in getattr(resp, "content", []) or []:

        texts.append(getattr(c, "text", "") or "")

    return "\n".join(t for t in texts if t).strip()


async def lf_docs(

    mode: Literal["search", "get"],

    query: Optional[str] = None,

    pathOrUrl: Optional[str] = None,

) -> str:

    if mode == "search":

        if not query:

            return "Error: 'query' is required for mode='search'."

        return await mcpcall("searchLangfuseDocs", query=query)


    # mode == "get"

    if not pathOrUrl:

        return "Error: 'pathOrUrl' is required for mode='get'."

    return await mcpcall("getLangfuseDocsPage", pathOrUrl=pathOrUrl)


ROOT = Agent(

    name="adk_mcp_cli_vertex",

    model=os.getenv("ADK_MODEL", "gemini-2.5-flash"),

    description="CLI agent using Langfuse Docs MCP tools (Vertex AI Gemini 2.5 Flash)",

    instruction=SYSTEM_PROMPT,

    tools=[lf_docs],

)


@observe(name="adk-cli-turn", as_type="generation")

async def run_once(prompt: str) -> str:

    runner = InMemoryRunner(agent=ROOT, app_name=ROOT.name)

    session = await runner.session_service.create_session(app_name=ROOT.name, user_id="cli-user")


    user_msg = types.Content(role="user", parts=[types.Part(text=prompt)])


    final = "(no answer)"

    async for event in runner.run_async(user_id="cli-user", session_id=session.id, new_message=user_msg):

        if hasattr(event, "is_final_response") and event.is_final_response():

            if event.content and event.content.parts:

                final = event.content.parts[0].text or final

    return final


async def amain():

    prompt = " ".join(sys.argv[1:]) or input("> ")

    out = await run_once(prompt)

    print(out)

    get_client().flush() 


if name == "__main__":

    asyncio.run(amain())


こちらがTraceです。こちらも構造化されて、グラフ図も同様に出てきます。

Traceの中のデータの入り方などは異なりますが、基本的には同じです。


ADK での Trace
ADK での Trace

同様に図も出してくれてわかりやすい
同様に図も出してくれてわかりやすい

まとめ

今回のサンプルコードは @observe を利用して、Traceを作っていますがSpan の構成やUser, Session, 命名など細かく設定をすることも可能です。ぜひ色々と試して見てください(MCP サーバーの活用も便利だと思います!)。



コメント


bottom of page