pokemon agent runtime 系列(四):LangGraph Agent 编排

April 20, 2026

pokemon agent runtime 系列(四):LangGraph Agent 编排

pokemon agent runtime · Runtime 4

这篇文章用 pokemon agent 作为例子,解释 Agent 模式如何从一个抽象概念,落到可读、可调试、可约束的 LangGraph 执行图上。

系列导航:上一篇见 pokemon agent runtime 系列(三):本地直答、缓存与 RAG 主链;下一篇见 pokemon agent runtime 系列(五):配置热切换与运行时覆盖。本文回答一个问题:当普通模式不够时,系统如何把执行控制权交给 supervisor、workers 和 finalizer?

很多人第一次接触 Agent,会把它理解成“让模型自己决定一切”。这个说法太粗糙了。工程里的 Agent 更应该是一套可控制的运行时:入口明确、状态明确、节点明确、路由明确、终点明确。

pokemon agent 的 Agent 模式正好适合拆解这件事。它没有把 Agent 写成一团“智能行为”,而是拆成了几个清晰部件:

  • 前端通过开关进入 Agent 模式
  • 后端进入 supervisor_agent
  • LangGraph 用 AgentState 保存共享状态
  • workflow.py 定义节点与边
  • supervisor 决定下一步去哪个 worker
  • worker 执行具体能力
  • finalizer 把中间结果整理成最终回答

TL;DR

问题简短结论
Agent 模式和普通模式最大的区别是什么?普通模式走固定链路;Agent 模式把“下一步做什么”交给 supervisor + workflow
Agent 模式入口在哪里?前端发到 /chat/agent/supervisor_agent,后端交给 SupervisorAgent.query(...)
LangGraph 在这里做什么?把执行流程写成一张图:节点、边、状态和结束条件都可见。
AgentState 是什么?节点之间共享的上下文容器,保存消息、路由、约束和并行状态。
supervisor 做什么?根据问题和状态决定下一个 worker,必要时并行派发。
worker 做什么?每个 worker 负责一类能力:RAG、图谱、Web、统计、MCP。
finalizer 为什么存在?worker 输出偏中间结果,finalizer 负责统一成用户可读的最终回答。

什么时候需要 Agent 模式

普通模式并不是“低级模式”。在 pokemon agent 里,普通模式已经包含本地直答、语义缓存、多源检索和最终生成。它适合路径稳定的问题:先检索、再整理上下文、最后生成回答。

Agent 模式解决的是另一类问题:系统不能在编码阶段完全写死路径,需要运行时判断下一步该调用哪种能力。

可以用一个简单判断:

  • 已知路径:走普通模式,稳定、省 token、容易调试。
  • 未知路径:走 Agent 模式,让 supervisor 在运行时调度 worker。
  • 单能力问题:通常不需要 Agent。
  • 多能力协作问题:更适合 Agent 编排。

单能力问题不是“简单问题”,而是主要依赖一种能力就能完成。例如:

  • “总结这篇知识库文档” → rag_worker
  • “这个宝可梦的种族值是多少” → stats_worker
  • “查一下最新公告” → web_worker

多能力问题的关键不是步骤多,而是跨了能力边界。例如:

  • “先查知识库定义,再去网页确认最新变更” → rag_worker + web_worker
  • “分析一个宝可梦为什么对战强,既看种族值,也看属性关系” → stats_worker + graph_worker
  • “如果内部资料没有答案,就补一轮网页搜索” → 中间结果会影响下一步路由

所以真正该看的不是问题长不长,而是:为了答好它,系统是否需要跨多个 worker 做调度和汇总。

Agent 模式的入口

在这个项目里,是否进入 Agent 模式不是后端临时让 LLM 判断的,而是前端显式控制的。

useChat.ts 的核心逻辑可以简化成这样:

const isAgentMode = Boolean(meta.use_agent)
 
if (isAgentMode) {
  const allowedWorkers = ['rag_worker', 'stats_worker']
  if (meta.agent_allow_web !== false) allowedWorkers.push('web_worker')
  if (meta.agent_allow_graph !== false) allowedWorkers.push('graph_worker')
  if (meta.agent_allow_mcp !== false) allowedWorkers.push('mcp_worker')
  requestMeta.agent_constraints = { allowed_workers: allowedWorkers }
}
 
