MATIH Platform is in active MVP development. Documentation reflects current implementation status.
13. ML Service & MLOps
Ensemble

Ensemble Pipelines

The ML Service supports ensemble methods that combine multiple models to produce more accurate and robust predictions. The ensemble subsystem supports voting, averaging, stacking, blending, bagging, and boosting methods with configurable parallel inference, failure tolerance, and meta-learning.


Ensemble Methods

class EnsembleMethod(str, Enum):
    VOTING_HARD = "voting_hard"          # Majority voting
    VOTING_SOFT = "voting_soft"          # Weighted probability averaging
    AVERAGING = "averaging"              # Simple average of predictions
    WEIGHTED_AVERAGING = "weighted_averaging"  # Weighted average
    STACKING = "stacking"                # Meta-learner on base outputs
    BLENDING = "blending"                # Holdout blending
    BOOSTING = "boosting"                # Sequential boosting
    BAGGING = "bagging"                  # Bootstrap aggregating
MethodDescriptionBest For
Hard VotingMajority vote of class predictionsClassification with diverse models
Soft VotingWeighted average of probability outputsClassification with calibrated models
AveragingSimple mean of predictionsRegression, reducing variance
Weighted AveragingWeighted mean based on model qualityRegression with unequal models
StackingMeta-learner trained on base model outputsComplex decision boundaries
BlendingLike stacking but uses holdout setWhen stacking overfits
BoostingSequential training, each model corrects previousReducing bias
BaggingParallel training on bootstrap samplesReducing variance

Ensemble Configuration

class EnsembleConfig(BaseModel):
    """Configuration for an ensemble of models."""
    id: UUID
    tenant_id: str
    name: str
    description: str | None = None
 
    method: EnsembleMethod
    models: list[ModelWeight]           # At least 2 models
 
    # Stacking configuration
    meta_model_id: UUID | None = None
    meta_model_version: str | None = None
 
    # Serving configuration
    parallel_inference: bool = True      # Run models in parallel
    timeout_seconds: int = 30
    fail_on_partial: bool = False       # Continue if some models fail
    min_models_required: int = 1         # Minimum for valid prediction
 
    # Metadata
    tags: list[str] = Field(default_factory=list)
    created_by: str

Model Weights

class ModelWeight(BaseModel):
    """Weight configuration for a model in an ensemble."""
    model_id: UUID
    version: str | None = None         # Default: current version
    weight: float = 1.0                 # Weight for averaging (0.0-1.0)
    enabled: bool = True                # Whether model is active

Ensemble Prediction

Parallel Inference

When parallel_inference=True, all models are invoked concurrently:

class EnsemblePredictor:
    """Executes ensemble predictions."""
 
    async def predict(
        self,
        config: EnsembleConfig,
        features: dict[str, Any],
    ) -> EnsemblePredictionResponse:
        """Make ensemble prediction."""
        enabled_models = [m for m in config.models if m.enabled]
 
        if config.parallel_inference:
            # Run all models in parallel
            tasks = [
                self._predict_single(model, features)
                for model in enabled_models
            ]
            results = await asyncio.gather(
                *tasks, return_exceptions=True
            )
        else:
            # Run sequentially
            results = []
            for model in enabled_models:
                result = await self._predict_single(model, features)
                results.append(result)
 
        # Filter failures
        successful = [
            r for r in results
            if not isinstance(r, Exception)
        ]
 
        if len(successful) < config.min_models_required:
            raise EnsembleFailureError(
                f"Only {len(successful)} models succeeded, "
                f"minimum {config.min_models_required} required"
            )
 
        # Combine results based on method
        combined = self._combine(
            config.method,
            successful,
            [m.weight for m in enabled_models],
        )
 
        return EnsemblePredictionResponse(
            prediction=combined,
            model_predictions=successful,
            models_succeeded=len(successful),
            models_total=len(enabled_models),
        )

Combination Methods

Hard Voting:

def _hard_vote(self, predictions: list[list[Any]]) -> list[Any]:
    """Majority vote across model predictions."""
    from collections import Counter
    votes = [Counter(preds).most_common(1)[0][0] for preds in zip(*predictions)]
    return votes

Soft Voting (Weighted):

def _soft_vote(
    self,
    probabilities: list[list[list[float]]],
    weights: list[float],
) -> list[Any]:
    """Weighted average of probability distributions."""
    weighted_probs = np.average(probabilities, axis=0, weights=weights)
    return np.argmax(weighted_probs, axis=1).tolist()

Stacking:

def _stacking(
    self,
    predictions: list[list[Any]],
    meta_model: Any,
) -> list[Any]:
    """Use meta-learner on base model predictions."""
    # Stack base predictions as features for meta-model
    stacked_features = np.column_stack(predictions)
    return meta_model.predict(stacked_features).tolist()

Failure Tolerance

The ensemble system supports graceful degradation when individual models fail:

ConfigurationBehavior
fail_on_partial=False, min_models_required=1Return result if any model succeeds
fail_on_partial=False, min_models_required=2Require at least 2 successful models
fail_on_partial=TrueFail if any model fails
timeout_seconds=30Kill slow models after timeout

API Endpoints

POST   /api/v1/ensembles                    # Create ensemble
GET    /api/v1/ensembles                    # List ensembles
GET    /api/v1/ensembles/{id}               # Get ensemble
PUT    /api/v1/ensembles/{id}               # Update ensemble
DELETE /api/v1/ensembles/{id}               # Delete ensemble
POST   /api/v1/ensembles/{id}/predict       # Ensemble prediction
POST   /api/v1/ensembles/{id}/evaluate      # Evaluate ensemble

Ensemble Evaluation

async def evaluate_ensemble(
    self,
    config: EnsembleConfig,
    test_data: list[dict[str, Any]],
) -> EnsembleEvaluationResult:
    """Compare ensemble vs individual models."""
    ensemble_predictions = []
    individual_predictions = {m.model_id: [] for m in config.models}
 
    for sample in test_data:
        # Ensemble prediction
        result = await self.predict(config, sample["features"])
        ensemble_predictions.append(result.prediction)
 
        # Individual predictions
        for model_pred in result.model_predictions:
            individual_predictions[model_pred.model_id].append(
                model_pred.prediction
            )
 
    return EnsembleEvaluationResult(
        ensemble_accuracy=self._accuracy(ensemble_predictions, labels),
        individual_accuracies={
            mid: self._accuracy(preds, labels)
            for mid, preds in individual_predictions.items()
        },
        improvement_over_best=ensemble_accuracy - max(individual_accuracies),
    )