Skip to content

02:线程与检查点

在上一篇文章中,我们介绍了什么是LangGraph,并带大家一起编写了第一个基于LangGraph的Agent程序。

本文将会介绍LangGraph中的持久化,并详细介绍使用持久化的核心组件线程(thread)检查点(checkpoint)

文中所有示例代码:https://github.com/wzycoding/langchain-study

一、什么是持久化

在LangGraph图执行过程中,每一个节点执行后会将图状态生成一份快照保存起来,这些快照在图执行结束后也可以访问,这种机制被称为LangGraph的持久化。

持久化机制类似于游戏的自动存档功能,不同玩家(对应不同线程)使用独立的账号ID(thread_id)进行存档。

通过LangGraph的持久化,可以实现人在环路、时间旅行、记忆组件等功能。

二、线程与检查点

2.1 什么是线程与检查点

理解LangGraph的持久化机制,需要掌握两个核心概念:检查点线程

在LangGraph中,持久化数据是由**检查点管理器(checkpointer)来实现的,当图中的节点执行完一个超级步骤(super-step)时,会保存一份图状态快照信息(StateSnapshot)线程(thread)**中。

每一个线程都有一个唯一标识thread ID,可以通过thread ID找到thread,再从thread读取中的快照数据,这些保存的图状态数据快照就被称为检查点(checkpoint),在检查点中就包含了图状态数据(State)等信息。

其中超级步骤(super-step)是指:

  • 对于顺序节点:一个顺序节点执行完后,就完成了一个超级步骤,此时会保存检查点。
  • 对于并行节点:所有并行节点都执行完后,才共同完成一个超级步骤,此时保存一个检查点。

以上各个术语之间关系的图如下:

image-20251221183827328

如果想使用持久化功能,首先要在创建图时指定检查点管理器(checkpointer),LangGraph的持久化功能才能生效。

python
checkpointer = InMemorySaver()
agent = graph.compile(checkpointer=checkpointer)

如果指定了检查点管理器,在执行图时,还需要指定thread_id。

python
config = {"configurable": {"thread_id": "1"}}
state = agent.invoke({"result": []}, config)

否则将抛出如下错误

python
ValueError: Checkpointer requires one or more of the following 'configurable' keys: thread_id, checkpoint_ns, checkpoint_id

在LangGraph中线程是用来存储检查点的容器,和我们熟悉的CPU执行的最小逻辑单元:线程(Thread),没有任何关系,大家千万不要混淆。

2.2 游戏存档类比

通过上面描述的,大家对线程和检查点应该有了一个初步的认识,但是可能总体还是感觉比较抽象,下面用打游戏这个例子来分析一下这些核心组件:

img

在游戏中,一般会有存档系统(checkpointer),在游戏的每一个关卡结束后(super-step),会根据不同的用户账号(threadID)进行游戏进度(checkpoint也就是图状态快照数据)自动保存到存档(thread)中。

每个用户都拥有自己的存档(thread),每一个存档都有一个用户账号(threadID)作为唯一标识。

存档(thread)保存着每一个关卡(super-step)的游戏进度(checkpoint),当我们在游戏失败,或者想跳回某个关卡时,就可以使用用户账号(thread ID)对存档(thread)中的游戏进度(checkpoint)进行加载,恢复到当时的游戏进度。

2.3 使用案例

检查点的使用非常简单,创建checkpointer对象,并在图编译的时候指定checkpointer,在执行图时传入线程id,这样就可以使用检查点相关功能了。

代码示例如下:

python

import operator
from typing import TypedDict, Annotated

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import END, START
from langgraph.graph import StateGraph


# 1.定义State
class State(TypedDict):
    result: Annotated[list[str], operator.add]


# 2.定义a节点
def a_node(state: State):
    """调用a节点"""
    print("执行a_node节点")
    return {"result": ["你好,我是a节点"]}


# 3.定义b节点
def b_node(state: State):
    """调用b节点"""
    print("执行b_node节点")
    return {"result": ["你好,我是b节点"]}


# 4.构建编译图
graph = StateGraph(State)

# 5.添加节点、边
graph.add_node("a_node", a_node)
graph.add_node("b_node", b_node)

graph.add_edge(START, "a_node")
graph.add_edge("a_node", "b_node")
graph.add_edge("b_node", END)

