Ensemble Pipelines
The ML Service supports ensemble methods that combine multiple models to produce more accurate and robust predictions. The ensemble subsystem supports voting, averaging, stacking, blending, bagging, and boosting methods with configurable parallel inference, failure tolerance, and meta-learning.
Ensemble Methods
class EnsembleMethod(str, Enum):
VOTING_HARD = "voting_hard" # Majority voting
VOTING_SOFT = "voting_soft" # Weighted probability averaging
AVERAGING = "averaging" # Simple average of predictions
WEIGHTED_AVERAGING = "weighted_averaging" # Weighted average
STACKING = "stacking" # Meta-learner on base outputs
BLENDING = "blending" # Holdout blending
BOOSTING = "boosting" # Sequential boosting
BAGGING = "bagging" # Bootstrap aggregating| Method | Description | Best For |
|---|---|---|
| Hard Voting | Majority vote of class predictions | Classification with diverse models |
| Soft Voting | Weighted average of probability outputs | Classification with calibrated models |
| Averaging | Simple mean of predictions | Regression, reducing variance |
| Weighted Averaging | Weighted mean based on model quality | Regression with unequal models |
| Stacking | Meta-learner trained on base model outputs | Complex decision boundaries |
| Blending | Like stacking but uses holdout set | When stacking overfits |
| Boosting | Sequential training, each model corrects previous | Reducing bias |
| Bagging | Parallel training on bootstrap samples | Reducing variance |
Ensemble Configuration
class EnsembleConfig(BaseModel):
"""Configuration for an ensemble of models."""
id: UUID
tenant_id: str
name: str
description: str | None = None
method: EnsembleMethod
models: list[ModelWeight] # At least 2 models
# Stacking configuration
meta_model_id: UUID | None = None
meta_model_version: str | None = None
# Serving configuration
parallel_inference: bool = True # Run models in parallel
timeout_seconds: int = 30
fail_on_partial: bool = False # Continue if some models fail
min_models_required: int = 1 # Minimum for valid prediction
# Metadata
tags: list[str] = Field(default_factory=list)
created_by: strModel Weights
class ModelWeight(BaseModel):
"""Weight configuration for a model in an ensemble."""
model_id: UUID
version: str | None = None # Default: current version
weight: float = 1.0 # Weight for averaging (0.0-1.0)
enabled: bool = True # Whether model is activeEnsemble Prediction
Parallel Inference
When parallel_inference=True, all models are invoked concurrently:
class EnsemblePredictor:
"""Executes ensemble predictions."""
async def predict(
self,
config: EnsembleConfig,
features: dict[str, Any],
) -> EnsemblePredictionResponse:
"""Make ensemble prediction."""
enabled_models = [m for m in config.models if m.enabled]
if config.parallel_inference:
# Run all models in parallel
tasks = [
self._predict_single(model, features)
for model in enabled_models
]
results = await asyncio.gather(
*tasks, return_exceptions=True
)
else:
# Run sequentially
results = []
for model in enabled_models:
result = await self._predict_single(model, features)
results.append(result)
# Filter failures
successful = [
r for r in results
if not isinstance(r, Exception)
]
if len(successful) < config.min_models_required:
raise EnsembleFailureError(
f"Only {len(successful)} models succeeded, "
f"minimum {config.min_models_required} required"
)
# Combine results based on method
combined = self._combine(
config.method,
successful,
[m.weight for m in enabled_models],
)
return EnsemblePredictionResponse(
prediction=combined,
model_predictions=successful,
models_succeeded=len(successful),
models_total=len(enabled_models),
)Combination Methods
Hard Voting:
def _hard_vote(self, predictions: list[list[Any]]) -> list[Any]:
"""Majority vote across model predictions."""
from collections import Counter
votes = [Counter(preds).most_common(1)[0][0] for preds in zip(*predictions)]
return votesSoft Voting (Weighted):
def _soft_vote(
self,
probabilities: list[list[list[float]]],
weights: list[float],
) -> list[Any]:
"""Weighted average of probability distributions."""
weighted_probs = np.average(probabilities, axis=0, weights=weights)
return np.argmax(weighted_probs, axis=1).tolist()Stacking:
def _stacking(
self,
predictions: list[list[Any]],
meta_model: Any,
) -> list[Any]:
"""Use meta-learner on base model predictions."""
# Stack base predictions as features for meta-model
stacked_features = np.column_stack(predictions)
return meta_model.predict(stacked_features).tolist()Failure Tolerance
The ensemble system supports graceful degradation when individual models fail:
| Configuration | Behavior |
|---|---|
fail_on_partial=False, min_models_required=1 | Return result if any model succeeds |
fail_on_partial=False, min_models_required=2 | Require at least 2 successful models |
fail_on_partial=True | Fail if any model fails |
timeout_seconds=30 | Kill slow models after timeout |
API Endpoints
POST /api/v1/ensembles # Create ensemble
GET /api/v1/ensembles # List ensembles
GET /api/v1/ensembles/{id} # Get ensemble
PUT /api/v1/ensembles/{id} # Update ensemble
DELETE /api/v1/ensembles/{id} # Delete ensemble
POST /api/v1/ensembles/{id}/predict # Ensemble prediction
POST /api/v1/ensembles/{id}/evaluate # Evaluate ensembleEnsemble Evaluation
async def evaluate_ensemble(
self,
config: EnsembleConfig,
test_data: list[dict[str, Any]],
) -> EnsembleEvaluationResult:
"""Compare ensemble vs individual models."""
ensemble_predictions = []
individual_predictions = {m.model_id: [] for m in config.models}
for sample in test_data:
# Ensemble prediction
result = await self.predict(config, sample["features"])
ensemble_predictions.append(result.prediction)
# Individual predictions
for model_pred in result.model_predictions:
individual_predictions[model_pred.model_id].append(
model_pred.prediction
)
return EnsembleEvaluationResult(
ensemble_accuracy=self._accuracy(ensemble_predictions, labels),
individual_accuracies={
mid: self._accuracy(preds, labels)
for mid, preds in individual_predictions.items()
},
improvement_over_best=ensemble_accuracy - max(individual_accuracies),
)