SQL Validation
Production - Multi-layer validation with sqlglot, sqlparse, and security checks
The SQL Validation system ensures generated SQL is syntactically correct, semantically valid against the known schema, and free from security vulnerabilities. It uses sqlglot for deep AST analysis and sqlparse for basic syntax validation.
12.3.3.1Validation Pipeline
The SQLValidator in data-plane/ai-service/src/sql_generation/validator.py performs four validation stages:
- Security Check: Scans for dangerous patterns (DROP, DELETE, INSERT, etc.)
- Syntax Validation: Verifies basic SQL syntax with sqlparse
- AST Analysis: Parses with sqlglot for deep structural analysis
- Schema Validation: Verifies referenced tables and columns exist
Security Patterns
DANGEROUS_PATTERNS = [
r"\bDROP\s+",
r"\bDELETE\s+",
r"\bTRUNCATE\s+",
r"\bALTER\s+",
r"\bCREATE\s+",
r"\bINSERT\s+",
r"\bUPDATE\s+",
r"\bGRANT\s+",
r"\bREVOKE\s+",
r"\bEXEC\s*\(",
r";\s*--", # Comment after semicolon (SQL injection)
r"UNION\s+ALL\s+SELECT.*FROM\s+information_schema",
]ValidationResult
@dataclass
class ValidationResult:
is_valid: bool
errors: list[dict[str, Any]] # Blocking errors
warnings: list[dict[str, Any]] # Non-blocking warnings
normalized_sql: str | None = None # Pretty-printed SQL
tables_referenced: list[str] | None = None
columns_referenced: list[str] | None = NoneError Types
| Error Type | Description |
|---|---|
syntax_error | SQL cannot be parsed |
invalid_table | Referenced table not in schema |
invalid_column | Referenced column not in schema |
ambiguous_column | Column exists in multiple tables without qualification |
missing_join_condition | JOIN without ON clause |
security_violation | Dangerous SQL operation detected |
12.3.3.2Common Issue Detection
The validator checks for common SQL anti-patterns:
def _check_common_issues(self, parsed):
warnings = []
# SELECT * warning
for star in parsed.find_all(exp.Star):
warnings.append({"type": "select_star", "message": "Using SELECT *..."})
# JOIN without ON
joins = list(parsed.find_all(exp.Join))
for join in joins:
if not join.args.get("on"):
warnings.append({"type": "missing_join_condition", ...})
# DISTINCT with ORDER BY
if parsed.find(exp.Distinct) and parsed.find(exp.Order):
warnings.append({"type": "distinct_order_by", ...})
return warnings12.3.3.3SQL Normalization
curl -X POST http://localhost:8000/api/v1/text-to-sql/normalize \
-H "Content-Type: application/json" \
-d '{
"sql": "select customer_name,sum(revenue) from orders group by customer_name",
"dialect": "trino"
}'{
"normalized_sql": "SELECT\n customer_name,\n SUM(revenue)\nFROM orders\nGROUP BY\n customer_name"
}