MATIH Platform is in active MVP development. Documentation reflects current implementation status.
11. Pipelines & Data Engineering
File Ingestion

File Ingestion

File ingestion pipelines process data files from cloud object stores (S3, GCS, Azure Blob) and SFTP servers. The Pipeline Service uses the CloudStorageExtractOperator for cloud files and supports CSV, JSON, Parquet, Avro, and ORC formats with automatic schema inference.


Supported Storage Backends

BackendConnection TypeAuthentication
Amazon S3s3AWS credentials via K8s Secret
Google Cloud StoragegcsService account JSON via K8s Secret
Azure Blob Storageazure_blobSAS token or managed identity
SFTPsftpSSH key or password via K8s Secret
Local filesystemlocalMounted PVC path

Pipeline Definition

metadata:
  name: daily-csv-import
  version: "1.0.0"
  owner: data-engineering
 
sources:
  vendor_files:
    type: cloud_storage
    connection: ${S3_CONNECTION}
    bucket: vendor-data-landing
    prefix: daily/transactions/
    pattern: "*.csv.gz"
    format: csv
    csv_options:
      delimiter: ","
      header: true
      quote_char: '"'
      encoding: utf-8
    archive_after_read: true
    archive_prefix: processed/
 
transformations:
  - name: validate_schema
    type: schema_check
    input: vendor_files
    expected_columns: [transaction_id, amount, currency, date]
 
sinks:
  staging:
    type: iceberg
    table: staging.vendor.transactions
    mode: overwrite_partition
    partition_by: [date]
 
orchestration:
  engine: airflow
  schedule: "30 7 * * *"

CloudStorageExtractOperator

Source: data-plane/pipeline-service/src/matih_pipeline/operators/cloud_storage_extract.py

Configuration Parameters

ParameterTypeRequiredDescription
bucketstringYesStorage bucket or container name
prefixstringNoObject key prefix filter
patternstringNoGlob pattern for file matching
formatstringYesFile format (csv, json, parquet, avro, orc)
compressionstringNoCompression type (gzip, snappy, zstd, none)
archive_after_readboolNoMove files after processing (default: false)
archive_prefixstringNoDestination prefix for archived files
max_filesintNoMaximum files per batch (default: 100)

File Discovery

The operator lists objects matching the prefix and pattern, then filters by modification time to avoid reprocessing:

1. List objects: s3://vendor-data-landing/daily/transactions/*.csv.gz
2. Filter: modified_after > last_run_timestamp
3. Sort: by modification time (oldest first)
4. Limit: max_files per batch
5. Process each file sequentially or in parallel

Schema Inference

For CSV and JSON files, the operator infers schemas from the first N rows:

FormatInference Method
CSVSample first 1000 rows, detect types by value pattern
JSONParse structure from first record, handle nested objects
ParquetRead embedded schema from file footer
AvroRead schema from file header
ORCRead embedded schema from file stripe

Archive and Cleanup

After successful processing, files can be archived or deleted:

StrategyBehavior
archiveMove to archive prefix with timestamp
deleteRemove source files after successful load
noneLeave files in place (use watermark to avoid reprocessing)

Related Pages