MATIH Platform is in active MVP development. Documentation reflects current implementation status.
11. Pipelines & Data Engineering
API Connectors

API Connectors

API connectors extract data from external REST APIs with support for pagination, authentication, rate limiting, and retry logic. The Pipeline Service uses the ApiExtractOperator to handle diverse API patterns across SaaS platforms, internal services, and third-party data providers.


Supported Authentication Methods

MethodConfigurationUse Case
API KeyHeader or query parameter via K8s SecretSimple API keys
OAuth2 Client CredentialsToken endpoint + client ID/secret via K8s SecretMachine-to-machine APIs
Bearer TokenStatic token via K8s SecretPre-generated access tokens
Basic AuthUsername/password via K8s SecretLegacy APIs

Pipeline Definition

metadata:
  name: salesforce-sync
  version: "1.0.0"
  owner: integration-team
 
sources:
  sf_accounts:
    type: api
    connection: ${SALESFORCE_CONNECTION}
    endpoint: /services/data/v58.0/query
    method: GET
    parameters:
      q: "SELECT Id, Name, Industry FROM Account WHERE LastModifiedDate > {{last_run}}"
    pagination:
      type: cursor
      cursor_field: nextRecordsUrl
      has_more_field: done
      has_more_value: false
    rate_limit:
      requests_per_second: 10
      burst: 20
 
sinks:
  data_lake:
    type: iceberg
    table: staging.salesforce.accounts
    mode: merge_on_key
    merge_keys: [id]
 
orchestration:
  engine: airflow
  schedule: "0 */2 * * *"

ApiExtractOperator

Source: data-plane/pipeline-service/src/matih_pipeline/operators/api_extract.py

Configuration Parameters

ParameterTypeRequiredDescription
endpointstringYesAPI endpoint path
methodstringYesHTTP method (GET, POST)
parametersmapNoQuery parameters or request body
headersmapNoAdditional HTTP headers
paginationobjectNoPagination configuration
rate_limitobjectNoRate limiting settings
timeout_secondsintNoRequest timeout (default: 30)
retry_countintNoMax retries on failure (default: 3)

Pagination Strategies

StrategyConfig KeyDescription
Offset-basedtype: offsetUses limit/offset parameters
Cursor-basedtype: cursorFollows next-page cursor from response
Page numbertype: page_numberIncrements page parameter
Link headertype: link_headerFollows RFC 5988 Link headers

Offset Pagination Example

pagination:
  type: offset
  limit: 1000
  offset_param: offset
  limit_param: limit
  total_field: totalSize

Cursor Pagination Example

pagination:
  type: cursor
  cursor_field: nextRecordsUrl
  has_more_field: done
  has_more_value: false

Rate Limiting

The operator implements a token bucket rate limiter to avoid exceeding API quotas:

ParameterDefaultDescription
requests_per_second10Sustained request rate
burst20Maximum burst capacity
retry_after_headertrueHonor Retry-After response header
backoff_factor2.0Exponential backoff multiplier

Response Mapping

The operator supports JSONPath expressions to extract records from nested API responses:

response_mapping:
  records_path: "$.data.records"
  id_field: "$.id"
  timestamp_field: "$.attributes.lastModified"

Related Pages