Model Serving and Inference
The ML Service provides multiple model serving strategies optimized for different latency, throughput, and cost requirements. The serving subsystem supports ONNX Runtime for optimized single-model inference, Ray Serve for scalable online serving, Triton Inference Server for high-throughput GPU inference, and batch prediction for large-scale offline scoring.
Serving Architecture
Prediction Request
|
v
+-------------------+
| Prediction Service| <-- Request routing, validation
+-------------------+
|
+-- Model Loader -----> Model Cache (LRU)
| |
| +-- MLflow Registry
| +-- S3 Artifact Store
|
+-- Inference Engine Selection
|
+-- ONNX Runtime -------> Single model, CPU optimized
|
+-- Ray Serve -----------> Scalable online serving
|
+-- Triton --------------> GPU-optimized, multi-model
|
+-- Batch Processor -----> Offline scoringPredictionService
The PredictionService in serving/prediction_service.py handles single and batch predictions:
class PredictionService:
"""Service for making predictions with loaded models."""
def __init__(
self,
registry: ModelRegistry,
loader: ModelLoader,
):
self.registry = registry
self.loader = loader
async def predict(
self,
tenant_id: str,
request: PredictionRequest,
) -> PredictionResponse:
"""Make a single prediction."""
start = time.perf_counter()
request_id = request.request_id or str(uuid4())
# Get model metadata
metadata = await self.registry.get_model(
tenant_id, request.model_id
)
if metadata is None:
raise ValueError(f"Model {request.model_id} not found")
# Load model (cached)
version = request.version or metadata.current_version
loaded_model = await self.loader.load_model(metadata, version)
# Prepare features
features = self._prepare_features(request.features, metadata)
# Make prediction
prediction = loaded_model.predict(features)
# Get probabilities if requested
probabilities = None
if request.options.get("return_probabilities", True):
proba = loaded_model.predict_proba(features)
if proba is not None:
probabilities = proba.tolist()
latency_ms = (time.perf_counter() - start) * 1000
return PredictionResponse(
request_id=request_id,
model_id=str(metadata.model_id),
version=version,
prediction=prediction.tolist(),
probabilities=probabilities,
confidence=float(max(proba[0])) if proba is not None else None,
latency_ms=latency_ms,
)Request/Response Models
class PredictionRequest(BaseModel):
"""Single prediction request."""
model_id: str
version: str | None = None
features: dict[str, Any]
request_id: str | None = None
options: dict[str, Any] = Field(default_factory=dict)
class PredictionResponse(BaseModel):
"""Single prediction response."""
request_id: str
model_id: str
version: str
prediction: list[Any]
probabilities: list[list[float]] | None = None
confidence: float | None = None
latency_ms: float
metadata: dict[str, Any] = Field(default_factory=dict)Model Loader
The ModelLoader manages model loading with LRU caching:
class ModelLoader:
"""Loads and caches models from the registry."""
def __init__(self, cache_size: int = 20):
self._cache: dict[str, LoadedModel] = {}
self._cache_size = cache_size
self._load_order: list[str] = []
async def load_model(
self,
metadata: ModelMetadata,
version: str,
) -> LoadedModel:
"""Load a model, using cache if available."""
cache_key = f"{metadata.model_id}:{version}"
if cache_key in self._cache:
return self._cache[cache_key]
# Download from artifact store
artifact_path = f"{metadata.artifact_uri}/{version}"
model = await self._download_and_load(
artifact_path, metadata.framework
)
# Cache with LRU eviction
if len(self._cache) >= self._cache_size:
oldest_key = self._load_order.pop(0)
del self._cache[oldest_key]
self._cache[cache_key] = model
self._load_order.append(cache_key)
return model
async def _download_and_load(
self,
artifact_path: str,
framework: str,
) -> LoadedModel:
"""Download and load model based on framework."""
if framework == "sklearn":
return await self._load_sklearn(artifact_path)
elif framework == "onnx":
return await self._load_onnx(artifact_path)
elif framework == "pytorch":
return await self._load_pytorch(artifact_path)
else:
raise ValueError(f"Unsupported framework: {framework}")ONNX Runtime Serving
For optimized CPU inference, models can be converted to ONNX format and served through ONNX Runtime:
class ONNXModelLoader:
"""Loads and serves ONNX models."""
async def load(self, model_path: str) -> ONNXModel:
"""Load an ONNX model."""
import onnxruntime as ort
session = ort.InferenceSession(
model_path,
providers=["CPUExecutionProvider"],
sess_options=self._get_session_options(),
)
return ONNXModel(session=session)
def _get_session_options(self) -> ort.SessionOptions:
opts = ort.SessionOptions()
opts.graph_optimization_level = (
ort.GraphOptimizationLevel.ORT_ENABLE_ALL
)
opts.intra_op_num_threads = 4
opts.inter_op_num_threads = 2
opts.enable_mem_pattern = True
return opts| ONNX Optimization | Effect |
|---|---|
| Graph optimization | Fuse operators, eliminate redundancy |
| Quantization | INT8/FP16 for faster inference |
| Thread tuning | Optimize parallelism for CPU |
| Memory patterns | Reduce memory allocation overhead |
Ray Serve Deployment
For scalable online serving, models are deployed as Ray Serve deployments:
class RayServeManager:
"""Manages Ray Serve model deployments."""
async def deploy(
self,
tenant_id: str,
model_id: str,
version: str,
num_replicas: int = 2,
max_concurrent_queries: int = 100,
autoscaling: bool = True,
) -> DeploymentInfo:
"""Deploy a model to Ray Serve."""
import ray
from ray import serve
@serve.deployment(
name=f"{tenant_id}-{model_id}",
num_replicas=num_replicas,
max_concurrent_queries=max_concurrent_queries,
autoscaling_config={
"min_replicas": 1,
"max_replicas": 10,
"target_num_ongoing_requests_per_replica": 5,
} if autoscaling else None,
)
class ModelDeployment:
def __init__(self):
self.model = load_model(model_id, version)
async def __call__(self, request):
features = await request.json()
prediction = self.model.predict(features)
return {"prediction": prediction.tolist()}
handle = serve.run(ModelDeployment.bind())
return DeploymentInfo(
deployment_name=f"{tenant_id}-{model_id}",
url=f"http://ray-serve:8000/{tenant_id}-{model_id}",
replicas=num_replicas,
)Autoscaling Configuration
| Parameter | Default | Description |
|---|---|---|
min_replicas | 1 | Minimum number of replicas |
max_replicas | 10 | Maximum number of replicas |
target_requests_per_replica | 5 | Target concurrent requests |
upscale_delay_s | 30 | Delay before scaling up |
downscale_delay_s | 300 | Delay before scaling down |
Triton Inference Server
For GPU-optimized, high-throughput inference:
class TritonInferenceService:
"""Integrates with NVIDIA Triton Inference Server."""
async def deploy_model(
self,
model_name: str,
model_path: str,
model_format: str,
instance_group: str = "gpu",
) -> None:
"""Deploy a model to Triton."""
# Build Triton model repository config
config = {
"name": model_name,
"platform": self._get_platform(model_format),
"instance_group": [
{"kind": instance_group, "count": 1}
],
"dynamic_batching": {
"preferred_batch_size": [4, 8, 16],
"max_queue_delay_microseconds": 100,
},
}
...
async def predict(
self,
model_name: str,
inputs: dict[str, Any],
) -> dict[str, Any]:
"""Make prediction via Triton gRPC."""
import tritonclient.grpc as grpcclient
client = grpcclient.InferenceServerClient(
url=self._triton_url,
)
...Batch Prediction
For large-scale offline scoring:
class BatchPredictionService:
"""Handles batch prediction jobs."""
async def submit_batch(
self,
tenant_id: str,
request: BatchPredictionRequest,
) -> BatchPredictionResponse:
"""Submit a batch prediction job."""
job_id = str(uuid4())
# Load model
model = await self.loader.load_model(
request.model_id, request.version
)
# Process in chunks
results = []
for chunk in self._read_chunks(
request.input_path, chunk_size=1000
):
predictions = model.predict(chunk)
results.extend(predictions)
# Write results
output_path = f"s3://predictions/{tenant_id}/{job_id}/"
await self._write_results(output_path, results)
return BatchPredictionResponse(
job_id=job_id,
status=PredictionStatus.COMPLETED,
output_path=output_path,
total_predictions=len(results),
)| Serving Mode | Latency | Throughput | Cost | Use Case |
|---|---|---|---|---|
| ONNX Runtime | Low (< 10ms) | Medium | Low | Real-time, CPU |
| Ray Serve | Medium (10-50ms) | High | Medium | Scalable online |
| Triton | Low (< 5ms) | Very High | High | GPU, high throughput |
| Batch | High (minutes) | Very High | Low | Offline scoring |
API Endpoints
POST /api/v1/predictions # Single prediction
POST /api/v1/predictions/batch # Batch prediction
GET /api/v1/predictions/batch/{job_id} # Batch job status
POST /api/v1/deployments # Deploy model
GET /api/v1/deployments # List deployments
PUT /api/v1/deployments/{name}/scale # Scale deployment
DELETE /api/v1/deployments/{name} # Undeploy model
GET /api/v1/deployments/{name}/metrics # Deployment metrics