pokemon agent runtime · Runtime 7
这篇文章是系列进阶篇:不再只讲“有 supervisor + workers”,而是专门讲清楚一个 LangGraph 项目在工程层面最关键的五件事:工具路由、运行时、结构化输出、记忆分层与上下文工程。
系列导航:上一篇见 pokemon agent runtime 系列(六):Docker Compose 架构与服务分层。本文是
pokemon agent runtime系列最后一篇,负责回答:一套 LangGraph 系统如何从“能跑”走到“可解释、可控、可维护”。
很多人第一次做 LangGraph,会在“跑通流程图”后停下来,觉得任务已经完成。
但真正决定系统质量的,往往不是 StateGraph(...) 这行代码,而是下面这些工程问题:
- 工具路由到底谁说了算,规则和 LLM 怎么分工?
- 运行时状态怎么传,怎么持久化,怎么流式输出?
- 结构化 JSON 输出怎么保证,不会一会儿是文本一会儿是半截 JSON?
- memory 是单层还是分层,短期/长期/缓存怎么协同?
- 上下文怎么拼接,既不丢证据,也不把上下文窗口撑爆?
pokemon agent 的价值就在于,它把这些问题都做成了可读的工程实现,而不是停留在抽象概念。
TL;DR
| 问题 | 在项目中的工程答案 |
|---|---|
| tools 路由怎么做? | supervisor 先走确定性规则(意图分类、rule route、parallel rule),最后才用 bind_tools(..., tool_choice="required") 兜底。 |
| runtime 怎么落地? | workflow.compile(checkpointer=..., store=...) + thread_id 配置 + astream_events 事件流,前后端通过 NDJSON 协议对接。 |
| JSON 输出怎么保证? | 在需要结构化输出的链路上使用 with_structured_output(PydanticModel),并提供异常 fallback,避免 brittle JSON parsing。 |
| memory 怎么设计? | AgentState 消息态 + checkpointer 线程态 + semantic cache 结果复用 + 可选长期记忆 middleware,形成分层记忆。 |
| 上下文工程怎么做? | Retriever 统一聚合 KB/Graph/Web/MCP 证据,worker 内做 query decomposition/HyDE/并行召回,finalizer 做“只改写不扩写”的强约束收口。 |
一张图先看全局
这张图对应到你的项目代码,可以直接对齐到:
- 前端入口:
web/src/composables/useChat.ts - 接口分发:
server/routers/chat_router.py - Agent 运行时:
src/agents/supervisor_agent.py - 图与状态:
src/graph/workflow.py、src/graph/state.py - 路由节点:
src/graph/nodes/supervisor.py - 收口节点:
src/graph/nodes/finalizer.py - 检索与缓存:
src/knowledge/core/retriever.py、src/knowledge/cache/cache.py
模块一:Tools 路由(Supervisor 决策栈)
很多教程会直接上 ReAct,把工具都丢给模型,然后期待模型自动做最优路由。
这在 Demo 阶段可行,但在工程系统里通常会出现两个问题:
- 成本不稳定(每次都先调 LLM 决定路由)
- 路由可控性差(简单问题也被过度 agent 化)
pokemon agent 在 src/graph/nodes/supervisor.py 里用了一个更工程化的分层决策:
# 1) 高置信意图:确定性快路由
decision = classify_intent(last_human or "")
if decision.intent == Intent.POKEDEX_FACTS and decision.confidence >= 0.8:
if "stats_worker" in allowed:
return {"next": "stats_worker", "forward_directly": True}
# 2) 规则并行路由
parallel_workers = rule_route_parallel(last_human or "", allowed)
if len(parallel_workers) > 1:
return {
"next": "__PARALLEL__",
"parallel_workers": parallel_workers,
"parallel_count": len(parallel_workers),
"parallel_done": 0,
"forward_directly": True,
}
# 3) 规则单路由
route = rule_route(last_human or "", allowed)
if route:
return {"next": route, "forward_directly": True}
# 4) 最后才交给 LLM tool-calling
handoff_tools = _build_handoff_tools(allowed, support_parallel=True)
llm_with_tools = self.llm.bind_tools(handoff_tools, tool_choice="required")路由栈图
这个顺序非常关键:能规则化的先规则化,把 LLM 路由留给真正不确定的场景。
它不是降低智能,而是把智能放在最该用的地方。
模块二:Runtime(图执行 + 事件流 + 线程态)
很多人把 LangGraph 当“流程控制库”,但在工程里它更像一个 runtime:
- 接收输入状态
- 按图调度节点
- 保存会话线程状态
- 产生流式执行事件
src/agents/supervisor_agent.py 里入口非常典型:
input_state: dict[str, Any] = {"messages": [HumanMessage(content=query)]}
if allowed_workers is not None:
input_state["allowed_workers"] = allowed_workers
if isinstance(db_id, str) and db_id.strip():
input_state["db_id"] = db_id.strip()
config = {"configurable": {"thread_id": meta.get("thread_id", "default")}}
async for event in self.graph.astream_events(input_state, config, version="v1"):
...这里至少说明了三层 runtime 设计:
- 输入态建模:不是只传
query,而是传messages + constraints + db_id的执行态。 - 线程态隔离:通过
thread_id把状态绑定到会话线程。 - 事件态输出:通过
astream_events提供 token、tool start/end、节点输出等事件。
在图编译层,SupervisorAgent._build_graph() 会注入 checkpointer 与 store:
return workflow.compile(
checkpointer=self._checkpointer,
store=_get_memory_store(),
)workflow.py 里则明确了图结构与路由函数:
workflow.add_conditional_edges(worker_name, _post_worker_route, _worker_route_map)
workflow.add_conditional_edges("supervisor", _supervisor_route, {...})
workflow.add_edge("finalizer", END)
workflow.set_entry_point("supervisor")这部分和前端的 web/src/composables/useChat.ts 正好闭环:前端把 thread_id、agent_constraints 发给后端,后端把执行结果按 NDJSON chunk 流回前端。
模块三:JSON 输出保证(结构化语义层 vs 传输层)
很多项目会写一句 prompt:请严格输出 JSON。
这在生产里非常脆,因为任何轻微偏差都会导致解析失败。
在 pokemon agent 里,可以看到更可靠的做法:用 schema 驱动 structured output。
例如 src/agents/chat_agent.py:
class GuardrailDecision(BaseModel):
status: str
reason: str | None = None
chain = guardrail_prompt | self.base_llm.with_structured_output(GuardrailDecision)路由决策也用约束 schema:
RouteResponse = _route_schema(options)
chain = prompt | self.base_llm.with_structured_output(RouteResponse)业务输出模式为 json 时:
structured_base_llm = self.base_llm.with_structured_output(AgentResponse)
model_response = wrapped_structured_invoke(messages)异常路径会 fallback(ErrorResponse 或 text),避免“全链路因为一次 parse error 崩掉”。
关键区分:NDJSON 不是结构化语义输出
server/routers/chat_router.py 与前端 readNdjsonStream(...) 用 application/x-ndjson,它解决的是传输层流式协议。
而 with_structured_output(...) 解决的是语义层 schema 保证。
两层都做,系统才稳:
- NDJSON:每一行是可增量消费 chunk
- structured output:字段满足 schema,减少后处理脆弱性
模块四:Memory 分层(状态记忆、线程记忆、缓存记忆、长期记忆)
在这个项目里,memory 不是单点功能,而是分层设计。不同层解决的问题不同,生命周期也不同。
1. 图内短期状态:AgentState
src/graph/state.py 的 AgentState 负责一次 LangGraph 执行过程中的共享状态。
class AgentState(TypedDict, total=False):
messages: Required[Annotated[Sequence[BaseMessage], operator.add]]
next: Required[str]
allowed_workers: NotRequired[list[str]]
db_id: NotRequired[str]
forward_directly: NotRequired[bool]
parallel_workers: NotRequired[list[str]]
parallel_count: NotRequired[int]
parallel_done: NotRequired[int]这里的 messages 不只是聊天历史,也会承载 worker 产生的中间结果;next 决定下一步走向哪个节点;allowed_workers 把前端传来的能力约束带进图里;parallel_* 字段则用于记录并行派发的 worker 数量和完成状态。
这一层 memory 的作用范围最短:只服务于一次图执行。它解决的是“当前这次任务走到哪里了、下一步该去哪”的问题。
2. 会话线程状态:checkpointer + thread_id
SupervisorAgent 在编译 workflow 时可以挂载 checkpointer,并在执行时传入 thread_id。
config = {"configurable": {"thread_id": meta.get("thread_id", "default")}}
async for event in self.graph.astream_events(input_state, config, version="v1"):
...thread_id 的意义是给 LangGraph 一个会话线程标识。只要后续请求继续使用同一个 thread_id,checkpointer 就能把这条线程上的状态连续起来。
工程上通常会有两种选择:
checkpointer = MemorySaver()
# or
checkpointer = SqliteSaver.from_conn_string("checkpoints.sqlite")
workflow.compile(checkpointer=checkpointer, store=store)MemorySaver 更适合本地开发或临时会话;SqliteSaver 更适合需要进程重启后仍能恢复的场景。这一层解决的是:同一个会话线程中,跨请求如何保持连续性。
3. 语义缓存:Semantic Cache
src/knowledge/cache/cache.py 的语义缓存不是保存“用户记忆”,而是复用相似问题的回答。
典型流程可以理解成:
query_embedding = embedding_model.embed_query(query)
cache_hit = cache.search(
query_embedding,
model_provider=model_provider,
model_name=model_name,
system_prompt_sha=system_prompt_sha,
)
if cache_hit and cache_hit.score >= threshold:
return cache_hit.answer这里最关键的不是 embedding 相似度本身,而是命名空间隔离:
namespace = f"{model_provider}:{model_name}:{system_prompt_sha}"如果模型、provider 或 system prompt 变了,即使用户问题很像,也不应该直接复用旧答案。否则就可能出现“相似问题命中缓存,但回答风格或约束来自另一套配置”的错配问题。
所以这一层 memory 的本质是:跨会话复用计算结果,但必须防止跨模型、跨 prompt 的错误复用。
4. 长期记忆:可选 middleware
src/agents/chat_agent.py 中的 LongTermMemoryMiddleware 是可选能力,通常按 feature flag 注入。
它和 checkpointer 不同:checkpointer 维护的是会话线程状态,长期记忆维护的是用户偏好、历史事实或可复用背景。
可以把它理解成两个阶段:
# before model
memories = memory_store.search(user_id=user_id, query=query)
messages = inject_memories(messages, memories)
# after agent
memory_store.save_async(user_id=user_id, messages=messages, answer=answer)模型调用前,它检索相关长期记忆并注入上下文;模型调用后,它再异步回写新的可记忆信息。
这一层的关键是“可选”和“异步”:不是每次对话都必须启用长期记忆,也不应该让长期记忆写入阻塞主回答链路。它解决的是:跨会话、跨任务的长期偏好和背景复用。
Memory 分层图
这四层的价值在于:
不同类型的信息放在不同生命周期里,不让“上下文历史”承担所有记忆职责。
模块五:上下文工程(Retriever + Worker + Finalizer)
一个常见误区是:检索做完就把结果拼到 prompt 尾部。
pokemon agent 的实现更接近可维护工程链:
1) 多源检索聚合器(Retriever)
src/knowledge/core/retriever.py 的 retrieval(...) 会并行聚合:
- Knowledge Base
- Graph
- Web
- MCP
当同时启用多个能力时用 ThreadPoolExecutor 并发,降低尾延迟。
2) context 构造器(construct_query)
construct_query(...) 不是盲拼,而是分区块组织:
- 图谱回答块
- 知识库证据块
- Web 结果块
- MCP 结果块
再套入 knowbase_qa_template,形成可消费增强输入。
3) worker 内上下文优化(RagWorker)
src/graph/nodes/rag_worker.py 里有几层实用策略:
- 复杂问题先
query decomposition - 按复杂度自适应
top_k - 条件启用 HyDE(不是默认全开)
- 多子问题并行召回
- CRAG 评估检索质量后修正路径
4) finalizer 强约束收口
src/graph/nodes/finalizer.py 的系统提示词明确要求:
- 只能基于草稿改写,不新增事实
- 不泄露工具/数据库/图谱/链路等内部细节
- 信息不足时友好说明并给一个澄清问题
上下文流转图
安全与可控:几条可直接复用的 guardrails
从代码里能提炼出一组很实用的控制策略:
- deterministic fast path:高置信简单问题直接走确定性路径(减少无谓 LLM 调度)。
- forward_directly:worker 结果可直接收口,避免回 supervisor 二次改写。
- allowed_workers:前端通过
agent_constraints显式约束可用能力边界。 - parallel_count/parallel_done:并行任务有显式完成条件,不靠隐式猜测。
- safe cache 条件:缓存只在单轮且非检索场景生效,避免上下文污染。
如果把这些策略抽象成一句话,就是:
先做“可控与确定”的部分,再把不确定性交给模型。
给自己的实践清单:如何判断你真的学会了 LangGraph 工程化
你可以用下面这张表自测:
| 检查项 | 达标标准 |
|---|---|
| 图结构 | 能清楚说出入口节点、条件边、终止条件、并行分发点。 |
| 路由策略 | 能解释规则路由与 LLM 路由的优先级,以及为什么这么排。 |
| 运行时 | 能解释 thread_id、checkpointer、astream_events 各自解决什么问题。 |
| 结构化输出 | 能区分“prompt 要 JSON”和“schema 强制输出”的可靠性差异。 |
| memory 分层 | 能区分状态记忆、线程记忆、缓存记忆、长期记忆的职责边界。 |
| 上下文工程 | 能解释检索证据是如何组织、压缩、拼接并最终进入生成的。 |
| 失败回退 | 能指出至少 3 个 fallback 路径(例如 finalizer fallback、cache miss、vector store 不可用时的本地兜底)。 |
如果这些点你都能从代码里讲清楚,并且能自己改一条路由、加一个 worker、扩一个 schema,你对 LangGraph 的理解就已经进入工程层了。
如果你已经读到这里,下一步最值得做的不是继续加术语,而是亲手做三件小改动:
- 给
supervisor新增一个可控 worker,并补allowed_workers约束 - 给一个节点增加
with_structured_output(...),把脆弱解析改成 schema 约束 - 给一次检索链加观测点(路由决策、召回条数、最终上下文长度),把“感觉变好”改成“可测量变好”