MATIH Platform is in active MVP development. Documentation reflects current implementation status.
13. ML Service & MLOps
Serving

Model Serving and Inference

The ML Service provides multiple model serving strategies optimized for different latency, throughput, and cost requirements. The serving subsystem supports ONNX Runtime for optimized single-model inference, Ray Serve for scalable online serving, Triton Inference Server for high-throughput GPU inference, and batch prediction for large-scale offline scoring.


Serving Architecture

Prediction Request
    |
    v
+-------------------+
| Prediction Service|  <-- Request routing, validation
+-------------------+
    |
    +-- Model Loader -----> Model Cache (LRU)
    |                           |
    |                           +-- MLflow Registry
    |                           +-- S3 Artifact Store
    |
    +-- Inference Engine Selection
        |
        +-- ONNX Runtime -------> Single model, CPU optimized
        |
        +-- Ray Serve -----------> Scalable online serving
        |
        +-- Triton --------------> GPU-optimized, multi-model
        |
        +-- Batch Processor -----> Offline scoring

PredictionService

The PredictionService in serving/prediction_service.py handles single and batch predictions:

class PredictionService:
    """Service for making predictions with loaded models."""
 
    def __init__(
        self,
        registry: ModelRegistry,
        loader: ModelLoader,
    ):
        self.registry = registry
        self.loader = loader
 
    async def predict(
        self,
        tenant_id: str,
        request: PredictionRequest,
    ) -> PredictionResponse:
        """Make a single prediction."""
        start = time.perf_counter()
        request_id = request.request_id or str(uuid4())
 
        # Get model metadata
        metadata = await self.registry.get_model(
            tenant_id, request.model_id
        )
        if metadata is None:
            raise ValueError(f"Model {request.model_id} not found")
 
        # Load model (cached)
        version = request.version or metadata.current_version
        loaded_model = await self.loader.load_model(metadata, version)
 
        # Prepare features
        features = self._prepare_features(request.features, metadata)
 
        # Make prediction
        prediction = loaded_model.predict(features)
 
        # Get probabilities if requested
        probabilities = None
        if request.options.get("return_probabilities", True):
            proba = loaded_model.predict_proba(features)
            if proba is not None:
                probabilities = proba.tolist()
 
        latency_ms = (time.perf_counter() - start) * 1000
 
        return PredictionResponse(
            request_id=request_id,
            model_id=str(metadata.model_id),
            version=version,
            prediction=prediction.tolist(),
            probabilities=probabilities,
            confidence=float(max(proba[0])) if proba is not None else None,
            latency_ms=latency_ms,
        )

Request/Response Models

class PredictionRequest(BaseModel):
    """Single prediction request."""
    model_id: str
    version: str | None = None
    features: dict[str, Any]
    request_id: str | None = None
    options: dict[str, Any] = Field(default_factory=dict)
 
class PredictionResponse(BaseModel):
    """Single prediction response."""
    request_id: str
    model_id: str
    version: str
    prediction: list[Any]
    probabilities: list[list[float]] | None = None
    confidence: float | None = None
    latency_ms: float
    metadata: dict[str, Any] = Field(default_factory=dict)

Model Loader

The ModelLoader manages model loading with LRU caching:

class ModelLoader:
    """Loads and caches models from the registry."""
 
    def __init__(self, cache_size: int = 20):
        self._cache: dict[str, LoadedModel] = {}
        self._cache_size = cache_size
        self._load_order: list[str] = []
 
    async def load_model(
        self,
        metadata: ModelMetadata,
        version: str,
    ) -> LoadedModel:
        """Load a model, using cache if available."""
        cache_key = f"{metadata.model_id}:{version}"
 
        if cache_key in self._cache:
            return self._cache[cache_key]
 
        # Download from artifact store
        artifact_path = f"{metadata.artifact_uri}/{version}"
        model = await self._download_and_load(
            artifact_path, metadata.framework
        )
 
        # Cache with LRU eviction
        if len(self._cache) >= self._cache_size:
            oldest_key = self._load_order.pop(0)
            del self._cache[oldest_key]
 
        self._cache[cache_key] = model
        self._load_order.append(cache_key)
 
        return model
 
    async def _download_and_load(
        self,
        artifact_path: str,
        framework: str,
    ) -> LoadedModel:
        """Download and load model based on framework."""
        if framework == "sklearn":
            return await self._load_sklearn(artifact_path)
        elif framework == "onnx":
            return await self._load_onnx(artifact_path)
        elif framework == "pytorch":
            return await self._load_pytorch(artifact_path)
        else:
            raise ValueError(f"Unsupported framework: {framework}")

ONNX Runtime Serving

For optimized CPU inference, models can be converted to ONNX format and served through ONNX Runtime:

