多 Agent 摘要机制: langmem.summarize_messages

一、问题:为什么需要摘要?

多 Agent 系统的一个核心挑战:对话历史无限增长,但 LLM 上下文窗口有限

请求1: 用户: "帮我规划成都3天"       → messages: 2条
请求2: 用户: "酒店换便宜的"           → messages: 4条
请求3: 用户: "再加个都江堰"           → messages: 6条
...
请求N: 用户: "第二天改徒步"           → messages: 40条,token > 8000

当对话积累到几十轮后,原始消息的 token 数会超出 DeepSeek 的上下文限制。

解决方案的核心原则:

Checkpoint 存完整历史(永不丢失)
    +
发给 LLM 前压缩旧消息为摘要
    =
既保留完整上下文,又不超出 token 限制

二、官方

2.1 第一版:手动实现 SummarizationMiddleware

langmem 早期版本提供 SummarizationMiddleware,但只支持 create_react_agent 架构——那种"单 Agent + 工具调用循环"的模式。我们的系统是自定义 StateGraph + Send() 并行多 Agent 架构,无法直接使用。

第一版手动实现了一个 100 行的 summarization_middleware 函数,核心逻辑:

def summarization_middleware(messages, existing_summary, agent):
    # 1. 用 trim_messages 检查 token 数
    check_trimmer = trim_messages(max_tokens=4000, strategy="last", ...)
    checked = check_trimmer.invoke(messages)
    
    # 2. 没超限 → 不处理
    if len(checked) >= len(messages):
        return messages, existing_summary
    
    # 3. 超限 → 分离旧消息和最近消息
    recent = messages[-6:]
    old = messages[:-6]
    
    # 4. 调 LLM 生成摘要
    new_summary = agent.invoke([HumanMessage(content=summary_prompt)])
    
    # 5. 返回 [SystemMessage(摘要)] + 最近消息
    return [SystemMessage(content=summary)] + recent, new_summary

痛点:

问题 详情
token 计数不准 trim_messages 基于 OpenAI tokenizer,与 DeepSeek tokenizer 有差异
摘要不追踪进度 每次都用"旧摘要 + 全部旧消息"生成新摘要,已摘要的消息被重复处理
手动维护 100 行逻辑需自行维护边界情况(空消息、摘要失败兜底等)
无类型安全 tuple[list, str] 返回值,调用方需记住顺序

2.2 第二版(当前):接入 langmem.summarize_messages

langmem 0.0.30 重构了 API,废弃 SummarizationMiddleware,改为两个正交组件:

  • summarize_messages():纯函数,输入消息 + 摘要状态,输出处理后的消息 + 更新后的状态
  • RunningSummary:dataclass,跟踪已摘要的消息 ID,实现真正的增量摘要

改造后的调用:

from langmem.short_term import summarize_messages, RunningSummary

# 从 checkpoint 恢复摘要状态
running_summary = _deserialize_running_summary(state.get("running_summary"))

# 调用官方 API(一行搞定,替代原来 100 行)
result = summarize_messages(
    messages=messages,
    running_summary=running_summary,
    model=supervisor_agent,
    max_tokens=4000,                  # 输出消息总 token 上限
    max_tokens_before_summary=4000,   # 触发摘要的阈值
    max_summary_tokens=256,           # 摘要本身最大 token 数
)

processed_messages = result.messages       # 可直接发给 LLM
updated_summary = result.running_summary   # 持久化到 checkpoint

对比:

手动版(旧):
  summarization_middleware(messages, existing_summary, agent) → (list, str)
  ├─ 自己检查 token
  ├─ 自己分离旧/新消息
  ├─ 自己拼摘要 prompt
  ├─ 自己调 LLM
  └─ 返回 tuple,约定俗成

langmem 版(新):
  summarize_messages(messages, running_summary, model, ...) → SummarizationResult
  ├─ 内置 token 计数(支持自定义 counter)
  ├─ 内置消息分离逻辑
  ├─ 内置摘要 prompt 模板(可自定义)
  ├─ 自动调 LLM
  └─ 返回具名 dataclass(.messages / .running_summary)

