MATIH Platform is in active MVP development. Documentation reflects current implementation status.
13. ML Service & MLOps
Ab Testing

A/B Testing and Canary Deployments

The ML Service supports controlled model rollouts through A/B testing and canary deployment strategies. These mechanisms enable data-driven model selection by routing a fraction of traffic to new model versions while monitoring performance metrics and automatically rolling back if quality degrades.


A/B Testing Architecture

Prediction Request
    |
    v
+-------------------+
| Traffic Router    |  <-- Deterministic user-based split
+-------------------+
    |
    +-- Variant A (Control) -----> Current Production Model (80%)
    |
    +-- Variant B (Treatment) ---> New Candidate Model (20%)
    |
    v
+-------------------+
| Metrics Collector |  <-- Track per-variant metrics
+-------------------+
    |
    v
+-------------------+
| Statistical       |  <-- Significance testing
| Analyzer          |
+-------------------+
    |
    v
Promote / Rollback Decision

Experiment Configuration

@dataclass
class ABTestConfig:
    """Configuration for an A/B test experiment."""
    experiment_id: str
    tenant_id: str
    name: str
 
    # Models
    control_model_id: str
    control_version: str
    treatment_model_id: str
    treatment_version: str
 
    # Traffic split
    treatment_fraction: float = 0.2   # 20% to treatment
 
    # Success criteria
    primary_metric: str = "accuracy"
    minimum_improvement: float = 0.01  # 1% improvement required
    confidence_level: float = 0.95     # 95% statistical confidence
 
    # Safeguards
    max_duration_hours: int = 168     # 1 week maximum
    min_sample_size: int = 1000       # Minimum samples before decision
    auto_rollback_threshold: float = 0.05  # Rollback if 5% worse
 
    # Status
    status: str = "active"            # active, completed, rolled_back
    started_at: datetime = field(default_factory=datetime.utcnow)

Traffic Routing

class ABTestRouter:
    """Routes prediction requests to A/B test variants."""
 
    def route(
        self,
        experiment: ABTestConfig,
        user_id: str,
    ) -> str:
        """Determine which variant to serve."""
        # Deterministic hash-based assignment
        # Same user always gets same variant
        hash_value = hash(
            f"{experiment.experiment_id}:{user_id}"
        ) % 10000
 
        threshold = int(experiment.treatment_fraction * 10000)
 
        if hash_value < threshold:
            return "treatment"
        return "control"

Assignment Properties

PropertyDescription
DeterministicSame user always sees same variant
UniformDistribution matches configured split
StableAdding/removing experiments does not reassign
IsolatedExperiments do not interfere with each other

Canary Deployments

Canary deployments gradually increase traffic to the new model:

class CanaryDeployment:
    """Progressive traffic shifting for model rollout."""
 
    STAGES = [
        {"fraction": 0.01, "duration_minutes": 30, "name": "smoke_test"},
        {"fraction": 0.05, "duration_minutes": 60, "name": "early_canary"},
        {"fraction": 0.10, "duration_minutes": 120, "name": "canary"},
        {"fraction": 0.25, "duration_minutes": 240, "name": "expansion"},
        {"fraction": 0.50, "duration_minutes": 360, "name": "half_traffic"},
        {"fraction": 1.00, "duration_minutes": 0, "name": "full_rollout"},
    ]
 
    async def advance_stage(
        self,
        deployment_id: str,
    ) -> CanaryStage:
        """Advance to the next canary stage."""
        current = await self._get_current_stage(deployment_id)
 
        # Check metrics before advancing
        metrics = await self._get_stage_metrics(deployment_id)
        if not self._meets_criteria(metrics):
            await self._rollback(deployment_id)
            return CanaryStage(
                name="rolled_back",
                reason=f"Metrics below threshold: {metrics}",
            )
 
        next_stage = self.STAGES[current.index + 1]
        await self._update_traffic_split(
            deployment_id, next_stage["fraction"]
        )
 
        return CanaryStage(
            name=next_stage["name"],
            fraction=next_stage["fraction"],
        )

Canary Stages

StageTrafficDurationMonitoring
Smoke Test1%30 minError rate only
Early Canary5%1 hourError rate + latency
Canary10%2 hoursFull metrics suite
Expansion25%4 hoursFull metrics + comparison
Half Traffic50%6 hoursStatistical significance
Full Rollout100%PermanentContinuous monitoring

Metrics Collection

