understand-Graph-by-example
LangGraph 节点和 Graph 详解
🎯 核心概念理解
LangGraph 是一个用于构建 有状态的多步骤 AI 工作流 的框架。你可以把它比作:
传统程序 → if/else 逻辑控制流
LangGraph → 状态机 + DAG (有向无环图)
📦 三个核心组件
1. 节点(Node)- 工作单元
一个节点就是一个 Python 函数,接收状态,处理,返回更新后的状态:
def generate_query(state: OverallState, config: RunnableConfig) -> QueryGenerationState:
"""这就是一个节点"""
# 接收状态
research_topic = get_research_topic(state["messages"])
# 处理逻辑
llm = ChatOpenAI(...)
result = llm.invoke(formatted_prompt)
# 返回状态更新
return {"search_query": result.query}
节点的特点:
- 是一个 纯函数 (给定输入,总是相同输出)
- 接收
state(整个工作流的状态) - 返回
字典(状态的部分更新) - 可以调用 LLM、数据库、API 等任何外部服务
2. 边(Edge)- 连接
边定义节点之间的 执行顺序 和 条件:
# 固定边:总是执行这个路径
builder.add_edge(START, "generate_query")
builder.add_edge("web_research", "reflection")
# 条件边:根据条件决定下一步
builder.add_conditional_edges(
"reflection", # 从哪个节点出发
evaluate_research, # 决策函数(返回下一个节点名)
["web_research", "finalize_answer"] # 可能的目标节点
)
两种边:
add_edge()- 顺序边,无条件跳转add_conditional_edges()- 条件边,根据状态决定下一步
3. 状态(State)- 数据流
状态是 整个工作流共享的数据:
class OverallState(TypedDict):
messages: Annotated[list, add_messages] # 消息列表
search_query: Annotated[list, operator.add] # 搜索查询
web_research_result: Annotated[list, operator.add] # 研究结果
sources_gathered: Annotated[list, operator.add] # 收集的源
initial_search_query_count: int # 初始查询数
max_research_loops: int # 最大循环数
research_loop_count: int # 当前循环数
reasoning_model: str # 推理模型
Annotated[list, operator.add] 的含义:
- 当多个节点返回相同的 key 时,自动合并 (而不是覆盖)
- 例如,多个
web_research节点都返回web_research_result,它们会被 拼接 起来
🔄 完整执行流程
用户输入
↓
START
↓
[generate_query] 节点执行
├─ 输入状态:{messages: [用户问题]}
├─ 输出:{search_query: ["query1", "query2", "query3"]}
├─ 状态更新:messages + search_query
↓
[continue_to_web_research] 条件路由
├─ 对每个 search_query,创建一个独立的 web_research 任务
├─ 创建 3 个并行的 Send("web_research", ...) 任务
↓
[web_research] 节点执行(3 个并行)
├─ 第 1 个:{search_query: "query1", id: 0} → {web_research_result: [...], sources_gathered: [...]}
├─ 第 2 个:{search_query: "query2", id: 1} → {web_research_result: [...], sources_gathered: [...]}
├─ 第 3 个:{search_query: "query3", id: 2} → {web_research_result: [...], sources_gathered: [...]}
│
├─ 状态合并:3 个结果都通过 operator.add 拼接到 web_research_result 和 sources_gathered
↓
[reflection] 节点执行
├─ 输入:{web_research_result: [所有研究结果], ...}
├─ 输出:{is_sufficient: bool, knowledge_gap: str, follow_up_queries: []}
├─ 状态更新:research_loop_count += 1
↓
[evaluate_research] 条件决策
├─ IF is_sufficient OR research_loop_count >= max_research_loops
│ └─ 返回 "finalize_answer"
├─ ELSE
│ └─ 返回 [Send("web_research", follow_up_queries), ...] 继续循环
↓
条件 1:发现知识缺口,继续研究
└─ 回到 [web_research] (循环)
条件 2:信息充分,完成研究
↓
[finalize_answer] 节点执行
├─ 输入:所有研究结果 + 源
├─ 输出:{messages: [最终答案]}
↓
END
↓
返回最终答案给用户
💡 节点的四种类型
1. 处理节点(大多数节点)
def web_research(state: WebSearchState, config: RunnableConfig) -> OverallState:
# 接收一个状态,返回更新
response = llm.invoke(prompt)
return {
"web_research_result": [response.content],
"sources_gathered": [source],
}
2. 路由节点(决策节点)
def continue_to_web_research(state: QueryGenerationState):
# 根据状态,生成多个任务
return [
Send("web_research", {"search_query": q, "id": i})
for i, q in enumerate(state["search_query"])
]
3. 条件决策节点
def evaluate_research(state: ReflectionState, config: RunnableConfig):
# 根据状态,返回下一个节点名
if state["is_sufficient"] or state["research_loop_count"] >= max_loops:
return "finalize_answer"
else:
return "web_research" # 或 ["web_research", ...] 并行
4. 入口和出口
builder.add_edge(START, "generate_query") # 入口
builder.add_edge("finalize_answer", END) # 出口
📊 状态的三种合并策略
class OverallState(TypedDict):
# 策略 1:覆盖(默认)
reasoning_model: str
# 新值覆盖旧值
# 策略 2:追加(add_messages)
messages: Annotated[list, add_messages]
# 智能合并消息(避免重复)
# 策略 3:拼接(operator.add)
search_query: Annotated[list, operator.add]
# list1 + list2 + list3 = [所有元素]
🎬 实际执行示例
当你运行 graph.invoke({...}):
state = {
"messages": [HumanMessage(content="Who won Euro 2024")],
"initial_search_query_count": 3,
"max_research_loops": 2,
"reasoning_model": "gpt-4o",
}
result = graph.invoke(state)
状态变化过程:
Step 1: generate_query
输入:{messages: [HumanMessage]}
输出:{search_query: ["Euro 2024 winner", "Spain Euro 2024", ...]}
状态:messages + search_query
Step 2: continue_to_web_research (路由)
创建 3 个 web_research 任务
Step 3: web_research (×3 并行)
输入:{search_query: "Euro 2024 winner", id: 0}
输出:{web_research_result: ["Spain won..."], sources_gathered: [...]}
输入:{search_query: "Spain Euro 2024", id: 1}
输出:{web_research_result: ["Spain beat England..."], sources_gathered: [...]}
输入:{search_query: "Euro 2024 final", id: 2}
输出:{web_research_result: ["Final was in Berlin..."], sources_gathered: [...]}
状态合并:
web_research_result: ["Spain won...", "Spain beat England...", "Final was in Berlin..."]
sources_gathered: [source1, source2, source3]
Step 4: reflection
输入:{web_research_result: [所有3个结果], messages: [...]}
分析:信息充分
输出:{is_sufficient: true, research_loop_count: 1, follow_up_queries: []}
Step 5: evaluate_research (条件)
判断:is_sufficient == true
决策:返回 "finalize_answer"
Step 6: finalize_answer
输入:所有研究结果 + 源
输出:{messages: [AIMessage("Spain won Euro 2024...")]}
最终返回给用户:
"Spain won Euro 2024, defeating England in the final..."
🔍 Graph 的可视化
START
↓
[generate_query]
↓
[continue_to_web_research] ← 路由,生成 3 个并行任务
↙ ↓ ↘
[web_research] [web_research] [web_research] ← 并行执行
↖ ↓ ↙
[reflection]
↓
[evaluate_research] ← 条件决策
↙ ↘
"web_research" "finalize_answer" ← 根据情况选择
(继续循环) (完成)
↓
[finalize_answer]
↓
END
💪 LangGraph 的强大之处
- 状态管理 - 所有数据在一个地方,节点间共享
- 条件路由 - 根据 LLM 的决策动态改变流程
- 并行处理 - 多个节点并行执行(web_research ×3)
- 循环控制 - 简单实现循环(research loop)
- 可追踪 - 每步执行都可被追踪和调试
- 持久化 - 状态可以保存到数据库,支持中断和恢复
🎓 一句话总结
节点 = 工作单元,边 = 连接逻辑,状态 = 共享数据,Graph = 这三者的编排