pokemon agent runtime 系列(七):LangGraph 工程化实战

April 23, 2026

pokemon agent runtime 系列(七):LangGraph 工程化实战

pokemon agent runtime · Runtime 7

这篇文章是系列进阶篇:不再只讲“有 supervisor + workers”,而是专门讲清楚一个 LangGraph 项目在工程层面最关键的五件事:工具路由、运行时、结构化输出、记忆分层与上下文工程。

系列导航:上一篇见 pokemon agent runtime 系列(六):Docker Compose 架构与服务分层。本文是 pokemon agent runtime 系列最后一篇,负责回答:一套 LangGraph 系统如何从“能跑”走到“可解释、可控、可维护”。

很多人第一次做 LangGraph,会在“跑通流程图”后停下来,觉得任务已经完成。
但真正决定系统质量的,往往不是 StateGraph(...) 这行代码,而是下面这些工程问题:

  • 工具路由到底谁说了算,规则和 LLM 怎么分工?
  • 运行时状态怎么传,怎么持久化,怎么流式输出?
  • 结构化 JSON 输出怎么保证,不会一会儿是文本一会儿是半截 JSON?
  • memory 是单层还是分层,短期/长期/缓存怎么协同?
  • 上下文怎么拼接,既不丢证据,也不把上下文窗口撑爆?

pokemon agent 的价值就在于,它把这些问题都做成了可读的工程实现,而不是停留在抽象概念。

TL;DR

问题在项目中的工程答案
tools 路由怎么做?supervisor 先走确定性规则(意图分类、rule route、parallel rule),最后才用 bind_tools(..., tool_choice="required") 兜底。
runtime 怎么落地?workflow.compile(checkpointer=..., store=...) + thread_id 配置 + astream_events 事件流,前后端通过 NDJSON 协议对接。
JSON 输出怎么保证?在需要结构化输出的链路上使用 with_structured_output(PydanticModel),并提供异常 fallback,避免 brittle JSON parsing。
memory 怎么设计?AgentState 消息态 + checkpointer 线程态 + semantic cache 结果复用 + 可选长期记忆 middleware,形成分层记忆。
上下文工程怎么做?Retriever 统一聚合 KB/Graph/Web/MCP 证据,worker 内做 query decomposition/HyDE/并行召回,finalizer 做“只改写不扩写”的强约束收口。

一张图先看全局

FrontenduseChat.ts/chat/agentchat_router.pySupervisor Runtimesupervisor_agent.pythread_id + astream_eventscheckpointer + storeWorkflowworkflow.py + state.pysupervisor / workers / finalizerconditional edges + Send()Tools Routingintent fast-pathrule_route / parallel_rulebind_tools(tool_choice=required)allowed_workers constraintsforward_directly guardrailStructured Outputwith_structured_output(...)GuardrailDecision / RouteResponseAgentResponse JSON modefallback to ErrorResponse/textNDJSON != semantic schemaMemory + ContextAgentState / checkpointerSemanticCache namespace isolationRetriever multi-source aggregationRagWorker: decompose + HyDE + parallelfinalizer: constrained rewrite

这张图对应到你的项目代码,可以直接对齐到:

  • 前端入口:web/src/composables/useChat.ts
  • 接口分发:server/routers/chat_router.py
  • Agent 运行时:src/agents/supervisor_agent.py
  • 图与状态:src/graph/workflow.pysrc/graph/state.py
  • 路由节点:src/graph/nodes/supervisor.py
  • 收口节点:src/graph/nodes/finalizer.py
  • 检索与缓存:src/knowledge/core/retriever.pysrc/knowledge/cache/cache.py

模块一:Tools 路由(Supervisor 决策栈)

很多教程会直接上 ReAct,把工具都丢给模型,然后期待模型自动做最优路由。
这在 Demo 阶段可行,但在工程系统里通常会出现两个问题:

  1. 成本不稳定(每次都先调 LLM 决定路由)
  2. 路由可控性差(简单问题也被过度 agent 化)

pokemon agentsrc/graph/nodes/supervisor.py 里用了一个更工程化的分层决策:

# 1) 高置信意图:确定性快路由
decision = classify_intent(last_human or "")
if decision.intent == Intent.POKEDEX_FACTS and decision.confidence >= 0.8:
    if "stats_worker" in allowed:
        return {"next": "stats_worker", "forward_directly": True}
 
