从自建记忆到 LangGraph Checkpoint
自建记忆的问题
最初设计了一套三层记忆架构:瞬时记忆(LangGraph State)、短期记忆(SQLite 对话历史 + 摘要)、长期记忆(用户画像)。其中"短期记忆"是最复杂的部分——自己实现了消息存储、摘要压缩、上下文窗口管理,写了约 200 行代码。
这套方案能工作,但有几个问题:
-
对话历史与图执行状态分离:消息存在 SQLite 的 conversations 表里,图状态在内存中。两者各自独立,无法利用 LangGraph 的断点恢复能力。
-
摘要逻辑粗糙:每 6 条消息触发一次 LLM 摘要调用,增加延迟和成本。摘要质量取决于 LLM,有时会丢失关键信息。
-
Go 网关的隐藏 Bug:Go 网关转发请求时只传了
message字段,丢弃了前端发来的user_id和session_id。这意味着自建记忆系统的 user_id 永远是 "guest",session_id 永远是 "default"——多用户隔离形同虚设。
Checkpoint 替换
自学还是不成体系,偶然发现了LangGraph 提供了 Checkpoint 机制:在每个节点执行后自动保存完整的图状态,下次同一 thread_id 的请求进来时自动恢复。这正好替代了自建的 conversations + summaries 逻辑。
替换内容
| 自建方案 | Checkpoint 方案 |
|---|---|
conversations 表存消息 |
Checkpoint 自动保存完整 state(含 messages) |
_maybe_summarize() 每6条压缩 |
trim_messages() 每次调用时动态裁剪 |
build_context() 手动拼上下文 |
直接读 state["messages"] |
save_message() 手动写入 |
节点返回时自动持久化 |
什么保留了
用户画像(user_profiles 表)保留为自建方案。原因:Checkpoint 以 thread_id(即 session_id)为单位存储,但用户画像是跨 session 的——用户今天聊成都,明天开新 session,系统仍需知道 last_destination=成都。这需要以 user_id 为单位的存储,Checkpoint 做不到。
实现细节
1. AgentState 增加 messages 字段
from langgraph.graph.message import add_messages
from typing import Annotated
class AgentState(TypedDict):
messages: Annotated[list, add_messages] # 新增
user_input: str
destination: str
# ... 其他字段不变
Annotated[list, add_messages] 是 LangGraph 的消息管理机制。add_messages 是一个 reducer(归约器),含义是:当节点返回 {"messages": [new_msg]} 时,追加到现有列表末尾,而不是覆盖。这样 Checkpoint 能完整保存所有对话历史。
2. 创建 Checkpoint 存储
from langgraph.checkpoint.sqlite import SqliteSaver
CHECKPOINT_DB = os.path.join(os.path.dirname(__file__), "..", "data", "checkpoints.db")
checkpointer = SqliteSaver.from_conn_string(CHECKPOINT_DB)
SqliteSaver 把 Checkpoint 数据存在 SQLite 文件中。与用户画像的 memory.db 分开,职责清晰。
3. 图编译时绑定 Checkpoint
# 之前:每次请求都重新编译
def build_graph():
graph = StateGraph(AgentState)
# ... 添加节点和边 ...
return graph.compile()
# 现在:编译一次,全局复用
def build_graph(checkpointer=None):
graph = StateGraph(AgentState)
# ... 添加节点和边(不变) ...
return graph.compile(checkpointer=checkpointer)
graph = build_graph(checkpointer=checkpointer) # 模块级初始化
4. 请求时传入 thread_id
config = {"configurable": {"thread_id": sid}}
final = graph.invoke(
{"user_input": user_message, "messages": [], ...},
config=config,
)
thread_id 是 Checkpoint 的会话标识。同一个 thread_id 的请求共享状态。传入 messages: [] 是因为 add_messages reducer 会追加——Checkpoint 自动加载历史,新消息追加到末尾。
5. Supervisor 用 trim_messages 替代 build_context
from langchain_core.messages import trim_messages
messages = state.get("messages", [])
trimmer = trim_messages(
max_tokens=4000,
strategy="last",
token_counter=llm,
include_system=True,
start_on="human",
)
trimmed = trimmer.invoke(messages)
resp = llm.invoke([SystemMessage(content=prompt)] + trimmed + [HumanMessage(content=user_input)])
trim_messages 的参数:
max_tokens=4000:保留最多 4000 token(给系统提示和当前消息留空间)strategy="last":保留最新的消息,丢弃最旧的token_counter=llm:用 LLM 的 tokenizer 计算 token 数include_system=True:保留 SystemMessagestart_on="human":裁剪后第一条必须是 HumanMessage(避免格式错误)
关键区别:自建方案在存储时压缩(删旧消息 + 存摘要),Checkpoint 方案在读取时裁剪(全量存储 + 动态截断)。后者更简单,且保留了完整历史——需要时可以回溯。
架构对比
改造前
用户消息 → Go网关(丢失user_id/session_id) → Python后端
│
├─ save_message(user) ← 写SQLite conversations表
├─ build_context() ← 读SQLite(画像+摘要+最近消息)
├─ Supervisor(注入上下文字符串)
├─ Agents(并行)
├─ Aggregator
├─ save_message(assistant) ← 写SQLite conversations表
└─ _maybe_summarize() ← 可能调LLM生成摘要
改造后
用户消息 → Go网关(转发user_id/session_id) → Python后端
│
├─ Checkpoint自动加载历史state(含messages)
├─ trim_messages()动态裁剪
├─ Supervisor(读裁剪后的messages)
├─ Agents(并行)
├─ Aggregator
├─ Checkpoint自动保存state(含messages)
└─ update_user_profile() ← 写SQLite user_profiles表
收益
| 维度 | 改造前 | 改造后 |
|---|---|---|
| 记忆代码量 | ~200行(memory.py) | ~120行(memory.py) |
| 消息存储 | 手动 save_message() | Checkpoint 自动 |
| 摘要机制 | 每6条调LLM压缩 | trim_messages 动态裁剪 |
| 断点恢复 | 不支持 | 支持(同一thread_id恢复state) |
| 多用户隔离 | 形同虚设(Go网关丢字段) | 正常工作(已修复) |
| LLM调用次数 | 可能多一次(摘要) | 无额外调用 |
遗留问题
-
Checkpoint 数据库会增长:每次图执行产生多个 checkpoint 条目(每个节点一个)。长期运行需考虑清理旧数据。
-
trim_messages 的 token 计数:DeepSeek 的 tokenizer 与 OpenAI 不完全一致,
token_counter=llm可能有偏差。暂时使用保守的max_tokens=4000留出余量。 -
图编译一次 vs 每次编译:Checkpoint 要求图是全局单例(共享 checkpointer)。
thread_id能不能保证不同会话隔离。
导航
← 上一篇:从单 Agent 到多 Agent:一次真实的架构改造 → 下一篇:多 Agent 状态同步、Checkpoint 记忆与重试机制