Stream Ingestion
Stream ingestion enables real-time data flow from Kafka topics to Iceberg tables via Flink SQL jobs managed by the pipeline-service.
Architecture
Kafka Topics → Flink SQL Job → Iceberg Table → Trino QueryCreating a Stream Configuration
POST /api/v1/streams
X-Tenant-Id: {tenantId}
{
"name": "orders-stream",
"topics": ["orders.created", "orders.updated"],
"consumerGroup": "matih-orders-consumer",
"startingPosition": "LATEST",
"batchSize": 1000,
"targetTable": "orders_realtime",
"targetSchema": "default",
"valueFormat": "JSON"
}Stream Lifecycle
| State | Description |
|---|---|
STOPPED | Configuration saved, job not running |
STARTING | Flink job submission in progress |
RUNNING | Flink job active, consuming from Kafka |
STOPPING | Graceful shutdown in progress |
ERROR | Job failed, see errorMessage for details |
Operations
| Action | Endpoint | Permission |
|---|---|---|
| Create | POST /api/v1/streams | streams:write |
| List | GET /api/v1/streams | streams:read |
| Get | GET /api/v1/streams/{id} | streams:read |
| Start | POST /api/v1/streams/{id}/start | ingestion:execute |
| Stop | POST /api/v1/streams/{id}/stop | ingestion:execute |
| Delete | DELETE /api/v1/streams/{id} | streams:write |
Value Formats
| Format | Schema Registry | Use Case |
|---|---|---|
JSON | Optional | Most common, flexible schema |
AVRO | Required | Schema evolution, compact binary |
PROTOBUF | Required | High-performance, strong typing |
CSV | No | Legacy systems, simple data |
Topic Security
Topic names are validated to prevent cross-tenant access:
- Empty topic names are rejected
- Control characters are rejected
- Topic names containing another tenant's ID prefix are blocked
RBAC
| Role | streams:read | streams:write | ingestion:execute |
|---|---|---|---|
| DATA_ENGINEER | Yes | Yes | Yes |
| DATA_ANALYST | Yes | No | No |
| DATA_SCIENTIST | No | No | No |
| VIEWER | No | No | No |