多 Agent 摘要机制: langmem.summarize_messages
一、问题:为什么需要摘要?
多 Agent 系统的一个核心挑战:对话历史无限增长,但 LLM 上下文窗口有限。
请求1: 用户: "帮我规划成都3天" → messages: 2条
请求2: 用户: "酒店换便宜的" → messages: 4条
请求3: 用户: "再加个都江堰" → messages: 6条
...
请求N: 用户: "第二天改徒步" → messages: 40条,token > 8000
当对话积累到几十轮后,原始消息的 token 数会超出 DeepSeek 的上下文限制。
解决方案的核心原则:
Checkpoint 存完整历史(永不丢失)
+
发给 LLM 前压缩旧消息为摘要
=
既保留完整上下文,又不超出 token 限制
二、官方
2.1 第一版:手动实现 SummarizationMiddleware
langmem 早期版本提供 SummarizationMiddleware,但只支持 create_react_agent 架构——那种"单 Agent + 工具调用循环"的模式。我们的系统是自定义 StateGraph + Send() 并行多 Agent 架构,无法直接使用。
第一版手动实现了一个 100 行的 summarization_middleware 函数,核心逻辑:
def summarization_middleware(messages, existing_summary, agent):
# 1. 用 trim_messages 检查 token 数
check_trimmer = trim_messages(max_tokens=4000, strategy="last", ...)
checked = check_trimmer.invoke(messages)
# 2. 没超限 → 不处理
if len(checked) >= len(messages):
return messages, existing_summary
# 3. 超限 → 分离旧消息和最近消息
recent = messages[-6:]
old = messages[:-6]
# 4. 调 LLM 生成摘要
new_summary = agent.invoke([HumanMessage(content=summary_prompt)])
# 5. 返回 [SystemMessage(摘要)] + 最近消息
return [SystemMessage(content=summary)] + recent, new_summary
痛点:
| 问题 | 详情 |
|---|---|
| token 计数不准 | trim_messages 基于 OpenAI tokenizer,与 DeepSeek tokenizer 有差异 |
| 摘要不追踪进度 | 每次都用"旧摘要 + 全部旧消息"生成新摘要,已摘要的消息被重复处理 |
| 手动维护 | 100 行逻辑需自行维护边界情况(空消息、摘要失败兜底等) |
| 无类型安全 | tuple[list, str] 返回值,调用方需记住顺序 |
2.2 第二版(当前):接入 langmem.summarize_messages
langmem 0.0.30 重构了 API,废弃 SummarizationMiddleware,改为两个正交组件:
summarize_messages():纯函数,输入消息 + 摘要状态,输出处理后的消息 + 更新后的状态RunningSummary:dataclass,跟踪已摘要的消息 ID,实现真正的增量摘要
改造后的调用:
from langmem.short_term import summarize_messages, RunningSummary
# 从 checkpoint 恢复摘要状态
running_summary = _deserialize_running_summary(state.get("running_summary"))
# 调用官方 API(一行搞定,替代原来 100 行)
result = summarize_messages(
messages=messages,
running_summary=running_summary,
model=supervisor_agent,
max_tokens=4000, # 输出消息总 token 上限
max_tokens_before_summary=4000, # 触发摘要的阈值
max_summary_tokens=256, # 摘要本身最大 token 数
)
processed_messages = result.messages # 可直接发给 LLM
updated_summary = result.running_summary # 持久化到 checkpoint
对比:
手动版(旧):
summarization_middleware(messages, existing_summary, agent) → (list, str)
├─ 自己检查 token
├─ 自己分离旧/新消息
├─ 自己拼摘要 prompt
├─ 自己调 LLM
└─ 返回 tuple,约定俗成
langmem 版(新):
summarize_messages(messages, running_summary, model, ...) → SummarizationResult
├─ 内置 token 计数(支持自定义 counter)
├─ 内置消息分离逻辑
├─ 内置摘要 prompt 模板(可自定义)
├─ 自动调 LLM
└─ 返回具名 dataclass(.messages / .running_summary)
三、RunningSummary:增量摘要的核心
这是新方案相比手动的最大提升。
3.1 数据结构
@dataclass
class RunningSummary:
summary: str # 最新摘要文本
summarized_message_ids: set[str] # 已摘要的消息 ID 集合
last_summarized_message_id: str | None # 最后被摘要的消息 ID
3.2 为什么需要追踪消息 ID?
手动版的问题:每次触发摘要时,把"旧摘要 + 全部旧消息"发给 LLM 重新摘要。如果 100 条消息中有 90 条已经被摘要过,这 90 条会被重复处理——浪费 token 且摘要质量下降(LLM 对重复内容敏感)。
RunningSummary 的做法:
请求1 (messages: msg1..msg10, 不超限):
summarize_messages → result.running_summary = None(没触发)
请求5 (messages: msg1..msg22, 超限!):
summarize_messages 内部:
1. 检查 summarized_message_ids
2. 发现 msg1..msg22 都未被摘要
3. 把 msg1..msg18 压缩为摘要 S1
4. 返回 messages = [SystemMessage(S1), msg19..msg22]
5. 返回 running_summary = RunningSummary(
summary="用户计划成都3天游...",
summarized_message_ids={"msg1",...,"msg18"},
last_summarized_message_id="msg18"
)
请求8 (messages: msg1..msg30, 再次超限!):
summarize_messages 内部:
1. 检查 summarized_message_ids = {"msg1",...,"msg18"}
2. 发现只有 msg19..msg26 是新的(msg1..msg18 已摘要过)
3. 基于旧摘要 S1 + 新消息 msg19..msg26 → 新摘要 S2
4. 返回 running_summary = RunningSummary(
summary="用户计划成都3天游,已规划路线,加过都江堰...",
summarized_message_ids={"msg1",...,"msg26"},
last_summarized_message_id="msg26"
)
关键点:已摘要的消息不会被重复发给 LLM。只处理增量。
3.3 状态持久化
RunningSummary 通过 checkpoint 在请求间持久化:
# AgentState 中的定义
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
running_summary: dict # RunningSummary 序列化为 dict 存储
# ...
# 每次 supervisor_node 返回时,checkpoint 自动保存
return {
"messages": [AIMessage(content=reply)],
"running_summary": _serialize_running_summary(updated_running_summary),
}
序列化/反序列化辅助函数(set 类型不直接支持 JSON):
def _serialize_running_summary(rs: RunningSummary) -> dict:
return {
"summary": rs.summary,
"summarized_message_ids": list(rs.summarized_message_ids), # set → list
"last_summarized_message_id": rs.last_summarized_message_id,
}
def _deserialize_running_summary(data: dict | None) -> RunningSummary | None:
if not data:
return None
return RunningSummary(
summary=data["summary"],
summarized_message_ids=set(data["summarized_message_ids"]), # list → set
last_summarized_message_id=data["last_summarized_message_id"],
)
四、在 supervisor_node 中的集成
4.1 为什么只在 supervisor 做摘要?
supervisor_node: 需要对话上下文("上次说的预算不够,换便宜的")
└─ 使用 summarize_messages ✓
route_agent_node: 只关心 destination + days
└─ 不需要摘要 ✗
hotel_agent_node: 只关心 destination + budget
└─ 不需要摘要 ✗
food_agent_node: 只关心 destination + days
└─ 不需要摘要 ✗
info_agent_node: 只关心 destination
└─ 不需要摘要 ✗
子 Agent 都是"单次任务"——给定参数,返回结果。只有 supervisor 需要跨请求的对话上下文。
4.2 完整调用链
def supervisor_node(state: AgentState) -> dict:
messages = state.get("messages", []) # checkpoint 自动加载完整历史
# 1. 恢复摘要状态
running_summary = _deserialize_running_summary(
state.get("running_summary")
)
# 2. 调用 langmem 官方 API 进行摘要(可能不触发)
try:
result = summarize_messages(
messages=messages,
running_summary=running_summary,
model=supervisor_agent,
max_tokens=4000,
max_tokens_before_summary=4000,
max_summary_tokens=256,
)
processed_messages = result.messages
updated_running_summary = result.running_summary
except Exception:
# 摘要失败不阻塞主流程,退化为全量消息
processed_messages = messages
updated_running_summary = running_summary
# 3. 序列化供 checkpoint 持久化
new_running_summary = (
_serialize_running_summary(updated_running_summary)
if updated_running_summary
else state.get("running_summary")
)
# 4. 构建提示词 + 调用 LLM
prompt = "你是TravelAgent-5旅行助手..."
resp = supervisor_agent.invoke(
[SystemMessage(content=prompt)]
+ processed_messages # ← 摘要后的消息(可能含 SystemMessage(摘要))
+ [HumanMessage(content=state["user_input"])]
)
# 5. 返回结果,摘要状态随 checkpoint 持久化
return {
"messages": [AIMessage(content=reply)],
"running_summary": new_running_summary, # ← 下次请求自动恢复
}
4.3 异常处理策略
summarize_messages 失败(网络波动 / LLM 暂时不可用)
→ 不阻塞主流程
→ 退化为使用全量消息(processed_messages = messages)
→ RunningSummary 保持旧值(丢失本次增量机会,但下次会重试)
supervisor_agent.invoke 失败
→ 默认走 ASK 追问流程
→ 给用户友好提示而非报错
设计原则:摘要功能是优化手段而非核心依赖。它失败了,系统仍然可用,只是 token 消耗增大。
五、架构适配:为什么不用 create_react_agent
5.1 官方推荐的用法
langmem 官方文档推荐的用法是将 SummarizationNode 或 summarize_messages 配合 create_react_agent 使用:
# 官方示例(单 Agent 架构)
agent = create_react_agent(
model, tools=[...],
# 新版 langgraph 可能支持 middleware 参数
)
5.2 我们的架构差异
官方 create_react_agent 的执行模型:
┌──────────────────────────────────┐
│ Agent Loop │
│ ┌──────┐ ┌──────┐ ┌─────┐ │
│ │model │ → │tools │ → │loop │ │
│ └──────┘ └──────┘ └─────┘ │
│ ↑ 统一的 model 调用入口 │
└──────────────────────────────────┘
我们的多 Agent StateGraph:
supervisor → Send(route) ─┐
Send(hotel) ──┤→ aggregator → END
Send(food) ──┤
Send(info) ──┘
↑ 每个节点是独立函数,Send() 实现并行分支
summarize_messages 是纯函数,可以在任意节点中调用——这正是自定义 StateGraph 所需要的。它不需要"统一的 model 调用入口",比 middleware 模式更灵活。
5.3 为什么这是更好的选择
| 维度 | Middleware 模式 | summarize_messages 模式 |
|---|---|---|
| 适用范围 | 仅 create_react_agent | 任意 StateGraph 节点 |
| 控制粒度 | 自动拦截所有 model 调用 | 在需要的节点手动调用 |
| 多 Agent 支持 | 单 Agent | 多 Agent(选择性地在 supervisor 做摘要) |
| 异常处理 | middleware 内部处理 | 调用方控制(可降级) |
| 可测试性 | 需 mock middleware 链 | 纯函数,直接测试 |
六、摘要 prompt 的内部机制
summarize_messages 内部使用三段 prompt 策略:
6.1 initial_summary_prompt(首次摘要)
# langmem 内置默认值
"Create a summary of the conversation above:"
当 running_summary 为 None 时使用。LLM 只看到待摘要的消息。
6.2 existing_summary_prompt(增量摘要)
# langmem 内置默认值
"This is summary of the conversation so far: {existing_summary}\n\n"
"Extend this summary by taking into account the new messages above:"
当 running_summary 已存在时使用。LLM 看到旧摘要 + 新消息,只需要"扩展"摘要而非重新生成。
6.3 final_prompt(注入到输出消息)
# langmem 内置默认值
"Summary of the conversation so far: {summary}"
生成的摘要通过 SystemMessage 注入到输出消息列表的最前面,供后续 LLM 调用使用。
6.4 自定义 prompt
所有 prompt 参数都支持自定义。如果默认的英文 prompt 不适合中文场景:
from langchain_core.prompts import ChatPromptTemplate
custom_initial_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个对话摘要助手。"),
("placeholder", "{messages}"),
("human", "将以上对话压缩为简洁摘要,保留目的地、预算、偏好等关键信息,用中文。"),
])
result = summarize_messages(
...,
initial_summary_prompt=custom_initial_prompt,
)
七、token 计数策略
7.1 默认计数器
summarize_messages 默认使用 count_tokens_approximately:
# 约 4 字符 ≈ 1 token(适用于英文)
# 对中文不太精确,但作为触发摘要的阈值足够
7.2 DeepSeek 场景的考虑
DeepSeek 使用自己的 tokenizer,与 OpenAI 的 tiktoken 计数有差异(中文场景差异更明显)。这导致:
- 阈值触发可能偏早或偏晚(但不会超过真实上下文限制,因为我们设了保守的
max_tokens=4000) - 摘要质量不受影响(摘要是语义压缩,不依赖精确 token 计数)
如果精度要求高,可以传入自定义 token_counter:
def deepseek_counter(messages) -> int:
"""使用 DeepSeek tokenizer 精确计数"""
# ...
result = summarize_messages(
...,
token_counter=deepseek_counter,
)
八、对比总结
8.1 代码量变化
手动版:SUMMARY_MAX_TOKENS + SUMMARY_KEEP_RECENT + SUMMARY_MAX_LENGTH
+ summarization_middleware() ~100行
+ trim_messages 检查逻辑
langmem 版:SUMMARY_MAX_TOKENS + SUMMARY_MAX_SUMMARY_TOKENS
+ _serialize/deserialize 辅助 ~20行
+ summarize_messages() 调用 ~10行
8.2 能力对比
| 维度 | 手动版 | langmem 版 |
|---|---|---|
| 摘要触发 | trim_messages 裁剪检测 | summarize_messages 内置 token 计数 |
| 摘要生成 | 手写 prompt + agent.invoke() | 官方三段 prompt 模板 |
| 增量摘要 | 旧摘要 + 全部旧消息(重复处理) | RunningSummary 追踪 ID(精准增量) |
| 状态管理 | 手写 summary 字符串拼接 | RunningSummary dataclass |
| 类型安全 | tuple[list, str] | SummarizationResult (具名属性) |
| 异常处理 | 手写 try/except | 调用方控制(降级灵活) |
| 维护成本 | 100 行业务逻辑 | 官方库维护 |
8.3 适用场景
summarize_messages(当前方案):
✓ 自定义 StateGraph + 多 Agent 并行
✓ 需要在特定节点(supervisor)而非全局做摘要
✓ 需要控制异常降级策略
✓ 想要官方维护的摘要质量
create_react_agent + middleware(未来方案):
✓ 单 Agent + 工具调用
✓ 全局自动拦截
✓ 零代码侵入
✗ 不支持自定义 StateGraph + Send() 并行
九、核心要点
- Checkpoint 存完整历史,发给 LLM 前压缩——这是所有摘要方案共同的核心原则
summarize_messages替代手动实现:从 100 行手写逻辑缩减为 10 行官方调用,且质量更高RunningSummary实现真正的增量摘要:通过追踪已摘要消息 ID,避免重复处理- 摘要失败不阻塞主流程:退化为全量消息,系统继续可用
- 只在 supervisor 做摘要:子 Agent 是无状态的单次任务,不需要对话上下文
- 纯函数优于 middleware:对于自定义 StateGraph,
summarize_messages比 middleware 模式更灵活、更可控
导航
← 上一篇:Reducer学习:从默认行为到自定义合并