A/B Testing and Canary Deployments
The ML Service supports controlled model rollouts through A/B testing and canary deployment strategies. These mechanisms enable data-driven model selection by routing a fraction of traffic to new model versions while monitoring performance metrics and automatically rolling back if quality degrades.
A/B Testing Architecture
Prediction Request
|
v
+-------------------+
| Traffic Router | <-- Deterministic user-based split
+-------------------+
|
+-- Variant A (Control) -----> Current Production Model (80%)
|
+-- Variant B (Treatment) ---> New Candidate Model (20%)
|
v
+-------------------+
| Metrics Collector | <-- Track per-variant metrics
+-------------------+
|
v
+-------------------+
| Statistical | <-- Significance testing
| Analyzer |
+-------------------+
|
v
Promote / Rollback DecisionExperiment Configuration
@dataclass
class ABTestConfig:
"""Configuration for an A/B test experiment."""
experiment_id: str
tenant_id: str
name: str
# Models
control_model_id: str
control_version: str
treatment_model_id: str
treatment_version: str
# Traffic split
treatment_fraction: float = 0.2 # 20% to treatment
# Success criteria
primary_metric: str = "accuracy"
minimum_improvement: float = 0.01 # 1% improvement required
confidence_level: float = 0.95 # 95% statistical confidence
# Safeguards
max_duration_hours: int = 168 # 1 week maximum
min_sample_size: int = 1000 # Minimum samples before decision
auto_rollback_threshold: float = 0.05 # Rollback if 5% worse
# Status
status: str = "active" # active, completed, rolled_back
started_at: datetime = field(default_factory=datetime.utcnow)Traffic Routing
class ABTestRouter:
"""Routes prediction requests to A/B test variants."""
def route(
self,
experiment: ABTestConfig,
user_id: str,
) -> str:
"""Determine which variant to serve."""
# Deterministic hash-based assignment
# Same user always gets same variant
hash_value = hash(
f"{experiment.experiment_id}:{user_id}"
) % 10000
threshold = int(experiment.treatment_fraction * 10000)
if hash_value < threshold:
return "treatment"
return "control"Assignment Properties
| Property | Description |
|---|---|
| Deterministic | Same user always sees same variant |
| Uniform | Distribution matches configured split |
| Stable | Adding/removing experiments does not reassign |
| Isolated | Experiments do not interfere with each other |
Canary Deployments
Canary deployments gradually increase traffic to the new model:
class CanaryDeployment:
"""Progressive traffic shifting for model rollout."""
STAGES = [
{"fraction": 0.01, "duration_minutes": 30, "name": "smoke_test"},
{"fraction": 0.05, "duration_minutes": 60, "name": "early_canary"},
{"fraction": 0.10, "duration_minutes": 120, "name": "canary"},
{"fraction": 0.25, "duration_minutes": 240, "name": "expansion"},
{"fraction": 0.50, "duration_minutes": 360, "name": "half_traffic"},
{"fraction": 1.00, "duration_minutes": 0, "name": "full_rollout"},
]
async def advance_stage(
self,
deployment_id: str,
) -> CanaryStage:
"""Advance to the next canary stage."""
current = await self._get_current_stage(deployment_id)
# Check metrics before advancing
metrics = await self._get_stage_metrics(deployment_id)
if not self._meets_criteria(metrics):
await self._rollback(deployment_id)
return CanaryStage(
name="rolled_back",
reason=f"Metrics below threshold: {metrics}",
)
next_stage = self.STAGES[current.index + 1]
await self._update_traffic_split(
deployment_id, next_stage["fraction"]
)
return CanaryStage(
name=next_stage["name"],
fraction=next_stage["fraction"],
)Canary Stages
| Stage | Traffic | Duration | Monitoring |
|---|---|---|---|
| Smoke Test | 1% | 30 min | Error rate only |
| Early Canary | 5% | 1 hour | Error rate + latency |
| Canary | 10% | 2 hours | Full metrics suite |
| Expansion | 25% | 4 hours | Full metrics + comparison |
| Half Traffic | 50% | 6 hours | Statistical significance |
| Full Rollout | 100% | Permanent | Continuous monitoring |
Metrics Collection
class ABTestMetricsCollector:
"""Collects per-variant metrics for A/B tests."""
async def record_prediction(
self,
experiment_id: str,
variant: str,
prediction: Any,
actual: Any | None = None,
latency_ms: float = 0,
) -> None:
"""Record a prediction outcome."""
await self._store.record({
"experiment_id": experiment_id,
"variant": variant,
"prediction": prediction,
"actual": actual,
"latency_ms": latency_ms,
"timestamp": datetime.utcnow(),
})Tracked Metrics
| Metric | Type | Purpose |
|---|---|---|
| Accuracy | Quality | Primary success metric |
| F1 Score | Quality | Classification balance |
| Latency (p50, p95, p99) | Performance | Inference speed |
| Error rate | Reliability | Prediction failures |
| Throughput | Performance | Requests per second |
| User satisfaction | Quality | Feedback scores (if available) |
Statistical Analysis
class StatisticalAnalyzer:
"""Performs statistical significance testing for A/B tests."""
def analyze(
self,
control_metrics: list[float],
treatment_metrics: list[float],
confidence_level: float = 0.95,
) -> ABTestResult:
"""Analyze A/B test results."""
from scipy import stats
# Two-sample t-test
t_stat, p_value = stats.ttest_ind(
control_metrics,
treatment_metrics,
)
# Effect size (Cohen's d)
pooled_std = np.sqrt(
(np.std(control_metrics)**2 + np.std(treatment_metrics)**2) / 2
)
effect_size = (
np.mean(treatment_metrics) - np.mean(control_metrics)
) / pooled_std
return ABTestResult(
control_mean=np.mean(control_metrics),
treatment_mean=np.mean(treatment_metrics),
improvement=(
np.mean(treatment_metrics) - np.mean(control_metrics)
) / np.mean(control_metrics),
p_value=p_value,
significant=p_value < (1 - confidence_level),
effect_size=effect_size,
sample_size=len(control_metrics),
recommendation=self._get_recommendation(
p_value, effect_size, confidence_level
),
)Decision Criteria
| Outcome | Condition | Action |
|---|---|---|
| Promote treatment | p < 0.05 AND improvement > minimum | Replace control with treatment |
| Continue testing | p > 0.05 AND samples < min_sample | Collect more data |
| Keep control | p < 0.05 AND improvement < 0 | Treatment is worse, keep control |
| No difference | p > 0.05 AND samples >= min_sample | Treatment offers no improvement |
Auto-Rollback
The system automatically rolls back if the treatment model performs significantly worse:
class AutoRollbackMonitor:
"""Monitors A/B tests and triggers automatic rollback."""
async def check(self, experiment_id: str) -> None:
"""Check if rollback is needed."""
experiment = await self._get_experiment(experiment_id)
metrics = await self._get_recent_metrics(
experiment_id, window_minutes=15
)
control_error_rate = metrics["control"]["error_rate"]
treatment_error_rate = metrics["treatment"]["error_rate"]
degradation = treatment_error_rate - control_error_rate
if degradation > experiment.auto_rollback_threshold:
await self._rollback(experiment_id)
await self._alert(
f"A/B test {experiment_id} auto-rolled back: "
f"treatment error rate {treatment_error_rate:.2%} "
f"vs control {control_error_rate:.2%}"
)Shadow Deployments
For risk-free evaluation, the ML Service supports shadow deployments where the new model receives a copy of all traffic but its predictions are not served to users:
class ShadowDeployment:
"""Shadow deployment for risk-free model evaluation."""
async def predict_with_shadow(
self,
primary_model: str,
shadow_model: str,
features: dict[str, Any],
) -> PredictionResponse:
"""Make prediction with primary, log shadow prediction."""
# Primary prediction (served to user)
primary_result = await self._predict(primary_model, features)
# Shadow prediction (logged only)
asyncio.create_task(
self._shadow_predict_and_log(shadow_model, features)
)
return primary_resultAPI Endpoints
POST /api/v1/ab-tests # Create A/B test
GET /api/v1/ab-tests # List experiments
GET /api/v1/ab-tests/{id} # Get experiment
PUT /api/v1/ab-tests/{id} # Update configuration
DELETE /api/v1/ab-tests/{id} # Stop experiment
GET /api/v1/ab-tests/{id}/results # Get statistical results
POST /api/v1/ab-tests/{id}/promote # Promote treatment
POST /api/v1/ab-tests/{id}/rollback # Rollback to control
POST /api/v1/canary # Start canary deployment
GET /api/v1/canary/{id}/status # Canary status
POST /api/v1/canary/{id}/advance # Advance canary stage
POST /api/v1/shadow # Start shadow deployment