Spark Integration
Apache Spark v4.1.1 provides distributed data processing capabilities for large-scale ETL, data transformations, and Iceberg table operations. MATIH integrates Spark through the Spark Operator on Kubernetes and Spark Connect for interactive and batch processing.
Integration Architecture
+-------------------+ SparkApplication CRD +-------------------+
| Pipeline Service | --------------------------> | Spark Operator |
| (SparkService) | | (K8s Operator) |
+-------------------+ +--------+----------+
|
+--------v----------+
| Spark Driver |
| (Pod) |
+--------+----------+
|
+------------+------------+
| | |
+----v----+ +---v-----+ +---v-----+
| Executor| | Executor| | Executor|
| Pod 1 | | Pod 2 | | Pod N |
+---------+ +---------+ +---------+
| | |
+----v------------v------------v----+
| Object Storage |
| (S3 / GCS / Azure Blob) |
| via Polaris REST Catalog |
+-----------------------------------+Spark Configuration
Spark Version and Dependencies
| Component | Version | Purpose |
|---|---|---|
| Apache Spark | 4.1.1 | Core processing engine |
| Iceberg Spark Runtime | 1.5.0 | Iceberg table format support |
| Hadoop AWS | 3.3.4 | S3 storage access |
| AWS Java SDK Bundle | 1.12.262 | AWS API support |
Polaris REST Catalog Integration
Spark connects to Polaris for Iceberg table management:
# Iceberg extensions
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
# Polaris REST Catalog configuration
spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.type=rest
spark.sql.catalog.iceberg.uri=http://polaris.matih-data-plane.svc.cluster.local:8181/api/catalog
spark.sql.catalog.iceberg.warehouse=matih
# Credential vending
spark.sql.catalog.iceberg.header.X-Iceberg-Access-Delegation=vended-credentials
spark.sql.catalog.iceberg.credential=${POLARIS_CLIENT_ID}:${POLARIS_CLIENT_SECRET}
spark.sql.catalog.iceberg.token-refresh-enabled=true
# Default catalog
spark.sql.defaultCatalog=icebergPolaris provides scoped storage credentials to Spark through credential vending, eliminating the need to distribute cloud storage credentials to Spark executors.
Spark Operator
MATIH uses the Spark Operator (sparkoperator.k8s.io) to manage Spark applications on Kubernetes:
SparkApplication CRD
The SparkApplication Custom Resource Definition describes a Spark job:
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-iceberg-example
namespace: matih-data-plane
labels:
app.kubernetes.io/name: spark-iceberg-job
app.kubernetes.io/component: etl
app.kubernetes.io/part-of: matih-data-plane
spec:
type: Python
mode: cluster
image: "apache/spark-py:4.1.1"
mainApplicationFile: "local:///opt/spark/work-dir/etl_job.py"
pythonVersion: "3"
sparkVersion: "4.1.1"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
sparkConf:
# Iceberg + Polaris configuration (see above)
driver:
cores: 1
coreLimit: "1200m"
memory: "2g"
serviceAccount: spark-operator-spark
env:
- name: POLARIS_CLIENT_ID
valueFrom:
secretKeyRef:
name: polaris-spark-credentials
key: client-id
- name: POLARIS_CLIENT_SECRET
valueFrom:
secretKeyRef:
name: polaris-spark-credentials
key: client-secret
executor:
cores: 2
instances: 2
memory: "4g"
env:
- name: POLARIS_CLIENT_ID
valueFrom:
secretKeyRef:
name: polaris-spark-credentials
key: client-id
- name: POLARIS_CLIENT_SECRET
valueFrom:
secretKeyRef:
name: polaris-spark-credentials
key: client-secretDynamic Allocation
For workloads with variable resource needs, Spark dynamic allocation scales executors automatically:
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.shuffleTracking.enabled=true
spark.dynamicAllocation.minExecutors=1
spark.dynamicAllocation.maxExecutors=10
spark.dynamicAllocation.initialExecutors=2Spark Operator Service
The SparkOperatorService in the Pipeline Service manages Spark job lifecycle:
class SparkOperatorService:
"""Manages Spark applications via the Spark Operator."""
async def submit_job(self, job_spec: SparkJobSpec) -> SparkJob:
"""Submit a SparkApplication CRD to Kubernetes."""
# 1. Build SparkApplication manifest from job spec
# 2. Inject Polaris credentials
# 3. Apply resource quotas based on tenant limits
# 4. Create SparkApplication CRD via K8s API
# 5. Return job handle with status URL
async def get_job_status(self, job_name: str) -> SparkJobStatus:
"""Get current status of a Spark job."""
# Read SparkApplication status from K8s API
async def cancel_job(self, job_name: str) -> bool:
"""Cancel a running Spark job."""
# Delete SparkApplication CRD
async def get_job_logs(self, job_name: str, container: str = "driver") -> str:
"""Get logs from driver or executor pods."""Spark Connect
Spark Connect provides a client-server protocol for interactive Spark sessions:
+-------------------+ gRPC +-------------------+
| Pipeline Service | ------------> | Spark Connect |
| (SparkService) | | Server |
+-------------------+ +--------+----------+
|
+--------v----------+
| Spark Execution |
| Engine |
+-------------------+Interactive Sessions
class SparkService:
"""Interactive Spark processing via Spark Connect."""
async def create_session(self, tenant_id: str, config: dict) -> SparkSession:
"""Create an interactive Spark Connect session."""
async def execute_sql(self, session_id: str, sql: str) -> DataFrame:
"""Execute SQL in an existing Spark session."""
async def execute_transformation(self, session_id: str, code: str) -> DataFrame:
"""Execute PySpark transformation code."""
async def close_session(self, session_id: str) -> None:
"""Close a Spark session and release resources."""Spark Template Service
The SparkTemplateService provides pre-configured Spark job templates:
class SparkTemplateService:
"""Manages Spark job templates."""
async def get_templates(self) -> List[SparkTemplate]:
"""List available Spark job templates."""
async def render_template(self, template_id: str, parameters: dict) -> SparkJobSpec:
"""Render a template with user parameters into a job specification."""Available Templates
| Template | Description | Parameters |
|---|---|---|
iceberg-etl | ETL from source to Iceberg table | source, target, transform SQL |
iceberg-compact | Compact small files in Iceberg table | table, target file size |
iceberg-snapshot-expire | Expire old snapshots | table, retention period |
data-profiling | Profile table statistics | table, sample size |
schema-migration | Migrate schema between catalogs | source catalog, target catalog |
Iceberg Table Operations
Spark provides full support for Iceberg table operations via the Polaris REST Catalog:
Table Management
-- Create an Iceberg table
CREATE TABLE iceberg.matih_analytics.sales_data (
id BIGINT,
region STRING,
amount DECIMAL(12,2),
order_date DATE,
customer_id BIGINT
) USING iceberg
PARTITIONED BY (days(order_date))
-- Insert data
INSERT INTO iceberg.matih_analytics.sales_data
SELECT * FROM staging.raw_orders
WHERE order_date >= '2026-02-01'
-- Merge (upsert)
MERGE INTO iceberg.matih_analytics.sales_data t
USING staging.updates s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET amount = s.amount
WHEN NOT MATCHED THEN INSERT *Table Maintenance
-- Compact small files
CALL iceberg.system.rewrite_data_files(
table => 'matih_analytics.sales_data',
options => map('target-file-size-bytes', '134217728')
)
-- Expire snapshots
CALL iceberg.system.expire_snapshots(
table => 'matih_analytics.sales_data',
older_than => TIMESTAMP '2026-01-01 00:00:00'
)
-- Remove orphan files
CALL iceberg.system.remove_orphan_files(
table => 'matih_analytics.sales_data',
older_than => TIMESTAMP '2026-01-01 00:00:00'
)
-- Analyze table for statistics
ANALYZE TABLE iceberg.matih_analytics.sales_data COMPUTE STATISTICSResource Management
Tenant Resource Isolation
Spark jobs are isolated per tenant through:
| Mechanism | Implementation |
|---|---|
| Namespace isolation | Jobs run in tenant-specific namespaces |
| Resource quotas | CPU and memory limits per tenant |
| Priority classes | Different priorities for interactive vs batch |
| Service accounts | Tenant-specific service accounts for RBAC |
| Network policies | Pods can only access tenant-scoped resources |
Resource Configuration
| Job Type | Driver | Executor | Max Executors |
|---|---|---|---|
| Interactive (small) | 1 core, 2GB | 1 core, 2GB | 2 |
| Batch ETL (medium) | 1 core, 2GB | 2 cores, 4GB | 5 |
| Large-scale (heavy) | 2 cores, 4GB | 4 cores, 8GB | 10 |
| Profiling | 1 core, 2GB | 2 cores, 4GB | 3 |
Monitoring and Observability
Spark Job Metrics
| Metric | Source | Description |
|---|---|---|
| Job duration | Spark UI / K8s events | Total job execution time |
| Stage completion | Spark REST API | Progress through stages |
| Shuffle read/write | Spark metrics | Data movement between stages |
| Executor memory | K8s metrics | Memory usage per executor |
| Task failures | Spark event logs | Failed task count and reasons |
Event Logging
Spark event logs are stored in object storage for post-mortem analysis:
spark.eventLog.enabled=true
spark.eventLog.dir=s3a://spark-events/logsRelated Sections
- Pipeline Service -- Spark job submission and management
- Flink -- Complementary streaming engine
- Templates -- Spark job templates
- API Reference -- Spark management endpoints