从自建记忆到 LangGraph Checkpoint

自建记忆的问题

最初设计了一套三层记忆架构:瞬时记忆(LangGraph State)、短期记忆(SQLite 对话历史 + 摘要)、长期记忆(用户画像)。其中"短期记忆"是最复杂的部分——自己实现了消息存储、摘要压缩、上下文窗口管理,写了约 200 行代码。

这套方案能工作,但有几个问题:

  1. 对话历史与图执行状态分离:消息存在 SQLite 的 conversations 表里,图状态在内存中。两者各自独立,无法利用 LangGraph 的断点恢复能力。

  2. 摘要逻辑粗糙:每 6 条消息触发一次 LLM 摘要调用,增加延迟和成本。摘要质量取决于 LLM,有时会丢失关键信息。

  3. Go 网关的隐藏 Bug:Go 网关转发请求时只传了 message 字段,丢弃了前端发来的 user_idsession_id。这意味着自建记忆系统的 user_id 永远是 "guest",session_id 永远是 "default"——多用户隔离形同虚设。

Checkpoint 替换

自学还是不成体系,偶然发现了LangGraph 提供了 Checkpoint 机制:在每个节点执行后自动保存完整的图状态,下次同一 thread_id 的请求进来时自动恢复。这正好替代了自建的 conversations + summaries 逻辑。

替换内容

自建方案 Checkpoint 方案
conversations 表存消息 Checkpoint 自动保存完整 state(含 messages)
_maybe_summarize() 每6条压缩 trim_messages() 每次调用时动态裁剪
build_context() 手动拼上下文 直接读 state["messages"]
save_message() 手动写入 节点返回时自动持久化

什么保留了

用户画像(user_profiles 表)保留为自建方案。原因:Checkpoint 以 thread_id(即 session_id)为单位存储,但用户画像是跨 session 的——用户今天聊成都,明天开新 session,系统仍需知道 last_destination=成都。这需要以 user_id 为单位的存储,Checkpoint 做不到。

实现细节

1. AgentState 增加 messages 字段

from langgraph.graph.message import add_messages
from typing import Annotated

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]  # 新增
    user_input: str
    destination: str
    # ... 其他字段不变

Annotated[list, add_messages] 是 LangGraph 的消息管理机制。add_messages 是一个 reducer(归约器),含义是:当节点返回 {"messages": [new_msg]} 时,追加到现有列表末尾,而不是覆盖。这样 Checkpoint 能完整保存所有对话历史。

2. 创建 Checkpoint 存储

from langgraph.checkpoint.sqlite import SqliteSaver

CHECKPOINT_DB = os.path.join(os.path.dirname(__file__), "..", "data", "checkpoints.db")
checkpointer = SqliteSaver.from_conn_string(CHECKPOINT_DB)

SqliteSaver 把 Checkpoint 数据存在 SQLite 文件中。与用户画像的 memory.db 分开,职责清晰。

3. 图编译时绑定 Checkpoint

# 之前:每次请求都重新编译
def build_graph():
    graph = StateGraph(AgentState)
    # ... 添加节点和边 ...
    return graph.compile()

# 现在:编译一次,全局复用
def build_graph(checkpointer=None):
    graph = StateGraph(AgentState)
    # ... 添加节点和边(不变) ...
    return graph.compile(checkpointer=checkpointer)

graph = build_graph(checkpointer=checkpointer)  # 模块级初始化

4. 请求时传入 thread_id

config = {"configurable": {"thread_id": sid}}
final = graph.invoke(
    {"user_input": user_message, "messages": [], ...},
    config=config,
)

thread_id 是 Checkpoint 的会话标识。同一个 thread_id 的请求共享状态。传入 messages: [] 是因为 add_messages reducer 会追加——Checkpoint 自动加载历史,新消息追加到末尾。

5. Supervisor 用 trim_messages 替代 build_context

from langchain_core.messages import trim_messages

messages = state.get("messages", [])
trimmer = trim_messages(
    max_tokens=4000,
    strategy="last",
    token_counter=llm,
    include_system=True,
    start_on="human",
)
trimmed = trimmer.invoke(messages)

resp = llm.invoke([SystemMessage(content=prompt)] + trimmed + [HumanMessage(content=user_input)])

trim_messages 的参数:

  • max_tokens=4000:保留最多 4000 token(给系统提示和当前消息留空间)
  • strategy="last":保留最新的消息,丢弃最旧的
  • token_counter=llm:用 LLM 的 tokenizer 计算 token 数
  • include_system=True:保留 SystemMessage
  • start_on="human":裁剪后第一条必须是 HumanMessage(避免格式错误)

关键区别:自建方案在存储时压缩(删旧消息 + 存摘要),Checkpoint 方案在读取时裁剪(全量存储 + 动态截断)。后者更简单,且保留了完整历史——需要时可以回溯。

架构对比

改造前

用户消息 → Go网关(丢失user_id/session_id) → Python后端
    │
    ├─ save_message(user) ← 写SQLite conversations表
    ├─ build_context() ← 读SQLite(画像+摘要+最近消息)
    ├─ Supervisor(注入上下文字符串)
    ├─ Agents(并行)
    ├─ Aggregator
    ├─ save_message(assistant) ← 写SQLite conversations表
    └─ _maybe_summarize() ← 可能调LLM生成摘要

改造后

用户消息 → Go网关(转发user_id/session_id) → Python后端
    │
    ├─ Checkpoint自动加载历史state(含messages)
    ├─ trim_messages()动态裁剪
    ├─ Supervisor(读裁剪后的messages)
    ├─ Agents(并行)
    ├─ Aggregator
    ├─ Checkpoint自动保存state(含messages)
    └─ update_user_profile() ← 写SQLite user_profiles表

收益

维度 改造前 改造后
记忆代码量 ~200行(memory.py) ~120行(memory.py)
消息存储 手动 save_message() Checkpoint 自动
摘要机制 每6条调LLM压缩 trim_messages 动态裁剪
断点恢复 不支持 支持(同一thread_id恢复state)
多用户隔离 形同虚设(Go网关丢字段) 正常工作(已修复)
LLM调用次数 可能多一次(摘要) 无额外调用

遗留问题

  1. Checkpoint 数据库会增长:每次图执行产生多个 checkpoint 条目(每个节点一个)。长期运行需考虑清理旧数据。

  2. trim_messages 的 token 计数:DeepSeek 的 tokenizer 与 OpenAI 不完全一致,token_counter=llm 可能有偏差。暂时使用保守的 max_tokens=4000 留出余量。

  3. 图编译一次 vs 每次编译:Checkpoint 要求图是全局单例(共享 checkpointer)。thread_id 能不能保证不同会话隔离。

导航

← 上一篇:从单 Agent 到多 Agent:一次真实的架构改造 → 下一篇:多 Agent 状态同步、Checkpoint 记忆与重试机制