MATIH Platform is in active MVP development. Documentation reflects current implementation status.
10. Data Catalog & Governance
Data Quality

Data Quality

The Data Quality Service is a Python/FastAPI application that continuously monitors data health across all tenant data assets. It provides validation rule execution, anomaly detection, quality scoring across multiple dimensions, SLA enforcement, and alerting for quality degradations. The service integrates with Great Expectations for rule-based validation and includes custom anomaly detection algorithms for time-series and distribution analysis.


Service Architecture

PropertyValue
LanguagePython 3.11
FrameworkFastAPI
Port8000
Namespacematih-data-plane
StoragePostgreSQL (quality metrics, rules, scores)
CacheRedis (metric caching, score snapshots)
Data accessTrino connector for querying tenant data

Component Layout

+------------------------------------------------------------------+
|                    Data Quality Service                            |
|                                                                   |
|  +-------------------+  +--------------------+  +---------------+ |
|  | API Routes         |  | Validation Engine  |  | Anomaly       | |
|  | - /rules          |  | - Rule engine      |  | Detection     | |
|  | - /validations    |  | - Expectations     |  | - Time series | |
|  | - /scores         |  | - Custom rules     |  | - Distribution| |
|  | - /profiles       |  | - dbt integration  |  | - Freshness   | |
|  | - /anomalies      |  |                    |  |               | |
|  +-------------------+  +--------------------+  +---------------+ |
|                                                                   |
|  +-------------------+  +--------------------+  +---------------+ |
|  | Scoring Engine     |  | Profiling Engine   |  | Alert Service | |
|  | - Multi-dimension |  | - Column stats     |  | - Thresholds  | |
|  | - Trend analysis  |  | - Spark profiler   |  | - Channels    | |
|  | - SLA tracking    |  | - Schema inference |  | - Escalation  | |
|  +-------------------+  +--------------------+  +---------------+ |
+------------------------------------------------------------------+

Validation Rules

The rule engine supports multiple rule types for validating data quality:

Rule Types

TypeDescriptionExample
NOT_NULLColumn must not contain NULL valuesorders.customer_id IS NOT NULL
UNIQUEColumn values must be uniqueorders.order_id is unique
RANGENumeric values within specified range0 < orders.amount < 1000000
ALLOWED_VALUESValues must be from a predefined setstatus IN ('active', 'cancelled', 'completed')
PATTERNString values match regex patternemail MATCHES '^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$'
FRESHNESSData updated within time thresholdMAX(updated_at) > NOW() - INTERVAL '1 HOUR'
ROW_COUNTTable row count within expected rangeCOUNT(*) BETWEEN 1000000 AND 20000000
REFERENTIALForeign key references existorders.customer_id EXISTS IN customers.id
CUSTOM_SQLArbitrary SQL expression evaluates to trueSUM(CASE WHEN amount < 0 THEN 1 ELSE 0 END) = 0
AGGREGATE_MATCHAggregates match between two tablesSUM(orders.amount) = SUM(settlements.total)
DISTRIBUTIONStatistical distribution within boundsSTDDEV(amount) < 500

Rule Definition

{
  "id": "rule-001",
  "name": "order_amount_positive",
  "description": "All order amounts must be positive",
  "tableFqn": "analytics.public.orders",
  "ruleType": "RANGE",
  "column": "amount",
  "config": {
    "min": 0.01,
    "max": null,
    "inclusive": true
  },
  "severity": "CRITICAL",
  "schedule": "0 */6 * * *",
  "enabled": true,
  "tags": ["finance", "data-integrity"]
}

Severity Levels

SeverityImpactAction
CRITICALData is incorrect or missingBlock downstream pipelines, alert immediately
HIGHData quality significantly degradedAlert within 15 minutes
MEDIUMData quality slightly degradedAlert within 1 hour
LOWMinor quality issueInclude in daily report
INFOInformational observationLog only

Anomaly Detection

The anomaly detection subsystem identifies unexpected patterns in data metrics:

Time-Series Anomaly Detection

# data-plane/data-quality-service/src/anomaly/time_series.py
 
class TimeSeriesAnomalyDetector:
    """Detects anomalies in time-series metrics using statistical methods."""
 
    def detect(self, metric_series: List[MetricPoint], config: AnomalyConfig) -> List[Anomaly]:
        # 1. Compute moving average and standard deviation
        # 2. Apply seasonal decomposition (STL) for periodic patterns
        # 3. Flag points outside configurable sigma threshold
        # 4. Account for known seasonal patterns (weekends, holidays)

Distribution Anomaly Detection

# data-plane/data-quality-service/src/anomaly/distribution.py
 
class DistributionAnomalyDetector:
    """Detects distribution shifts between data snapshots."""
 
    def detect(self, current: Distribution, baseline: Distribution) -> List[Anomaly]:
        # 1. Compute KL divergence between current and baseline
        # 2. Perform Kolmogorov-Smirnov test
        # 3. Compare quantile boundaries
        # 4. Flag significant distribution shifts

Freshness Anomaly Detection

# data-plane/data-quality-service/src/anomaly/freshness.py
 
