04:持久执行
在上一篇文章中,我们详细介绍了支持跨线程存储的持久化工具Memory Store的基础用法,以及如何在LangGraph使用它。
本文将主要介绍什么是持久执行(Durable Execution),以及在使用持久执行时有哪些注意点。
一、什么是持久执行
**持久执行(Durable execution)**是一种在执行流程中会在关键点保存执行进度的技术,通过持久执行可以实现在执行流程中暂停或者恢复,而无需重复之前已经执行的步骤。
在执行中出现错误,可以在中断处恢复,持久执行也同样非常适合人工干预的场景,比如在执行下一流程之前对操作进行人工审核,人工审核通过后才能继续执行。
二、LangGraph的持久执行模式
在前文中,我们介绍的LangGraph内置的线程和检查点持久化机制为LangGraph工作流提供了持久执行功能。
通过线程和检查点可以确保每个步骤的执行状态和数据都被持久化存储,如果执行某个步骤时程序被中断,可以从最后一次保存的检查点恢复执行。
2.1 如何使用持久执行
在LangGraph中使用持久执行只需要在图编译时,指定检查点管理器(checkpointer),并且在执行图时需要指定线程thread_id,后续可以通过thread_id来获取工作流的历史执行信息。
checkpointer = InMemorySaver()
agent = graph.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
state = agent.invoke({"result": []}, config)2.2 如何保证一致性
在LangGraph中使用持久执行时,某些步骤可能会重新执行,而某些步骤重复执行会出现错误,比如在某个节点中生成UUID、调用API、新增数据库记录等操作,这些不可重复执行的步骤,我们需要将这些操作包装到task中,这样在工作流重新执行这些节点时,不会重新执行task中的逻辑,而是从之前保存的持久化数据中获取执行结果。
示例如下,将写入日志这一步单独抽离出来,并且用@task包装,这样即使图恢复执行,也不会重新执行@task中的代码。
@task
def write_log_task():
# todo: 写入日志
print("=========写入日志========")
return
# 3.定义存储日志节点
def save_log_node(state: State):
"""存储日志节点"""
uuid = write_log_task()
return2.3 LangGraph持久化模式
针对检查点的持久化时机不同,LangGraph支持设置三种持久化模式,可以在调用invoke/stream时指定:
exit: 只有在图退出时,才会将退出前最后一个超级步骤生成的检查点进行持久化,这种模式保证长时间运行图的最佳性能,但是这种模式无法进行中断恢复等操作。
async:执行每个超级步骤之后,异步对检查点进行持久化,该方式性能很高,但在异步持久化过程中,若发生进程崩溃等情况,可能因检查点尚未完成写入而导致数据丢失。
sync:执行每个超级步骤之后,同步对检查点进行持久化,该方式牺牲了一定性能来换取数据一致性。
state = agent.invoke({"result": []}, config, durability="sync")三、总结
本文重点介绍了 LangGraph 的持久执行(Durable Execution) 机制。
首先,介绍了持久执行的概念,及其在 异常中断恢复和人在环路(Human-in-the-loop) 等场景中的实际应用。随后介绍了在 LangGraph 中,持久执行功能是基于线程(Thread)与检查点(Checkpoint)机制实现的。
接着,对如何保证持久执行一致性进行分析,关键用法是将 非确定性操作和具有副作用的逻辑封装到 task 中,同时,也对 LangGraph 提供的三种持久化模式进行了对比分析,可以根据实际的业务场景对性能和一致性进行权衡,选择合适的持久化模式。
通过本文,相信你已经对持久执行的概念、LangGraph 中持久执行的适用场景及其提供的持久化模式有了相对完整的理解。