三、RunningSummary:增量摘要的核心

这是新方案相比手动的最大提升。

3.1 数据结构

@dataclass
class RunningSummary:
    summary: str                          # 最新摘要文本
    summarized_message_ids: set[str]      # 已摘要的消息 ID 集合
    last_summarized_message_id: str | None  # 最后被摘要的消息 ID

3.2 为什么需要追踪消息 ID?

手动版的问题:每次触发摘要时,把"旧摘要 + 全部旧消息"发给 LLM 重新摘要。如果 100 条消息中有 90 条已经被摘要过,这 90 条会被重复处理——浪费 token 且摘要质量下降(LLM 对重复内容敏感)。

RunningSummary 的做法:

请求1 (messages: msg1..msg10, 不超限):
  summarize_messages → result.running_summary = None(没触发)
  
请求5 (messages: msg1..msg22, 超限!):
  summarize_messages 内部:
    1. 检查 summarized_message_ids
    2. 发现 msg1..msg22 都未被摘要
    3. 把 msg1..msg18 压缩为摘要 S1
    4. 返回 messages = [SystemMessage(S1), msg19..msg22]
    5. 返回 running_summary = RunningSummary(
         summary="用户计划成都3天游...",
         summarized_message_ids={"msg1",...,"msg18"},
         last_summarized_message_id="msg18"
       )

请求8 (messages: msg1..msg30, 再次超限!):
  summarize_messages 内部:
    1. 检查 summarized_message_ids = {"msg1",...,"msg18"}
    2. 发现只有 msg19..msg26 是新的(msg1..msg18 已摘要过)
    3. 基于旧摘要 S1 + 新消息 msg19..msg26 → 新摘要 S2
    4. 返回 running_summary = RunningSummary(
         summary="用户计划成都3天游,已规划路线,加过都江堰...",
         summarized_message_ids={"msg1",...,"msg26"},
         last_summarized_message_id="msg26"
       )

关键点:已摘要的消息不会被重复发给 LLM。只处理增量。

3.3 状态持久化

RunningSummary 通过 checkpoint 在请求间持久化:

# AgentState 中的定义
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    running_summary: dict  # RunningSummary 序列化为 dict 存储
    # ...

# 每次 supervisor_node 返回时,checkpoint 自动保存
return {
    "messages": [AIMessage(content=reply)],
    "running_summary": _serialize_running_summary(updated_running_summary),
}

序列化/反序列化辅助函数(set 类型不直接支持 JSON):

def _serialize_running_summary(rs: RunningSummary) -> dict:
    return {
        "summary": rs.summary,
        "summarized_message_ids": list(rs.summarized_message_ids),  # set → list
        "last_summarized_message_id": rs.last_summarized_message_id,
    }

def _deserialize_running_summary(data: dict | None) -> RunningSummary | None:
    if not data:
        return None
    return RunningSummary(
        summary=data["summary"],
        summarized_message_ids=set(data["summarized_message_ids"]),  # list → set
        last_summarized_message_id=data["last_summarized_message_id"],
    )

四、在 supervisor_node 中的集成

4.1 为什么只在 supervisor 做摘要?

supervisor_node:  需要对话上下文("上次说的预算不够,换便宜的")
  └─ 使用 summarize_messages ✓

route_agent_node:  只关心 destination + days
  └─ 不需要摘要 ✗

hotel_agent_node:  只关心 destination + budget
  └─ 不需要摘要 ✗

food_agent_node:   只关心 destination + days
  └─ 不需要摘要 ✗

info_agent_node:   只关心 destination
  └─ 不需要摘要 ✗

子 Agent 都是"单次任务"——给定参数,返回结果。只有 supervisor 需要跨请求的对话上下文。

4.2 完整调用链