class FreshnessAnomalyDetector:
    """Detects delayed data updates based on expected patterns."""
 
    def detect(self, table_fqn: str, config: FreshnessConfig) -> Optional[Anomaly]:
        # 1. Query MAX(updated_at) from table
        # 2. Compare with expected update frequency
        # 3. Account for known maintenance windows
        # 4. Flag if data is stale beyond threshold

Anomaly Response

{
  "anomalies": [
    {
      "id": "anomaly-001",
      "type": "DISTRIBUTION_SHIFT",
      "tableFqn": "analytics.public.orders",
      "column": "amount",
      "detectedAt": "2026-02-12T10:30:00Z",
      "severity": "HIGH",
      "description": "Order amount distribution shifted significantly from baseline",
      "metrics": {
        "klDivergence": 0.45,
        "ksStatistic": 0.12,
        "pValue": 0.001,
        "baselineMedian": 125.00,
        "currentMedian": 210.00
      },
      "possibleCauses": [
        "Price change in source system",
        "New product category with higher price point",
        "Data quality issue in upstream pipeline"
      ],
      "status": "OPEN"
    }
  ]
}

Quality Scoring

The ScoreCalculator computes composite quality scores across six dimensions:

Scoring Dimensions

DimensionWeightDescriptionMetrics
Completeness0.20Data values are presentNull ratio, missing field count
Accuracy0.20Values are correct and validRule violation count, anomaly count
Consistency0.15Values are consistent across sourcesCross-table validation, referential integrity
Timeliness0.15Data is current and freshFreshness lag, update frequency
Uniqueness0.15No unintended duplicatesDuplicate ratio, key uniqueness
Validity0.15Values conform to format and rangePattern match rate, range violations

Score Computation

# data-plane/data-quality-service/src/scoring/calculator.py
 
class QualityScoreCalculator:
 
    def compute_score(self, table_fqn: str, tenant_id: str) -> QualityScore:
        dimensions = {}
 
        # Completeness: based on null ratios across columns
        dimensions["completeness"] = self._compute_completeness(table_fqn, tenant_id)
 
        # Accuracy: based on rule validation results
        dimensions["accuracy"] = self._compute_accuracy(table_fqn, tenant_id)
 
        # Consistency: based on cross-table validations
        dimensions["consistency"] = self._compute_consistency(table_fqn, tenant_id)
 
        # Timeliness: based on freshness checks
        dimensions["timeliness"] = self._compute_timeliness(table_fqn, tenant_id)
 
        # Uniqueness: based on duplicate detection
        dimensions["uniqueness"] = self._compute_uniqueness(table_fqn, tenant_id)
 
        # Validity: based on format and range validations
        dimensions["validity"] = self._compute_validity(table_fqn, tenant_id)
 
        # Weighted composite score
        composite = sum(
            dimensions[dim] * self.weights[dim]
            for dim in dimensions
        )
 
        return QualityScore(
            table_fqn=table_fqn,
            composite_score=composite,
            dimensions=dimensions,
            computed_at=datetime.utcnow()
        )

Score API

GET /v1/quality/scores?table=analytics.public.orders

Response:
{
  "tableFqn": "analytics.public.orders",
  "compositeScore": 0.94,
  "dimensions": {
    "completeness": 0.98,
    "accuracy": 0.95,
    "consistency": 0.92,
    "timeliness": 0.99,
    "uniqueness": 0.97,
    "validity": 0.91
  },
  "trend": {
    "7day": 0.02,
    "30day": -0.01
  },
  "computedAt": "2026-02-12T10:00:00Z"
}

SLA Monitoring

The Data Quality Service monitors SLA compliance defined in data contracts:

class QualitySlaService:
 
    def check_sla_compliance(self, contract_id: str) -> SlaStatus:
        contract = self.get_contract(contract_id)
 
        return SlaStatus(
            freshness=self._check_freshness(contract),
            quality_score=self._check_quality(contract),
            completeness=self._check_completeness(contract),
            overall_status="COMPLIANT" if all_pass else "VIOLATION"
        )

SLA Dashboard

GET /v1/quality/sla/dashboard

Response:
{
  "totalContracts": 15,
  "compliant": 13,
  "warning": 1,
  "violation": 1,
  "contracts": [
    {
      "contractId": "contract-sales-to-bi",
      "dataset": "analytics.public.daily_sales",
      "status": "COMPLIANT",
      "metrics": {
        "freshness": {"status": "OK", "delay": "12 minutes", "sla": "60 minutes"},
        "quality": {"status": "OK", "score": 0.97, "sla": 0.95},
        "completeness": {"status": "OK", "nullRate": 0.002, "sla": 0.01}
      }
    }
  ]
}

Alerting

Quality violations trigger alerts through configurable channels:

ChannelConfigurationUse Case
SlackWebhook URL, channelTeam notifications
EmailSMTP, recipient listFormal notifications
PagerDutyService key, severity mappingOn-call escalation
KafkaTopic nameProgrammatic consumption
WebhookURL, auth headersCustom integrations

Alert deduplication prevents alert storms:

  • Same rule + same table + same severity = one alert per evaluation window
  • Escalation after configurable number of consecutive violations
  • Auto-resolve when quality returns to normal

Related Sections