File Ingestion
File ingestion pipelines process data files from cloud object stores (S3, GCS, Azure Blob) and SFTP servers. The Pipeline Service uses the CloudStorageExtractOperator for cloud files and supports CSV, JSON, Parquet, Avro, and ORC formats with automatic schema inference.
Supported Storage Backends
| Backend | Connection Type | Authentication |
|---|---|---|
| Amazon S3 | s3 | AWS credentials via K8s Secret |
| Google Cloud Storage | gcs | Service account JSON via K8s Secret |
| Azure Blob Storage | azure_blob | SAS token or managed identity |
| SFTP | sftp | SSH key or password via K8s Secret |
| Local filesystem | local | Mounted PVC path |
Pipeline Definition
metadata:
name: daily-csv-import
version: "1.0.0"
owner: data-engineering
sources:
vendor_files:
type: cloud_storage
connection: ${S3_CONNECTION}
bucket: vendor-data-landing
prefix: daily/transactions/
pattern: "*.csv.gz"
format: csv
csv_options:
delimiter: ","
header: true
quote_char: '"'
encoding: utf-8
archive_after_read: true
archive_prefix: processed/
transformations:
- name: validate_schema
type: schema_check
input: vendor_files
expected_columns: [transaction_id, amount, currency, date]
sinks:
staging:
type: iceberg
table: staging.vendor.transactions
mode: overwrite_partition
partition_by: [date]
orchestration:
engine: airflow
schedule: "30 7 * * *"CloudStorageExtractOperator
Source: data-plane/pipeline-service/src/matih_pipeline/operators/cloud_storage_extract.py
Configuration Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
bucket | string | Yes | Storage bucket or container name |
prefix | string | No | Object key prefix filter |
pattern | string | No | Glob pattern for file matching |
format | string | Yes | File format (csv, json, parquet, avro, orc) |
compression | string | No | Compression type (gzip, snappy, zstd, none) |
archive_after_read | bool | No | Move files after processing (default: false) |
archive_prefix | string | No | Destination prefix for archived files |
max_files | int | No | Maximum files per batch (default: 100) |
File Discovery
The operator lists objects matching the prefix and pattern, then filters by modification time to avoid reprocessing:
1. List objects: s3://vendor-data-landing/daily/transactions/*.csv.gz
2. Filter: modified_after > last_run_timestamp
3. Sort: by modification time (oldest first)
4. Limit: max_files per batch
5. Process each file sequentially or in parallelSchema Inference
For CSV and JSON files, the operator infers schemas from the first N rows:
| Format | Inference Method |
|---|---|
| CSV | Sample first 1000 rows, detect types by value pattern |
| JSON | Parse structure from first record, handle nested objects |
| Parquet | Read embedded schema from file footer |
| Avro | Read schema from file header |
| ORC | Read embedded schema from file stripe |
Archive and Cleanup
After successful processing, files can be archived or deleted:
| Strategy | Behavior |
|---|---|
archive | Move to archive prefix with timestamp |
delete | Remove source files after successful load |
none | Leave files in place (use watermark to avoid reprocessing) |
Related Pages
- Batch Ingestion -- Database extraction patterns
- Schema Registry -- Schema validation
- API Connectors -- REST API extraction