MATIH Platform is in active MVP development. Documentation reflects current implementation status.
11. Pipelines & Data Engineering
Spark

Spark Integration

Apache Spark v4.1.1 provides distributed data processing capabilities for large-scale ETL, data transformations, and Iceberg table operations. MATIH integrates Spark through the Spark Operator on Kubernetes and Spark Connect for interactive and batch processing.


Integration Architecture

+-------------------+     SparkApplication CRD    +-------------------+
| Pipeline Service  | --------------------------> | Spark Operator    |
| (SparkService)    |                             | (K8s Operator)    |
+-------------------+                             +--------+----------+
                                                           |
                                                  +--------v----------+
                                                  | Spark Driver      |
                                                  | (Pod)             |
                                                  +--------+----------+
                                                           |
                                              +------------+------------+
                                              |            |            |
                                         +----v----+  +---v-----+ +---v-----+
                                         | Executor|  | Executor|  | Executor|
                                         | Pod 1   |  | Pod 2   |  | Pod N   |
                                         +---------+  +---------+  +---------+
                                              |            |            |
                                         +----v------------v------------v----+
                                         |         Object Storage            |
                                         |    (S3 / GCS / Azure Blob)        |
                                         |    via Polaris REST Catalog        |
                                         +-----------------------------------+

Spark Configuration

Spark Version and Dependencies

ComponentVersionPurpose
Apache Spark4.1.1Core processing engine
Iceberg Spark Runtime1.5.0Iceberg table format support
Hadoop AWS3.3.4S3 storage access
AWS Java SDK Bundle1.12.262AWS API support

Polaris REST Catalog Integration

Spark connects to Polaris for Iceberg table management:

# Iceberg extensions
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
 
# Polaris REST Catalog configuration
spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.type=rest
spark.sql.catalog.iceberg.uri=http://polaris.matih-data-plane.svc.cluster.local:8181/api/catalog
spark.sql.catalog.iceberg.warehouse=matih
 
# Credential vending
spark.sql.catalog.iceberg.header.X-Iceberg-Access-Delegation=vended-credentials
spark.sql.catalog.iceberg.credential=${POLARIS_CLIENT_ID}:${POLARIS_CLIENT_SECRET}
spark.sql.catalog.iceberg.token-refresh-enabled=true
 
# Default catalog
spark.sql.defaultCatalog=iceberg

Polaris provides scoped storage credentials to Spark through credential vending, eliminating the need to distribute cloud storage credentials to Spark executors.


Spark Operator

MATIH uses the Spark Operator (sparkoperator.k8s.io) to manage Spark applications on Kubernetes:

SparkApplication CRD

The SparkApplication Custom Resource Definition describes a Spark job:

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-iceberg-example
  namespace: matih-data-plane
  labels:
    app.kubernetes.io/name: spark-iceberg-job
    app.kubernetes.io/component: etl
    app.kubernetes.io/part-of: matih-data-plane
spec:
  type: Python
  mode: cluster
  image: "apache/spark-py:4.1.1"
  mainApplicationFile: "local:///opt/spark/work-dir/etl_job.py"
  pythonVersion: "3"
  sparkVersion: "4.1.1"
 
  restartPolicy:
    type: OnFailure
    onFailureRetries: 3
    onFailureRetryInterval: 10
    onSubmissionFailureRetries: 5
    onSubmissionFailureRetryInterval: 20
 
  sparkConf:
    # Iceberg + Polaris configuration (see above)
 
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "2g"
    serviceAccount: spark-operator-spark
    env:
      - name: POLARIS_CLIENT_ID
        valueFrom:
          secretKeyRef:
            name: polaris-spark-credentials
            key: client-id
      - name: POLARIS_CLIENT_SECRET
        valueFrom:
          secretKeyRef:
            name: polaris-spark-credentials
            key: client-secret
 
  executor:
    cores: 2
    instances: 2
    memory: "4g"
    env:
      - name: POLARIS_CLIENT_ID
        valueFrom:
          secretKeyRef:
            name: polaris-spark-credentials
            key: client-id
      - name: POLARIS_CLIENT_SECRET
        valueFrom:
          secretKeyRef:
            name: polaris-spark-credentials
            key: client-secret

Dynamic Allocation

For workloads with variable resource needs, Spark dynamic allocation scales executors automatically:

spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.shuffleTracking.enabled=true
spark.dynamicAllocation.minExecutors=1
spark.dynamicAllocation.maxExecutors=10
spark.dynamicAllocation.initialExecutors=2

Spark Operator Service

The SparkOperatorService in the Pipeline Service manages Spark job lifecycle:

