Reducer 深度解析

一、Reducer 是什么?

Reducer 是 LangGraph 中定义"状态如何合并"的规则。

当一个节点返回 {"destination": "成都"} 时,LangGraph 需要知道:

  • 替换原来的 destination?
  • 还是追加到一个列表里?
  • 还是合并两个字典?

这个"怎么合并"的规则,就是 Reducer。

二、默认行为:替换(Replace)

如果一个字段没有声明 Reducer,LangGraph 默认用替换策略:

class AgentState(TypedDict):
    destination: str   # 无 Reducer → 默认替换
    days: int          # 无 Reducer → 默认替换
    budget: str        # 无 Reducer → 默认替换

执行流程:

state = {"destination": "", "days": 0, "budget": ""}

supervisor_node 返回 {"destination": "成都", "days": 3, "budget": "5000"}
    │
    ▼
state = {"destination": "成都", "days": 3, "budget": "5000"}
    │
    │  (route_agent 读取 destination="成都",不会修改它)
    │  (hotel_agent 读取 budget="5000",不会修改它)
    ▼
state = {"destination": "成都", "days": 3, "budget": "5000"}  ← 不变

替换策略的含义:最后一次写入生效(Last Write Wins)。

三、自定义 Reducer:add_messages

from langgraph.graph.message import add_messages
from typing import Annotated

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]  # ← 自定义 Reducer
    # 其他字段全部是默认替换

3.1 Annotated 语法解析

messages: Annotated[list, add_messages]
#         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
#         Annotated[类型, Reducer函数]
  • list:messages 的类型是列表
  • add_messages:LangGraph 内置的 Reducer 函数,定义了"新消息怎么合并到旧消息"

LangGraph 看到 Annotated[list, add_messages] 后,会:

  1. 在合并 state 时,调用 add_messages(old_messages, new_messages)
  2. 而不是直接用 new_messages 替换 old_messages

3.2 add_messages 的合并规则

add_messages 的核心逻辑:

def add_messages(old_messages, new_messages):
    # 1. 建立旧消息的索引(按 message.id)
    existing = {msg.id: msg for msg in old_messages}

    # 2. 遍历新消息
    for msg in new_messages:
        if msg.id in existing:
            # 已存在 → 替换(更新内容)
            existing[msg.id] = msg
        else:
            # 不存在 → 追加
            existing[msg.id] = msg

    # 3. 返回合并后的列表(保持顺序)
    return list(existing.values())

关键特性:

  • 按 message.id 去重:如果新消息和旧消息有相同的 id,替换旧的
  • 无 id 的消息:每次生成新的 UUID,所以总是追加
  • 保持顺序:旧消息在前,新消息在后

3.3 实际效果

第一次请求(无历史):
  checkpoint 加载: messages = []
  supervisor 返回: {"messages": [AIMessage("好的,成都3天...")]}
  合并后: messages = [AIMessage("好的,成都3天...")]

第二次请求(有历史):
  checkpoint 加载: messages = [AIMessage("好的,成都3天...")]
  supervisor 返回: {"messages": [AIMessage("酒店换便宜的...")]}
  合并后: messages = [AIMessage("好的,成都3天..."), AIMessage("酒店换便宜的...")]
  ← 新消息追加到末尾,旧消息保留

四、并行分支中的 Reducer 行为

图有并行分支(Send):

supervisor
    │
    ├─ Send("route")   → route_agent
    ├─ Send("hotel")   → hotel_agent
    └─ Send("food")    → food_agent
    │
    └─ fan-in → aggregator

4.1 Fan-in 时的 Reducer 合并

三个分支执行完毕后,各自返回不同的字段:

分支1 (route) 返回: {"route_result": "{...}"}
分支2 (hotel) 返回: {"hotel_result": "{...}"}
分支3 (food)  返回: {"food_result": "{...}"}

LangGraph 在 fan-in 时合并这些更新:

合并规则(对于每个字段):
  route_result: 分支1有值,分支2/3没有 → 用分支1的值
  hotel_result: 分支2有值,分支1/3没有 → 用分支2的值
  food_result:  分支3有值,分支1/2没有 → 用分支3的值
  messages:     所有分支都没有新消息 → 不变

关键点:因为每个 Agent 只写自己的字段,其他分支对该字段返回 None(没有更新), 所以默认的"替换"行为不会造成冲突——None 不会替换已有值。

4.3 如果多个分支写了同一个字段会怎样?

假设 route_agent 和 hotel_agent 都返回了 {"destination": "新值"}

分支1 (route) 返回: {"destination": "A"}
分支2 (hotel) 返回: {"destination": "B"}

合并(默认替换): destination = ???

LangGraph 的处理方式是:最后一个完成的分支生效。 具体哪个"最后"取决于线程调度顺序,是不确定的

这就是为什么当前设计要求每个 Agent 只写自己的字段—— 如果多个分支写同一个字段,结果是不可预测的。

4.4 messages 字段的并行合并

如果多个分支都返回了 messages:

分支1 (route) 返回: {"messages": [AIMessage("路线规划完成")]}
分支2 (hotel) 返回: {"messages": [AIMessage("酒店推荐完成")]}
分支3 (food)  返回: {"messages": [AIMessage("美食推荐完成")]}

合并(add_messages):
  messages = 原有messages
           + [AIMessage("路线规划完成")]
           + [AIMessage("酒店推荐完成")]
           + [AIMessage("美食推荐完成")]

