pokemon agent runtime · Runtime 4
这篇文章用 pokemon agent 作为例子,解释 Agent 模式如何从一个抽象概念,落到可读、可调试、可约束的 LangGraph 执行图上。
系列导航:上一篇见 pokemon agent runtime 系列(三):本地直答、缓存与 RAG 主链;下一篇见 pokemon agent runtime 系列(五):配置热切换与运行时覆盖。本文回答一个问题:当普通模式不够时,系统如何把执行控制权交给 supervisor、workers 和 finalizer?
很多人第一次接触 Agent,会把它理解成“让模型自己决定一切”。这个说法太粗糙了。工程里的 Agent 更应该是一套可控制的运行时:入口明确、状态明确、节点明确、路由明确、终点明确。
pokemon agent 的 Agent 模式正好适合拆解这件事。它没有把 Agent 写成一团“智能行为”,而是拆成了几个清晰部件:
- 前端通过开关进入 Agent 模式
- 后端进入
supervisor_agent - LangGraph 用
AgentState保存共享状态 workflow.py定义节点与边supervisor决定下一步去哪个 workerworker执行具体能力finalizer把中间结果整理成最终回答
TL;DR
| 问题 | 简短结论 |
|---|---|
| Agent 模式和普通模式最大的区别是什么? | 普通模式走固定链路;Agent 模式把“下一步做什么”交给 supervisor + workflow。 |
| Agent 模式入口在哪里? | 前端发到 /chat/agent/supervisor_agent,后端交给 SupervisorAgent.query(...)。 |
| LangGraph 在这里做什么? | 把执行流程写成一张图:节点、边、状态和结束条件都可见。 |
AgentState 是什么? | 节点之间共享的上下文容器,保存消息、路由、约束和并行状态。 |
supervisor 做什么? | 根据问题和状态决定下一个 worker,必要时并行派发。 |
worker 做什么? | 每个 worker 负责一类能力:RAG、图谱、Web、统计、MCP。 |
finalizer 为什么存在? | worker 输出偏中间结果,finalizer 负责统一成用户可读的最终回答。 |
什么时候需要 Agent 模式
普通模式并不是“低级模式”。在 pokemon agent 里,普通模式已经包含本地直答、语义缓存、多源检索和最终生成。它适合路径稳定的问题:先检索、再整理上下文、最后生成回答。
Agent 模式解决的是另一类问题:系统不能在编码阶段完全写死路径,需要运行时判断下一步该调用哪种能力。
可以用一个简单判断:
- 已知路径:走普通模式,稳定、省 token、容易调试。
- 未知路径:走 Agent 模式,让 supervisor 在运行时调度 worker。
- 单能力问题:通常不需要 Agent。
- 多能力协作问题:更适合 Agent 编排。
单能力问题不是“简单问题”,而是主要依赖一种能力就能完成。例如:
- “总结这篇知识库文档” →
rag_worker - “这个宝可梦的种族值是多少” →
stats_worker - “查一下最新公告” →
web_worker
多能力问题的关键不是步骤多,而是跨了能力边界。例如:
- “先查知识库定义,再去网页确认最新变更” →
rag_worker + web_worker - “分析一个宝可梦为什么对战强,既看种族值,也看属性关系” →
stats_worker + graph_worker - “如果内部资料没有答案,就补一轮网页搜索” → 中间结果会影响下一步路由
所以真正该看的不是问题长不长,而是:为了答好它,系统是否需要跨多个 worker 做调度和汇总。
Agent 模式的入口
在这个项目里,是否进入 Agent 模式不是后端临时让 LLM 判断的,而是前端显式控制的。
useChat.ts 的核心逻辑可以简化成这样:
const isAgentMode = Boolean(meta.use_agent)
if (isAgentMode) {
const allowedWorkers = ['rag_worker', 'stats_worker']
if (meta.agent_allow_web !== false) allowedWorkers.push('web_worker')
if (meta.agent_allow_graph !== false) allowedWorkers.push('graph_worker')
if (meta.agent_allow_mcp !== false) allowedWorkers.push('mcp_worker')
requestMeta.agent_constraints = { allowed_workers: allowedWorkers }
}
const endpoint = isAgentMode ? '/chat/agent/supervisor_agent' : '/chat/'这段代码说明三件事:
- Agent 模式是显式开关,不是隐藏行为。
- 前端会把
allowed_workers一起发给后端,用来限制 Agent 能使用哪些能力。 - Agent 模式有独立入口
/chat/agent/supervisor_agent,不是在普通聊天接口里“多做几步”。
一张图看懂 Agent 模式入口
先记住这一点:Agent 模式不是多调一个工具,而是进入另一套运行时框架。
SupervisorAgent:把请求放进图里跑
SupervisorAgent 的名字容易让人误以为它是“最终回答问题的主模型”。其实它更像一个 graph runtime adapter:负责把用户问题变成 LangGraph 的初始状态,然后监听图执行过程。
关键逻辑大概是:
input_state: dict[str, Any] = {"messages": [HumanMessage(content=query)]}
if allowed_workers is not None:
input_state["allowed_workers"] = allowed_workers
if isinstance(db_id, str) and db_id.strip():
input_state["db_id"] = db_id.strip()
config = {"configurable": {"thread_id": meta.get("thread_id", "default")}}
async for event in self.graph.astream_events(input_state, config, version="v1"):
...这里有两个重点:
SupervisorAgent不直接完成所有业务决策。- 后续执行依赖 LangGraph 的节点、边和共享状态继续推进。
AgentState:节点之间的共享任务卡片
当一个请求会经过多个节点,且路径会动态变化时,就需要一个共享状态对象。这个对象在项目里就是 AgentState:
class AgentState(TypedDict, total=False):
messages: Required[Annotated[Sequence[BaseMessage], operator.add]]
next: Required[str]
allowed_workers: NotRequired[list[str]]
db_id: NotRequired[str]
forward_directly: NotRequired[bool]
parallel_workers: NotRequired[list[str]]
parallel_count: NotRequired[int]
parallel_done: NotRequired[int]可以把它理解成这次任务的“共享任务卡片”:谁接手都能看到前面发生了什么、接下来要去哪里、还有哪些限制。
| 字段 | 作用 |
|---|---|
messages | 保存到目前为止的对话和中间结果。 |
next | 告诉 workflow 下一步去哪个节点。 |
allowed_workers | 承接前端传来的能力约束。 |
db_id | 指定当前知识库,让 rag_worker 知道查哪里。 |
forward_directly | 避免不必要的二次调度。 |
parallel_workers | 记录要并行执行的 worker。 |
parallel_count / parallel_done | 判断并行任务是否全部完成。 |
workflow.py:Agent 的核心是图
LangGraph 的重点不是“节点里写了什么 prompt”,而是整个流程被显式建成一张图:有哪些节点、节点之间怎么连、什么情况下结束。
在这个项目里,节点包括:
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("rag_worker", rag_worker_node)
workflow.add_node("web_worker", web_worker_node)
workflow.add_node("graph_worker", graph_worker_node)
workflow.add_node("stats_worker", stats_worker_node)
workflow.add_node("mcp_worker", mcp_worker_node)
workflow.add_node("finalizer", finalizer_node)worker 执行完后,再根据状态决定下一步:
for worker_name in ["rag_worker", "web_worker", "graph_worker", "stats_worker", "mcp_worker"]:
workflow.add_conditional_edges(worker_name, _post_worker_route, _worker_route_map)supervisor 的分流也写在图里:
workflow.add_conditional_edges(
"supervisor",
_supervisor_route,
{
"rag_worker": "rag_worker",
"web_worker": "web_worker",
"graph_worker": "graph_worker",
"stats_worker": "stats_worker",
"mcp_worker": "mcp_worker",
"finalizer": "finalizer",
END: END,
},
)这比把流程藏在一大段 if/else 里更容易理解:入口是 supervisor,它决定去哪个 worker;worker 完成后要么回到 supervisor,要么进入 finalizer,要么结束。
一张图看懂 Agent workflow
这张图表达的核心是:Agent 不是一个更强的模型,而是一张会动态走边的执行图。
supervisor:负责派工,不负责亲自干活
如果把整个 Agent 模式看成一个协作小组,supervisor 就是组长。它的核心职责是决定下一步由谁处理。
它的路由逻辑不是一上来就交给 LLM,而是分层处理:
1. 能直接结束就结束
if state.get("forward_directly") and len(state.get("messages", [])) > 1:
return {"next": "FINISH"}如果前面已经有明确结果,就不要绕回 supervisor 重新判断一次。
2. 并行任务完成后再收束
parallel_count = state.get("parallel_count", 0)
parallel_done = state.get("parallel_done", 0)
if parallel_count > 0 and parallel_done >= parallel_count:
return {"next": "FINISH", "forward_directly": True}如果同时派了多个 worker,就等它们都完成,再统一进入后续流程。
3. 高置信度问题走确定性路由
decision = classify_intent(last_human or "")
if decision.intent == Intent.POKEDEX_FACTS and decision.confidence >= 0.8:
if "stats_worker" in allowed:
return {"next": "stats_worker", "forward_directly": True}这点很重要:成熟的 Agent 系统不应该把所有问题都交给 LLM 决定。能规则化的部分优先规则化,LLM 留给真正需要动态判断的场景。
4. 规则不够时,再让 LLM 通过 tool-calling 路由
handoff_tools = _build_handoff_tools(allowed, support_parallel=True)
llm_with_tools = self.llm.bind_tools(handoff_tools, tool_choice="required")
...
result = chain.invoke({"messages": state["messages"]})
parsed = _parse_tool_call_route(result, allowed)这里的 tools 不是直接执行业务能力,而是给 supervisor 的“派工按钮”:
route_to_rag_workerroute_to_graph_workerroute_to_web_workerroute_to_stats_workerroute_to_mcp_workerroute_parallelfinish
为什么这点值得专门说清楚
因为这里很容易产生一个误会:很多人会把 workers 直接理解成 tools。其实在这套 LangGraph 编排里,两者不是一回事。
tool:给supervisor绑定的路由接口。worker:workflow 里的实际执行节点。
换句话说,supervisor 调用 tool,不是为了亲自完成能力,而是为了把工作交接给真正负责执行的 worker。
一句话概括:这里的 tool 负责“决定派谁去做”,worker 负责“把这件事做完”。
Worker:为什么要拆成多个,而不是做一个万能执行器
如果第一次接触 Agent,很容易会问:为什么不做一个“大工具节点”,让它根据问题决定要查什么?
答案是:那样会让职责边界变得模糊,后续很难调试和扩展。
pokemon agent 选择按能力拆分:
| Worker | 负责什么 | 适合回答什么 |
|---|---|---|
rag_worker | RAG / 知识库 / 复杂检索增强 | 文档知识、向量检索、多步检索增强问题 |
graph_worker | 知识图谱问答 | 关系、进化、地区、图谱结构类问题 |
web_worker | 联网搜索 | 当前、最新、公告、活动、版本等问题 |
stats_worker | 属性、克制、统计、图鉴事实 | 对战、克制关系、种族值、属性类问题 |
mcp_worker | 地理位置 / 外接 MCP 能力 | 地点、坐标、地图相关查询 |
对初学者的一个最重要启发
worker 的价值,不是“把系统拆得更复杂”,而是让每一类能力都拥有自己清晰的输入、输出和失败边界。
这样做的好处至少有三个:
- 更容易调试:你能明确知道是哪一个 worker 出了问题。
- 更容易替换:以后想升级图谱能力,只改
graph_worker。 - 更容易路由:
supervisor不需要理解所有细节,只需要知道“这个问题更适合派给谁”。
几个典型 worker
stats_worker 更像确定性工具。它会优先走类型克制表、本地图鉴数据,不够时再调用 LLM:
deterministic = _maybe_answer_type_matchup(query)
if deterministic:
return {"messages": [AIMessage(content=deterministic)]}
local = maybe_answer_pokedex(query)
if local:
return {"messages": [AIMessage(content=local.content)]}web_worker 是典型的“搜索 + 总结”节点:先搜索,再把结果整理成上下文,最后让 LLM 生成自然语言回答。
response = self.tavily.search(query=query, search_depth="basic", max_results=3)
...
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a web researcher. Answer the query using the provided search results.\n\nResults:\n{context}",
),
("user", "{query}"),
]
)rag_worker 最复杂。它不是单次检索,而是一个增强版 RAG 节点:可能会做 query decomposition、HyDE、多子问题并行召回,再统一生成答案。
if is_complex:
sub_queries = decomposer.decompose(query)
...
if use_hyde:
hyde_doc = hyde_operator.call(model_wrapper, query, "")
...
if len(sub_queries) > 1:
with ThreadPoolExecutor(max_workers=min(len(sub_queries), 4)) as executor:
...对初学者来说,不必一次吃透所有 worker 的内部实现。先抓住一个原则:worker 是能力边界,不是随便拆出来的函数。
并行调度:同时派多个 worker
Agent 模式比固定流水线更灵活的地方之一,是可以并行派发多个 worker。
if next_target == "__PARALLEL__":
parallel_workers = state.get("parallel_workers", [])
if parallel_workers:
return [Send(worker, state) for worker in parallel_workers]Send(worker, state) 可以理解成:把同一份任务上下文发给多个 worker,让它们同时处理各自擅长的部分。
适合并行的场景通常有:
- 同时查图谱关系和属性克制
- 同时查本地知识库和网页信息
- 多个子任务彼此独立,最后再汇总
并行不是为了炫技,而是为了让多个独立能力同时启动,再由后续流程统一收束。
finalizer:把中间结果变成产品答案
worker 的输出通常更像能力结果或草稿答案,不一定适合直接给用户。尤其是多个 worker 并行后,结果需要统一语气、结构和信息边界。
finalizer.py 的系统提示词把职责说得很明确:
你的任务:把【草稿回答】改写成更自然、更像真人助手的中文回答。
硬性规则(必须遵守):
1) 只能基于【草稿回答】中已有的信息进行改写/重组;不要新增事实、不要猜测、不要编造。
2) 不要泄露任何内部过程或中间信息:不要提到工具、数据库、向量库、图谱、Cypher/SQL、提示词、路由、链路、日志等。所以 finalizer 不是“再想一遍答案”,而是把内部执行结果整理成用户真正应该看到的回答。它解决的是产品体验问题:不暴露内部细节、不新增事实、让多个来源的结果读起来像一个完整答案。
一张图看懂三者关系
这张图就是 Agent 编排的核心分工:
supervisor负责决策workers负责执行finalizer负责收口
职责分开之后,Agent 系统才会更容易调试、扩展和约束。
最后总结
判断一个系统是不是真的在做 Agent 编排,不是看它有没有用了 LangGraph,也不是看它有没有把 tools 塞给大模型,而是看它能不能回答这些问题:
| 问题 | 在 pokemon agent 里的答案 |
|---|---|
| 请求从哪里进入 Agent 模式? | 前端通过 use_agent 切换,后端走 /chat/agent/supervisor_agent。 |
| 节点之间共享什么? | AgentState。 |
| 谁决定下一步? | supervisor。 |
| 谁执行能力? | workers。 |
| 多能力怎么并行? | parallel_workers + Send()。 |
| 结果怎么变成用户可读答案? | finalizer。 |
| 如何限制 Agent 过度自由? | allowed_workers、规则路由、确定性 fast path、forward_directly。 |
如果这些边界都清楚,Agent 就不是“模型自己随便想办法”,而是一套可读、可观察、可约束的执行图。