MATIH Platform is in active MVP development. Documentation reflects current implementation status.
9. Query Engine & SQL
SQL Execution

SQL Execution Flow

Every SQL query that enters the MATIH platform follows a deterministic execution pipeline from receipt through result delivery. This section traces the complete lifecycle of a query through both synchronous and asynchronous paths, covering validation, security enforcement, engine routing, execution, pagination, and response construction.


Execution Pipeline Overview

The query execution pipeline consists of eight ordered phases:

 +----------+    +----------+    +--------+    +--------+
 | Receive  | -> | Validate | -> |  Cache  | -> |  RLS   |
 | & Parse  |    | & Auth   |    | Lookup  |    | Inject |
 +----------+    +----------+    +--------+    +--------+
                                                    |
 +----------+    +----------+    +--------+    +----v---+
 | Deliver  | <- |  Cache   | <- | Execute| <- | Route  |
 | Response |    |  Store   |    | on Eng |    | Query  |
 +----------+    +----------+    +--------+    +--------+
PhaseComponentDuration (typical)
1. Receive and parseQueryController< 1ms
2. Validate and authenticateSecurityConfig, JwtAuthenticationFilter1-5ms
3. Cache lookupMultiLevelCacheService1-10ms
4. RLS injectionRlsFilterService5-50ms
5. Data maskingMasking service1-10ms
6. Engine routingSmartQueryRouter1-5ms
7. ExecutionTrinoQueryStrategy / others100ms - 300s
8. Response deliveryQueryController1-50ms

Phase 1: Receive and Parse

Synchronous Execution

The QueryController exposes the synchronous execution endpoint:

@PostMapping("/execute")
public ResponseEntity<QueryResponse> executeSync(
        @RequestHeader("X-Tenant-ID") UUID tenantId,
        @RequestHeader("X-User-ID") UUID userId,
        @Valid @RequestBody QueryRequest request) {
 
    QueryResponse response = executionService.executeSync(tenantId, userId, request);
    return ResponseEntity.ok(response);
}

Required headers:

HeaderTypeRequiredDescription
X-Tenant-IDUUIDYesTenant identifier from JWT
X-User-IDUUIDYesUser identifier from JWT
AuthorizationStringYesBearer JWT token

Asynchronous Execution

For long-running queries, the asynchronous endpoint returns immediately with an execution ID:

@PostMapping("/execute/async")
public ResponseEntity<AsyncQueryResponse> executeAsync(
        @RequestHeader("X-Tenant-ID") UUID tenantId,
        @RequestHeader("X-User-ID") UUID userId,
        @Valid @RequestBody QueryRequest request) {
 
    AsyncQueryResponse response = executionService.submitAsync(tenantId, userId, request);
    return ResponseEntity.accepted().body(response);
}

The AsyncQueryResponse includes:

public class AsyncQueryResponse {
    private UUID executionId;          // Unique execution identifier
    private QueryStatus status;        // QUEUED
    private String statusUrl;          // URL to poll for status
    private String resultsUrl;         // URL to fetch results when complete
    private long estimatedDurationMs;  // Estimated execution time
    private Instant submittedAt;       // Submission timestamp
}

Request Validation

The QueryRequest object undergoes Jakarta Bean Validation before reaching the service layer:

FieldValidationError
sql@NotBlankSQL query is required
sqlMax 1MBQuery size limit exceeded
limit@Min(1) @Max(100000)Limit must be between 1 and 100,000
timeoutSeconds@Min(1) @Max(3600)Timeout must be between 1 and 3,600 seconds
catalog@Pattern(regexp = "[a-zA-Z0-9_]+")Invalid catalog name
schema@Pattern(regexp = "[a-zA-Z0-9_]+")Invalid schema name

Phase 2: Validate and Authenticate

The Spring Security filter chain processes every request before it reaches the controller:

JWT Authentication

The JwtAuthenticationFilter extracts and validates the JWT token:

@Component
public class JwtAuthenticationFilter extends OncePerRequestFilter {
    @Override
    protected void doFilterInternal(HttpServletRequest request,
                                     HttpServletResponse response,
                                     FilterChain chain) {
        // 1. Extract Bearer token from Authorization header
        // 2. Validate token signature and expiration
        // 3. Extract claims: tenantId, userId, roles, permissions
        // 4. Verify X-Tenant-ID header matches token tenant claim
        // 5. Set SecurityContext with UserPrincipal
        // 6. Continue filter chain
    }
}