# 2) 规则并行路由
parallel_workers = rule_route_parallel(last_human or "", allowed)
if len(parallel_workers) > 1:
    return {
        "next": "__PARALLEL__",
        "parallel_workers": parallel_workers,
        "parallel_count": len(parallel_workers),
        "parallel_done": 0,
        "forward_directly": True,
    }
 
# 3) 规则单路由
route = rule_route(last_human or "", allowed)
if route:
    return {"next": route, "forward_directly": True}
 
# 4) 最后才交给 LLM tool-calling
handoff_tools = _build_handoff_tools(allowed, support_parallel=True)
llm_with_tools = self.llm.bind_tools(handoff_tools, tool_choice="required")

路由栈图

输入:last_human + allowed_workers + state flagsLayer 1classify_intent 快路由Layer 2rule_route_parallelLayer 3rule_route 单路由Layer 4LLM bind_toolsSingle workernext = stats/rag/graph/web/mcpParallel workersnext=PARALLEL + Send()Finishfinalizer / END

这个顺序非常关键:能规则化的先规则化,把 LLM 路由留给真正不确定的场景
它不是降低智能,而是把智能放在最该用的地方。

模块二:Runtime(图执行 + 事件流 + 线程态)

很多人把 LangGraph 当“流程控制库”,但在工程里它更像一个 runtime:

  • 接收输入状态
  • 按图调度节点
  • 保存会话线程状态
  • 产生流式执行事件

src/agents/supervisor_agent.py 里入口非常典型:

input_state: dict[str, Any] = {"messages": [HumanMessage(content=query)]}
if allowed_workers is not None:
    input_state["allowed_workers"] = allowed_workers
if isinstance(db_id, str) and db_id.strip():
    input_state["db_id"] = db_id.strip()
 
config = {"configurable": {"thread_id": meta.get("thread_id", "default")}}
 
async for event in self.graph.astream_events(input_state, config, version="v1"):
    ...

这里至少说明了三层 runtime 设计:

  1. 输入态建模:不是只传 query,而是传 messages + constraints + db_id 的执行态。
  2. 线程态隔离:通过 thread_id 把状态绑定到会话线程。
  3. 事件态输出:通过 astream_events 提供 token、tool start/end、节点输出等事件。

在图编译层,SupervisorAgent._build_graph() 会注入 checkpointer 与 store:

return workflow.compile(
    checkpointer=self._checkpointer,
    store=_get_memory_store(),
)

workflow.py 里则明确了图结构与路由函数:

workflow.add_conditional_edges(worker_name, _post_worker_route, _worker_route_map)
workflow.add_conditional_edges("supervisor", _supervisor_route, {...})
workflow.add_edge("finalizer", END)
workflow.set_entry_point("supervisor")

这部分和前端的 web/src/composables/useChat.ts 正好闭环:前端把 thread_idagent_constraints 发给后端,后端把执行结果按 NDJSON chunk 流回前端。

模块三:JSON 输出保证(结构化语义层 vs 传输层)

很多项目会写一句 prompt:请严格输出 JSON
这在生产里非常脆,因为任何轻微偏差都会导致解析失败。

pokemon agent 里,可以看到更可靠的做法:用 schema 驱动 structured output

例如 src/agents/chat_agent.py

class GuardrailDecision(BaseModel):
    status: str
    reason: str | None = None
 
chain = guardrail_prompt | self.base_llm.with_structured_output(GuardrailDecision)

路由决策也用约束 schema:

RouteResponse = _route_schema(options)
chain = prompt | self.base_llm.with_structured_output(RouteResponse)

业务输出模式为 json 时:

structured_base_llm = self.base_llm.with_structured_output(AgentResponse)
model_response = wrapped_structured_invoke(messages)

异常路径会 fallback(ErrorResponse 或 text),避免“全链路因为一次 parse error 崩掉”。

关键区分:NDJSON 不是结构化语义输出

server/routers/chat_router.py 与前端 readNdjsonStream(...)application/x-ndjson,它解决的是传输层流式协议
with_structured_output(...) 解决的是语义层 schema 保证

两层都做,系统才稳:

  • NDJSON:每一行是可增量消费 chunk
  • structured output:字段满足 schema,减少后处理脆弱性