def supervisor_node(state: AgentState) -> dict:
    messages = state.get("messages", [])  # checkpoint 自动加载完整历史

    # 1. 恢复摘要状态
    running_summary = _deserialize_running_summary(
        state.get("running_summary")
    )

    # 2. 调用 langmem 官方 API 进行摘要(可能不触发)
    try:
        result = summarize_messages(
            messages=messages,
            running_summary=running_summary,
            model=supervisor_agent,
            max_tokens=4000,
            max_tokens_before_summary=4000,
            max_summary_tokens=256,
        )
        processed_messages = result.messages
        updated_running_summary = result.running_summary
    except Exception:
        # 摘要失败不阻塞主流程,退化为全量消息
        processed_messages = messages
        updated_running_summary = running_summary

    # 3. 序列化供 checkpoint 持久化
    new_running_summary = (
        _serialize_running_summary(updated_running_summary)
        if updated_running_summary
        else state.get("running_summary")
    )

    # 4. 构建提示词 + 调用 LLM
    prompt = "你是TravelAgent-5旅行助手..."
    resp = supervisor_agent.invoke(
        [SystemMessage(content=prompt)]
        + processed_messages          # ← 摘要后的消息(可能含 SystemMessage(摘要))
        + [HumanMessage(content=state["user_input"])]
    )

    # 5. 返回结果,摘要状态随 checkpoint 持久化
    return {
        "messages": [AIMessage(content=reply)],
        "running_summary": new_running_summary,  # ← 下次请求自动恢复
    }

4.3 异常处理策略

summarize_messages 失败(网络波动 / LLM 暂时不可用)
  → 不阻塞主流程
  → 退化为使用全量消息(processed_messages = messages)
  → RunningSummary 保持旧值(丢失本次增量机会,但下次会重试)

supervisor_agent.invoke 失败
  → 默认走 ASK 追问流程
  → 给用户友好提示而非报错

设计原则:摘要功能是优化手段而非核心依赖。它失败了,系统仍然可用,只是 token 消耗增大。


五、架构适配:为什么不用 create_react_agent

5.1 官方推荐的用法

langmem 官方文档推荐的用法是将 SummarizationNodesummarize_messages 配合 create_react_agent 使用:

# 官方示例(单 Agent 架构)
agent = create_react_agent(
    model, tools=[...],
    # 新版 langgraph 可能支持 middleware 参数
)

5.2 我们的架构差异

官方 create_react_agent 的执行模型:
  ┌──────────────────────────────────┐
  │  Agent Loop                       │
  │  ┌──────┐    ┌──────┐    ┌─────┐ │
  │  │model │ →  │tools │ →  │loop │ │
  │  └──────┘    └──────┘    └─────┘ │
  │  ↑ 统一的 model 调用入口           │
  └──────────────────────────────────┘

我们的多 Agent StateGraph:
  supervisor → Send(route) ─┐
               Send(hotel) ──┤→ aggregator → END
               Send(food)  ──┤
               Send(info)  ──┘
  ↑ 每个节点是独立函数,Send() 实现并行分支

summarize_messages 是纯函数,可以在任意节点中调用——这正是自定义 StateGraph 所需要的。它不需要"统一的 model 调用入口",比 middleware 模式更灵活。

5.3 为什么这是更好的选择

维度 Middleware 模式 summarize_messages 模式
适用范围 仅 create_react_agent 任意 StateGraph 节点
控制粒度 自动拦截所有 model 调用 在需要的节点手动调用
多 Agent 支持 单 Agent 多 Agent(选择性地在 supervisor 做摘要)
异常处理 middleware 内部处理 调用方控制(可降级)
可测试性 需 mock middleware 链 纯函数,直接测试

六、摘要 prompt 的内部机制

summarize_messages 内部使用三段 prompt 策略:

6.1 initial_summary_prompt(首次摘要)

# langmem 内置默认值
"Create a summary of the conversation above:"

running_summary 为 None 时使用。LLM 只看到待摘要的消息。

6.2 existing_summary_prompt(增量摘要)

# langmem 内置默认值
"This is summary of the conversation so far: {existing_summary}\n\n"
"Extend this summary by taking into account the new messages above:"