Tenant Context Verification

The security layer verifies that the X-Tenant-ID header matches the tenant claim in the JWT token. A mismatch results in a 403 Forbidden response. This prevents a user with a valid token for tenant A from executing queries against tenant B's data.


Phase 3: Cache Lookup

Before executing any query, the MultiLevelCacheService checks for cached results across three cache tiers:

Cache Tier Architecture

Request
  |
  v
[L1: In-Memory (Caffeine)] -- hit --> Return cached result
  |
  miss
  v
[L2: Redis] -- hit --> Promote to L1, return cached result
  |
  miss
  v
[Semantic Cache] -- similar query found --> Return cached result
  |
  miss
  v
Proceed to execution

Cache Key Generation

Cache keys are generated as a hash of:

cache_key = SHA-256(
    tenant_id +
    sql_normalized +
    catalog +
    schema +
    user_roles_hash    // RLS-dependent: different roles may see different results
)

The SQL is normalized before hashing to ensure that semantically identical queries with different whitespace or casing produce the same cache key:

  • Leading/trailing whitespace removed
  • Internal whitespace collapsed to single spaces
  • Keywords uppercased
  • Comments stripped

Cache Invalidation

Cache entries are invalidated through several mechanisms:

TriggerScopeMechanism
TTL expirationPer-entryAutomatic (configurable per cache level)
Manual invalidationPer-tenantDELETE /v1/queries/cache
Schema changePer-catalog/schemaCacheInvalidationListener on Kafka event
Data mutationPer-tableDDL/DML events from catalog service
RLS policy changePer-tenantPolicy update event from governance service

Phase 4: RLS Injection

If the cache lookup misses, the query proceeds to Row-Level Security filter injection. The RlsFilterService modifies the SQL query to enforce row-level access controls:

Table Extraction

The service extracts all table references from the SQL using regex pattern matching:

// FROM clause extraction
private static final Pattern FROM_PATTERN = Pattern.compile(
    "\\bFROM\\s+([\\w.\"]+(?:\\s+(?:AS\\s+)?\\w+)?(?:\\s*,\\s*[\\w.\"]+(?:\\s+(?:AS\\s+)?\\w+)?)*)",
    Pattern.CASE_INSENSITIVE
);
 
// JOIN clause extraction
private static final Pattern JOIN_PATTERN = Pattern.compile(
    "\\b(INNER|LEFT|RIGHT|FULL|CROSS)?\\s*JOIN\\s+([\\w.\"]+)(?:\\s+(?:AS\\s+)?(\\w+))?",
    Pattern.CASE_INSENSITIVE
);

Policy Evaluation

For each extracted table, the service calls the RLS policy evaluation endpoint:

RlsEvaluationRequest request = RlsEvaluationRequest.builder()
    .tenantId(context.getTenantId())
    .userId(context.getUserId())
    .userRoles(context.getUserRoles())
    .userAttributes(context.getUserAttributes())
    .resourceFqn(resourceFqn)         // e.g., "analytics.public.sales_data"
    .resourceType("TABLE")
    .operation(determineOperation(sql)) // SELECT, INSERT, UPDATE, DELETE
    .build();
 
RlsEvaluationResult result = rlsServiceClient.evaluateRls(request);

WHERE Clause Injection

If the evaluation returns a filter, the service injects it into the SQL:

-- Original query
SELECT * FROM orders WHERE status = 'active'
 
-- After RLS injection (user restricted to region 'US-EAST')
SELECT * FROM orders WHERE (status = 'active') AND (orders.region = 'US-EAST')

For queries without an existing WHERE clause, a new one is added. For queries with GROUP BY, ORDER BY, or LIMIT, the WHERE clause is inserted at the correct position.


Phase 5: Data Masking

After RLS injection, the query passes through the data masking service which modifies SELECT column expressions for sensitive data:

-- Original query
SELECT customer_name, email, ssn, purchase_amount FROM customers
 
-- After masking (user has 'analyst' role, not 'admin')
SELECT customer_name, SUBSTR(email, 1, 3) || '***@***' AS email,
       '***-**-' || SUBSTR(ssn, -4) AS ssn, purchase_amount FROM customers

