API Connectors
API connectors extract data from external REST APIs with support for pagination, authentication, rate limiting, and retry logic. The Pipeline Service uses the ApiExtractOperator to handle diverse API patterns across SaaS platforms, internal services, and third-party data providers.
Supported Authentication Methods
| Method | Configuration | Use Case |
|---|---|---|
| API Key | Header or query parameter via K8s Secret | Simple API keys |
| OAuth2 Client Credentials | Token endpoint + client ID/secret via K8s Secret | Machine-to-machine APIs |
| Bearer Token | Static token via K8s Secret | Pre-generated access tokens |
| Basic Auth | Username/password via K8s Secret | Legacy APIs |
Pipeline Definition
metadata:
name: salesforce-sync
version: "1.0.0"
owner: integration-team
sources:
sf_accounts:
type: api
connection: ${SALESFORCE_CONNECTION}
endpoint: /services/data/v58.0/query
method: GET
parameters:
q: "SELECT Id, Name, Industry FROM Account WHERE LastModifiedDate > {{last_run}}"
pagination:
type: cursor
cursor_field: nextRecordsUrl
has_more_field: done
has_more_value: false
rate_limit:
requests_per_second: 10
burst: 20
sinks:
data_lake:
type: iceberg
table: staging.salesforce.accounts
mode: merge_on_key
merge_keys: [id]
orchestration:
engine: airflow
schedule: "0 */2 * * *"ApiExtractOperator
Source: data-plane/pipeline-service/src/matih_pipeline/operators/api_extract.py
Configuration Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
endpoint | string | Yes | API endpoint path |
method | string | Yes | HTTP method (GET, POST) |
parameters | map | No | Query parameters or request body |
headers | map | No | Additional HTTP headers |
pagination | object | No | Pagination configuration |
rate_limit | object | No | Rate limiting settings |
timeout_seconds | int | No | Request timeout (default: 30) |
retry_count | int | No | Max retries on failure (default: 3) |
Pagination Strategies
| Strategy | Config Key | Description |
|---|---|---|
| Offset-based | type: offset | Uses limit/offset parameters |
| Cursor-based | type: cursor | Follows next-page cursor from response |
| Page number | type: page_number | Increments page parameter |
| Link header | type: link_header | Follows RFC 5988 Link headers |
Offset Pagination Example
pagination:
type: offset
limit: 1000
offset_param: offset
limit_param: limit
total_field: totalSizeCursor Pagination Example
pagination:
type: cursor
cursor_field: nextRecordsUrl
has_more_field: done
has_more_value: falseRate Limiting
The operator implements a token bucket rate limiter to avoid exceeding API quotas:
| Parameter | Default | Description |
|---|---|---|
requests_per_second | 10 | Sustained request rate |
burst | 20 | Maximum burst capacity |
retry_after_header | true | Honor Retry-After response header |
backoff_factor | 2.0 | Exponential backoff multiplier |
Response Mapping
The operator supports JSONPath expressions to extract records from nested API responses:
response_mapping:
records_path: "$.data.records"
id_field: "$.id"
timestamp_field: "$.attributes.lastModified"Related Pages
- File Ingestion -- Cloud storage extraction
- Batch Ingestion -- Database extraction
- Schema Registry -- Schema validation for API data