MATIH Platform is in active MVP development. Documentation reflects current implementation status.
9. Query Engine & SQL
Scheduling

Query Scheduling and Workload Management

The Query Engine provides scheduling, queuing, and workload management capabilities for managing query execution across tenant workloads. This section covers scheduled query execution, priority-based queuing, cost estimation, timeout policies, and workload group configuration.


Scheduling Architecture

+-------------------+     +------------------+     +------------------+
| Query Scheduler   |     | Priority Queue   |     | Workload Manager |
| (Cron triggers)   | --> | (Sorted by       | --> | (Resource limits, |
|                   |     |  priority+time)  |     |  concurrency)    |
+-------------------+     +------------------+     +------------------+
                                   |
                          +--------v---------+
                          | Query Execution  |
                          | Service          |
                          +------------------+

Scheduled Queries

The QueryScheduleController manages recurring query executions:

Creating a Scheduled Query

POST /v1/queries/schedules

Request:
{
  "name": "daily_sales_report",
  "sql": "SELECT region, SUM(amount) AS total_sales FROM orders WHERE order_date = CURRENT_DATE - INTERVAL '1' DAY GROUP BY region",
  "cronExpression": "0 6 * * *",
  "timezone": "America/New_York",
  "catalog": "delta",
  "schema": "analytics",
  "priority": "NORMAL",
  "timeoutSeconds": 600,
  "maxRetries": 3,
  "notifyOnCompletion": true,
  "notifyOnFailure": true,
  "resultDestination": {
    "type": "TABLE",
    "target": "analytics.reports.daily_sales",
    "mode": "APPEND"
  },
  "tags": ["sales", "daily-report"]
}

Schedule Configuration

FieldTypeRequiredDescription
nameStringYesHuman-readable schedule name
sqlStringYesSQL query to execute
cronExpressionStringYesCron expression for scheduling
timezoneStringNoTimezone for cron evaluation (default: UTC)
catalogStringNoTrino catalog (default: configured default)
schemaStringNoTrino schema (default: configured default)
priorityEnumNoLOW, NORMAL, HIGH, CRITICAL (default: NORMAL)
timeoutSecondsIntegerNoMaximum execution time (default: 600)
maxRetriesIntegerNoRetry count on failure (default: 3)
notifyOnCompletionBooleanNoSend notification on success
notifyOnFailureBooleanNoSend notification on failure
resultDestinationObjectNoWhere to store results (TABLE, FILE, WEBHOOK)
enabledBooleanNoWhether the schedule is active (default: true)

Cron Expression Examples

ExpressionSchedule
0 6 * * *Daily at 6:00 AM
0 */4 * * *Every 4 hours
0 0 * * MONEvery Monday at midnight
0 6 1 * *First day of each month at 6:00 AM
*/15 * * * *Every 15 minutes

Result Destinations

Scheduled query results can be delivered to several destinations:

DestinationConfigurationUse Case
TABLETarget table name, append/overwrite modeStore results in Iceberg table
FILEObject storage path, format (CSV/Parquet/JSON)Export results to data lake
WEBHOOKURL, authentication, payload templateNotify external systems
EMAILRecipient list, attachment formatEmail results to stakeholders
CACHECache key, TTLPre-warm cache for dashboard queries

Priority Queue

The Query Engine uses a priority queue to manage query execution order:

Priority Levels

PriorityWeightMax ConcurrentUse Case
CRITICAL100UnlimitedSystem health queries, incident investigation
HIGH7510 per tenantInteractive user queries, dashboard refresh
NORMAL505 per tenantScheduled reports, background analytics
LOW252 per tenantData exports, batch processing

Queue Ordering

Queries in the queue are ordered by:

  1. Priority weight (descending) -- higher priority executes first
  2. Submission time (ascending) -- FIFO within the same priority
  3. Estimated execution time (ascending) -- shorter queries preferred when equal priority and time

Queue Management

GET /v1/queries/workloads/queue

Response:
{
  "queueSize": 23,
  "queuedByPriority": {
    "CRITICAL": 0,
    "HIGH": 5,
    "NORMAL": 12,
    "LOW": 6
  },
  "runningQueries": 8,
  "maxConcurrent": 15,
  "oldestQueuedAt": "2026-02-12T10:28:00Z",
  "estimatedWaitTimeMs": 15000
}

Cost Estimation

The QueryCostEstimator provides pre-execution cost estimates:

Cost Model

@Service
public class QueryCostEstimator {
 
    public QueryCostEstimate estimate(QueryRequest request) {
        // 1. Parse SQL to identify tables
        List<TableReference> tables = extractTables(request.getSql());
 
        // 2. Get table statistics from catalog
        Map<String, TableStatistics> stats = statisticsProvider.getStatistics(tables);
 
        // 3. Estimate scan size
        long estimatedBytes = computeScanEstimate(tables, stats, request);
 
        // 4. Estimate execution time from historical data
        long estimatedTimeMs = computeTimeEstimate(request, estimatedBytes);
 
        // 5. Estimate compute cost
        double estimatedCostUsd = computeCostEstimate(estimatedBytes, estimatedTimeMs);
 
        return QueryCostEstimate.builder()
            .estimatedScanBytes(estimatedBytes)
            .estimatedTimeMs(estimatedTimeMs)
            .estimatedCostUsd(estimatedCostUsd)
            .engine(router.route(request))
            .withinBudget(isWithinBudget(request.getTenantId(), estimatedCostUsd))
            .build();
    }
}