const endpoint = isAgentMode ? '/chat/agent/supervisor_agent' : '/chat/'

这段代码说明三件事:

  1. Agent 模式是显式开关,不是隐藏行为。
  2. 前端会把 allowed_workers 一起发给后端,用来限制 Agent 能使用哪些能力。
  3. Agent 模式有独立入口 /chat/agent/supervisor_agent,不是在普通聊天接口里“多做几步”。

一张图看懂 Agent 模式入口

用户开关use_agent = trueuseChat.ts组装 history / meta注入 allowed_workers设置 db_id / thread_id选择 Agent endpointchat_router.py统一打包流式 chunkSupervisorAgentgraph.query() 入口

先记住这一点:Agent 模式不是多调一个工具,而是进入另一套运行时框架。

SupervisorAgent:把请求放进图里跑

SupervisorAgent 的名字容易让人误以为它是“最终回答问题的主模型”。其实它更像一个 graph runtime adapter:负责把用户问题变成 LangGraph 的初始状态,然后监听图执行过程。

关键逻辑大概是:

input_state: dict[str, Any] = {"messages": [HumanMessage(content=query)]}
if allowed_workers is not None:
    input_state["allowed_workers"] = allowed_workers
if isinstance(db_id, str) and db_id.strip():
    input_state["db_id"] = db_id.strip()
 
config = {"configurable": {"thread_id": meta.get("thread_id", "default")}}
 
async for event in self.graph.astream_events(input_state, config, version="v1"):
    ...

这里有两个重点:

  • SupervisorAgent 不直接完成所有业务决策。
  • 后续执行依赖 LangGraph 的节点、边和共享状态继续推进。

AgentState:节点之间的共享任务卡片

当一个请求会经过多个节点,且路径会动态变化时,就需要一个共享状态对象。这个对象在项目里就是 AgentState

class AgentState(TypedDict, total=False):
    messages: Required[Annotated[Sequence[BaseMessage], operator.add]]
    next: Required[str]
    allowed_workers: NotRequired[list[str]]
    db_id: NotRequired[str]
    forward_directly: NotRequired[bool]
    parallel_workers: NotRequired[list[str]]
    parallel_count: NotRequired[int]
    parallel_done: NotRequired[int]

可以把它理解成这次任务的“共享任务卡片”:谁接手都能看到前面发生了什么、接下来要去哪里、还有哪些限制。

字段作用
messages保存到目前为止的对话和中间结果。
next告诉 workflow 下一步去哪个节点。
allowed_workers承接前端传来的能力约束。
db_id指定当前知识库,让 rag_worker 知道查哪里。
forward_directly避免不必要的二次调度。
parallel_workers记录要并行执行的 worker。
parallel_count / parallel_done判断并行任务是否全部完成。

workflow.py:Agent 的核心是图

LangGraph 的重点不是“节点里写了什么 prompt”,而是整个流程被显式建成一张图:有哪些节点、节点之间怎么连、什么情况下结束。

在这个项目里,节点包括:

workflow.add_node("supervisor", supervisor_node)
workflow.add_node("rag_worker", rag_worker_node)
workflow.add_node("web_worker", web_worker_node)
workflow.add_node("graph_worker", graph_worker_node)
workflow.add_node("stats_worker", stats_worker_node)
workflow.add_node("mcp_worker", mcp_worker_node)
workflow.add_node("finalizer", finalizer_node)

worker 执行完后,再根据状态决定下一步:

for worker_name in ["rag_worker", "web_worker", "graph_worker", "stats_worker", "mcp_worker"]:
    workflow.add_conditional_edges(worker_name, _post_worker_route, _worker_route_map)

supervisor 的分流也写在图里:

workflow.add_conditional_edges(
    "supervisor",
    _supervisor_route,
    {
        "rag_worker": "rag_worker",
        "web_worker": "web_worker",
        "graph_worker": "graph_worker",
        "stats_worker": "stats_worker",
        "mcp_worker": "mcp_worker",
        "finalizer": "finalizer",
        END: END,
    },
)