模块四:Memory 分层(状态记忆、线程记忆、缓存记忆、长期记忆)

在这个项目里,memory 不是单点功能,而是分层设计。不同层解决的问题不同,生命周期也不同。

1. 图内短期状态:AgentState

src/graph/state.pyAgentState 负责一次 LangGraph 执行过程中的共享状态。

class AgentState(TypedDict, total=False):
    messages: Required[Annotated[Sequence[BaseMessage], operator.add]]
    next: Required[str]
    allowed_workers: NotRequired[list[str]]
    db_id: NotRequired[str]
    forward_directly: NotRequired[bool]
    parallel_workers: NotRequired[list[str]]
    parallel_count: NotRequired[int]
    parallel_done: NotRequired[int]

这里的 messages 不只是聊天历史,也会承载 worker 产生的中间结果;next 决定下一步走向哪个节点;allowed_workers 把前端传来的能力约束带进图里;parallel_* 字段则用于记录并行派发的 worker 数量和完成状态。

这一层 memory 的作用范围最短:只服务于一次图执行。它解决的是“当前这次任务走到哪里了、下一步该去哪”的问题。

2. 会话线程状态:checkpointer + thread_id

SupervisorAgent 在编译 workflow 时可以挂载 checkpointer,并在执行时传入 thread_id

config = {"configurable": {"thread_id": meta.get("thread_id", "default")}}
 
async for event in self.graph.astream_events(input_state, config, version="v1"):
    ...

thread_id 的意义是给 LangGraph 一个会话线程标识。只要后续请求继续使用同一个 thread_id,checkpointer 就能把这条线程上的状态连续起来。

工程上通常会有两种选择:

checkpointer = MemorySaver()
# or
checkpointer = SqliteSaver.from_conn_string("checkpoints.sqlite")
 
workflow.compile(checkpointer=checkpointer, store=store)

MemorySaver 更适合本地开发或临时会话;SqliteSaver 更适合需要进程重启后仍能恢复的场景。这一层解决的是:同一个会话线程中,跨请求如何保持连续性

3. 语义缓存:Semantic Cache

src/knowledge/cache/cache.py 的语义缓存不是保存“用户记忆”,而是复用相似问题的回答。

典型流程可以理解成:

query_embedding = embedding_model.embed_query(query)
cache_hit = cache.search(
    query_embedding,
    model_provider=model_provider,
    model_name=model_name,
    system_prompt_sha=system_prompt_sha,
)
 
if cache_hit and cache_hit.score >= threshold:
    return cache_hit.answer

这里最关键的不是 embedding 相似度本身,而是命名空间隔离:

namespace = f"{model_provider}:{model_name}:{system_prompt_sha}"

如果模型、provider 或 system prompt 变了,即使用户问题很像,也不应该直接复用旧答案。否则就可能出现“相似问题命中缓存,但回答风格或约束来自另一套配置”的错配问题。

所以这一层 memory 的本质是:跨会话复用计算结果,但必须防止跨模型、跨 prompt 的错误复用

4. 长期记忆:可选 middleware

src/agents/chat_agent.py 中的 LongTermMemoryMiddleware 是可选能力,通常按 feature flag 注入。

它和 checkpointer 不同:checkpointer 维护的是会话线程状态,长期记忆维护的是用户偏好、历史事实或可复用背景。

可以把它理解成两个阶段:

# before model
memories = memory_store.search(user_id=user_id, query=query)
messages = inject_memories(messages, memories)
 
# after agent
memory_store.save_async(user_id=user_id, messages=messages, answer=answer)

模型调用前,它检索相关长期记忆并注入上下文;模型调用后,它再异步回写新的可记忆信息。

这一层的关键是“可选”和“异步”:不是每次对话都必须启用长期记忆,也不应该让长期记忆写入阻塞主回答链路。它解决的是:跨会话、跨任务的长期偏好和背景复用

Memory 分层图

同一会话中的“记住”不是一层能力,而是多层协同Layer 1AgentStatemessages / nextallowed_workersparallel_count/donescope: 单次图执行Layer 2CheckpointerMemorySaver / SqliteSaverkey: thread_idscope: 会话线程跨请求连续性Layer 3Semantic Cacheembedding similaritymodel/provider/prompt_shanamespace isolationscope: 跨会话复用Layer 4Long-term MemoryLongTermMemoryMiddlewarebefore_model injectafter_agent async savescope: 用户长期偏好