Masking rules are driven by data classification tags assigned in the catalog service. See the Data Masking section for details.


Phase 6: Engine Routing

The SmartQueryRouter analyzes the (now modified) SQL to select the optimal execution engine:

public EngineType route(QueryRequest request) {
    QueryAnalysis analysis = analyzeQuery(request);
 
    // Real-time tables -> ClickHouse
    if (analysis.hasRealtimeTables) return EngineType.CLICKHOUSE;
 
    // Very large scans -> Spark Async
    if (analysis.estimatedBytes > LARGE_SCAN_BYTE_THRESHOLD) return EngineType.SPARK_ASYNC;
 
    // Simple small queries -> ClickHouse
    if (analysis.estimatedRows < SMALL_QUERY_ROW_THRESHOLD
            && !analysis.hasJoins && !analysis.hasWindowFunctions)
        return EngineType.CLICKHOUSE;
 
    // Complex analytics -> Trino
    if (analysis.hasWindowFunctions || analysis.joinCount > 2) return EngineType.TRINO;
 
    // Default -> Trino
    return EngineType.TRINO;
}

The router's decision can be overridden by specifying enginePreference in the QueryRequest:

{
  "sql": "SELECT ...",
  "enginePreference": "TRINO"
}

Phase 7: Execution

Synchronous Path

The selected strategy executes the query and blocks until results are available:

QueryEngineStrategy strategy = strategies.get(engineType);
QueryResponse response = strategy.execute(request, timeoutSeconds);

The Trino strategy implementation:

  1. Opens a JDBC connection to Trino with catalog/schema context
  2. Sets billing session properties for resource group selection
  3. Applies result limit to the SQL if not already present
  4. Executes the query via Statement.executeQuery()
  5. Iterates the ResultSet to materialize rows into List<Map<String, Object>>
  6. Extracts column metadata from ResultSetMetaData
  7. Records execution time and result statistics
  8. Returns the QueryResponse

Asynchronous Path

The asynchronous path submits the query to a persistent queue:

public AsyncQueryResponse submitAsync(UUID tenantId, UUID userId, QueryRequest request) {
    // 1. Create execution record with status QUEUED
    QueryExecution execution = QueryExecution.builder()
        .id(UUID.randomUUID())
        .tenantId(tenantId)
        .userId(userId)
        .sql(request.getSql())
        .transformedSql(transformedSql)  // After RLS + masking
        .engineType(routedEngine)
        .status(QueryExecutionStatus.QUEUED)
        .priority(determinePriority(request))
        .submittedAt(Instant.now())
        .build();
 
    // 2. Persist to database
    executionRepository.save(execution);
 
    // 3. Submit to priority queue
    queryQueue.submit(execution);
 
    // 4. Publish event
    eventPublisher.publish(new QuerySubmittedEvent(execution));
 
    // 5. Return response with polling URLs
    return AsyncQueryResponse.builder()
        .executionId(execution.getId())
        .status(QueryStatus.QUEUED)
        .statusUrl("/v1/queries/" + execution.getId() + "/status")
        .resultsUrl("/v1/queries/" + execution.getId() + "/results")
        .estimatedDurationMs(costEstimator.estimateTime(request))
        .build();
}

Status Polling

Clients poll the status endpoint until the query completes:

GET /v1/queries/{executionId}/status

Response (RUNNING):
{
  "executionId": "550e8400-e29b-41d4-a716-446655440000",
  "status": "RUNNING",
  "engineType": "TRINO",
  "submittedAt": "2026-02-12T10:30:00Z",
  "progress": 0.45
}

Response (COMPLETED):
{
  "executionId": "550e8400-e29b-41d4-a716-446655440000",
  "status": "COMPLETED",
  "engineType": "TRINO",
  "rowCount": 15234,
  "executionTimeMs": 12450,
  "submittedAt": "2026-02-12T10:30:00Z",
  "completedAt": "2026-02-12T10:30:12Z"
}

Phase 8: Response Delivery

Result Pagination

Large result sets are paginated using cursor-based pagination managed by the CursorPaginationService:

GET /v1/queries/{executionId}/results?page=0&size=1000