class ONNXModelLoader:
    """Loads and serves ONNX models."""
 
    async def load(self, model_path: str) -> ONNXModel:
        """Load an ONNX model."""
        import onnxruntime as ort
 
        session = ort.InferenceSession(
            model_path,
            providers=["CPUExecutionProvider"],
            sess_options=self._get_session_options(),
        )
 
        return ONNXModel(session=session)
 
    def _get_session_options(self) -> ort.SessionOptions:
        opts = ort.SessionOptions()
        opts.graph_optimization_level = (
            ort.GraphOptimizationLevel.ORT_ENABLE_ALL
        )
        opts.intra_op_num_threads = 4
        opts.inter_op_num_threads = 2
        opts.enable_mem_pattern = True
        return opts
ONNX OptimizationEffect
Graph optimizationFuse operators, eliminate redundancy
QuantizationINT8/FP16 for faster inference
Thread tuningOptimize parallelism for CPU
Memory patternsReduce memory allocation overhead

Ray Serve Deployment

For scalable online serving, models are deployed as Ray Serve deployments:

class RayServeManager:
    """Manages Ray Serve model deployments."""
 
    async def deploy(
        self,
        tenant_id: str,
        model_id: str,
        version: str,
        num_replicas: int = 2,
        max_concurrent_queries: int = 100,
        autoscaling: bool = True,
    ) -> DeploymentInfo:
        """Deploy a model to Ray Serve."""
        import ray
        from ray import serve
 
        @serve.deployment(
            name=f"{tenant_id}-{model_id}",
            num_replicas=num_replicas,
            max_concurrent_queries=max_concurrent_queries,
            autoscaling_config={
                "min_replicas": 1,
                "max_replicas": 10,
                "target_num_ongoing_requests_per_replica": 5,
            } if autoscaling else None,
        )
        class ModelDeployment:
            def __init__(self):
                self.model = load_model(model_id, version)
 
            async def __call__(self, request):
                features = await request.json()
                prediction = self.model.predict(features)
                return {"prediction": prediction.tolist()}
 
        handle = serve.run(ModelDeployment.bind())
 
        return DeploymentInfo(
            deployment_name=f"{tenant_id}-{model_id}",
            url=f"http://ray-serve:8000/{tenant_id}-{model_id}",
            replicas=num_replicas,
        )

Autoscaling Configuration

ParameterDefaultDescription
min_replicas1Minimum number of replicas
max_replicas10Maximum number of replicas
target_requests_per_replica5Target concurrent requests
upscale_delay_s30Delay before scaling up
downscale_delay_s300Delay before scaling down

Triton Inference Server

For GPU-optimized, high-throughput inference:

class TritonInferenceService:
    """Integrates with NVIDIA Triton Inference Server."""
 
    async def deploy_model(
        self,
        model_name: str,
        model_path: str,
        model_format: str,
        instance_group: str = "gpu",
    ) -> None:
        """Deploy a model to Triton."""
        # Build Triton model repository config
        config = {
            "name": model_name,
            "platform": self._get_platform(model_format),
            "instance_group": [
                {"kind": instance_group, "count": 1}
            ],
            "dynamic_batching": {
                "preferred_batch_size": [4, 8, 16],
                "max_queue_delay_microseconds": 100,
            },
        }
        ...
 
    async def predict(
        self,
        model_name: str,
        inputs: dict[str, Any],
    ) -> dict[str, Any]:
        """Make prediction via Triton gRPC."""
        import tritonclient.grpc as grpcclient
 
        client = grpcclient.InferenceServerClient(
            url=self._triton_url,
        )
        ...

Batch Prediction

For large-scale offline scoring:

class BatchPredictionService:
    """Handles batch prediction jobs."""
 
    async def submit_batch(
        self,
        tenant_id: str,
        request: BatchPredictionRequest,
    ) -> BatchPredictionResponse:
        """Submit a batch prediction job."""
        job_id = str(uuid4())
 
        # Load model
        model = await self.loader.load_model(
            request.model_id, request.version
        )
 
        # Process in chunks
        results = []
        for chunk in self._read_chunks(
            request.input_path, chunk_size=1000
        ):
            predictions = model.predict(chunk)
            results.extend(predictions)
 
        # Write results
        output_path = f"s3://predictions/{tenant_id}/{job_id}/"
        await self._write_results(output_path, results)
 
        return BatchPredictionResponse(
            job_id=job_id,
            status=PredictionStatus.COMPLETED,
            output_path=output_path,
            total_predictions=len(results),
        )
Serving ModeLatencyThroughputCostUse Case
ONNX RuntimeLow (< 10ms)MediumLowReal-time, CPU
Ray ServeMedium (10-50ms)HighMediumScalable online
TritonLow (< 5ms)Very HighHighGPU, high throughput
BatchHigh (minutes)Very HighLowOffline scoring

API Endpoints

POST /api/v1/predictions                          # Single prediction
POST /api/v1/predictions/batch                     # Batch prediction
GET  /api/v1/predictions/batch/{job_id}            # Batch job status
POST /api/v1/deployments                           # Deploy model
GET  /api/v1/deployments                           # List deployments
PUT  /api/v1/deployments/{name}/scale              # Scale deployment
DELETE /api/v1/deployments/{name}                   # Undeploy model
GET  /api/v1/deployments/{name}/metrics             # Deployment metrics