这四层的价值在于:
不同类型的信息放在不同生命周期里,不让“上下文历史”承担所有记忆职责。

模块五:上下文工程(Retriever + Worker + Finalizer)

一个常见误区是:检索做完就把结果拼到 prompt 尾部。
pokemon agent 的实现更接近可维护工程链:

1) 多源检索聚合器(Retriever)

src/knowledge/core/retriever.pyretrieval(...) 会并行聚合:

  • Knowledge Base
  • Graph
  • Web
  • MCP

当同时启用多个能力时用 ThreadPoolExecutor 并发,降低尾延迟。

2) context 构造器(construct_query)

construct_query(...) 不是盲拼,而是分区块组织:

  • 图谱回答块
  • 知识库证据块
  • Web 结果块
  • MCP 结果块

再套入 knowbase_qa_template,形成可消费增强输入。

3) worker 内上下文优化(RagWorker)

src/graph/nodes/rag_worker.py 里有几层实用策略:

  • 复杂问题先 query decomposition
  • 按复杂度自适应 top_k
  • 条件启用 HyDE(不是默认全开)
  • 多子问题并行召回
  • CRAG 评估检索质量后修正路径

4) finalizer 强约束收口

src/graph/nodes/finalizer.py 的系统提示词明确要求:

  • 只能基于草稿改写,不新增事实
  • 不泄露工具/数据库/图谱/链路等内部细节
  • 信息不足时友好说明并给一个澄清问题

上下文流转图

User Query+ history + metaRetriever.retrievalKB/Graph/Web/MCP 并行construct_query证据分块 + 模板组装RagWorkerdecompose/hyde/cragEvidence Sourcesknowledge_base.resultsgraph_base.answer/resultsweb_search.resultsmysql_mcp.answerclean_kb_text / _web_item_to_textknowbase_qa_templateWorker DraftAIMessage(content=draft)可能来自并行 workersmessages reducer 合并进入 finalizer_nodeFinalizer Output只改写不扩写不泄露内部链路与工具细节失败回退到 draft最终 NDJSON 流返回前端

安全与可控:几条可直接复用的 guardrails

从代码里能提炼出一组很实用的控制策略:

  1. deterministic fast path:高置信简单问题直接走确定性路径(减少无谓 LLM 调度)。
  2. forward_directly:worker 结果可直接收口,避免回 supervisor 二次改写。
  3. allowed_workers:前端通过 agent_constraints 显式约束可用能力边界。
  4. parallel_count/parallel_done:并行任务有显式完成条件,不靠隐式猜测。
  5. safe cache 条件:缓存只在单轮且非检索场景生效,避免上下文污染。

如果把这些策略抽象成一句话,就是:
先做“可控与确定”的部分,再把不确定性交给模型。

给自己的实践清单:如何判断你真的学会了 LangGraph 工程化

你可以用下面这张表自测:

检查项达标标准
图结构能清楚说出入口节点、条件边、终止条件、并行分发点。
路由策略能解释规则路由与 LLM 路由的优先级,以及为什么这么排。
运行时能解释 thread_id、checkpointer、astream_events 各自解决什么问题。
结构化输出能区分“prompt 要 JSON”和“schema 强制输出”的可靠性差异。
memory 分层能区分状态记忆、线程记忆、缓存记忆、长期记忆的职责边界。
上下文工程能解释检索证据是如何组织、压缩、拼接并最终进入生成的。
失败回退能指出至少 3 个 fallback 路径(例如 finalizer fallback、cache miss、vector store 不可用时的本地兜底)。

如果这些点你都能从代码里讲清楚,并且能自己改一条路由、加一个 worker、扩一个 schema,你对 LangGraph 的理解就已经进入工程层了。

如果你已经读到这里,下一步最值得做的不是继续加术语,而是亲手做三件小改动:

  • supervisor 新增一个可控 worker,并补 allowed_workers 约束
  • 给一个节点增加 with_structured_output(...),把脆弱解析改成 schema 约束
  • 给一次检索链加观测点(路由决策、召回条数、最终上下文长度),把“感觉变好”改成“可测量变好”