class ABTestMetricsCollector:
    """Collects per-variant metrics for A/B tests."""
 
    async def record_prediction(
        self,
        experiment_id: str,
        variant: str,
        prediction: Any,
        actual: Any | None = None,
        latency_ms: float = 0,
    ) -> None:
        """Record a prediction outcome."""
        await self._store.record({
            "experiment_id": experiment_id,
            "variant": variant,
            "prediction": prediction,
            "actual": actual,
            "latency_ms": latency_ms,
            "timestamp": datetime.utcnow(),
        })

Tracked Metrics

MetricTypePurpose
AccuracyQualityPrimary success metric
F1 ScoreQualityClassification balance
Latency (p50, p95, p99)PerformanceInference speed
Error rateReliabilityPrediction failures
ThroughputPerformanceRequests per second
User satisfactionQualityFeedback scores (if available)

Statistical Analysis

class StatisticalAnalyzer:
    """Performs statistical significance testing for A/B tests."""
 
    def analyze(
        self,
        control_metrics: list[float],
        treatment_metrics: list[float],
        confidence_level: float = 0.95,
    ) -> ABTestResult:
        """Analyze A/B test results."""
        from scipy import stats
 
        # Two-sample t-test
        t_stat, p_value = stats.ttest_ind(
            control_metrics,
            treatment_metrics,
        )
 
        # Effect size (Cohen's d)
        pooled_std = np.sqrt(
            (np.std(control_metrics)**2 + np.std(treatment_metrics)**2) / 2
        )
        effect_size = (
            np.mean(treatment_metrics) - np.mean(control_metrics)
        ) / pooled_std
 
        return ABTestResult(
            control_mean=np.mean(control_metrics),
            treatment_mean=np.mean(treatment_metrics),
            improvement=(
                np.mean(treatment_metrics) - np.mean(control_metrics)
            ) / np.mean(control_metrics),
            p_value=p_value,
            significant=p_value < (1 - confidence_level),
            effect_size=effect_size,
            sample_size=len(control_metrics),
            recommendation=self._get_recommendation(
                p_value, effect_size, confidence_level
            ),
        )

Decision Criteria

OutcomeConditionAction
Promote treatmentp < 0.05 AND improvement > minimumReplace control with treatment
Continue testingp > 0.05 AND samples < min_sampleCollect more data
Keep controlp < 0.05 AND improvement < 0Treatment is worse, keep control
No differencep > 0.05 AND samples >= min_sampleTreatment offers no improvement

Auto-Rollback

The system automatically rolls back if the treatment model performs significantly worse:

class AutoRollbackMonitor:
    """Monitors A/B tests and triggers automatic rollback."""
 
    async def check(self, experiment_id: str) -> None:
        """Check if rollback is needed."""
        experiment = await self._get_experiment(experiment_id)
        metrics = await self._get_recent_metrics(
            experiment_id, window_minutes=15
        )
 
        control_error_rate = metrics["control"]["error_rate"]
        treatment_error_rate = metrics["treatment"]["error_rate"]
 
        degradation = treatment_error_rate - control_error_rate
 
        if degradation > experiment.auto_rollback_threshold:
            await self._rollback(experiment_id)
            await self._alert(
                f"A/B test {experiment_id} auto-rolled back: "
                f"treatment error rate {treatment_error_rate:.2%} "
                f"vs control {control_error_rate:.2%}"
            )

Shadow Deployments

For risk-free evaluation, the ML Service supports shadow deployments where the new model receives a copy of all traffic but its predictions are not served to users:

class ShadowDeployment:
    """Shadow deployment for risk-free model evaluation."""
 
    async def predict_with_shadow(
        self,
        primary_model: str,
        shadow_model: str,
        features: dict[str, Any],
    ) -> PredictionResponse:
        """Make prediction with primary, log shadow prediction."""
        # Primary prediction (served to user)
        primary_result = await self._predict(primary_model, features)
 
        # Shadow prediction (logged only)
        asyncio.create_task(
            self._shadow_predict_and_log(shadow_model, features)
        )
 
        return primary_result

API Endpoints

POST   /api/v1/ab-tests                     # Create A/B test
GET    /api/v1/ab-tests                     # List experiments
GET    /api/v1/ab-tests/{id}                # Get experiment
PUT    /api/v1/ab-tests/{id}                # Update configuration
DELETE /api/v1/ab-tests/{id}                # Stop experiment
GET    /api/v1/ab-tests/{id}/results        # Get statistical results
POST   /api/v1/ab-tests/{id}/promote        # Promote treatment
POST   /api/v1/ab-tests/{id}/rollback       # Rollback to control
POST   /api/v1/canary                        # Start canary deployment
GET    /api/v1/canary/{id}/status            # Canary status
POST   /api/v1/canary/{id}/advance           # Advance canary stage
POST   /api/v1/shadow                        # Start shadow deployment