# 6.编译并运行图,并指定检查点管理器
checkpointer = InMemorySaver()
agent = graph.compile(checkpointer=checkpointer)

# 7.调用图传入线程id
config = {"configurable": {"thread_id": "1"}}
state = agent.invoke({"result": []}, config)

三、检查点的使用

3.1 检查点结构

检查点作为图状态数据快照,它包含以下关键属性:

  • config:Config配置信息
  • metadata:元数据信息
  • values: 图状态数据
  • tasks:有关下一步要执行的任务信息PregelTask对象元组(在执行完成的最终检查点中,此字段通常为空)

3.2 获取最新检查点

在图完成编译和执行之后,可以调用get_state()方法获取最新的检查点,用法如下,需要传入配置信息,配置信息中需要包含thread_id

python
config = {"configurable": {"thread_id": "1"}}
print("==============获取最新的检查点===============")
latest_state = agent.get_state(config)
print(latest_state)

执行结果:

python
StateSnapshot(values={'result': ['你好,我是a节点', '你好,我是b节点']}, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f0de5ff-1c74-6c92-8002-404168dee320'}}, metadata={'source': 'loop', 'step': 2, 'parents': {}}, created_at='2025-12-21T11:26:50.811499+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f0de5ff-1c73-6626-8001-38a35205281b'}}, tasks=(), interrupts=())

3.3 获取历史检查点列表

除了通过get_state()获取最新检查点外,还可使用get_state_history()获取历史检查点列表:

python
config = {"configurable": {"thread_id": "1"}}
history_states = agent.get_state_history(config)
print("==============获取历史检查点===============")
for state in history_states:
    print(state)

执行结果如下,包含了四个检查点信息,并且顺序是从最新的检查点开始排列:

python
StateSnapshot(values={'result': ['你好,我是a节点', '你好,我是b节点']}, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f0de616-1321-6746-8002-815157a260db'}}, metadata={'source': 'loop', 'step': 2, 'parents': {}}, created_at='2025-12-21T11:37:07.235196+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f0de616-131f-6e5a-8001-c6358e92cf59'}}, tasks=(), interrupts=())

StateSnapshot(values={'result': ['你好,我是a节点']}, next=('b_node',), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f0de616-131f-6e5a-8001-c6358e92cf59'}}, metadata={'source': 'loop', 'step': 1, 'parents': {}}, created_at='2025-12-21T11:37:07.234558+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f0de616-131e-619a-8000-f27299d6ddfa'}}, tasks=(PregelTask(id='92242553-9ce1-aacd-36ab-2ab709806fab', name='b_node', path=('__pregel_pull', 'b_node'), error=None, interrupts=(), state=None, result={'result': ['你好,我是b节点']}),), interrupts=())

StateSnapshot(values={'result': []}, next=('a_node',), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f0de616-131e-619a-8000-f27299d6ddfa'}}, metadata={'source': 'loop', 'step': 0, 'parents': {}}, created_at='2025-12-21T11:37:07.233818+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f0de616-131a-6b9e-bfff-e9103701fc9d'}}, tasks=(PregelTask(id='5d3cb606-b8fa-89ee-7191-ebdd6af2a0c9', name='a_node', path=('__pregel_pull', 'a_node'), error=None, interrupts=(), state=None, result={'result': ['你好,我是a节点']}),), interrupts=())

StateSnapshot(values={'result': []}, next=('__start__',), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f0de616-131a-6b9e-bfff-e9103701fc9d'}}, metadata={'source': 'input', 'step': -1, 'parents': {}}, created_at='2025-12-21T11:37:07.232441+00:00', parent_config=None, tasks=(PregelTask(id='784c4f84-d5c4-168a-c5bf-ff9ef6b8af73', name='__start__', path=('__pregel_pull', '__start__'), error=None, interrupts=(), state=None, result={'result': []}),), interrupts=())

这四个检查点分别在四个super-step进行保存:

1、第一个检查点是一个空的检查点,下一个要执行的节点是START节点。

2、第二个检查点是执行完START节点,下一个要执行的是a_node节点,并且图状态数据values还是初始化状态。

3、第三个检查点是执行完a_node节点,下一个要执行的是b_node节点,并且图状态数据为'result': ['你好,我是a节点']

4、第四个检查点是执行完b_node节点,没有下一个要执行的节点,并且图状态数据为{'result': ['你好,我是a节点', '你好,我是b节点']}

3.4 回放检查点

在图执行完成之后,还可以对检查点回放(replay),检查点回放是指:通过传递的thread_idcheckpoint_id,对指定检查点之前的步骤进行重放,不会重新执行,在检查点之后的步骤全部都会重新执行。

还是以最开始的图为例,我们使用回放功能,回放到 a_node 执行完毕后的状态。那么,此状态对应的检查点之前的所有步骤都不会重新执行,系统将从此状态开始,继续执行后续的节点(b_nodeEND。这里的END是一个特殊节点,表示图执行结束。

image-20251221224624703

在进行回放之前,首先从历史检查点信息中读取到要回放的检查点id,之后,重新调用图,将该检查点id和线程id作为配置传递给invoke()方法,并且初始化的图状态数据为None

python
# 8.获取历史图状态信息
history_states = list(agent.get_state_history(config))
# 例如,要回放到 a_node 执行后的状态(历史列表中索引为1的检查点)
target_checkpoint_index = 1
target_state = history_states[target_checkpoint_index]

# 9.回放检查点
target_checkpoint_id = target_state.config["configurable"]["checkpoint_id"]
replay_config = {"configurable": {"thread_id": "1", "checkpoint_id": target_checkpoint_id}}
replay_state = agent.invoke(None, replay_config)

print("================回放检查点结果=================")
print(replay_state["result"])

执行结果如下,我们可以发现图执行结束之后,对检查点进行回放,节点b重新执行,并且最后输出了正确的图状态数据。

python
执行a_node节点
执行b_node节点
================图执行结束=================
执行b_node节点
================回放检查点结果=================
['你好,我是a节点', '你好,我是b节点']

3.5 更新图状态

除了可以对检查点进行重放之外,还可以使用update_state()更新图状态,执行更新后,系统会创建一个新的检查点,并添加到在当前thread中,该方法可以传入三个参数:

  • config:配置信息,在配置中需要传递thread_id,可以选择是否传递checkpoint_id,如果只传递thread_idupdate_state方法会根据最新的检查点的图状态信息进行更新,并在thread中添加一个新的检查点,如果同时传递了thread_idcheckpoint_id,则会根据指定的checkpoint_id对应的检查点的图状态数据进行更新,同样在thread中添加一个新的检查点。
  • values:图状态数据
  • as_node:数据是由哪个节点更新的,如果不传递默认则被设置为最后一个更新状态的节点。

update_state()方法的代码示例如下,config中只传入了thread_id,将会根据最新的检查点的图状态数据为基础进行更新,并插入新的检查点。

python
# 7.调用图传入线程id
config = {"configurable": {"thread_id": "1"}}
state = agent.invoke({"result": []}, config)

# 8.更新检查点
agent.update_state(config, {"result": ["你好我是c节点"]})

history_states = agent.get_state_history(config)
print("==============获取历史检查点===============")
for state in history_states:
    print(state)

执行结果如下,新增了一个检查点,并且图状态数据也是我们期望的更新后的数据。

python
执行a_node节点
执行b_node节点
==============获取历史检查点===============
StateSnapshot(values={'result': ['你好,我是a节点', '你好,我是b节点', '你好我是c节点']}, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f0de8a8-9943-6312-8003-3303b4bb330b'}}, metadata={'source': 'update', 'step': 3, 'parents': {}}, created_at='2025-12-21T16:31:44.352930+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f0de8a8-9941-64f4-8002-96227f544e8a'}}, tasks=(), interrupts=())
后面四个检查点省略...

四、总结

本文围绕 LangGraph 的持久化机制 展开,剖析了持久化在 LangGraph 中两个核心概念:线程(Thread)检查点(Checkpoint)

首先,我们介绍了什么是 LangGraph 的持久化机制,并且重点解释了 线程与检查点的关系。之后通过完整代码示例展示了

  • 如何启用检查点管理器和如何指定线程id
  • 如何获取最新检查点和历史检查点列表
  • 如何基于指定检查点进行 回放(Replay)
  • 如何 更新图状态(update_state)