Reducer 深度解析
一、Reducer 是什么?
Reducer 是 LangGraph 中定义"状态如何合并"的规则。
当一个节点返回 {"destination": "成都"} 时,LangGraph 需要知道:
- 是替换原来的 destination?
- 还是追加到一个列表里?
- 还是合并两个字典?
这个"怎么合并"的规则,就是 Reducer。
二、默认行为:替换(Replace)
如果一个字段没有声明 Reducer,LangGraph 默认用替换策略:
class AgentState(TypedDict):
destination: str # 无 Reducer → 默认替换
days: int # 无 Reducer → 默认替换
budget: str # 无 Reducer → 默认替换
执行流程:
state = {"destination": "", "days": 0, "budget": ""}
supervisor_node 返回 {"destination": "成都", "days": 3, "budget": "5000"}
│
▼
state = {"destination": "成都", "days": 3, "budget": "5000"}
│
│ (route_agent 读取 destination="成都",不会修改它)
│ (hotel_agent 读取 budget="5000",不会修改它)
▼
state = {"destination": "成都", "days": 3, "budget": "5000"} ← 不变
替换策略的含义:最后一次写入生效(Last Write Wins)。
三、自定义 Reducer:add_messages
from langgraph.graph.message import add_messages
from typing import Annotated
class AgentState(TypedDict):
messages: Annotated[list, add_messages] # ← 自定义 Reducer
# 其他字段全部是默认替换
3.1 Annotated 语法解析
messages: Annotated[list, add_messages]
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# Annotated[类型, Reducer函数]
list:messages 的类型是列表add_messages:LangGraph 内置的 Reducer 函数,定义了"新消息怎么合并到旧消息"
LangGraph 看到 Annotated[list, add_messages] 后,会:
- 在合并 state 时,调用
add_messages(old_messages, new_messages) - 而不是直接用
new_messages替换old_messages
3.2 add_messages 的合并规则
add_messages 的核心逻辑:
def add_messages(old_messages, new_messages):
# 1. 建立旧消息的索引(按 message.id)
existing = {msg.id: msg for msg in old_messages}
# 2. 遍历新消息
for msg in new_messages:
if msg.id in existing:
# 已存在 → 替换(更新内容)
existing[msg.id] = msg
else:
# 不存在 → 追加
existing[msg.id] = msg
# 3. 返回合并后的列表(保持顺序)
return list(existing.values())
关键特性:
- 按 message.id 去重:如果新消息和旧消息有相同的 id,替换旧的
- 无 id 的消息:每次生成新的 UUID,所以总是追加
- 保持顺序:旧消息在前,新消息在后
3.3 实际效果
第一次请求(无历史):
checkpoint 加载: messages = []
supervisor 返回: {"messages": [AIMessage("好的,成都3天...")]}
合并后: messages = [AIMessage("好的,成都3天...")]
第二次请求(有历史):
checkpoint 加载: messages = [AIMessage("好的,成都3天...")]
supervisor 返回: {"messages": [AIMessage("酒店换便宜的...")]}
合并后: messages = [AIMessage("好的,成都3天..."), AIMessage("酒店换便宜的...")]
← 新消息追加到末尾,旧消息保留
四、并行分支中的 Reducer 行为
图有并行分支(Send):
supervisor
│
├─ Send("route") → route_agent
├─ Send("hotel") → hotel_agent
└─ Send("food") → food_agent
│
└─ fan-in → aggregator
4.1 Fan-in 时的 Reducer 合并
三个分支执行完毕后,各自返回不同的字段:
分支1 (route) 返回: {"route_result": "{...}"}
分支2 (hotel) 返回: {"hotel_result": "{...}"}
分支3 (food) 返回: {"food_result": "{...}"}
LangGraph 在 fan-in 时合并这些更新:
合并规则(对于每个字段):
route_result: 分支1有值,分支2/3没有 → 用分支1的值
hotel_result: 分支2有值,分支1/3没有 → 用分支2的值
food_result: 分支3有值,分支1/2没有 → 用分支3的值
messages: 所有分支都没有新消息 → 不变
关键点:因为每个 Agent 只写自己的字段,其他分支对该字段返回 None(没有更新),
所以默认的"替换"行为不会造成冲突——None 不会替换已有值。
4.3 如果多个分支写了同一个字段会怎样?
假设 route_agent 和 hotel_agent 都返回了 {"destination": "新值"}:
分支1 (route) 返回: {"destination": "A"}
分支2 (hotel) 返回: {"destination": "B"}
合并(默认替换): destination = ???
LangGraph 的处理方式是:最后一个完成的分支生效。 具体哪个"最后"取决于线程调度顺序,是不确定的。
这就是为什么当前设计要求每个 Agent 只写自己的字段—— 如果多个分支写同一个字段,结果是不可预测的。
4.4 messages 字段的并行合并
如果多个分支都返回了 messages:
分支1 (route) 返回: {"messages": [AIMessage("路线规划完成")]}
分支2 (hotel) 返回: {"messages": [AIMessage("酒店推荐完成")]}
分支3 (food) 返回: {"messages": [AIMessage("美食推荐完成")]}
合并(add_messages):
messages = 原有messages
+ [AIMessage("路线规划完成")]
+ [AIMessage("酒店推荐完成")]
+ [AIMessage("美食推荐完成")]
add_messages 保证所有分支的消息都被保留,不会互相覆盖。
五、当前设计的痛点
5.1 痛点一:只有 messages 有 Reducer,其他字段"靠约定"
class AgentState(TypedDict):
messages: Annotated[list, add_messages] # 有 Reducer,安全
destination: str # 无 Reducer,靠"只有一个 Agent 写"的约定
route_result: str # 无 Reducer,靠"只有 route_agent 写"的约定
hotel_result: str # 无 Reducer,靠"只有 hotel_agent 写"的约定
# ...
问题:
- 没有类型级保证:如果两个 Agent 错误地写了同一个字段,LangGraph 不会报错,只是结果不确定
- 代码审查困难:看 AgentState 定义,无法知道哪些字段是"谁写的",需要逐个读 node 函数
- 重构风险:如果未来给一个 Agent 加了新字段,不小心和另一个 Agent 重名,会静默出 bug
5.2 痛点二:没有"只读/只写"约束
当前设计中,任何节点都能读写任何字段:
def route_agent_node(state):
# 可以读 destination ✓(应该的)
# 可以读 hotel_result ✓(不应该,但技术上可以)
# 可以写 hotel_result ✓(不应该,但技术上会覆盖)
没有机制强制"route_agent 只能写 route_result"。
5.3 痛点三:Reducer 与 Checkpoint 的交互不直观
Checkpoint 在每个节点执行后保存完整 state。当使用 add_messages 时:
节点执行顺序:
supervisor → checkpoint_1 {messages: [msg1]}
route → checkpoint_2 {messages: [msg1]} ← route 不写 messages
hotel → checkpoint_3 {messages: [msg1]} ← hotel 不写 messages
food → checkpoint_4 {messages: [msg1]} ← food 不写 messages
aggregator → checkpoint_5 {messages: [msg1, msg2]} ← aggregator 写了 msg2
每次 checkpoint 都存了完整的 messages 列表。 如果 messages 很长(比如 100 条),每个 checkpoint 都存 100 条, 数据库会快速膨胀。
5.4 痛点四:trim_messages 不是 Reducer
trim_messages 在 supervisor_node 中被调用:
trimmer = trim_messages(max_tokens=4000, strategy="last", ...)
trimmed_history = trimmer.invoke(messages)
这是读取时裁剪,不是存储时裁剪。 checkpoint 中存的仍然是完整的历史消息。
checkpoint 存储: [msg1, msg2, ..., msg100] ← 100条,全部保存
supervisor 读取: [msg95, ..., msg100] ← 只用最近6条
好处:完整历史可追溯。 坏处:存储空间持续增长,没有自动清理机制。
六、改进方向
6.1 方案一:用 operator.add 替代 add_messages(适用于非消息列表)
如果有一个字段需要"列表追加"但不是消息类型:
import operator
class AgentState(TypedDict):
visited_pois: Annotated[list[str], operator.add] # 景点列表,追加合并
# route_agent 返回 {"visited_pois": ["宽窄巷子"]}
# hotel_agent 返回 {"visited_pois": ["锦里"]}
# 合并后: visited_pois = ["宽窄巷子", "锦里"]
6.2 方案二:用自定义 Reducer 实现安全合并
def merge_results(old: str, new: str) -> str:
"""只接受非空新值,防止意外覆盖。"""
return new if new else old
class AgentState(TypedDict):
route_result: Annotated[str, merge_results] # 只有非空值才能替换
hotel_result: Annotated[str, merge_results]
food_result: Annotated[str, merge_results]
这样即使两个分支意外写了同一个字段,空值不会覆盖已有值。
6.3 方案三:拆分子图,用 Input/Output Schema 隔离
# 每个 Agent 有独立的输入输出 Schema
class RouteInput(TypedDict):
destination: str
days: int
class RouteOutput(TypedDict):
route_result: str
# route_agent 只能读 RouteInput,只写 RouteOutput
LangGraph 的子图(Subgraph)机制支持这种隔离,但增加了复杂度。
6.4 方案四:定期清理 Checkpoint
# 添加定时任务,清理 30 天前的 checkpoint
import sqlite3
def cleanup_checkpoints(db_path, days=30):
conn = sqlite3.connect(db_path)
conn.execute(
"DELETE FROM checkpoints WHERE created_at < ?",
(time.time() - days * 86400,)
)
conn.commit()
七、总结
| 字段 | Reducer | 并行安全 | 当前风险 |
|---|---|---|---|
| messages | add_messages | 安全(追加) | 存储膨胀 |
| destination | 替换 | 安全(只有 supervisor 写) | 靠约定 |
| route_result | 替换 | 安全(只有 route 写) | 靠约定 |
| hotel_result | 替换 | 安全(只有 hotel 写) | 靠约定 |
| food_result | 替换 | 安全(只有 food 写) | 靠约定 |
| info_result | 替换 | 安全(只有 info 写) | 靠约定 |
| is_casual | 替换 | 安全(只有 supervisor 写) | 靠约定 |
| agents_to_call | 替换 | 安全(只有 supervisor 写) | 靠约定 |
核心结论:
- add_messages 是唯一有类型级保证的 Reducer
- 其他字段的并行安全靠"每个 Agent 只写自己的字段"的约定
- 这种约定在当前规模下有效,但随着 Agent 增多,风险会增加
导航
← 上一篇:多 Agent 状态同步、Checkpoint 记忆与重试机制 → 下一篇:好用的多 Agent 摘要机制:langmem.summarize_messages