这比把流程藏在一大段 if/else 里更容易理解:入口是 supervisor,它决定去哪个 worker;worker 完成后要么回到 supervisor,要么进入 finalizer,要么结束。

一张图看懂 Agent workflow

supervisor决定下一步路由rag_workergraph_workerweb_workerstats_workermcp_workerfinalizer整理最终回答END

这张图表达的核心是:Agent 不是一个更强的模型,而是一张会动态走边的执行图。

supervisor:负责派工,不负责亲自干活

如果把整个 Agent 模式看成一个协作小组,supervisor 就是组长。它的核心职责是决定下一步由谁处理。

它的路由逻辑不是一上来就交给 LLM,而是分层处理:

1. 能直接结束就结束

if state.get("forward_directly") and len(state.get("messages", [])) > 1:
    return {"next": "FINISH"}

如果前面已经有明确结果,就不要绕回 supervisor 重新判断一次。

2. 并行任务完成后再收束

parallel_count = state.get("parallel_count", 0)
parallel_done = state.get("parallel_done", 0)
if parallel_count > 0 and parallel_done >= parallel_count:
    return {"next": "FINISH", "forward_directly": True}

如果同时派了多个 worker,就等它们都完成,再统一进入后续流程。

3. 高置信度问题走确定性路由

decision = classify_intent(last_human or "")
if decision.intent == Intent.POKEDEX_FACTS and decision.confidence >= 0.8:
    if "stats_worker" in allowed:
        return {"next": "stats_worker", "forward_directly": True}

这点很重要:成熟的 Agent 系统不应该把所有问题都交给 LLM 决定。能规则化的部分优先规则化,LLM 留给真正需要动态判断的场景。

4. 规则不够时,再让 LLM 通过 tool-calling 路由

handoff_tools = _build_handoff_tools(allowed, support_parallel=True)
llm_with_tools = self.llm.bind_tools(handoff_tools, tool_choice="required")
...
result = chain.invoke({"messages": state["messages"]})
parsed = _parse_tool_call_route(result, allowed)

这里的 tools 不是直接执行业务能力,而是给 supervisor 的“派工按钮”:

  • route_to_rag_worker
  • route_to_graph_worker
  • route_to_web_worker
  • route_to_stats_worker
  • route_to_mcp_worker
  • route_parallel
  • finish

为什么这点值得专门说清楚

因为这里很容易产生一个误会:很多人会把 workers 直接理解成 tools。其实在这套 LangGraph 编排里,两者不是一回事。

  • tool:给 supervisor 绑定的路由接口。
  • worker:workflow 里的实际执行节点。

换句话说,supervisor 调用 tool,不是为了亲自完成能力,而是为了把工作交接给真正负责执行的 worker。

一句话概括:这里的 tool 负责“决定派谁去做”,worker 负责“把这件事做完”。

Worker:为什么要拆成多个,而不是做一个万能执行器

如果第一次接触 Agent,很容易会问:为什么不做一个“大工具节点”,让它根据问题决定要查什么?

答案是:那样会让职责边界变得模糊,后续很难调试和扩展。

pokemon agent 选择按能力拆分:

Worker负责什么适合回答什么
rag_workerRAG / 知识库 / 复杂检索增强文档知识、向量检索、多步检索增强问题
graph_worker知识图谱问答关系、进化、地区、图谱结构类问题
web_worker联网搜索当前、最新、公告、活动、版本等问题
stats_worker属性、克制、统计、图鉴事实对战、克制关系、种族值、属性类问题
mcp_worker地理位置 / 外接 MCP 能力地点、坐标、地图相关查询

对初学者的一个最重要启发

worker 的价值,不是“把系统拆得更复杂”,而是让每一类能力都拥有自己清晰的输入、输出和失败边界。

这样做的好处至少有三个:

  1. 更容易调试:你能明确知道是哪一个 worker 出了问题。
  2. 更容易替换:以后想升级图谱能力,只改 graph_worker
  3. 更容易路由supervisor 不需要理解所有细节,只需要知道“这个问题更适合派给谁”。

几个典型 worker

