Schema Registry
The Schema Registry manages schema definitions, evolution rules, and compatibility validation for data flowing through MATIH pipelines. It ensures that producers and consumers agree on data formats and that schema changes do not break downstream consumers.
Architecture
| Property | Value |
|---|---|
| Backend | PostgreSQL (schema storage) |
| Format support | Avro, JSON Schema, Protobuf |
| Integration | Kafka (serialization/deserialization), Pipeline Service (validation) |
| Compatibility modes | BACKWARD, FORWARD, FULL, NONE |
Compatibility Modes
| Mode | Description | Allowed Changes |
|---|---|---|
BACKWARD | New schema can read old data | Add optional fields, widen types |
FORWARD | Old schema can read new data | Remove optional fields, narrow types |
FULL | Both backward and forward compatible | Add/remove optional fields only |
NONE | No compatibility checking | Any change allowed |
Schema Registration
POST /v1/schemas/:subject/versions
Request:
{
"schema": "{\"type\":\"record\",\"name\":\"Transaction\",...}",
"schemaType": "AVRO",
"compatibility": "BACKWARD"
}
Response:
{
"id": 42,
"subject": "transactions-value",
"version": 3,
"schemaType": "AVRO",
"compatible": true
}Schema Validation in Pipelines
Pipeline definitions reference schemas for validation at extraction and load time:
sources:
events:
type: kafka
topic: matih.events
schema_registry: ${SCHEMA_REGISTRY_URL}
schema_subject: events-value
schema_version: latest
quality_checks:
- name: schema_conformance
type: schema_check
schema_subject: events-value
schema_version: 3
severity: criticalSchema Evolution Rules
Adding a Field (BACKWARD Compatible)
{
"type": "record",
"name": "Transaction",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "USD"}
]
}The currency field has a default value, so old data without this field can still be read.
Removing a Field (FORWARD Compatible)
Removing a field with a default value is forward-compatible because old consumers can use the default when the field is missing.
Kafka Integration
The Schema Registry integrates with Kafka via serializers and deserializers:
| Component | Purpose |
|---|---|
AvroSerializer | Encodes Kafka messages using registered Avro schemas |
AvroDeserializer | Decodes Kafka messages using schema ID embedded in the message |
JsonSchemaValidator | Validates JSON messages against registered JSON Schema |
Schema Subjects
Subjects follow the naming convention {topic}-{key|value}:
| Subject | Schema Type | Description |
|---|---|---|
matih.ai.state-changes-value | JSON | FSM state transition events |
matih.ai.agent-traces-value | JSON | Agent execution traces |
matih.ai.llm-ops-value | JSON | LLM operation metrics |
Related Pages
- Stream Ingestion -- Kafka consumer pipelines
- Data Virtualization -- Federated schema access
- Pipeline Service -- Architecture overview