MATIH Platform is in active MVP development. Documentation reflects current implementation status.
12. AI Service
SQL Validation

SQL Validation

Production - Multi-layer validation with sqlglot, sqlparse, and security checks

The SQL Validation system ensures generated SQL is syntactically correct, semantically valid against the known schema, and free from security vulnerabilities. It uses sqlglot for deep AST analysis and sqlparse for basic syntax validation.


12.3.3.1Validation Pipeline

The SQLValidator in data-plane/ai-service/src/sql_generation/validator.py performs four validation stages:

  1. Security Check: Scans for dangerous patterns (DROP, DELETE, INSERT, etc.)
  2. Syntax Validation: Verifies basic SQL syntax with sqlparse
  3. AST Analysis: Parses with sqlglot for deep structural analysis
  4. Schema Validation: Verifies referenced tables and columns exist

Security Patterns

DANGEROUS_PATTERNS = [
    r"\bDROP\s+",
    r"\bDELETE\s+",
    r"\bTRUNCATE\s+",
    r"\bALTER\s+",
    r"\bCREATE\s+",
    r"\bINSERT\s+",
    r"\bUPDATE\s+",
    r"\bGRANT\s+",
    r"\bREVOKE\s+",
    r"\bEXEC\s*\(",
    r";\s*--",  # Comment after semicolon (SQL injection)
    r"UNION\s+ALL\s+SELECT.*FROM\s+information_schema",
]

ValidationResult

@dataclass
class ValidationResult:
    is_valid: bool
    errors: list[dict[str, Any]]       # Blocking errors
    warnings: list[dict[str, Any]]     # Non-blocking warnings
    normalized_sql: str | None = None  # Pretty-printed SQL
    tables_referenced: list[str] | None = None
    columns_referenced: list[str] | None = None

Error Types

Error TypeDescription
syntax_errorSQL cannot be parsed
invalid_tableReferenced table not in schema
invalid_columnReferenced column not in schema
ambiguous_columnColumn exists in multiple tables without qualification
missing_join_conditionJOIN without ON clause
security_violationDangerous SQL operation detected

12.3.3.2Common Issue Detection

The validator checks for common SQL anti-patterns:

def _check_common_issues(self, parsed):
    warnings = []
 
    # SELECT * warning
    for star in parsed.find_all(exp.Star):
        warnings.append({"type": "select_star", "message": "Using SELECT *..."})
 
    # JOIN without ON
    joins = list(parsed.find_all(exp.Join))
    for join in joins:
        if not join.args.get("on"):
            warnings.append({"type": "missing_join_condition", ...})
 
    # DISTINCT with ORDER BY
    if parsed.find(exp.Distinct) and parsed.find(exp.Order):
        warnings.append({"type": "distinct_order_by", ...})
 
    return warnings

12.3.3.3SQL Normalization

curl -X POST http://localhost:8000/api/v1/text-to-sql/normalize \
  -H "Content-Type: application/json" \
  -d '{
    "sql": "select customer_name,sum(revenue) from orders group by customer_name",
    "dialect": "trino"
  }'
{
  "normalized_sql": "SELECT\n  customer_name,\n  SUM(revenue)\nFROM orders\nGROUP BY\n  customer_name"
}