19 SQL Agent
"帮我查一下上个月销售额最高的产品是哪个"
"哪个地区的客户退货率最高?"
这些是业务中最常见的数据查询需求。传统做法是:业务人员提需求 → 数据分析师写SQL → 出报告。一来一回可能要等半天。
如果Agent能直接理解自然语言,自己写SQL查数据库,当场给答案呢?
SQL Agent就是干这个的——让Agent自己读数据库结构、写SQL、执行查询、返回结果。
打个比方:SQL Agent就像一个新来的数据分析实习生。你把数据库交给他,告诉他"有什么表、什么结构",他就能根据你的问题写SQL查数据。不过他毕竟是新人,写的SQL偶尔会出错,所以你得有个机制让他检查自己的SQL,或者在执行前让你过目。
一、整体流程
SQL Agent的工作流程:
用户提问 → 列出数据库表 → 找到相关表 → 查看表结构 → 生成SQL → 检查SQL → 执行查询
↑ |
└──── 如果报错,修正后重试 ─────┘
→ 用自然语言回答和人类写SQL的思路一样:先看看有什么表,找到相关的表,看看字段结构,写SQL,检查一下,执行。
二、准备数据库工具
Agent需要四个工具来和数据库交互:
import sqlite3
from langchain.tools import tool
# 数据库连接
DB_PATH = "chinook.db"
conn = sqlite3.connect(DB_PATH)
@tool
def list_tables() -> str:
"""列出数据库中所有表名"""
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
)
tables = [row[0] for row in cursor.fetchall()]
return ", ".join(tables)
@tool
def get_schema(table_names: str) -> str:
"""获取指定表的结构和示例数据。输入是逗号分隔的表名。"""
tables = [t.strip() for t in table_names.split(",")]
result = []
for table in tables:
# 获取建表语句
cursor = conn.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name=?",
(table,),
)
create_sql = cursor.fetchone()
if create_sql:
result.append(create_sql[0])
# 获取3条示例数据
cursor = conn.execute(f"SELECT * FROM [{table}] LIMIT 3")
rows = cursor.fetchall()
if rows:
columns = [desc[0] for desc in cursor.description]
result.append(f"/* 示例数据 ({table}): */")
result.append("\t".join(columns))
for row in rows:
result.append("\t".join(str(v) for v in row))
result.append("")
return "\n".join(result)
@tool
def check_query(query: str) -> str:
"""在执行前检查SQL是否正确。先用这个工具检查,再用execute_query执行。"""
# SQLite的EXPLAIN可以检查语法而不实际执行
try:
cursor = conn.execute(f"EXPLAIN {query}")
return "SQL语法正确。"
except sqlite3.Error as e:
return f"SQL有误: {e}。请修改后重试。"
@tool
def execute_query(query: str) -> str:
"""执行SQL查询并返回结果。只用于SELECT查询,不要执行DELETE/UPDATE/DROP。"""
# 安全检查:只允许SELECT
query_upper = query.strip().upper()
if not query_upper.startswith("SELECT"):
return "错误:只允许执行SELECT查询。"
try:
cursor = conn.execute(query)
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
if not rows:
return "查询结果为空。"
# 格式化输出
result = [", ".join(columns)]
for row in rows[:20]: # 最多返回20行
result.append(", ".join(str(v) for v in row))
total = len(rows)
output = "\n".join(result)
if total > 20:
output += f"\n... 共 {total} 条记录,已显示前20条"
return output
except sqlite3.Error as e:
return f"执行出错: {e}。请检查SQL语句。"四个工具各司其职:
| 工具 | 作用 | 类比 |
|---|---|---|
list_tables | 列出所有表 | 看看数据库里有什么 |
get_schema | 查看表结构和示例数据 | 看看表长什么样 |
check_query | 检查SQL语法 | 写完SQL先检查一遍 |
execute_query | 执行SQL查询 | 跑SQL拿结果 |
三、创建Agent
工具准备好了,接下来创建Agent。关键是系统提示词——要告诉Agent查数据库的正确工作流程:
from langchain.agents import create_agent
system_prompt = """你是一个数据分析助手,帮用户查询数据库回答问题。
工作流程:
1. 先用list_tables查看有哪些表
2. 根据问题判断需要哪些表,用get_schema查看它们的结构
3. 写SQL查询,用check_query检查语法
4. 确认无误后,用execute_query执行查询
5. 根据查询结果用自然语言回答用户
注意:
- 优先使用check_query检查SQL,再执行
- 如果执行出错,根据错误信息修正SQL重试
- 如果结果为空,告知用户,不要编造数据
- 只执行SELECT查询,拒绝任何修改数据的请求"""
agent = create_agent(
model="deepseek-v4-flash",
tools=[list_tables, get_schema, check_query, execute_query],
system_prompt=system_prompt,
)四、使用示例
来看一个完整的查询过程:
result = agent.invoke({
"messages": [{"role": "user", "content": "哪种音乐风格的歌曲平均时长最长?"}]
})
print(result["messages"][-1].content)Agent的执行过程(简化):
1. 调用 list_tables → "Album, Artist, Customer, Genre, Invoice, Track, ..."
2. 调用 get_schema("Track, Genre") → 看到Track表有Milliseconds和GenreId,Genre表有Name
3. 写SQL: SELECT Genre.Name, AVG(Track.Milliseconds) ...
4. 调用 check_query → "SQL语法正确"
5. 调用 execute_query → "[('Sci Fi & Fantasy', 2911783), ...]"
6. 回答:"平均时长最长的音乐风格是 Sci Fi & Fantasy,平均约48.5分钟。"Agent自己完成了:理解问题 → 找表 → 看结构 → 写SQL → 检查 → 执行 → 回答。全程不需要你写一行SQL。
五、安全防护
让Agent自己写SQL,你肯定会有顾虑:万一它写了DELETE呢?万一它查了不该查的数据呢?
5.1 工具层防护
最基础的防护是在工具里做限制——上面的execute_query已经做了:
# 只允许SELECT
if not query_upper.startswith("SELECT"):
return "错误:只允许执行SELECT查询。"5.2 人在环路
对于生产环境,更稳妥的做法是在SQL执行前暂停,让人工审核:
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver
agent = create_agent(
model="deepseek-v4-flash",
tools=[list_tables, get_schema, check_query, execute_query],
system_prompt=system_prompt,
middleware=[
HumanInTheLoopMiddleware(
interrupt_on={
"list_tables": False, # 查看表名是安全的
"get_schema": False, # 查看结构是安全的
"check_query": False, # 检查SQL是安全的
"execute_query": True, # 执行SQL需要确认
},
),
],
checkpointer=InMemorySaver(),
)这样Agent写好SQL后会暂停,你看到SQL内容,觉得没问题再批准执行:
from langgraph.types import Command
config = {"configurable": {"thread_id": "data_analysis"}}
# 第一步:Agent会走到execute_query然后暂停
result = agent.invoke(
{"messages": [{"role": "user", "content": "查一下上月销售额最高的产品"}]},
config=config,
)
# 查看Agent生成的SQL
print(result.interrupts)
# 第二步:确认SQL没问题后批准执行
result = agent.invoke(
Command(resume={"decisions": [{"type": "approve"}]}),
config=config,
)
print(result["messages"][-1].content)你也可以拒绝并给出修改建议:
result = agent.invoke(
Command(resume={
"decisions": [{
"type": "reject",
"message": "这个查询没有加LIMIT,数据量太大了。请加上LIMIT 100。",
}]
}),
config=config,
)5.3 数据库权限
最根本的安全措施是限制数据库连接本身的权限:
# 用只读账号连接数据库
conn = sqlite3.connect("file:chinook.db?mode=ro", uri=True)在生产环境中,给Agent专用的只读数据库账号,即使Agent绕过了代码层面的防护,数据库层面也不允许修改操作。
六、错误修正
LLM写的SQL不可能100%正确。好消息是,Agent有一个天然的优势:它能看到错误信息并自我修正。
Agent写了SQL → check_query检查 → "列名不存在"
Agent看到错误 → 查看get_schema确认正确的列名 → 修正SQL → 重新检查 → 执行成功这个"写 → 检查 → 修正"的循环,就是ReAct模式的威力。错误信息本身就是有价值的上下文,Agent利用它来改进自己的输出。
七、连接不同数据库
上面的例子用的是SQLite,实际项目中你可能会用MySQL、PostgreSQL等。只需要把工具里的连接和查询逻辑换一下:
import pymysql
# MySQL连接
mysql_conn = pymysql.connect(
host="localhost",
user="readonly_user",
password="password",
database="my_database",
)
@tool
def execute_query(query: str) -> str:
"""执行SQL查询"""
if not query.strip().upper().startswith("SELECT"):
return "错误:只允许SELECT查询。"
try:
cursor = mysql_conn.cursor()
cursor.execute(query)
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
# ... 格式化输出
except Exception as e:
return f"执行出错: {e}"Agent的逻辑和用什么数据库无关——它只关心表结构和SQL语法。换数据库只需要改工具的底层实现。
八、总结
SQL Agent把自然语言查询和数据库连接起来:
- 四个工具:list_tables、get_schema、check_query、execute_query
- 自然循环:写SQL → 检查 → 执行 → 有错就修正
- 安全防护:工具层限制 + 人在环路审批 + 数据库只读权限
- 数据库无关:换数据库只需要改工具底层实现
有了SQL Agent,业务人员可以直接问数据问题,不需要等数据分析师写SQL。Agent不会取代数据分析师,但能把简单的查询工作自动化,让分析师专注于更复杂的分析任务。