Skip to content

19 SQL Agent

"帮我查一下上个月销售额最高的产品是哪个"

"哪个地区的客户退货率最高?"

这些是业务中最常见的数据查询需求。传统做法是:业务人员提需求 → 数据分析师写SQL → 出报告。一来一回可能要等半天。

如果Agent能直接理解自然语言,自己写SQL查数据库,当场给答案呢?

SQL Agent就是干这个的——让Agent自己读数据库结构、写SQL、执行查询、返回结果

打个比方:SQL Agent就像一个新来的数据分析实习生。你把数据库交给他,告诉他"有什么表、什么结构",他就能根据你的问题写SQL查数据。不过他毕竟是新人,写的SQL偶尔会出错,所以你得有个机制让他检查自己的SQL,或者在执行前让你过目。

一、整体流程

SQL Agent的工作流程:

用户提问 → 列出数据库表 → 找到相关表 → 查看表结构 → 生成SQL → 检查SQL → 执行查询
                                        ↑                              |
                                        └──── 如果报错,修正后重试 ─────┘
                                                                              → 用自然语言回答

和人类写SQL的思路一样:先看看有什么表,找到相关的表,看看字段结构,写SQL,检查一下,执行。

二、准备数据库工具

Agent需要四个工具来和数据库交互:

python
import sqlite3
from langchain.tools import tool


# 数据库连接
DB_PATH = "chinook.db"
conn = sqlite3.connect(DB_PATH)


@tool
def list_tables() -> str:
    """列出数据库中所有表名"""
    cursor = conn.execute(
        "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
    )
    tables = [row[0] for row in cursor.fetchall()]
    return ", ".join(tables)


@tool
def get_schema(table_names: str) -> str:
    """获取指定表的结构和示例数据。输入是逗号分隔的表名。"""
    tables = [t.strip() for t in table_names.split(",")]
    result = []
    for table in tables:
        # 获取建表语句
        cursor = conn.execute(
            "SELECT sql FROM sqlite_master WHERE type='table' AND name=?",
            (table,),
        )
        create_sql = cursor.fetchone()
        if create_sql:
            result.append(create_sql[0])

        # 获取3条示例数据
        cursor = conn.execute(f"SELECT * FROM [{table}] LIMIT 3")
        rows = cursor.fetchall()
        if rows:
            columns = [desc[0] for desc in cursor.description]
            result.append(f"/* 示例数据 ({table}): */")
            result.append("\t".join(columns))
            for row in rows:
                result.append("\t".join(str(v) for v in row))

        result.append("")

    return "\n".join(result)


@tool
def check_query(query: str) -> str:
    """在执行前检查SQL是否正确。先用这个工具检查,再用execute_query执行。"""
    # SQLite的EXPLAIN可以检查语法而不实际执行
    try:
        cursor = conn.execute(f"EXPLAIN {query}")
        return "SQL语法正确。"
    except sqlite3.Error as e:
        return f"SQL有误: {e}。请修改后重试。"


@tool
def execute_query(query: str) -> str:
    """执行SQL查询并返回结果。只用于SELECT查询,不要执行DELETE/UPDATE/DROP。"""
    # 安全检查:只允许SELECT
    query_upper = query.strip().upper()
    if not query_upper.startswith("SELECT"):
        return "错误:只允许执行SELECT查询。"

    try:
        cursor = conn.execute(query)
        columns = [desc[0] for desc in cursor.description]
        rows = cursor.fetchall()

        if not rows:
            return "查询结果为空。"

        # 格式化输出
        result = [", ".join(columns)]
        for row in rows[:20]:  # 最多返回20行
            result.append(", ".join(str(v) for v in row))

        total = len(rows)
        output = "\n".join(result)
        if total > 20:
            output += f"\n... 共 {total} 条记录,已显示前20条"

        return output
    except sqlite3.Error as e:
        return f"执行出错: {e}。请检查SQL语句。"

四个工具各司其职:

工具作用类比
list_tables列出所有表看看数据库里有什么
get_schema查看表结构和示例数据看看表长什么样
check_query检查SQL语法写完SQL先检查一遍
execute_query执行SQL查询跑SQL拿结果

三、创建Agent

工具准备好了,接下来创建Agent。关键是系统提示词——要告诉Agent查数据库的正确工作流程:

python
from langchain.agents import create_agent


system_prompt = """你是一个数据分析助手,帮用户查询数据库回答问题。

工作流程:
1. 先用list_tables查看有哪些表
2. 根据问题判断需要哪些表,用get_schema查看它们的结构
3. 写SQL查询,用check_query检查语法
4. 确认无误后,用execute_query执行查询
5. 根据查询结果用自然语言回答用户

注意:
- 优先使用check_query检查SQL,再执行
- 如果执行出错,根据错误信息修正SQL重试
- 如果结果为空,告知用户,不要编造数据
- 只执行SELECT查询,拒绝任何修改数据的请求"""


agent = create_agent(
    model="deepseek-v4-flash",
    tools=[list_tables, get_schema, check_query, execute_query],
    system_prompt=system_prompt,
)

四、使用示例

来看一个完整的查询过程:

python
result = agent.invoke({
    "messages": [{"role": "user", "content": "哪种音乐风格的歌曲平均时长最长?"}]
})
print(result["messages"][-1].content)

Agent的执行过程(简化):

1. 调用 list_tables → "Album, Artist, Customer, Genre, Invoice, Track, ..."
2. 调用 get_schema("Track, Genre") → 看到Track表有Milliseconds和GenreId,Genre表有Name
3. 写SQL: SELECT Genre.Name, AVG(Track.Milliseconds) ...
4. 调用 check_query → "SQL语法正确"
5. 调用 execute_query → "[('Sci Fi & Fantasy', 2911783), ...]"
6. 回答:"平均时长最长的音乐风格是 Sci Fi & Fantasy,平均约48.5分钟。"

Agent自己完成了:理解问题 → 找表 → 看结构 → 写SQL → 检查 → 执行 → 回答。全程不需要你写一行SQL。

五、安全防护

让Agent自己写SQL,你肯定会有顾虑:万一它写了DELETE呢?万一它查了不该查的数据呢?

5.1 工具层防护

最基础的防护是在工具里做限制——上面的execute_query已经做了:

python
# 只允许SELECT
if not query_upper.startswith("SELECT"):
    return "错误:只允许执行SELECT查询。"

5.2 人在环路

对于生产环境,更稳妥的做法是在SQL执行前暂停,让人工审核:

python
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver


agent = create_agent(
    model="deepseek-v4-flash",
    tools=[list_tables, get_schema, check_query, execute_query],
    system_prompt=system_prompt,
    middleware=[
        HumanInTheLoopMiddleware(
            interrupt_on={
                "list_tables": False,    # 查看表名是安全的
                "get_schema": False,     # 查看结构是安全的
                "check_query": False,    # 检查SQL是安全的
                "execute_query": True,   # 执行SQL需要确认
            },
        ),
    ],
    checkpointer=InMemorySaver(),
)

这样Agent写好SQL后会暂停,你看到SQL内容,觉得没问题再批准执行:

python
from langgraph.types import Command

config = {"configurable": {"thread_id": "data_analysis"}}

# 第一步:Agent会走到execute_query然后暂停
result = agent.invoke(
    {"messages": [{"role": "user", "content": "查一下上月销售额最高的产品"}]},
    config=config,
)

# 查看Agent生成的SQL
print(result.interrupts)

# 第二步:确认SQL没问题后批准执行
result = agent.invoke(
    Command(resume={"decisions": [{"type": "approve"}]}),
    config=config,
)
print(result["messages"][-1].content)

你也可以拒绝并给出修改建议:

python
result = agent.invoke(
    Command(resume={
        "decisions": [{
            "type": "reject",
            "message": "这个查询没有加LIMIT,数据量太大了。请加上LIMIT 100。",
        }]
    }),
    config=config,
)

5.3 数据库权限

最根本的安全措施是限制数据库连接本身的权限

python
# 用只读账号连接数据库
conn = sqlite3.connect("file:chinook.db?mode=ro", uri=True)

在生产环境中,给Agent专用的只读数据库账号,即使Agent绕过了代码层面的防护,数据库层面也不允许修改操作。

六、错误修正

LLM写的SQL不可能100%正确。好消息是,Agent有一个天然的优势:它能看到错误信息并自我修正

Agent写了SQL → check_query检查 → "列名不存在"
Agent看到错误 → 查看get_schema确认正确的列名 → 修正SQL → 重新检查 → 执行成功

这个"写 → 检查 → 修正"的循环,就是ReAct模式的威力。错误信息本身就是有价值的上下文,Agent利用它来改进自己的输出。

七、连接不同数据库

上面的例子用的是SQLite,实际项目中你可能会用MySQL、PostgreSQL等。只需要把工具里的连接和查询逻辑换一下:

python
import pymysql


# MySQL连接
mysql_conn = pymysql.connect(
    host="localhost",
    user="readonly_user",
    password="password",
    database="my_database",
)


@tool
def execute_query(query: str) -> str:
    """执行SQL查询"""
    if not query.strip().upper().startswith("SELECT"):
        return "错误:只允许SELECT查询。"

    try:
        cursor = mysql_conn.cursor()
        cursor.execute(query)
        columns = [desc[0] for desc in cursor.description]
        rows = cursor.fetchall()
        # ... 格式化输出
    except Exception as e:
        return f"执行出错: {e}"

Agent的逻辑和用什么数据库无关——它只关心表结构和SQL语法。换数据库只需要改工具的底层实现。

八、总结

SQL Agent把自然语言查询和数据库连接起来:

  • 四个工具:list_tables、get_schema、check_query、execute_query
  • 自然循环:写SQL → 检查 → 执行 → 有错就修正
  • 安全防护:工具层限制 + 人在环路审批 + 数据库只读权限
  • 数据库无关:换数据库只需要改工具底层实现

有了SQL Agent,业务人员可以直接问数据问题,不需要等数据分析师写SQL。Agent不会取代数据分析师,但能把简单的查询工作自动化,让分析师专注于更复杂的分析任务。