stats_worker 更像确定性工具。它会优先走类型克制表、本地图鉴数据,不够时再调用 LLM:

deterministic = _maybe_answer_type_matchup(query)
if deterministic:
    return {"messages": [AIMessage(content=deterministic)]}
 
local = maybe_answer_pokedex(query)
if local:
    return {"messages": [AIMessage(content=local.content)]}

web_worker 是典型的“搜索 + 总结”节点:先搜索,再把结果整理成上下文,最后让 LLM 生成自然语言回答。

response = self.tavily.search(query=query, search_depth="basic", max_results=3)
...
prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a web researcher. Answer the query using the provided search results.\n\nResults:\n{context}",
        ),
        ("user", "{query}"),
    ]
)

rag_worker 最复杂。它不是单次检索,而是一个增强版 RAG 节点:可能会做 query decomposition、HyDE、多子问题并行召回,再统一生成答案。

if is_complex:
    sub_queries = decomposer.decompose(query)
...
if use_hyde:
    hyde_doc = hyde_operator.call(model_wrapper, query, "")
...
if len(sub_queries) > 1:
    with ThreadPoolExecutor(max_workers=min(len(sub_queries), 4)) as executor:
        ...

对初学者来说,不必一次吃透所有 worker 的内部实现。先抓住一个原则:worker 是能力边界,不是随便拆出来的函数。

并行调度:同时派多个 worker

Agent 模式比固定流水线更灵活的地方之一,是可以并行派发多个 worker。

if next_target == "__PARALLEL__":
    parallel_workers = state.get("parallel_workers", [])
    if parallel_workers:
        return [Send(worker, state) for worker in parallel_workers]

Send(worker, state) 可以理解成:把同一份任务上下文发给多个 worker,让它们同时处理各自擅长的部分。

适合并行的场景通常有:

  • 同时查图谱关系和属性克制
  • 同时查本地知识库和网页信息
  • 多个子任务彼此独立,最后再汇总

并行不是为了炫技,而是为了让多个独立能力同时启动,再由后续流程统一收束。

finalizer:把中间结果变成产品答案

worker 的输出通常更像能力结果或草稿答案,不一定适合直接给用户。尤其是多个 worker 并行后,结果需要统一语气、结构和信息边界。

finalizer.py 的系统提示词把职责说得很明确:

你的任务:把【草稿回答】改写成更自然、更像真人助手的中文回答。
 
硬性规则(必须遵守):
1) 只能基于【草稿回答】中已有的信息进行改写/重组;不要新增事实、不要猜测、不要编造。
2) 不要泄露任何内部过程或中间信息:不要提到工具、数据库、向量库、图谱、Cypher/SQL、提示词、路由、链路、日志等。

所以 finalizer 不是“再想一遍答案”,而是把内部执行结果整理成用户真正应该看到的回答。它解决的是产品体验问题:不暴露内部细节、不新增事实、让多个来源的结果读起来像一个完整答案。

一张图看懂三者关系

supervisor决定派谁干活决定是否并行workersrag / graph / web / stats / mcp每个 worker 只做一类能力可以单独执行,也可以并行执行产出能力结果或草稿答案finalizer整理成最终回答不新增事实,只做收束

这张图就是 Agent 编排的核心分工:

  • supervisor 负责决策
  • workers 负责执行
  • finalizer 负责收口

职责分开之后,Agent 系统才会更容易调试、扩展和约束。

最后总结

判断一个系统是不是真的在做 Agent 编排,不是看它有没有用了 LangGraph,也不是看它有没有把 tools 塞给大模型,而是看它能不能回答这些问题:

问题pokemon agent 里的答案
请求从哪里进入 Agent 模式?前端通过 use_agent 切换,后端走 /chat/agent/supervisor_agent
节点之间共享什么?AgentState
谁决定下一步?supervisor
谁执行能力?workers
多能力怎么并行?parallel_workers + Send()
结果怎么变成用户可读答案?finalizer
如何限制 Agent 过度自由?allowed_workers、规则路由、确定性 fast path、forward_directly

如果这些边界都清楚,Agent 就不是“模型自己随便想办法”,而是一套可读、可观察、可约束的执行图。