add_messages 保证所有分支的消息都被保留,不会互相覆盖。

五、当前设计的痛点

5.1 痛点一:只有 messages 有 Reducer,其他字段"靠约定"

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]  # 有 Reducer,安全
    destination: str    # 无 Reducer,靠"只有一个 Agent 写"的约定
    route_result: str   # 无 Reducer,靠"只有 route_agent 写"的约定
    hotel_result: str   # 无 Reducer,靠"只有 hotel_agent 写"的约定
    # ...

问题:

  • 没有类型级保证:如果两个 Agent 错误地写了同一个字段,LangGraph 不会报错,只是结果不确定
  • 代码审查困难:看 AgentState 定义,无法知道哪些字段是"谁写的",需要逐个读 node 函数
  • 重构风险:如果未来给一个 Agent 加了新字段,不小心和另一个 Agent 重名,会静默出 bug

5.2 痛点二:没有"只读/只写"约束

当前设计中,任何节点都能读写任何字段:

def route_agent_node(state):
    # 可以读 destination ✓(应该的)
    # 可以读 hotel_result ✓(不应该,但技术上可以)
    # 可以写 hotel_result ✓(不应该,但技术上会覆盖)

没有机制强制"route_agent 只能写 route_result"。

5.3 痛点三:Reducer 与 Checkpoint 的交互不直观

Checkpoint 在每个节点执行后保存完整 state。当使用 add_messages 时:

节点执行顺序:
  supervisor → checkpoint_1 {messages: [msg1]}
  route      → checkpoint_2 {messages: [msg1]}          ← route 不写 messages
  hotel      → checkpoint_3 {messages: [msg1]}          ← hotel 不写 messages
  food       → checkpoint_4 {messages: [msg1]}          ← food 不写 messages
  aggregator → checkpoint_5 {messages: [msg1, msg2]}    ← aggregator 写了 msg2

每次 checkpoint 都存了完整的 messages 列表。 如果 messages 很长(比如 100 条),每个 checkpoint 都存 100 条, 数据库会快速膨胀。

5.4 痛点四:trim_messages 不是 Reducer

trim_messages 在 supervisor_node 中被调用:

trimmer = trim_messages(max_tokens=4000, strategy="last", ...)
trimmed_history = trimmer.invoke(messages)

这是读取时裁剪,不是存储时裁剪。 checkpoint 中存的仍然是完整的历史消息。

checkpoint 存储: [msg1, msg2, ..., msg100]  ← 100条,全部保存
supervisor 读取: [msg95, ..., msg100]        ← 只用最近6条

好处:完整历史可追溯。 坏处:存储空间持续增长,没有自动清理机制。

六、改进方向

6.1 方案一:用 operator.add 替代 add_messages(适用于非消息列表)

如果有一个字段需要"列表追加"但不是消息类型:

import operator

class AgentState(TypedDict):
    visited_pois: Annotated[list[str], operator.add]  # 景点列表,追加合并
    # route_agent 返回 {"visited_pois": ["宽窄巷子"]}
    # hotel_agent 返回 {"visited_pois": ["锦里"]}
    # 合并后: visited_pois = ["宽窄巷子", "锦里"]

6.2 方案二:用自定义 Reducer 实现安全合并

def merge_results(old: str, new: str) -> str:
    """只接受非空新值,防止意外覆盖。"""
    return new if new else old

class AgentState(TypedDict):
    route_result: Annotated[str, merge_results]  # 只有非空值才能替换
    hotel_result: Annotated[str, merge_results]
    food_result: Annotated[str, merge_results]

这样即使两个分支意外写了同一个字段,空值不会覆盖已有值。

6.3 方案三:拆分子图,用 Input/Output Schema 隔离

# 每个 Agent 有独立的输入输出 Schema
class RouteInput(TypedDict):
    destination: str
    days: int

class RouteOutput(TypedDict):
    route_result: str

# route_agent 只能读 RouteInput,只写 RouteOutput

LangGraph 的子图(Subgraph)机制支持这种隔离,但增加了复杂度。

6.4 方案四:定期清理 Checkpoint

# 添加定时任务,清理 30 天前的 checkpoint
import sqlite3

def cleanup_checkpoints(db_path, days=30):
    conn = sqlite3.connect(db_path)
    conn.execute(
        "DELETE FROM checkpoints WHERE created_at < ?",
        (time.time() - days * 86400,)
    )
    conn.commit()

七、总结

字段 Reducer 并行安全 当前风险
messages add_messages 安全(追加) 存储膨胀
destination 替换 安全(只有 supervisor 写) 靠约定
route_result 替换 安全(只有 route 写) 靠约定
hotel_result 替换 安全(只有 hotel 写) 靠约定
food_result 替换 安全(只有 food 写) 靠约定
info_result 替换 安全(只有 info 写) 靠约定
is_casual 替换 安全(只有 supervisor 写) 靠约定
agents_to_call 替换 安全(只有 supervisor 写) 靠约定

核心结论:

  • add_messages 是唯一有类型级保证的 Reducer
  • 其他字段的并行安全靠"每个 Agent 只写自己的字段"的约定
  • 这种约定在当前规模下有效,但随着 Agent 增多,风险会增加

导航

← 上一篇:多 Agent 状态同步、Checkpoint 记忆与重试机制 → 下一篇:好用的多 Agent 摘要机制:langmem.summarize_messages