Cost Estimation Endpoint

POST /v1/queries/cost/estimate

Request:
{
  "sql": "SELECT * FROM orders JOIN customers ON orders.customer_id = customers.id WHERE order_date >= '2026-01-01'"
}

Response:
{
  "estimatedScanBytes": 5368709120,
  "estimatedScanBytesHuman": "5.0 GB",
  "estimatedTimeMs": 12000,
  "estimatedTimeHuman": "12 seconds",
  "estimatedCostUsd": 0.025,
  "engine": "TRINO",
  "withinBudget": true,
  "budgetRemainingUsd": 49.975,
  "tablesScanned": [
    {"table": "orders", "estimatedRows": 15000000, "estimatedBytes": 3221225472},
    {"table": "customers", "estimatedRows": 500000, "estimatedBytes": 2147483648}
  ]
}

Cost Policies

Tenant administrators can define cost policies that restrict query execution:

{
  "tenantId": "tenant-123",
  "policies": [
    {
      "name": "max_scan_size",
      "type": "SCAN_LIMIT",
      "value": 100,
      "unit": "GB",
      "action": "BLOCK",
      "message": "Query exceeds maximum scan size of 100GB"
    },
    {
      "name": "daily_budget",
      "type": "BUDGET_LIMIT",
      "value": 50.00,
      "unit": "USD",
      "period": "DAILY",
      "action": "WARN",
      "message": "Daily query budget of $50 reached"
    },
    {
      "name": "max_execution_time",
      "type": "TIME_LIMIT",
      "value": 30,
      "unit": "MINUTES",
      "action": "BLOCK",
      "message": "Query exceeds maximum execution time of 30 minutes"
    }
  ]
}
ActionBehavior
BLOCKReject the query before execution
WARNAllow execution but send notification
QUEUEMove to low-priority queue
APPROVERequire manual approval before execution

Timeout Policies

The QueryTimeoutPolicyRepository manages timeout configurations per tenant and workload type:

Workload TypeDefault TimeoutMax TimeoutDescription
Interactive300s (5 min)600s (10 min)User-initiated queries
Dashboard120s (2 min)300s (5 min)Dashboard widget queries
Scheduled1800s (30 min)3600s (1 hr)Scheduled report queries
Export3600s (1 hr)7200s (2 hr)Data export operations
System60s (1 min)120s (2 min)Health checks and metadata queries

When a query exceeds its timeout:

  1. Statement.cancel() is sent to the execution engine
  2. Query status transitions to CANCELLED with reason TIMEOUT
  3. A timeout event is published to Kafka
  4. If the query is scheduled, the retry logic is invoked

Workload Groups

The WorkloadController manages workload group configurations:

POST /v1/queries/workloads/groups

Request:
{
  "name": "interactive-analytics",
  "maxConcurrentQueries": 10,
  "maxQueuedQueries": 50,
  "maxScanBytesPerQuery": 10737418240,
  "defaultTimeoutSeconds": 300,
  "priority": "HIGH",
  "matchPatterns": {
    "sourceTypes": ["AI_SERVICE", "BI_SERVICE"],
    "userRoles": ["ANALYST", "DATA_ENGINEER"]
  }
}

Workload Group Configuration

PropertyDescription
maxConcurrentQueriesMaximum queries running simultaneously in this group
maxQueuedQueriesMaximum queries waiting in queue for this group
maxScanBytesPerQueryPer-query scan size limit
defaultTimeoutSecondsDefault execution timeout
priorityQueue priority for this group
matchPatternsRules for routing queries to this group

Resource Allocation

Workload groups interact with Trino resource groups:

Query Engine Workload Group          Trino Resource Group
-------------------------------     ---------------------------
interactive-analytics          -->  root.interactive.tenant_X.high_priority
batch-reports                  -->  root.batch.scheduled_queries
data-exports                   -->  root.batch.exports
system-health                  -->  root.system.health_checks

Saved Queries

The SavedQueryController provides persistent storage for frequently used queries:

POST /v1/queries/saved

Request:
{
  "name": "Monthly Revenue by Region",
  "description": "Aggregates monthly revenue across all regions with YoY comparison",
  "sql": "SELECT region, DATE_TRUNC('month', order_date) AS month, SUM(amount) AS revenue FROM orders GROUP BY 1, 2",
  "catalog": "delta",
  "schema": "analytics",
  "tags": ["revenue", "monthly", "regional"],
  "isPublic": true,
  "parameters": [
    {
      "name": "start_date",
      "type": "DATE",
      "defaultValue": "2026-01-01",
      "description": "Start date for the report"
    }
  ]
}

Saved queries support:

  • Parameterization: Named parameters with type validation and defaults
  • Versioning: Version history with diff tracking
  • Sharing: Public queries visible to all tenant users; private queries visible only to the creator
  • Tagging: Categorization for search and discovery
  • Usage tracking: Execution count and last-used timestamp

Related Sections