Batch Inference
The Batch Inference module enables high-throughput model scoring across large datasets using distributed compute via Ray. Batch jobs are submitted through the API, processed in parallel across worker nodes, and results are written to the data warehouse or object store.
Batch Pipeline Architecture
API Request --> Batch Processor --> Ray Data Pipeline --> Model Scoring --> Output Store
| |
Job Manager Data Partitioning
| |
Status Tracking Parallel WorkersSubmit Batch Job
POST /api/v1/inference/batch{
"model_id": "model-xyz789",
"input": {
"source": "sql",
"query": "SELECT customer_id, tenure, monthly_charges, total_charges FROM customers"
},
"output": {
"destination": "table",
"table_name": "predictions.churn_scores",
"mode": "overwrite"
},
"config": {
"batch_size": 1000,
"parallelism": 4,
"timeout_minutes": 60
}
}Response
{
"job_id": "batch-abc123",
"status": "submitted",
"estimated_duration_minutes": 15,
"input_row_count": 500000
}Get Job Status
GET /api/v1/inference/batch/:job_id{
"job_id": "batch-abc123",
"status": "running",
"progress": {
"total_rows": 500000,
"processed_rows": 325000,
"percentage": 65
},
"started_at": "2025-03-15T10:00:00Z",
"estimated_completion": "2025-03-15T10:12:00Z"
}Job States
| State | Description |
|---|---|
submitted | Job accepted and queued |
preparing | Loading model and partitioning data |
running | Actively scoring records |
completing | Writing results to output store |
completed | Job finished successfully |
failed | Job encountered an error |
cancelled | Job was cancelled by user |
Output Destinations
| Destination | Format | Description |
|---|---|---|
table | SQL table | Write predictions to data warehouse table |
parquet | Parquet files | Write to object store as Parquet |
csv | CSV files | Write to object store as CSV |
kafka | Kafka topic | Stream predictions to Kafka topic |
Data Partitioning
Large datasets are automatically partitioned for parallel processing:
class BatchInferenceEngine:
async def run(self, job: BatchJob):
dataset = ray.data.read_sql(job.input_query)
predictions = dataset.map_batches(
self._predict_batch,
batch_size=job.batch_size,
num_gpus=0,
num_cpus=1,
)
predictions.write_parquet(job.output_path)Resource Management
| Setting | Default | Description |
|---|---|---|
batch_size | 1000 | Records per scoring batch |
parallelism | 4 | Number of parallel workers |
max_memory_gb | 16 | Maximum memory per worker |
timeout_minutes | 60 | Job timeout |
retry_count | 3 | Retries on transient failures |
Scheduling
Batch jobs can be scheduled for recurring execution:
{
"schedule": {
"cron": "0 2 * * *",
"timezone": "UTC",
"enabled": true
}
}Configuration
| Environment Variable | Default | Description |
|---|---|---|
BATCH_MAX_CONCURRENT_JOBS | 3 | Max concurrent batch jobs per tenant |
BATCH_DEFAULT_PARALLELISM | 4 | Default worker parallelism |
BATCH_MAX_INPUT_ROWS | 10000000 | Maximum input dataset rows |
BATCH_RESULT_TTL_HOURS | 168 | Result retention (7 days) |