Robust SQL Query Generator with Substrate
Building a Natural Language to SQL Query Generator
Purpose: To build a system that generates syntactically and contextually correct SQL queries from natural language inputs.
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
β Natural β β LLM + Pydantic β β Valid SQL β
β Language βββββ>β Processing βββββ>β Query β
β Question β β β β β
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
β β β
β β β
βΌ βΌ βΌ
"Show me all senior" {"sql": "SELECT", "SELECT employee_id,
employees in IT" "columns": [...], first_name FROM..."
"conditions": [...]}
This is my experiment to play around with Substrate that reduces the complexity of multi-model systems by supporting a graph SDK.
Prerequisites
Before diving in, make sure you understand:
- Python basics: Classes, functions, type hints
- SQL fundamentals: SELECT, WHERE, ORDER BY clauses
- JSON structure: How JSON objects work
- API basics: Making HTTP requests
Required tools:
- Python 3.8+
- pip package manager
- OpenAI API key (or Substrate key)
Why Do I Love Substrate?
I think Substrate has several compelling advantages:
- There should be a platform that takes open source models, optimizes them relentlessly, provides an API, and offers the most competitive pricing with great uptime
- Long-term benefits from economies of scale with GPUs and optimization processes
- High demand exists currently, with many users requiring high API volumes
- Potential to train specialized, less powerful models optimized for cost/latency to counter foundation model companies focused primarily on capability
Counterpoints to Consider
While promising, there are some concerns:
- Sustainability question: Will large model builders become quickly commoditized? Many startups may compete for the same developer dollars
- Community optimization might outpace proprietary optimizations, similar to creating custom optimized PHP versions in 2001 - technical possibility but challenging business case
Implementation Details
Writing SQL with LLMs presents multiple challenges with hallucinations, not necessarily due to SQL generation itself, but due to contextual misuse.
With larger context windows, the problem becomes more pronounced as dumping all rows and context to prompts consumes excessive tokens for even simple queries.
Common LLM SQL Generation Problems:
β Direct Approach:
ββββββββββββββββββββββββββββββββββββββββββββββ
β "Generate SQL for: Show high earners" β
β β β
β LLM: "SELECT * FROM users WHERE income > ?"β β Wrong table!
β "SELECT * FROM emp WHERE pay > 1000" β β Wrong column!
ββββββββββββββββββββββββββββββββββββββββββββββ
β
Our Structured Approach:
ββββββββββββββββββββββββββββββββββββββββββββββ
β 1. Define exact schema with Pydantic β
β 2. Constrain LLM to valid columns/values β
β 3. Generate JSON structure first β
β 4. Convert to SQL with validation β
ββββββββββββββββββββββββββββββββββββββββββββββ
The idea is to find a combination of Syntax
and Context
thatβs both robust and efficient through:
- Mapping of the table being used
- Providing NLP-style SQL objects to combine for syntax
Setting Up the Environment
First, letβs set up our development environment by installing the necessary Python packages. Weβll use Pydantic for data validation and schema definition.
# Create a new project directory
mkdir sql-generator
cd sql-generator
# Create virtual environment (recommended)
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install required packages
pip install pydantic openai
Now letβs import our dependencies:
from pydantic import BaseModel, Field
from typing import Optional, Union, List
from enum import Enum
import json
import openai # We'll use this later
Understanding Our Database Schema
Before we start coding, letβs visualize the employee database weβll be working with:
Employee Database Schema:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β EMPLOYEE TABLE β
βββββββββββββββββββ¬ββββββββββββββββ¬βββββββββββββββββββββββββββ€
β Column Name β Data Type β Description β
βββββββββββββββββββΌββββββββββββββββΌβββββββββββββββββββββββββββ€
β employee_id β INTEGER (PK) β Unique employee ID β
β first_name β VARCHAR(50) β Employee's first name β
β last_name β VARCHAR(50) β Employee's last name β
β dept_id β ENUM β Department (IT, SALES, β
β β β ACCOUNTING, CEO) β
β manager_id β INTEGER (FK) β References employee_id β
β salary β INTEGER β Annual salary in USD β
β expertise β ENUM β Level (JUNIOR, β
β β β SEMISENIOR, SENIOR) β
βββββββββββββββββββ΄ββββββββββββββββ΄βββββββββββββββββββββββββββ
Department Hierarchy:
βββββββββββββββ
β CEO β
ββββββββ¬βββββββ
β
ββββββββ΄βββββββ¬ββββββββββββββββ¬βββββββββββββββ
β IT β SALES β ACCOUNTING β
βββββββββββββββ΄ββββββββββββββββ΄βββββββββββββββ
Defining Column Types and Enumerations
Letβs define enumerations for our database columns and SQL operations to ensure type safety:
class Departments(str, Enum):
IT = "IT"
SALES = "SALES"
ACCOUNTING = "ACCOUNTING"
CEO = "CEO"
class EmpLevel(str, Enum):
JUNIOR = "JUNIOR"
SEMISENIOR = "SEMISENIOR"
SENIOR = "SENIOR"
class column_names(str, Enum):
EMPLOYEE_ID = "employee_id"
FIRST_NAME = "first_name"
LAST_NAME = "last_name"
DEPT_ID = "dept_id"
MANAGER_ID = "manager_id"
SALARY = "salary"
EXPERTISE = "expertise"
class TableColumns(BaseModel):
employee_id: Optional[int] = Field(None, title="Employee ID", description="The ID of the employee")
first_name: Optional[str] = Field(None, title="First Name", description="The first name of the employee")
last_name: Optional[str] = Field(None, title="Last Name", description="The last name of the employee")
dept_id: Optional[Departments] = Field(None, title="Department ID", description="The department ID of the employee")
manager_id: Optional[int] = Field(None, title="Manager ID", description="The ID of the manager")
salary: Optional[int] = Field(None, title="Salary", description="The salary of the employee")
expertise: Optional[EmpLevel] = Field(None, title="Expertise Level", description="The expertise level of the employee")
π‘ Why Pydantic?
- Type Safety: Ensures data matches expected types
- Validation: Automatically validates inputs
- Documentation: Self-documenting with descriptions
- JSON Schema: Auto-generates schemas for LLMs
Defining SQL Syntax Models
Next, weβll define models for SQL operations, comparisons, logic operators, and ordering:
class sql_type(str, Enum):
SELECT = "SELECT"
INSERT = "INSERT"
UPDATE = "UPDATE"
DELETE = "DELETE"
class sql_compare(str, Enum):
EQUAL = "="
NOT_EQUAL = "!="
GREATER = ">"
LESS = "<"
GREATER_EQUAL = ">="
LESS_EQUAL = "<="
class sql_logic_operator(str, Enum):
AND = "AND"
OR = "OR"
class sql_order(str, Enum):
ASC = "ASC"
DESC = "DESC"
class sql_comparison(BaseModel):
column: column_names = Field(..., title="Table Column", description="Column in the Table")
compare: sql_compare = Field(..., title="Comparison Operator", description="Comparison Operator")
value: Union[str, Departments, EmpLevel] = Field(..., title="Value", description="Value to Compare")
class sql_logic_condition(BaseModel):
logic: sql_logic_operator = Field(..., title="Logic Operator", description="Logic Operator")
comparison: sql_comparison = Field(..., title="Comparison", description="Comparison")
class SQLQuery(BaseModel):
sql: sql_type = Field(..., title="SQL Type", description="SQL Type")
columns: list[column_names] = Field(..., title="Columns", description="Columns to Select")
table: str = Field(..., title="Table", description="Table Name")
conditions: List[sql_logic_condition] = Field(..., title="Conditions", description="List of Conditions with Logic")
order: Optional[sql_order] = Field(None, title="Order", description="Order")
limit: Optional[int] = Field(None, title="Limit", description="Limit")
Generating SQL Query Structure
Now weβll create a function to generate the SQL query structure using OpenAIβs GPT-3.5 model.
Step-by-Step Process
Query Generation Flow:
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β 1. Natural β β 2. LLM + Schema β β 3. JSON β
β Language ββββββΆβ Processing ββββββΆβ Structure β
β Input β β β β β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β
βββββββββββββββββββ β
β 5. SQL Query β β
β Output βββββββββββββββββ
βββββββββββββββββββ β 4. Validation β
β & Formatting β
pip install openai
import openai
import json
openai.api_key = 'your-api-key-here'
def generate_sql_json(question: str) -> dict:
prompt = f"""
Generate a JSON structure for an SQL query based on the following question:
{question}
Use the following JSON schema:
{json.dumps(SQLQuery.model_json_schema(), indent=2)}
Respond only with the JSON structure, nothing else.
"""
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a helpful assistant that generates SQL query structures in JSON format."},
{"role": "user", "content": prompt}
]
)
return json.loads(response.choices[0].message['content'])
# Example usage
question = "Can you provide me with the amount of employee id and salary in the Account department that has a salary greater than 50000 in descending order?"
json_response = generate_sql_json(question)
# Parse and validate the response
query_formatted = SQLQuery(**json_response)
# Let's see what the JSON looks like
print("Generated JSON Structure:")
print(json.dumps(json_response, indent=2))
Expected Output
{
"sql": "SELECT",
"columns": ["employee_id", "salary"],
"table": "employee",
"conditions": [
{
"logic": "AND",
"comparison": {
"column": "dept_id",
"compare": "=",
"value": "ACCOUNTING"
}
},
{
"logic": "AND",
"comparison": {
"column": "salary",
"compare": ">",
"value": "50000"
}
}
],
"order": "DESC",
"limit": null
}
Formatting the SQL Query
Finally, letβs create a function to format the SQLQuery object into a proper SQL string.
Visual Flow of SQL Generation
JSON Structure β SQL Query Builder β Final SQL
{
"sql": "SELECT", βββββββββββββββββββββββββββ
"columns": [...] βββββΆ β SELECT employee_id, β
"table": "employee" β salary β
"conditions": [...] β FROM employee β
"order": "DESC" β WHERE dept_id = 'ACCOUNTING'β
} β AND salary > '50000' β
β ORDER BY ... DESC β
βββββββββββββββββββββββββββ
def format_sql_query(query: SQLQuery) -> str:
# Generate the initial Base Query with no comparisons
generated_query = f"{query.sql} {', '.join([col.value for col in query.columns])} FROM {query.table}"
# Check for additional conditions
if query.conditions:
# Replace first logical operator with WHERE
generated_query += " WHERE "
# For each condition, append it the query in the correct format
for i, condition in enumerate(query.conditions):
if i > 0:
generated_query += f" {condition.logic} "
generated_query += f"{condition.comparison.column} {condition.comparison.compare} '{condition.comparison.value}'"
# if there is an ordering rule, then format and append
if query.order:
generated_query += f" ORDER BY {', '.join([col.value for col in query.columns])} {query.order}"
# if there is a limit, then format and append
if query.limit:
generated_query += f" LIMIT {query.limit}"
return generated_query
# Generate the final SQL query
final_query = format_sql_query(query_formatted)
print("\nGenerated SQL Query:")
print(final_query)
Full Example with Multiple Queries
Letβs test our system with various natural language inputs:
# Test cases with expected outputs
test_queries = [
{
"question": "Show me all senior employees in IT department",
"expected_sql": "SELECT employee_id, first_name, last_name, dept_id, manager_id, salary, expertise FROM employee WHERE dept_id = 'IT' AND expertise = 'SENIOR'"
},
{
"question": "List top 5 highest paid employees",
"expected_sql": "SELECT employee_id, first_name, last_name, dept_id, manager_id, salary, expertise FROM employee ORDER BY salary DESC LIMIT 5"
},
{
"question": "Find junior employees with salary above 40000",
"expected_sql": "SELECT employee_id, first_name, last_name, dept_id, manager_id, salary, expertise FROM employee WHERE expertise = 'JUNIOR' AND salary > '40000'"
}
]
# Process each query
for test in test_queries:
print(f"\n{'='*60}")
print(f"Question: {test['question']}")
print(f"{'='*60}")
try:
# Generate JSON
json_response = generate_sql_json(test['question'])
print("\nGenerated JSON:")
print(json.dumps(json_response, indent=2))
# Parse and validate
query_obj = SQLQuery(**json_response)
# Generate SQL
sql = format_sql_query(query_obj)
print("\nGenerated SQL:")
print(sql)
except Exception as e:
print(f"Error: {e}")
This system allows us to generate SQL queries from natural language inputs in a structured and type-safe manner. By using Pydantic models, we ensure that our generated queries adhere to the correct format and data types.
Troubleshooting Guide
Here are common issues and their solutions:
Issue 1: Invalid JSON from LLM
# Problem: LLM returns malformed JSON
# Solution: Add retry logic with validation
def generate_sql_json_with_retry(question: str, max_retries: 3) -> dict:
for attempt in range(max_retries):
try:
response = generate_sql_json(question)
# Validate against schema
SQLQuery(**response) # This will raise if invalid
return response
except (json.JSONDecodeError, ValidationError) as e:
if attempt == max_retries - 1:
raise Exception(f"Failed after {max_retries} attempts: {e}")
print(f"Attempt {attempt + 1} failed, retrying...")
Issue 2: Incorrect Column References
Problem Diagnosis Flow:
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β User Query ββββββΆβ Check if column ββββββΆβ Fuzzy match to β
β mentions wrong β β exists in enum β β closest column β
β column name β β β β β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
# Solution: Add fuzzy matching for column names
from difflib import get_close_matches
def suggest_column(user_column: str, threshold: float = 0.6) -> str:
valid_columns = [col.value for col in column_names]
matches = get_close_matches(user_column, valid_columns, n=1, cutoff=threshold)
return matches[0] if matches else None
# Example usage
user_said = "employee_name" # Wrong column name
suggested = suggest_column(user_said)
print(f"Did you mean '{suggested}'?") # Output: Did you mean 'first_name'?
Issue 3: Complex Queries Not Supported
# Extend the system for JOINs and aggregations
class AggregateFunction(str, Enum):
COUNT = "COUNT"
SUM = "SUM"
AVG = "AVG"
MAX = "MAX"
MIN = "MIN"
class ExtendedSQLQuery(SQLQuery):
# Add support for aggregations
group_by: Optional[List[column_names]] = Field(None, title="Group By Columns")
aggregate: Optional[Dict[column_names, AggregateFunction]] = Field(None, title="Aggregations")
having: Optional[List[sql_logic_condition]] = Field(None, title="Having Conditions")
Performance Optimization
Query Generation Speed Comparison
Model Performance Metrics:
βββββββββββββββββββ¬βββββββββββ¬βββββββββββββ¬ββββββββββββββ
β Model β Latency β Accuracy β Cost/1K β
βββββββββββββββββββΌβββββββββββΌβββββββββββββΌββββββββββββββ€
β GPT-3.5-turbo β 1.2s β 92% β $0.002 β
β GPT-4 β 3.5s β 97% β $0.030 β
β Claude-2 β 2.1s β 95% β $0.008 β
β Local LLaMA-2 β 0.8s β 88% β $0.000 β
βββββββββββββββββββ΄βββββββββββ΄βββββββββββββ΄ββββββββββββββ
Caching Strategy
from functools import lru_cache
import hashlib
class CachedSQLGenerator:
def __init__(self):
self.cache = {}
def _hash_question(self, question: str) -> str:
return hashlib.md5(question.lower().strip().encode()).hexdigest()
def generate_or_cache(self, question: str) -> dict:
question_hash = self._hash_question(question)
if question_hash in self.cache:
print("Cache hit!")
return self.cache[question_hash]
result = generate_sql_json(question)
self.cache[question_hash] = result
return result
# Usage
cached_gen = CachedSQLGenerator()
result1 = cached_gen.generate_or_cache("Show all employees in IT") # API call
result2 = cached_gen.generate_or_cache("Show all employees in IT") # Cache hit!
Advanced Use Cases
1. Multi-Table Queries
class MultiTableQuery(BaseModel):
primary_table: str = Field(..., title="Primary Table")
joins: List[Dict[str, str]] = Field(..., title="Join Specifications")
# ... rest of the fields
# Example: Joining employee with department table
query = {
"primary_table": "employee",
"joins": [{
"table": "department",
"on": "employee.dept_id = department.id",
"type": "INNER"
}],
"columns": ["employee.first_name", "department.name"],
"conditions": [...]
}
2. Time-Series Queries
# Add temporal functions
class TemporalFunction(str, Enum):
DATE_TRUNC = "DATE_TRUNC"
EXTRACT = "EXTRACT"
INTERVAL = "INTERVAL"
# Example: "Show monthly salary trends"
query_with_time = {
"sql": "SELECT",
"columns": [
"DATE_TRUNC('month', hire_date) as month",
"AVG(salary) as avg_salary"
],
"table": "employee",
"group_by": ["DATE_TRUNC('month', hire_date)"],
"order": "ASC"
}
3. Security Best Practices
def sanitize_sql_value(value: str) -> str:
"""Prevent SQL injection by escaping special characters"""
# Never use string concatenation for SQL!
# Always use parameterized queries in production
# For demonstration - in production use proper parameterization
dangerous_chars = ["'", '"', ';', '--', '/*', '*/', 'xp_', 'sp_']
for char in dangerous_chars:
if char in value:
raise ValueError(f"Potentially dangerous character detected: {char}")
return value
# Better approach: Use parameterized queries
def execute_safe_query(connection, query_obj: SQLQuery):
# Convert to parameterized query
sql = format_sql_query(query_obj)
# Use placeholders instead of direct insertion
# This is database-specific (example for PostgreSQL)
params = []
for condition in query_obj.conditions:
params.append(condition.comparison.value)
# Execute with parameters (prevents injection)
cursor = connection.cursor()
cursor.execute(sql, params)
return cursor.fetchall()
Production Deployment Guide
Architecture for Scale
Production Architecture:
βββββββββββββββ ββββββββββββββββ βββββββββββββββ
β Users ββββββΆβ API Gateway ββββββΆβ Load β
β β β (Rate Limit) β β Balancer β
βββββββββββββββ ββββββββββββββββ ββββββββ¬ββββββββ
β
ββββββββββββββββββββββββββββββ΄βββββββββ
β β
βββββββΌββββββ βββββββΌββββββ
β Service β β Service β
β Instance 1β β Instance 2β
βββββββ¬ββββββ βββββββ¬ββββββ
β β
ββββββββββββββββ¬βββββββββββββββββββββββ
β
ββββββββββΌβββββββββ
β Redis Cache β
β (Query Results) β
βββββββββββββββββββ
Monitoring and Metrics
import time
from datetime import datetime
import logging
class SQLGeneratorMetrics:
def __init__(self):
self.metrics = {
'total_requests': 0,
'successful_queries': 0,
'failed_queries': 0,
'avg_latency': 0,
'cache_hits': 0
}
def record_query(self, success: bool, latency: float, cache_hit: bool = False):
self.metrics['total_requests'] += 1
if success:
self.metrics['successful_queries'] += 1
else:
self.metrics['failed_queries'] += 1
if cache_hit:
self.metrics['cache_hits'] += 1
# Update rolling average
n = self.metrics['total_requests']
self.metrics['avg_latency'] = (
(self.metrics['avg_latency'] * (n - 1) + latency) / n
)
def get_success_rate(self) -> float:
if self.metrics['total_requests'] == 0:
return 0.0
return self.metrics['successful_queries'] / self.metrics['total_requests']
def log_metrics(self):
logging.info(f"SQL Generator Metrics: {self.metrics}")
logging.info(f"Success Rate: {self.get_success_rate():.2%}")
# Usage
metrics = SQLGeneratorMetrics()
start_time = time.time()
try:
result = generate_sql_json("Show all employees")
success = True
except Exception as e:
success = False
logging.error(f"Query generation failed: {e}")
latency = time.time() - start_time
metrics.record_query(success, latency)
Conclusion
This natural language to SQL system provides a robust foundation for building conversational database interfaces. Key takeaways:
- Type Safety: Pydantic models ensure valid SQL generation
- Extensibility: Easy to add new SQL features and operations
- Production Ready: With proper error handling and monitoring
- Security: Built-in protection against SQL injection
For the complete code and additional examples, check out the GitHub repository.