AI-генерация SQL из текстовых запросов (Text-to-SQL)
Text-to-SQL позволяет бизнес-аналитикам и менеджерам напрямую задавать вопросы к базе данных без знания SQL. Ключевой технический вызов — не генерация SQL синтаксиса (LLM справляется легко), а передача точного контекста схемы: какие таблицы существуют, как они связаны, какие значения допустимы в enum-полях.
Архитектура Text-to-SQL системы
from anthropic import Anthropic
import psycopg2
import json
from typing import Optional
from dataclasses import dataclass
client = Anthropic()
@dataclass
class QueryResult:
sql: str
explanation: str
rows: list[dict]
error: Optional[str] = None
class TextToSQLEngine:
def __init__(self, connection_string: str):
self.conn = psycopg2.connect(connection_string)
self.schema_cache: dict = {}
def get_schema(self, tables: list[str] = None) -> str:
"""Получает DDL схемы из PostgreSQL"""
query = """
SELECT
t.table_name,
c.column_name,
c.data_type,
c.is_nullable,
c.column_default,
tc.constraint_type,
kcu.column_name as fk_column,
ccu.table_name as fk_table
FROM information_schema.tables t
JOIN information_schema.columns c ON t.table_name = c.table_name
LEFT JOIN information_schema.key_column_usage kcu
ON c.table_name = kcu.table_name AND c.column_name = kcu.column_name
LEFT JOIN information_schema.table_constraints tc
ON kcu.constraint_name = tc.constraint_name
LEFT JOIN information_schema.constraint_column_usage ccu
ON tc.constraint_name = ccu.constraint_name
WHERE t.table_schema = 'public'
"""
if tables:
placeholders = ",".join(["%s"] * len(tables))
query += f" AND t.table_name IN ({placeholders})"
with self.conn.cursor() as cur:
cur.execute(query, tables or [])
rows = cur.fetchall()
# Форматируем как DDL
tables_dict = {}
for row in rows:
table_name = row[0]
if table_name not in tables_dict:
tables_dict[table_name] = {"columns": [], "foreign_keys": []}
col_def = f" {row[1]} {row[2].upper()}"
if row[3] == "NO":
col_def += " NOT NULL"
if row[4]:
col_def += f" DEFAULT {row[4]}"
if row[5] == "PRIMARY KEY":
col_def += " PRIMARY KEY"
tables_dict[table_name]["columns"].append(col_def)
if row[5] == "FOREIGN KEY" and row[7]:
tables_dict[table_name]["foreign_keys"].append(
f" FOREIGN KEY ({row[6]}) REFERENCES {row[7]}"
)
ddl_parts = []
for table, info in tables_dict.items():
ddl = f"CREATE TABLE {table} (\n"
ddl += ",\n".join(info["columns"])
if info["foreign_keys"]:
ddl += ",\n" + ",\n".join(info["foreign_keys"])
ddl += "\n);"
ddl_parts.append(ddl)
return "\n\n".join(ddl_parts)
def get_sample_values(self, important_columns: dict[str, list[str]]) -> str:
"""Получает примеры значений для enum/category полей"""
samples = []
with self.conn.cursor() as cur:
for table_col, _ in important_columns.items():
table, col = table_col.split(".")
try:
cur.execute(
f"SELECT DISTINCT {col} FROM {table} LIMIT 10"
)
values = [str(row[0]) for row in cur.fetchall()]
samples.append(f"-- {table}.{col}: {', '.join(values)}")
except Exception:
pass
return "\n".join(samples)
def generate_sql(self, question: str, context_tables: list[str] = None) -> QueryResult:
"""Генерирует SQL из текстового вопроса"""
schema = self.get_schema(context_tables)
# Дополнительный контекст: примеры значений для строковых полей
sample_values = self._get_relevant_samples(question)
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=2048,
system="""Ты — эксперт по SQL и PostgreSQL.
Генерируй точные, оптимизированные SQL запросы на основе схемы БД.
Правила:
- Используй только существующие таблицы и колонки из схемы
- Предпочитай JOIN вместо подзапросов где возможно
- Добавляй LIMIT 1000 для запросов без агрегации
- Для дат используй PostgreSQL функции: DATE_TRUNC, NOW(), EXTRACT
- Всегда добавляй ORDER BY для предсказуемости результатов
- Если вопрос неоднозначен — выбирай наиболее вероятную интерпретацию
Верни JSON:
{
"sql": "<SQL запрос>",
"explanation": "<объяснение что делает запрос, 1-2 предложения>",
"assumptions": ["<допущение 1 если были>"]
}""",
messages=[{
"role": "user",
"content": f"""Вопрос: {question}
Схема базы данных:
```sql
{schema}
{f"Примеры значений:{chr(10)}{sample_values}" if sample_values else ""}""" }] )
text = response.content[0].text
try:
# Парсим JSON ответ
start = text.find("{")
end = text.rfind("}") + 1
data = json.loads(text[start:end])
sql = data["sql"]
explanation = data.get("explanation", "")
# Выполняем запрос
rows = self._execute_safe(sql)
return QueryResult(sql=sql, explanation=explanation, rows=rows)
except Exception as e:
return QueryResult(sql="", explanation="", rows=[], error=str(e))
def _execute_safe(self, sql: str) -> list[dict]:
"""Выполняет только SELECT запросы"""
sql_upper = sql.strip().upper()
if not sql_upper.startswith("SELECT") and not sql_upper.startswith("WITH"):
raise ValueError("Only SELECT queries are allowed")
with self.conn.cursor() as cur:
cur.execute(sql)
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
return [dict(zip(columns, row)) for row in rows]
def _get_relevant_samples(self, question: str) -> str:
"""Простая эвристика для определения релевантных enum полей"""
# В реальной системе — LLM определяет нужные поля
return ""
### Самокорректирующийся генератор
```python
class SelfCorrectingTextToSQL:
"""Итеративно исправляет SQL при ошибках выполнения"""
def __init__(self, engine: TextToSQLEngine):
self.engine = engine
def query(self, question: str, max_attempts: int = 3) -> QueryResult:
"""Генерирует SQL с автоматическим исправлением ошибок"""
result = self.engine.generate_sql(question)
if not result.error:
return result
# Итеративно исправляем
messages = [{
"role": "user",
"content": f"Вопрос: {question}\n\nСгенерировал запрос:\n```sql\n{result.sql}\n```\n\nОшибка: {result.error}\n\nИсправь запрос."
}]
for attempt in range(max_attempts - 1):
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
system="Ты — SQL эксперт. Исправляй SQL запросы по ошибкам выполнения. Верни только исправленный SQL.",
messages=messages,
)
fixed_sql = response.content[0].text.strip()
if "```sql" in fixed_sql:
fixed_sql = fixed_sql.split("```sql")[1].split("```")[0].strip()
try:
rows = self.engine._execute_safe(fixed_sql)
return QueryResult(sql=fixed_sql, explanation="Auto-corrected", rows=rows)
except Exception as e:
messages.append({"role": "assistant", "content": response.content[0].text})
messages.append({"role": "user", "content": f"Всё ещё ошибка: {e}"})
return QueryResult(sql=result.sql, rows=[], error="Max attempts reached", explanation="")
NL интерфейс с историей
class ConversationalDataAnalyst:
"""Диалоговый интерфейс для работы с данными"""
def __init__(self, connection_string: str):
self.engine = TextToSQLEngine(connection_string)
self.history: list[dict] = []
self.last_sql: str = ""
def ask(self, question: str) -> str:
"""Отвечает на вопрос с учётом истории диалога"""
# Добавляем контекст предыдущего запроса
context = ""
if self.last_sql:
context = f"\nПредыдущий запрос:\n```sql\n{self.last_sql}\n```"
# Поддержка уточняющих вопросов
if any(word in question.lower() for word in ["и ещё", "а теперь", "добавь", "также"]):
enhanced = f"На основе предыдущего запроса, {question}"
else:
enhanced = question
result = self.engine.generate_sql(enhanced + context)
if result.error:
return f"Ошибка выполнения запроса: {result.error}"
self.last_sql = result.sql
self.history.append({"question": question, "sql": result.sql})
# Форматируем результат
if not result.rows:
return "Запрос выполнен успешно, но данных не найдено."
response_text = f"{result.explanation}\n\n"
response_text += f"SQL: `{result.sql}`\n\n"
response_text += f"Результаты ({len(result.rows)} строк):\n"
# Таблица результатов
if result.rows:
headers = list(result.rows[0].keys())
response_text += " | ".join(headers) + "\n"
response_text += " | ".join(["---"] * len(headers)) + "\n"
for row in result.rows[:10]:
response_text += " | ".join(str(v) for v in row.values()) + "\n"
if len(result.rows) > 10:
response_text += f"... и ещё {len(result.rows) - 10} строк"
return response_text
Практический кейс: аналитика e-commerce
Задача: продакт-менеджеры формировали задачи аналитикам (2–5 дней ожидания), т.к. не знали SQL. База данных: PostgreSQL, 23 таблицы, ~50M записей.
Внедрение:
- Text-to-SQL интерфейс в Slack:
/data <вопрос> - White-list разрешённых таблиц для продактов (без личных данных)
- Кэширование часто задаваемых вопросов
Метрики:
- ad-hoc запросы от продактов без участия аналитиков: 0 → 23 в неделю
- Время получения ответа на простой вопрос: 2 дня → 30 секунд
- Точность генерируемого SQL: 89% (не требуют правки)
- 11% запросов требовали итеративного уточнения через диалог
Типичные вопросы:
- "Сколько заказов отменено за последние 7 дней по каждой категории?"
- "Топ-10 клиентов по выручке за текущий квартал"
- "Средний чек по городам в сравнении с прошлым годом"
Сроки
- Базовый Text-to-SQL с одной БД: 3–5 дней
- Self-correcting + история диалога: 1 неделя
- Slack/Telegram интеграция + white-list таблиц: 1 неделя
- Production с кэшированием и мониторингом: 2–3 недели







