目录

理解langgraph.graph.StateGraph中-State-的-Annotated-以函数作为元数据meta如何影响State传递

目录

理解langgraph.graph.StateGraph中 State 的 Annotated 以函数作为元数据(meta)如何影响State传递

LangChain 编程的一大创新是引入 管道化表达式(Pipeline Expressions) ,通过 链式组合 不同的模块来构建 LLM 任务流。

为了实现这种链式编程,LangChain 通过 Runnable 统一封装模块,使它们提供 标准化接口 (如 invoke()batch()stream()

通过运算符( | )实现模块间的数据传递,“|” 像一数据管道,本质上仍然是 invoke() 逐步执行。

langgraph.graph.StateGraph 更是引入图论、基于图论,通过 节点(node) 定义不同的任务,通过 边(Edge) 使数据在多个任务之间流转和更新。

V0.3 版的聊天机器人示例代码中,State就是通过Edge流转。里面的StateMessage通过Annotated中引用的add_message实现了自动叠加历史消息的逻辑。

为了理解add_message如何影响State,我们先用一个更直观的例子来理解。

以下代码展示的是一个有三个节点的工作流,数据从一个节点流入下一个节点的过程中,会被Annotated中元数据的函数修改,模拟“带历史消息”的聊天。

因为数据(state)是从一个节点流入另一个节点,连接节点的是边(edge),为了形象,我们用管道来描述,即edge是数据流通的管道。

为了展示数据是在管道中被修改的,我们分别把节点收到的state、节点输出的state、管道中的state分别打印出来。

为了方便演示,我们定义State的结构包含2个成员:foo 表示数据要流向的节点名称,bar 表示本环节大模型api接口返回的结果。 Annotated 的bar_meta_function 对bar添加的逻辑是自动把之前的结果拼接,使得下一个节点收到的bar值是已叠加了历史消息的bar。

from langgraph.graph import StateGraph, START, END
from typing import Annotated, TypedDict, Sequence


# 自定义累加函数
def foo_meta_function(existing_foo, new_foo):

    print(f"在连接{existing_foo} - {new_foo}的管道中,foo_meta_function被调用 - [上一节名称是]:{existing_foo} + [下一节名称是]:{new_foo}")
    return new_foo

def bar_meta_function(existing_bar, new_bar):

    print(f"在管道中,bar_meta_function被调用 - [上一节点接收到的state中的bar]:{existing_bar} + [上一节点return值中的bar]:{new_bar}\n")
    update_bar = existing_bar + new_bar
    return update_bar

# 定义 State 结构
class State(TypedDict):
    foo: Annotated[str, foo_meta_function]
    bar: Annotated[Sequence[str], bar_meta_function]  # 自动累加

# 节点 A
def node_a(state: State) -> State:
    print('node_a收到的state', state)
    new_state = {**state, "foo": "step_2", "bar": ["a"]}  # 先字典解包,然后指定的成员用新值覆盖。创建新的state而不是直接修改原state,这样可避免在多线并行等特殊场景state被污染。
    print('node_a输出的state', new_state, '\n')
    return new_state

# 节点 B
def node_b(state: State) -> State:
    print('node_b收到的state', state)
    new_state = {**state, "foo": "step_3", "bar": ["b"]}  #  返回新 state
    print('node_b输出的state', new_state, '\n')
    return new_state

# 节点 C
def node_c(state: State) -> State:
    print('node_c收到的state', state)
    new_state = {**state, "foo": "#end#", "bar": ["c"]}  #  返回新 state
    print('node_c输出的state', new_state, '\n')
    return new_state

# 构建 StateGraph
workflow = StateGraph(State)
workflow.add_node("node_a", node_a)
workflow.add_node("node_b", node_b)
workflow.add_node("node_c", node_c)

# 一共四条边(含连接START 和 连接END 的边)
workflow.set_entry_point("node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", "node_c")
workflow.add_edge("node_c", END)


graph = workflow.compile()

# 运行
initial_state = {"foo": "step_1", "bar": ['000']}
result = graph.invoke(initial_state)

print("最终结果:", result)

输出结果如下:

在连接 - step_1的管道中,foo_meta_function被调用 - [上一节名称是]: + [下一节名称是]:step_1
在管道中,bar_meta_function被调用 - [上一节点接收到的state中的bar]:[] + [上一节点return值中的bar]:['000']

node_a收到的state {'foo': 'step_1', 'bar': ['000']}
node_a输出的state {'foo': 'step_2', 'bar': ['a']} 

在连接step_1 - step_2的管道中,foo_meta_function被调用 - [上一节名称是]:step_1 + [下一节名称是]:step_2
在管道中,bar_meta_function被调用 - [上一节点接收到的state中的bar]:['000'] + [上一节点return值中的bar]:['a']

node_b收到的state {'foo': 'step_2', 'bar': ['000', 'a']}
node_b输出的state {'foo': 'step_3', 'bar': ['b']} 

在连接step_2 - step_3的管道中,foo_meta_function被调用 - [上一节名称是]:step_2 + [下一节名称是]:step_3
在管道中,bar_meta_function被调用 - [上一节点接收到的state中的bar]:['000', 'a'] + [上一节点return值中的bar]:['b']

node_c收到的state {'foo': 'step_3', 'bar': ['000', 'a', 'b']}
node_c输出的state {'foo': '#end#', 'bar': ['c']} 

在连接step_3 - #end#的管道中,foo_meta_function被调用 - [上一节名称是]:step_3 + [下一节名称是]:#end#
在管道中,bar_meta_function被调用 - [上一节点接收到的state中的bar]:['000', 'a', 'b'] + [上一节点return值中的bar]:['c']

最终结果: {'foo': '#end#', 'bar': ['000', 'a', 'b', 'c']}

https://i-blog.csdnimg.cn/direct/d0b2e2bbcd164bfc83a184ac26ac3bd4.png