SQL Execution Flow
Every SQL query that enters the MATIH platform follows a deterministic execution pipeline from receipt through result delivery. This section traces the complete lifecycle of a query through both synchronous and asynchronous paths, covering validation, security enforcement, engine routing, execution, pagination, and response construction.
Execution Pipeline Overview
The query execution pipeline consists of eight ordered phases:
+----------+ +----------+ +--------+ +--------+
| Receive | -> | Validate | -> | Cache | -> | RLS |
| & Parse | | & Auth | | Lookup | | Inject |
+----------+ +----------+ +--------+ +--------+
|
+----------+ +----------+ +--------+ +----v---+
| Deliver | <- | Cache | <- | Execute| <- | Route |
| Response | | Store | | on Eng | | Query |
+----------+ +----------+ +--------+ +--------+| Phase | Component | Duration (typical) |
|---|---|---|
| 1. Receive and parse | QueryController | < 1ms |
| 2. Validate and authenticate | SecurityConfig, JwtAuthenticationFilter | 1-5ms |
| 3. Cache lookup | MultiLevelCacheService | 1-10ms |
| 4. RLS injection | RlsFilterService | 5-50ms |
| 5. Data masking | Masking service | 1-10ms |
| 6. Engine routing | SmartQueryRouter | 1-5ms |
| 7. Execution | TrinoQueryStrategy / others | 100ms - 300s |
| 8. Response delivery | QueryController | 1-50ms |
Phase 1: Receive and Parse
Synchronous Execution
The QueryController exposes the synchronous execution endpoint:
@PostMapping("/execute")
public ResponseEntity<QueryResponse> executeSync(
@RequestHeader("X-Tenant-ID") UUID tenantId,
@RequestHeader("X-User-ID") UUID userId,
@Valid @RequestBody QueryRequest request) {
QueryResponse response = executionService.executeSync(tenantId, userId, request);
return ResponseEntity.ok(response);
}Required headers:
| Header | Type | Required | Description |
|---|---|---|---|
X-Tenant-ID | UUID | Yes | Tenant identifier from JWT |
X-User-ID | UUID | Yes | User identifier from JWT |
Authorization | String | Yes | Bearer JWT token |
Asynchronous Execution
For long-running queries, the asynchronous endpoint returns immediately with an execution ID:
@PostMapping("/execute/async")
public ResponseEntity<AsyncQueryResponse> executeAsync(
@RequestHeader("X-Tenant-ID") UUID tenantId,
@RequestHeader("X-User-ID") UUID userId,
@Valid @RequestBody QueryRequest request) {
AsyncQueryResponse response = executionService.submitAsync(tenantId, userId, request);
return ResponseEntity.accepted().body(response);
}The AsyncQueryResponse includes:
public class AsyncQueryResponse {
private UUID executionId; // Unique execution identifier
private QueryStatus status; // QUEUED
private String statusUrl; // URL to poll for status
private String resultsUrl; // URL to fetch results when complete
private long estimatedDurationMs; // Estimated execution time
private Instant submittedAt; // Submission timestamp
}Request Validation
The QueryRequest object undergoes Jakarta Bean Validation before reaching the service layer:
| Field | Validation | Error |
|---|---|---|
sql | @NotBlank | SQL query is required |
sql | Max 1MB | Query size limit exceeded |
limit | @Min(1) @Max(100000) | Limit must be between 1 and 100,000 |
timeoutSeconds | @Min(1) @Max(3600) | Timeout must be between 1 and 3,600 seconds |
catalog | @Pattern(regexp = "[a-zA-Z0-9_]+") | Invalid catalog name |
schema | @Pattern(regexp = "[a-zA-Z0-9_]+") | Invalid schema name |
Phase 2: Validate and Authenticate
The Spring Security filter chain processes every request before it reaches the controller:
JWT Authentication
The JwtAuthenticationFilter extracts and validates the JWT token:
@Component
public class JwtAuthenticationFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(HttpServletRequest request,
HttpServletResponse response,
FilterChain chain) {
// 1. Extract Bearer token from Authorization header
// 2. Validate token signature and expiration
// 3. Extract claims: tenantId, userId, roles, permissions
// 4. Verify X-Tenant-ID header matches token tenant claim
// 5. Set SecurityContext with UserPrincipal
// 6. Continue filter chain
}
}Tenant Context Verification
The security layer verifies that the X-Tenant-ID header matches the tenant claim in the JWT token. A mismatch results in a 403 Forbidden response. This prevents a user with a valid token for tenant A from executing queries against tenant B's data.
Phase 3: Cache Lookup
Before executing any query, the MultiLevelCacheService checks for cached results across three cache tiers:
Cache Tier Architecture
Request
|
v
[L1: In-Memory (Caffeine)] -- hit --> Return cached result
|
miss
v
[L2: Redis] -- hit --> Promote to L1, return cached result
|
miss
v
[Semantic Cache] -- similar query found --> Return cached result
|
miss
v
Proceed to executionCache Key Generation
Cache keys are generated as a hash of:
cache_key = SHA-256(
tenant_id +
sql_normalized +
catalog +
schema +
user_roles_hash // RLS-dependent: different roles may see different results
)The SQL is normalized before hashing to ensure that semantically identical queries with different whitespace or casing produce the same cache key:
- Leading/trailing whitespace removed
- Internal whitespace collapsed to single spaces
- Keywords uppercased
- Comments stripped
Cache Invalidation
Cache entries are invalidated through several mechanisms:
| Trigger | Scope | Mechanism |
|---|---|---|
| TTL expiration | Per-entry | Automatic (configurable per cache level) |
| Manual invalidation | Per-tenant | DELETE /v1/queries/cache |
| Schema change | Per-catalog/schema | CacheInvalidationListener on Kafka event |
| Data mutation | Per-table | DDL/DML events from catalog service |
| RLS policy change | Per-tenant | Policy update event from governance service |
Phase 4: RLS Injection
If the cache lookup misses, the query proceeds to Row-Level Security filter injection. The RlsFilterService modifies the SQL query to enforce row-level access controls:
Table Extraction
The service extracts all table references from the SQL using regex pattern matching:
// FROM clause extraction
private static final Pattern FROM_PATTERN = Pattern.compile(
"\\bFROM\\s+([\\w.\"]+(?:\\s+(?:AS\\s+)?\\w+)?(?:\\s*,\\s*[\\w.\"]+(?:\\s+(?:AS\\s+)?\\w+)?)*)",
Pattern.CASE_INSENSITIVE
);
// JOIN clause extraction
private static final Pattern JOIN_PATTERN = Pattern.compile(
"\\b(INNER|LEFT|RIGHT|FULL|CROSS)?\\s*JOIN\\s+([\\w.\"]+)(?:\\s+(?:AS\\s+)?(\\w+))?",
Pattern.CASE_INSENSITIVE
);Policy Evaluation
For each extracted table, the service calls the RLS policy evaluation endpoint:
RlsEvaluationRequest request = RlsEvaluationRequest.builder()
.tenantId(context.getTenantId())
.userId(context.getUserId())
.userRoles(context.getUserRoles())
.userAttributes(context.getUserAttributes())
.resourceFqn(resourceFqn) // e.g., "analytics.public.sales_data"
.resourceType("TABLE")
.operation(determineOperation(sql)) // SELECT, INSERT, UPDATE, DELETE
.build();
RlsEvaluationResult result = rlsServiceClient.evaluateRls(request);WHERE Clause Injection
If the evaluation returns a filter, the service injects it into the SQL:
-- Original query
SELECT * FROM orders WHERE status = 'active'
-- After RLS injection (user restricted to region 'US-EAST')
SELECT * FROM orders WHERE (status = 'active') AND (orders.region = 'US-EAST')For queries without an existing WHERE clause, a new one is added. For queries with GROUP BY, ORDER BY, or LIMIT, the WHERE clause is inserted at the correct position.
Phase 5: Data Masking
After RLS injection, the query passes through the data masking service which modifies SELECT column expressions for sensitive data:
-- Original query
SELECT customer_name, email, ssn, purchase_amount FROM customers
-- After masking (user has 'analyst' role, not 'admin')
SELECT customer_name, SUBSTR(email, 1, 3) || '***@***' AS email,
'***-**-' || SUBSTR(ssn, -4) AS ssn, purchase_amount FROM customersMasking rules are driven by data classification tags assigned in the catalog service. See the Data Masking section for details.
Phase 6: Engine Routing
The SmartQueryRouter analyzes the (now modified) SQL to select the optimal execution engine:
public EngineType route(QueryRequest request) {
QueryAnalysis analysis = analyzeQuery(request);
// Real-time tables -> ClickHouse
if (analysis.hasRealtimeTables) return EngineType.CLICKHOUSE;
// Very large scans -> Spark Async
if (analysis.estimatedBytes > LARGE_SCAN_BYTE_THRESHOLD) return EngineType.SPARK_ASYNC;
// Simple small queries -> ClickHouse
if (analysis.estimatedRows < SMALL_QUERY_ROW_THRESHOLD
&& !analysis.hasJoins && !analysis.hasWindowFunctions)
return EngineType.CLICKHOUSE;
// Complex analytics -> Trino
if (analysis.hasWindowFunctions || analysis.joinCount > 2) return EngineType.TRINO;
// Default -> Trino
return EngineType.TRINO;
}The router's decision can be overridden by specifying enginePreference in the QueryRequest:
{
"sql": "SELECT ...",
"enginePreference": "TRINO"
}Phase 7: Execution
Synchronous Path
The selected strategy executes the query and blocks until results are available:
QueryEngineStrategy strategy = strategies.get(engineType);
QueryResponse response = strategy.execute(request, timeoutSeconds);The Trino strategy implementation:
- Opens a JDBC connection to Trino with catalog/schema context
- Sets billing session properties for resource group selection
- Applies result limit to the SQL if not already present
- Executes the query via
Statement.executeQuery() - Iterates the
ResultSetto materialize rows intoList<Map<String, Object>> - Extracts column metadata from
ResultSetMetaData - Records execution time and result statistics
- Returns the
QueryResponse
Asynchronous Path
The asynchronous path submits the query to a persistent queue:
public AsyncQueryResponse submitAsync(UUID tenantId, UUID userId, QueryRequest request) {
// 1. Create execution record with status QUEUED
QueryExecution execution = QueryExecution.builder()
.id(UUID.randomUUID())
.tenantId(tenantId)
.userId(userId)
.sql(request.getSql())
.transformedSql(transformedSql) // After RLS + masking
.engineType(routedEngine)
.status(QueryExecutionStatus.QUEUED)
.priority(determinePriority(request))
.submittedAt(Instant.now())
.build();
// 2. Persist to database
executionRepository.save(execution);
// 3. Submit to priority queue
queryQueue.submit(execution);
// 4. Publish event
eventPublisher.publish(new QuerySubmittedEvent(execution));
// 5. Return response with polling URLs
return AsyncQueryResponse.builder()
.executionId(execution.getId())
.status(QueryStatus.QUEUED)
.statusUrl("/v1/queries/" + execution.getId() + "/status")
.resultsUrl("/v1/queries/" + execution.getId() + "/results")
.estimatedDurationMs(costEstimator.estimateTime(request))
.build();
}Status Polling
Clients poll the status endpoint until the query completes:
GET /v1/queries/{executionId}/status
Response (RUNNING):
{
"executionId": "550e8400-e29b-41d4-a716-446655440000",
"status": "RUNNING",
"engineType": "TRINO",
"submittedAt": "2026-02-12T10:30:00Z",
"progress": 0.45
}
Response (COMPLETED):
{
"executionId": "550e8400-e29b-41d4-a716-446655440000",
"status": "COMPLETED",
"engineType": "TRINO",
"rowCount": 15234,
"executionTimeMs": 12450,
"submittedAt": "2026-02-12T10:30:00Z",
"completedAt": "2026-02-12T10:30:12Z"
}Phase 8: Response Delivery
Result Pagination
Large result sets are paginated using cursor-based pagination managed by the CursorPaginationService:
GET /v1/queries/{executionId}/results?page=0&size=1000
Response:
{
"executionId": "...",
"status": "COMPLETED",
"columns": [
{"name": "order_id", "type": "VARCHAR", "nullable": false},
{"name": "amount", "type": "DECIMAL", "nullable": true},
{"name": "created_at", "type": "TIMESTAMP", "nullable": false}
],
"data": [
{"order_id": "ORD-001", "amount": 150.00, "created_at": "2026-01-15T08:30:00Z"},
{"order_id": "ORD-002", "amount": 275.50, "created_at": "2026-01-15T09:15:00Z"}
],
"rowCount": 1000,
"totalRows": 15234,
"hasMore": true,
"cursorToken": "eyJvZmZzZXQiOjEwMDAsInF1ZXJ5SWQiOiIuLi4ifQ=="
}For the asynchronous path, results are stored in a QueryResultStorageService which persists them to Redis (for results under 10MB) or to object storage (for larger result sets).
Result Caching
After successful execution, results are cached at the appropriate tier:
// Cache the result at L1 and L2
cacheService.put(cacheKey, response, CacheTier.L1_AND_L2, ttl);
// Also cache the query plan for plan cache
planCache.put(normalizedSql, queryPlan);Cache TTLs are determined by the AdaptiveCachePolicy:
| Factor | Effect on TTL |
|---|---|
| Query frequency | Frequently executed queries get longer TTL |
| Data freshness requirement | Real-time tables get shorter TTL (30s) |
| Result set size | Larger results get shorter TTL to conserve memory |
| Table mutation rate | Tables with frequent writes get shorter TTL |
| Explicit hint | Query metadata can specify TTL |
Query Cancellation
Running or queued queries can be cancelled via the cancellation endpoint:
@PostMapping("/{executionId}/cancel")
public ResponseEntity<Map<String, Object>> cancelQuery(
@RequestHeader("X-Tenant-ID") UUID tenantId,
@PathVariable UUID executionId) {
boolean cancelled = executionService.cancelQuery(tenantId, executionId);
return ResponseEntity.ok(Map.of(
"executionId", executionId,
"cancelled", cancelled,
"message", cancelled ? "Query cancelled" : "Query could not be cancelled"
));
}Cancellation behavior by state:
| Current State | Cancellation Behavior |
|---|---|
| QUEUED | Removed from queue immediately |
| RUNNING | Statement.cancel() sent to Trino; waits for acknowledgment |
| COMPLETED | Cannot cancel (already finished) |
| FAILED | Cannot cancel (already terminated) |
| CANCELLED | No-op (already cancelled) |
Query History
Every query execution is recorded in the query history database, accessible through the history endpoints:
GET /v1/queries/my-history?status=COMPLETED&page=0&size=20
Response:
{
"content": [
{
"executionId": "...",
"sql": "SELECT region, SUM(amount) FROM orders GROUP BY region",
"status": "COMPLETED",
"engineType": "TRINO",
"rowCount": 12,
"executionTimeMs": 450,
"submittedAt": "2026-02-12T10:30:00Z"
}
],
"totalElements": 156,
"totalPages": 8,
"number": 0,
"size": 20
}Query Statistics
Aggregated statistics for the tenant:
GET /v1/queries/stats?days=7
Response:
{
"totalQueries": 1234,
"averageExecutionMs": 2450,
"cacheHitRate": 0.42,
"engineDistribution": {
"TRINO": 856,
"CLICKHOUSE": 312,
"SPARK_ASYNC": 66
},
"statusDistribution": {
"COMPLETED": 1180,
"FAILED": 32,
"CANCELLED": 22
},
"peakQueriesPerHour": 89,
"totalRowsProcessed": 45678900
}Error Handling
The GlobalExceptionHandler provides consistent error responses:
| Exception | HTTP Status | Error Code |
|---|---|---|
QueryExecutionException | 500 | QUERY_EXECUTION_FAILED |
QueryTimeoutException | 408 | QUERY_TIMEOUT |
QueryCancelledException | 499 | QUERY_CANCELLED |
MethodArgumentNotValidException | 400 | VALIDATION_ERROR |
AccessDeniedException | 403 | ACCESS_DENIED |
DataSourceNotFoundException | 404 | DATA_SOURCE_NOT_FOUND |
Error response format:
{
"error": {
"code": "QUERY_TIMEOUT",
"message": "Query exceeded timeout of 300 seconds",
"executionId": "550e8400-e29b-41d4-a716-446655440000",
"timestamp": "2026-02-12T10:35:00Z"
}
}Related Sections
- Architecture -- Service design and component layout
- Optimization -- Cache tiers and adaptive policies
- Row-Level Security -- RLS filter injection details
- API Reference -- Complete endpoint documentation