Query Engine Architecture
The Query Engine is designed as a multi-backend SQL execution gateway that routes queries to the optimal processing engine based on query characteristics, data volume, and tenant configuration. This section examines the service's internal architecture, its strategy-based engine abstraction, the smart query router, and the component interactions that enable high-throughput, low-latency query processing.
Service Overview
The Query Engine is a Java 21 / Spring Boot 3.2 application deployed in the matih-data-plane namespace on port 8080. It serves as the single entry point for all SQL execution within a tenant's data plane, abstracting the complexity of multiple execution backends behind a unified REST API.
| Property | Value |
|---|---|
| Language | Java 21 |
| Framework | Spring Boot 3.2 |
| Port | 8080 |
| Namespace | matih-data-plane |
| Base path | /v1/queries |
| Authentication | JWT via X-Tenant-ID and X-User-ID headers |
| Build tool | Gradle |
| Dependencies | Spring Web, Spring Data JPA, Spring Data Redis, Micrometer, JSqlParser |
High-Level Architecture
The Query Engine is organized into six major subsystems, each responsible for a distinct phase of the query lifecycle:
+----------------------------------------------------------------+
| Query Engine Service |
| |
| +------------------+ +------------------+ +---------------+ |
| | REST Controllers | | Security Layer | | Cache Layer | |
| | - QueryController| | - JWT Filter | | - L1 Memory | |
| | - ScheduleCtrl | | - RLS Service | | - L2 Redis | |
| | - CacheCtrl | | - Masking Svc | | - Semantic | |
| | - CostEstCtrl | | - SecurityConfig | | - Plan Cache | |
| +--------+---------+ +--------+---------+ +-------+-------+ |
| | | | |
| +--------v----------------------v--------------------v-------+ |
| | Query Execution Service | |
| | - Sync execution - Async submission | |
| | - State machine - Result pagination | |
| +----------------------------+-------------------------------+ |
| | |
| +----------------------------v-------------------------------+ |
| | Smart Query Router | |
| | - Query analysis - Cost estimation | |
| | - Engine selection - Workload routing | |
| +-----+------------------+------------------+----------------+ |
| | | | |
| +-----v------+ +------v-------+ +------v--------+ |
| | Trino | | ClickHouse | | Spark Async | |
| | Strategy | | Strategy | | Strategy | |
| +------------+ +--------------+ +---------------+ |
+----------------------------------------------------------------+Component Architecture
Controller Layer
The controller layer exposes the Query Engine's REST API surface. Each controller is responsible for a specific domain of query operations:
| Controller | Path | Responsibility |
|---|---|---|
QueryController | /v1/queries | Synchronous and asynchronous query execution, status checks, result retrieval, cancellation |
QueryScheduleController | /v1/queries/schedules | Scheduled query management: create, update, delete, list, trigger |
QueryManagementController | /v1/queries/management | Administrative operations: bulk cancel, resource cleanup, query governance |
CacheController | /v1/queries/cache | Cache statistics, manual invalidation, warming triggers |
CostEstimationController | /v1/queries/cost | Pre-execution cost estimation, policy evaluation |
QueryExportController | /v1/queries/export | Result export in CSV, JSON, Parquet formats |
SavedQueryController | /v1/queries/saved | Saved query management: create, update, share, version |
MaterializedViewController | /v1/queries/materialized-views | Materialized view creation, refresh scheduling, dependency tracking |
WorkloadController | /v1/queries/workloads | Workload group management, resource allocation, priority configuration |
AnalyticsController | /v1/queries/analytics | Query analytics: usage patterns, performance trends, optimization recommendations |
QueryHistoryController | /v1/queries/history | Detailed query execution history with filtering and search |
Every controller method requires the X-Tenant-ID header, which the security layer validates against the JWT token to ensure tenant isolation.
Security Layer
The security layer intercepts every query before it reaches the execution engine:
// Security filter chain ordering
@Configuration
public class SecurityConfig {
// 1. JWT Authentication Filter - validates token, extracts tenant context
// 2. RLS Filter Service - injects row-level security predicates
// 3. Data Masking Service - applies column masking rules
// 4. Query Audit Service - logs access for compliance
}JwtAuthenticationFilter validates the JWT token from the Authorization header, extracts the tenant ID, user ID, and roles, and sets the Spring Security context for downstream processing.
RlsFilterService parses the SQL query to extract table references, evaluates RLS policies for each table against the user's roles and attributes, and injects WHERE clause predicates to restrict row visibility.
Data Masking Service identifies columns tagged with sensitivity classifications and applies masking functions (redaction, hashing, partial masking) based on the user's role and the column's classification level.
QueryAuditService records every query execution, including the original query, the transformed query (after RLS and masking), the user context, and the execution outcome, to an immutable audit log.
Query Execution Service
The QueryExecutionService is the central orchestrator of the query lifecycle. It coordinates all phases of query processing:
@Service
@RequiredArgsConstructor
public class QueryExecutionService {
private final SmartQueryRouter queryRouter;
private final Map<EngineType, QueryEngineStrategy> strategies;
private final RlsFilterService rlsFilterService;
private final QueryCacheService cacheService;
private final QueryAuditService auditService;
public QueryResponse executeSync(UUID tenantId, UUID userId, QueryRequest request) {
// 1. Check cache for existing result
// 2. Apply RLS filters
// 3. Apply data masking rules
// 4. Route to optimal engine
// 5. Execute via strategy
// 6. Cache result
// 7. Record audit entry
// 8. Return response
}
public AsyncQueryResponse submitAsync(UUID tenantId, UUID userId, QueryRequest request) {
// 1. Validate query
// 2. Estimate cost
// 3. Apply RLS and masking
// 4. Submit to execution queue
// 5. Return execution ID and estimated completion time
}
}The service implements both synchronous and asynchronous execution paths:
| Mode | Behavior | Use Case |
|---|---|---|
| Synchronous | Blocks until results are ready; returns data inline | Small-to-medium queries, interactive analytics |
| Asynchronous | Returns immediately with execution ID; results polled separately | Large scans, batch analytics, scheduled queries |
Strategy Pattern for Engine Abstraction
The Query Engine uses the Strategy pattern to abstract execution backends. Each backend implements the QueryEngineStrategy interface:
public interface QueryEngineStrategy {
QueryResponse execute(QueryRequest request, int timeoutSeconds);
EngineType getEngineType();
boolean isAvailable();
void healthCheck();
}Three strategies are currently implemented:
| Strategy | Engine | Optimized For |
|---|---|---|
TrinoQueryStrategy | Trino v458 | Complex analytical queries, multi-source joins, window functions |
ClickHouseQueryStrategy | ClickHouse | Real-time OLAP, aggregate queries on event data, low-latency reads |
SparkAsyncQueryStrategy | Spark via Spark Connect | Very large table scans (>100GB), batch transformations, data export |
Each strategy handles connection management, timeout enforcement, result materialization, and metrics reporting independently. The strategies are registered as Spring components and injected into the execution service as a map keyed by EngineType.
Smart Query Router
The SmartQueryRouter analyzes incoming queries and selects the optimal execution engine based on query characteristics and estimated resource requirements.
Routing Decision Tree
Query arrives
|
v
[Contains real-time tables?] -- Yes --> ClickHouse
|
No
v
[Estimated scan > 100GB?] -- Yes --> Spark Async
|
No
v
[Estimated rows < 1M AND no joins AND no window functions?] -- Yes --> ClickHouse
|
No
v
[Has window functions OR > 2 joins?] -- Yes --> Trino
|
No
v
Default --> TrinoQuery Analysis
The router performs several analysis steps before making a routing decision:
private QueryAnalysis analyzeQuery(QueryRequest request) {
QueryAnalysis analysis = new QueryAnalysis();
String sql = request.getSql().toLowerCase();
// 1. Check for real-time table patterns (events, clicks, pageviews, metrics, logs, sessions)
// 2. Detect aggregate functions (COUNT, SUM, AVG, MIN, MAX, PERCENTILE, APPROX_*)
// 3. Detect window functions (OVER, PARTITION BY, ROW_NUMBER, RANK, DENSE_RANK, LAG, LEAD)
// 4. Count JOIN operations
// 5. Parse SQL via JSqlParser for GROUP BY, ORDER BY, and subquery detection
// 6. Estimate row count and scan size
}The router uses JSqlParser for SQL AST analysis when possible, falling back to regex-based heuristics for queries that cannot be parsed. This dual approach ensures routing decisions are made even for non-standard SQL syntax.
Cost-Based Router
In addition to the heuristic SmartQueryRouter, a CostBasedRouter provides more precise routing decisions using table statistics:
| Signal | Source | Usage |
|---|---|---|
| Table row counts | TableStatisticsProvider | Determines scan size |
| Column cardinality | Data catalog metadata | Estimates join selectivity |
| Historical execution times | Query history database | Predicts completion time |
| Current engine load | Health check endpoints | Avoids overloaded engines |
| Tenant resource quotas | Config service | Enforces per-tenant limits |
The cost-based router consults the TableStatisticsProvider which maintains cached statistics from the data catalog, including row counts, average row sizes, and column cardinality estimates.
State Machine
Every query execution is modeled as a state machine defined in QueryStateMachineConfig:
+----------+
| RECEIVED |
+----+-----+
|
+----v-----+
| QUEUED |
+----+-----+
|
+-------+--------+
| |
+----v-----+ +----v-------+
| RUNNING | | CANCELLED |
+----+-----+ +------------+
|
+-------+--------+
| |
+----v-----+ +----v------+
| COMPLETED| | FAILED |
+----------+ +-----------+| State | Description |
|---|---|
RECEIVED | Query has been received by the API and validated |
QUEUED | Query is waiting in the priority queue for engine capacity |
RUNNING | Query is actively executing on the selected engine |
COMPLETED | Query finished successfully; results are available |
FAILED | Query execution failed with an error |
CANCELLED | Query was cancelled by the user or by a timeout policy |
State transitions are persisted to PostgreSQL via QueryExecutionRepository and published as Kafka events on the matih.query.state-changes topic for downstream consumers (analytics, billing, observability).
Query Request and Response DTOs
QueryRequest
public class QueryRequest {
private String sql; // The SQL query to execute
private String catalog; // Trino catalog (optional, defaults to 'delta')
private String schema; // Trino schema (optional, defaults to 'default')
private Integer limit; // Result row limit (default: 10000, max: 100000)
private Integer timeoutSeconds; // Execution timeout (default: 300)
private String enginePreference; // Preferred engine (optional, overrides router)
private Map<String, String> sessionProperties; // Trino session properties
private Map<String, Object> metadata; // Billing context, tags, correlation IDs
private UUID tenantId; // Injected from header
private UUID queryId; // Generated if not provided
}QueryResponse
public class QueryResponse {
private UUID executionId;
private QueryStatus status; // RECEIVED, QUEUED, RUNNING, COMPLETED, FAILED, CANCELLED
private EngineType engineType; // TRINO, CLICKHOUSE, SPARK_ASYNC
private List<ColumnMetadata> columns; // Column names, types, nullability
private List<Map<String, Object>> data; // Result rows
private long rowCount; // Rows returned in this page
private long totalRows; // Total rows in result set
private long executionTimeMs; // Total execution time
private boolean hasMore; // Whether more rows are available
private String cursorToken; // Cursor for next page
private Instant submittedAt;
private Instant completedAt;
private String errorMessage; // Error details if FAILED
}Data Source Management
The Query Engine maintains a registry of configured data sources per tenant, managed through the DataSourceService and DataSourceController:
public enum DataSourceType {
POSTGRESQL,
MYSQL,
MONGODB,
S3,
GCS,
AZURE_BLOB,
TRINO_CATALOG,
CLICKHOUSE,
SNOWFLAKE,
BIGQUERY
}Each data source registration includes:
| Field | Description |
|---|---|
name | Human-readable data source name |
type | One of the supported DataSourceType values |
connectionString | JDBC URL or endpoint (stored encrypted) |
credentialSecretRef | Kubernetes Secret reference for credentials |
catalog | Trino catalog name for this data source |
schema | Default schema within the catalog |
tenantId | Owning tenant |
readOnly | Whether write operations are allowed |
maxConnections | Connection pool size for this source |
properties | Additional connection properties |
Data source credentials are never stored in the database. They are referenced via Kubernetes Secrets using secretKeyRef, consistent with the platform's zero-secrets-in-code policy.
Health Checks and Observability
Health Endpoints
The Query Engine exposes standard Spring Boot Actuator health endpoints along with custom health indicators for each execution engine:
| Endpoint | Description |
|---|---|
/actuator/health | Composite health status |
/actuator/health/trino | Trino connection health |
/actuator/health/clickhouse | ClickHouse connection health |
/actuator/health/redis | Redis cache health |
/actuator/health/db | PostgreSQL database health |
Each engine strategy implements a healthCheck() method that verifies connectivity by executing a lightweight probe query (e.g., SELECT 1). Health check results are cached for 30 seconds to avoid probe query storms.
Metrics
The Query Engine emits the following Micrometer metrics:
| Metric | Type | Labels | Description |
|---|---|---|---|
query.execution.total | Counter | engine, status, tenant | Total query executions |
query.execution.duration | Timer | engine, tenant | Query execution duration |
query.trino.execution | Timer | -- | Trino-specific execution duration |
query.cache.hits | Counter | level, tenant | Cache hit count by level |
query.cache.misses | Counter | level, tenant | Cache miss count by level |
rls.filter.applied | Counter | -- | Queries with RLS filters applied |
rls.filter.skipped | Counter | -- | Queries where RLS was skipped |
rls.filter.time | Timer | -- | Time spent applying RLS filters |
query.queue.size | Gauge | priority | Current queue depth by priority |
query.queue.wait_time | Timer | priority | Time queries spend in queue |
Distributed Tracing
Every query execution creates an OpenTelemetry span hierarchy:
query.execute (root)
|-- query.cache.lookup
|-- query.rls.apply
|-- query.masking.apply
|-- query.router.decide
|-- query.engine.execute
| |-- trino.connection.acquire
| |-- trino.session.configure
| |-- trino.query.submit
| |-- trino.results.fetch
|-- query.cache.store
|-- query.audit.recordTrace context is propagated to Trino via session properties, enabling end-to-end trace correlation from the browser through the AI Service, the Query Engine, and into the distributed query execution within Trino.
Deployment Configuration
The Query Engine is deployed via Helm chart at infrastructure/helm/query-engine/:
# Key Helm values
replicaCount: 2
image:
repository: matih/query-engine
tag: latest
service:
port: 8080
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 2
memory: 4Gi
env:
QUERY_TRINO_URL:
value: "jdbc:trino://trino.matih-data-plane.svc.cluster.local:8080"
QUERY_TRINO_CATALOG:
value: "delta"
SPRING_REDIS_HOST:
valueFrom:
secretKeyRef:
name: query-engine-redis
key: host
SPRING_DATASOURCE_URL:
valueFrom:
secretKeyRef:
name: query-engine-db
key: urlThe service scales horizontally behind a Kubernetes Service. Each replica is stateless; all state is stored in PostgreSQL (query history, scheduled queries) and Redis (cache, query deduplication locks).
Related Sections
- Trino Integration -- Detailed Trino configuration and catalog management
- SQL Execution -- End-to-end query execution lifecycle
- Optimization -- Caching strategies and materialized views
- Row-Level Security -- RLS filter injection mechanics