Response:
{
  "executionId": "...",
  "status": "COMPLETED",
  "columns": [
    {"name": "order_id", "type": "VARCHAR", "nullable": false},
    {"name": "amount", "type": "DECIMAL", "nullable": true},
    {"name": "created_at", "type": "TIMESTAMP", "nullable": false}
  ],
  "data": [
    {"order_id": "ORD-001", "amount": 150.00, "created_at": "2026-01-15T08:30:00Z"},
    {"order_id": "ORD-002", "amount": 275.50, "created_at": "2026-01-15T09:15:00Z"}
  ],
  "rowCount": 1000,
  "totalRows": 15234,
  "hasMore": true,
  "cursorToken": "eyJvZmZzZXQiOjEwMDAsInF1ZXJ5SWQiOiIuLi4ifQ=="
}

For the asynchronous path, results are stored in a QueryResultStorageService which persists them to Redis (for results under 10MB) or to object storage (for larger result sets).

Result Caching

After successful execution, results are cached at the appropriate tier:

// Cache the result at L1 and L2
cacheService.put(cacheKey, response, CacheTier.L1_AND_L2, ttl);
 
// Also cache the query plan for plan cache
planCache.put(normalizedSql, queryPlan);

Cache TTLs are determined by the AdaptiveCachePolicy:

FactorEffect on TTL
Query frequencyFrequently executed queries get longer TTL
Data freshness requirementReal-time tables get shorter TTL (30s)
Result set sizeLarger results get shorter TTL to conserve memory
Table mutation rateTables with frequent writes get shorter TTL
Explicit hintQuery metadata can specify TTL

Query Cancellation

Running or queued queries can be cancelled via the cancellation endpoint:

@PostMapping("/{executionId}/cancel")
public ResponseEntity<Map<String, Object>> cancelQuery(
        @RequestHeader("X-Tenant-ID") UUID tenantId,
        @PathVariable UUID executionId) {
 
    boolean cancelled = executionService.cancelQuery(tenantId, executionId);
    return ResponseEntity.ok(Map.of(
        "executionId", executionId,
        "cancelled", cancelled,
        "message", cancelled ? "Query cancelled" : "Query could not be cancelled"
    ));
}

Cancellation behavior by state:

Current StateCancellation Behavior
QUEUEDRemoved from queue immediately
RUNNINGStatement.cancel() sent to Trino; waits for acknowledgment
COMPLETEDCannot cancel (already finished)
FAILEDCannot cancel (already terminated)
CANCELLEDNo-op (already cancelled)

Query History

Every query execution is recorded in the query history database, accessible through the history endpoints:

GET /v1/queries/my-history?status=COMPLETED&page=0&size=20

Response:
{
  "content": [
    {
      "executionId": "...",
      "sql": "SELECT region, SUM(amount) FROM orders GROUP BY region",
      "status": "COMPLETED",
      "engineType": "TRINO",
      "rowCount": 12,
      "executionTimeMs": 450,
      "submittedAt": "2026-02-12T10:30:00Z"
    }
  ],
  "totalElements": 156,
  "totalPages": 8,
  "number": 0,
  "size": 20
}

Query Statistics

Aggregated statistics for the tenant:

GET /v1/queries/stats?days=7

Response:
{
  "totalQueries": 1234,
  "averageExecutionMs": 2450,
  "cacheHitRate": 0.42,
  "engineDistribution": {
    "TRINO": 856,
    "CLICKHOUSE": 312,
    "SPARK_ASYNC": 66
  },
  "statusDistribution": {
    "COMPLETED": 1180,
    "FAILED": 32,
    "CANCELLED": 22
  },
  "peakQueriesPerHour": 89,
  "totalRowsProcessed": 45678900
}

Error Handling

The GlobalExceptionHandler provides consistent error responses:

ExceptionHTTP StatusError Code
QueryExecutionException500QUERY_EXECUTION_FAILED
QueryTimeoutException408QUERY_TIMEOUT
QueryCancelledException499QUERY_CANCELLED
MethodArgumentNotValidException400VALIDATION_ERROR
AccessDeniedException403ACCESS_DENIED
DataSourceNotFoundException404DATA_SOURCE_NOT_FOUND

Error response format:

{
  "error": {
    "code": "QUERY_TIMEOUT",
    "message": "Query exceeded timeout of 300 seconds",
    "executionId": "550e8400-e29b-41d4-a716-446655440000",
    "timestamp": "2026-02-12T10:35:00Z"
  }
}

Related Sections