running_summary 已存在时使用。LLM 看到旧摘要 + 新消息,只需要"扩展"摘要而非重新生成。

6.3 final_prompt(注入到输出消息)

# langmem 内置默认值
"Summary of the conversation so far: {summary}"

生成的摘要通过 SystemMessage 注入到输出消息列表的最前面,供后续 LLM 调用使用。

6.4 自定义 prompt

所有 prompt 参数都支持自定义。如果默认的英文 prompt 不适合中文场景:

from langchain_core.prompts import ChatPromptTemplate

custom_initial_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个对话摘要助手。"),
    ("placeholder", "{messages}"),
    ("human", "将以上对话压缩为简洁摘要,保留目的地、预算、偏好等关键信息,用中文。"),
])

result = summarize_messages(
    ...,
    initial_summary_prompt=custom_initial_prompt,
)

七、token 计数策略

7.1 默认计数器

summarize_messages 默认使用 count_tokens_approximately

# 约 4 字符 ≈ 1 token(适用于英文)
# 对中文不太精确,但作为触发摘要的阈值足够

7.2 DeepSeek 场景的考虑

DeepSeek 使用自己的 tokenizer,与 OpenAI 的 tiktoken 计数有差异(中文场景差异更明显)。这导致:

  • 阈值触发可能偏早或偏晚(但不会超过真实上下文限制,因为我们设了保守的 max_tokens=4000
  • 摘要质量不受影响(摘要是语义压缩,不依赖精确 token 计数)

如果精度要求高,可以传入自定义 token_counter

def deepseek_counter(messages) -> int:
    """使用 DeepSeek tokenizer 精确计数"""
    # ...

result = summarize_messages(
    ...,
    token_counter=deepseek_counter,
)

八、对比总结

8.1 代码量变化

手动版:SUMMARY_MAX_TOKENS + SUMMARY_KEEP_RECENT + SUMMARY_MAX_LENGTH
        + summarization_middleware() ~100行
        + trim_messages 检查逻辑

langmem 版:SUMMARY_MAX_TOKENS + SUMMARY_MAX_SUMMARY_TOKENS
           + _serialize/deserialize 辅助 ~20行
           + summarize_messages() 调用 ~10行

8.2 能力对比

维度 手动版 langmem 版
摘要触发 trim_messages 裁剪检测 summarize_messages 内置 token 计数
摘要生成 手写 prompt + agent.invoke() 官方三段 prompt 模板
增量摘要 旧摘要 + 全部旧消息(重复处理) RunningSummary 追踪 ID(精准增量)
状态管理 手写 summary 字符串拼接 RunningSummary dataclass
类型安全 tuple[list, str] SummarizationResult (具名属性)
异常处理 手写 try/except 调用方控制(降级灵活)
维护成本 100 行业务逻辑 官方库维护

8.3 适用场景

summarize_messages(当前方案):
  ✓ 自定义 StateGraph + 多 Agent 并行
  ✓ 需要在特定节点(supervisor)而非全局做摘要
  ✓ 需要控制异常降级策略
  ✓ 想要官方维护的摘要质量

create_react_agent + middleware(未来方案):
  ✓ 单 Agent + 工具调用
  ✓ 全局自动拦截
  ✓ 零代码侵入
  ✗ 不支持自定义 StateGraph + Send() 并行

九、核心要点

  1. Checkpoint 存完整历史,发给 LLM 前压缩——这是所有摘要方案共同的核心原则
  2. summarize_messages 替代手动实现:从 100 行手写逻辑缩减为 10 行官方调用,且质量更高
  3. RunningSummary 实现真正的增量摘要:通过追踪已摘要消息 ID,避免重复处理
  4. 摘要失败不阻塞主流程:退化为全量消息,系统继续可用
  5. 只在 supervisor 做摘要:子 Agent 是无状态的单次任务,不需要对话上下文
  6. 纯函数优于 middleware:对于自定义 StateGraph,summarize_messages 比 middleware 模式更灵活、更可控

导航

← 上一篇:Reducer学习:从默认行为到自定义合并