Data Quality
The Data Quality Service is a Python/FastAPI application that continuously monitors data health across all tenant data assets. It provides validation rule execution, anomaly detection, quality scoring across multiple dimensions, SLA enforcement, and alerting for quality degradations. The service integrates with Great Expectations for rule-based validation and includes custom anomaly detection algorithms for time-series and distribution analysis.
Service Architecture
| Property | Value |
|---|---|
| Language | Python 3.11 |
| Framework | FastAPI |
| Port | 8000 |
| Namespace | matih-data-plane |
| Storage | PostgreSQL (quality metrics, rules, scores) |
| Cache | Redis (metric caching, score snapshots) |
| Data access | Trino connector for querying tenant data |
Component Layout
+------------------------------------------------------------------+
| Data Quality Service |
| |
| +-------------------+ +--------------------+ +---------------+ |
| | API Routes | | Validation Engine | | Anomaly | |
| | - /rules | | - Rule engine | | Detection | |
| | - /validations | | - Expectations | | - Time series | |
| | - /scores | | - Custom rules | | - Distribution| |
| | - /profiles | | - dbt integration | | - Freshness | |
| | - /anomalies | | | | | |
| +-------------------+ +--------------------+ +---------------+ |
| |
| +-------------------+ +--------------------+ +---------------+ |
| | Scoring Engine | | Profiling Engine | | Alert Service | |
| | - Multi-dimension | | - Column stats | | - Thresholds | |
| | - Trend analysis | | - Spark profiler | | - Channels | |
| | - SLA tracking | | - Schema inference | | - Escalation | |
| +-------------------+ +--------------------+ +---------------+ |
+------------------------------------------------------------------+Validation Rules
The rule engine supports multiple rule types for validating data quality:
Rule Types
| Type | Description | Example |
|---|---|---|
NOT_NULL | Column must not contain NULL values | orders.customer_id IS NOT NULL |
UNIQUE | Column values must be unique | orders.order_id is unique |
RANGE | Numeric values within specified range | 0 < orders.amount < 1000000 |
ALLOWED_VALUES | Values must be from a predefined set | status IN ('active', 'cancelled', 'completed') |
PATTERN | String values match regex pattern | email MATCHES '^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$' |
FRESHNESS | Data updated within time threshold | MAX(updated_at) > NOW() - INTERVAL '1 HOUR' |
ROW_COUNT | Table row count within expected range | COUNT(*) BETWEEN 1000000 AND 20000000 |
REFERENTIAL | Foreign key references exist | orders.customer_id EXISTS IN customers.id |
CUSTOM_SQL | Arbitrary SQL expression evaluates to true | SUM(CASE WHEN amount < 0 THEN 1 ELSE 0 END) = 0 |
AGGREGATE_MATCH | Aggregates match between two tables | SUM(orders.amount) = SUM(settlements.total) |
DISTRIBUTION | Statistical distribution within bounds | STDDEV(amount) < 500 |
Rule Definition
{
"id": "rule-001",
"name": "order_amount_positive",
"description": "All order amounts must be positive",
"tableFqn": "analytics.public.orders",
"ruleType": "RANGE",
"column": "amount",
"config": {
"min": 0.01,
"max": null,
"inclusive": true
},
"severity": "CRITICAL",
"schedule": "0 */6 * * *",
"enabled": true,
"tags": ["finance", "data-integrity"]
}Severity Levels
| Severity | Impact | Action |
|---|---|---|
CRITICAL | Data is incorrect or missing | Block downstream pipelines, alert immediately |
HIGH | Data quality significantly degraded | Alert within 15 minutes |
MEDIUM | Data quality slightly degraded | Alert within 1 hour |
LOW | Minor quality issue | Include in daily report |
INFO | Informational observation | Log only |
Anomaly Detection
The anomaly detection subsystem identifies unexpected patterns in data metrics:
Time-Series Anomaly Detection
# data-plane/data-quality-service/src/anomaly/time_series.py
class TimeSeriesAnomalyDetector:
"""Detects anomalies in time-series metrics using statistical methods."""
def detect(self, metric_series: List[MetricPoint], config: AnomalyConfig) -> List[Anomaly]:
# 1. Compute moving average and standard deviation
# 2. Apply seasonal decomposition (STL) for periodic patterns
# 3. Flag points outside configurable sigma threshold
# 4. Account for known seasonal patterns (weekends, holidays)Distribution Anomaly Detection
# data-plane/data-quality-service/src/anomaly/distribution.py
class DistributionAnomalyDetector:
"""Detects distribution shifts between data snapshots."""
def detect(self, current: Distribution, baseline: Distribution) -> List[Anomaly]:
# 1. Compute KL divergence between current and baseline
# 2. Perform Kolmogorov-Smirnov test
# 3. Compare quantile boundaries
# 4. Flag significant distribution shiftsFreshness Anomaly Detection
# data-plane/data-quality-service/src/anomaly/freshness.py
class FreshnessAnomalyDetector:
"""Detects delayed data updates based on expected patterns."""
def detect(self, table_fqn: str, config: FreshnessConfig) -> Optional[Anomaly]:
# 1. Query MAX(updated_at) from table
# 2. Compare with expected update frequency
# 3. Account for known maintenance windows
# 4. Flag if data is stale beyond thresholdAnomaly Response
{
"anomalies": [
{
"id": "anomaly-001",
"type": "DISTRIBUTION_SHIFT",
"tableFqn": "analytics.public.orders",
"column": "amount",
"detectedAt": "2026-02-12T10:30:00Z",
"severity": "HIGH",
"description": "Order amount distribution shifted significantly from baseline",
"metrics": {
"klDivergence": 0.45,
"ksStatistic": 0.12,
"pValue": 0.001,
"baselineMedian": 125.00,
"currentMedian": 210.00
},
"possibleCauses": [
"Price change in source system",
"New product category with higher price point",
"Data quality issue in upstream pipeline"
],
"status": "OPEN"
}
]
}Quality Scoring
The ScoreCalculator computes composite quality scores across six dimensions:
Scoring Dimensions
| Dimension | Weight | Description | Metrics |
|---|---|---|---|
| Completeness | 0.20 | Data values are present | Null ratio, missing field count |
| Accuracy | 0.20 | Values are correct and valid | Rule violation count, anomaly count |
| Consistency | 0.15 | Values are consistent across sources | Cross-table validation, referential integrity |
| Timeliness | 0.15 | Data is current and fresh | Freshness lag, update frequency |
| Uniqueness | 0.15 | No unintended duplicates | Duplicate ratio, key uniqueness |
| Validity | 0.15 | Values conform to format and range | Pattern match rate, range violations |
Score Computation
# data-plane/data-quality-service/src/scoring/calculator.py
class QualityScoreCalculator:
def compute_score(self, table_fqn: str, tenant_id: str) -> QualityScore:
dimensions = {}
# Completeness: based on null ratios across columns
dimensions["completeness"] = self._compute_completeness(table_fqn, tenant_id)
# Accuracy: based on rule validation results
dimensions["accuracy"] = self._compute_accuracy(table_fqn, tenant_id)
# Consistency: based on cross-table validations
dimensions["consistency"] = self._compute_consistency(table_fqn, tenant_id)
# Timeliness: based on freshness checks
dimensions["timeliness"] = self._compute_timeliness(table_fqn, tenant_id)
# Uniqueness: based on duplicate detection
dimensions["uniqueness"] = self._compute_uniqueness(table_fqn, tenant_id)
# Validity: based on format and range validations
dimensions["validity"] = self._compute_validity(table_fqn, tenant_id)
# Weighted composite score
composite = sum(
dimensions[dim] * self.weights[dim]
for dim in dimensions
)
return QualityScore(
table_fqn=table_fqn,
composite_score=composite,
dimensions=dimensions,
computed_at=datetime.utcnow()
)Score API
GET /v1/quality/scores?table=analytics.public.orders
Response:
{
"tableFqn": "analytics.public.orders",
"compositeScore": 0.94,
"dimensions": {
"completeness": 0.98,
"accuracy": 0.95,
"consistency": 0.92,
"timeliness": 0.99,
"uniqueness": 0.97,
"validity": 0.91
},
"trend": {
"7day": 0.02,
"30day": -0.01
},
"computedAt": "2026-02-12T10:00:00Z"
}SLA Monitoring
The Data Quality Service monitors SLA compliance defined in data contracts:
class QualitySlaService:
def check_sla_compliance(self, contract_id: str) -> SlaStatus:
contract = self.get_contract(contract_id)
return SlaStatus(
freshness=self._check_freshness(contract),
quality_score=self._check_quality(contract),
completeness=self._check_completeness(contract),
overall_status="COMPLIANT" if all_pass else "VIOLATION"
)SLA Dashboard
GET /v1/quality/sla/dashboard
Response:
{
"totalContracts": 15,
"compliant": 13,
"warning": 1,
"violation": 1,
"contracts": [
{
"contractId": "contract-sales-to-bi",
"dataset": "analytics.public.daily_sales",
"status": "COMPLIANT",
"metrics": {
"freshness": {"status": "OK", "delay": "12 minutes", "sla": "60 minutes"},
"quality": {"status": "OK", "score": 0.97, "sla": 0.95},
"completeness": {"status": "OK", "nullRate": 0.002, "sla": 0.01}
}
}
]
}Alerting
Quality violations trigger alerts through configurable channels:
| Channel | Configuration | Use Case |
|---|---|---|
| Slack | Webhook URL, channel | Team notifications |
| SMTP, recipient list | Formal notifications | |
| PagerDuty | Service key, severity mapping | On-call escalation |
| Kafka | Topic name | Programmatic consumption |
| Webhook | URL, auth headers | Custom integrations |
Alert deduplication prevents alert storms:
- Same rule + same table + same severity = one alert per evaluation window
- Escalation after configurable number of consecutive violations
- Auto-resolve when quality returns to normal
Related Sections
- Catalog Service -- Metadata and profiling integration
- Governance Service -- Data contract management
- Pipeline Service -- Quality gates in pipeline execution
- API Reference -- Data Quality Service endpoints