class SparkOperatorService:
    """Manages Spark applications via the Spark Operator."""
 
    async def submit_job(self, job_spec: SparkJobSpec) -> SparkJob:
        """Submit a SparkApplication CRD to Kubernetes."""
        # 1. Build SparkApplication manifest from job spec
        # 2. Inject Polaris credentials
        # 3. Apply resource quotas based on tenant limits
        # 4. Create SparkApplication CRD via K8s API
        # 5. Return job handle with status URL
 
    async def get_job_status(self, job_name: str) -> SparkJobStatus:
        """Get current status of a Spark job."""
        # Read SparkApplication status from K8s API
 
    async def cancel_job(self, job_name: str) -> bool:
        """Cancel a running Spark job."""
        # Delete SparkApplication CRD
 
    async def get_job_logs(self, job_name: str, container: str = "driver") -> str:
        """Get logs from driver or executor pods."""

Spark Connect

Spark Connect provides a client-server protocol for interactive Spark sessions:

+-------------------+     gRPC      +-------------------+
| Pipeline Service  | ------------> | Spark Connect     |
| (SparkService)    |               | Server            |
+-------------------+               +--------+----------+
                                             |
                                    +--------v----------+
                                    | Spark Execution   |
                                    | Engine            |
                                    +-------------------+

Interactive Sessions

class SparkService:
    """Interactive Spark processing via Spark Connect."""
 
    async def create_session(self, tenant_id: str, config: dict) -> SparkSession:
        """Create an interactive Spark Connect session."""
 
    async def execute_sql(self, session_id: str, sql: str) -> DataFrame:
        """Execute SQL in an existing Spark session."""
 
    async def execute_transformation(self, session_id: str, code: str) -> DataFrame:
        """Execute PySpark transformation code."""
 
    async def close_session(self, session_id: str) -> None:
        """Close a Spark session and release resources."""

Spark Template Service

The SparkTemplateService provides pre-configured Spark job templates:

class SparkTemplateService:
    """Manages Spark job templates."""
 
    async def get_templates(self) -> List[SparkTemplate]:
        """List available Spark job templates."""
 
    async def render_template(self, template_id: str, parameters: dict) -> SparkJobSpec:
        """Render a template with user parameters into a job specification."""

Available Templates

TemplateDescriptionParameters
iceberg-etlETL from source to Iceberg tablesource, target, transform SQL
iceberg-compactCompact small files in Iceberg tabletable, target file size
iceberg-snapshot-expireExpire old snapshotstable, retention period
data-profilingProfile table statisticstable, sample size
schema-migrationMigrate schema between catalogssource catalog, target catalog

Iceberg Table Operations

Spark provides full support for Iceberg table operations via the Polaris REST Catalog:

Table Management

-- Create an Iceberg table
CREATE TABLE iceberg.matih_analytics.sales_data (
    id BIGINT,
    region STRING,
    amount DECIMAL(12,2),
    order_date DATE,
    customer_id BIGINT
) USING iceberg
PARTITIONED BY (days(order_date))
 
-- Insert data
INSERT INTO iceberg.matih_analytics.sales_data
SELECT * FROM staging.raw_orders
WHERE order_date >= '2026-02-01'
 
-- Merge (upsert)
MERGE INTO iceberg.matih_analytics.sales_data t
USING staging.updates s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET amount = s.amount
WHEN NOT MATCHED THEN INSERT *

Table Maintenance

-- Compact small files
CALL iceberg.system.rewrite_data_files(
    table => 'matih_analytics.sales_data',
    options => map('target-file-size-bytes', '134217728')
)
 
-- Expire snapshots
CALL iceberg.system.expire_snapshots(
    table => 'matih_analytics.sales_data',
    older_than => TIMESTAMP '2026-01-01 00:00:00'
)
 
-- Remove orphan files
CALL iceberg.system.remove_orphan_files(
    table => 'matih_analytics.sales_data',
    older_than => TIMESTAMP '2026-01-01 00:00:00'
)
 
-- Analyze table for statistics
ANALYZE TABLE iceberg.matih_analytics.sales_data COMPUTE STATISTICS

Resource Management

Tenant Resource Isolation

Spark jobs are isolated per tenant through:

MechanismImplementation
Namespace isolationJobs run in tenant-specific namespaces
Resource quotasCPU and memory limits per tenant
Priority classesDifferent priorities for interactive vs batch
Service accountsTenant-specific service accounts for RBAC
Network policiesPods can only access tenant-scoped resources

Resource Configuration

Job TypeDriverExecutorMax Executors
Interactive (small)1 core, 2GB1 core, 2GB2
Batch ETL (medium)1 core, 2GB2 cores, 4GB5
Large-scale (heavy)2 cores, 4GB4 cores, 8GB10
Profiling1 core, 2GB2 cores, 4GB3

Monitoring and Observability

Spark Job Metrics

MetricSourceDescription
Job durationSpark UI / K8s eventsTotal job execution time
Stage completionSpark REST APIProgress through stages
Shuffle read/writeSpark metricsData movement between stages
Executor memoryK8s metricsMemory usage per executor
Task failuresSpark event logsFailed task count and reasons

Event Logging

Spark event logs are stored in object storage for post-mortem analysis:

spark.eventLog.enabled=true
spark.eventLog.dir=s3a://spark-events/logs

Related Sections