Connector SDK Guide
The MATIH Connector SDK enables developers to build custom data source connectors that integrate seamlessly with the platform's data ingestion, metadata synchronization, and catalog infrastructure. This guide covers the SDK architecture, the connector lifecycle, and step-by-step instructions for building connectors for databases, cloud storage, and SaaS applications.
Overview
Connectors in the MATIH Platform serve as the bridge between external data sources and the internal data infrastructure. Every connector performs three core functions:
- Metadata extraction: Discover schemas, tables, columns, data types, and statistics from the source
- Data ingestion: Read data from the source and write it to the platform's Iceberg lakehouse
- Change tracking: Detect and propagate schema changes and data updates
Connector Types
| Type | Description | Examples |
|---|---|---|
| Database | Relational and NoSQL databases | PostgreSQL, MySQL, SQL Server, Oracle, MongoDB, Cassandra |
| Cloud Storage | Object stores and file systems | AWS S3, Azure Blob Storage, Google Cloud Storage, HDFS |
| SaaS | Software-as-a-Service applications | Salesforce, HubSpot, Slack, Jira, Zendesk, Google Analytics |
| Streaming | Event and message streams | Kafka, Kinesis, Pub/Sub, EventHub |
| File | Structured and semi-structured files | CSV, JSON, Parquet, Avro, Excel, XML |
| API | Generic REST/GraphQL APIs | Custom REST endpoints, GraphQL schemas |
SDK Architecture
Project Structure
The Connector SDK is located in connectors/connector-sdk/ and follows this structure:
connectors/
connector-sdk/
src/
main/
java/
ai/matih/connector/
api/
Connector.java # Core connector interface
MetadataExtractor.java # Metadata extraction interface
DataReader.java # Data reading interface
ChangeTracker.java # Change detection interface
ConnectorConfig.java # Configuration base class
ConnectorContext.java # Runtime context
lifecycle/
ConnectorLifecycle.java # Lifecycle management
HealthCheck.java # Health check interface
model/
Table.java # Table metadata model
Column.java # Column metadata model
Schema.java # Schema metadata model
DataType.java # Type mapping
SyncResult.java # Sync operation result
ChangeEvent.java # Change event model
testing/
ConnectorTestHarness.java # Test framework
MockDataSource.java # Mock data source for testing
examples/
postgresql-connector/
s3-connector/
salesforce-connector/Core Interfaces
Every connector must implement the Connector interface:
package ai.matih.connector.api;
public interface Connector extends AutoCloseable {
/**
* Initialize the connector with its configuration.
* Called once when the connector is first created.
*/
void initialize(ConnectorConfig config, ConnectorContext context);
/**
* Test connectivity to the data source.
* Returns a HealthCheck result indicating success or failure with details.
*/
HealthCheckResult testConnection();
/**
* Get the metadata extractor for this connector.
*/
MetadataExtractor getMetadataExtractor();
/**
* Get the data reader for this connector.
*/
DataReader getDataReader();
/**
* Get the change tracker for this connector (optional).
* Return null if change tracking is not supported.
*/
default ChangeTracker getChangeTracker() {
return null;
}
/**
* Get connector metadata (name, version, capabilities).
*/
ConnectorMetadata getMetadata();
}Metadata Extractor Interface
package ai.matih.connector.api;
import java.util.List;
public interface MetadataExtractor {
/**
* Discover all schemas (databases/namespaces) in the data source.
*/
List<Schema> discoverSchemas();
/**
* Discover all tables within a specific schema.
*/
List<Table> discoverTables(String schemaName);
/**
* Get detailed column metadata for a specific table.
*/
List<Column> discoverColumns(String schemaName, String tableName);
/**
* Get table-level statistics (row count, size, last modified).
*/
TableStatistics getTableStatistics(String schemaName, String tableName);
/**
* Get sample data from a table for preview.
*/
List<List<Object>> getSampleData(String schemaName, String tableName, int limit);
}Data Reader Interface
package ai.matih.connector.api;
import java.util.Iterator;
public interface DataReader {
/**
* Read all data from a table, returning an iterator of record batches.
* Each batch is a list of rows, where each row is a list of values.
*/
Iterator<RecordBatch> readTable(
String schemaName,
String tableName,
ReadOptions options
);
/**
* Read data with a predicate pushdown filter.
*/
Iterator<RecordBatch> readTableWithFilter(
String schemaName,
String tableName,
String filterExpression,
ReadOptions options
);
/**
* Read incremental changes since a checkpoint.
*/
Iterator<RecordBatch> readIncremental(
String schemaName,
String tableName,
String checkpoint,
ReadOptions options
);
/**
* Estimate the number of rows for progress tracking.
*/
long estimateRowCount(String schemaName, String tableName);
}Change Tracker Interface
package ai.matih.connector.api;
import java.util.List;
public interface ChangeTracker {
/**
* Detect schema changes since the last sync.
*/
List<SchemaChange> detectSchemaChanges(
String schemaName,
String tableName,
SchemaSnapshot previousSnapshot
);
/**
* Get the current change position (for bookmarking).
*/
String getCurrentPosition();
/**
* Check if there are pending changes to process.
*/
boolean hasPendingChanges(String schemaName, String tableName, String lastPosition);
}Building a Database Connector
This section walks through building a connector for a relational database (using PostgreSQL as the example).
Step 1: Create the Project
mkdir connectors/my-postgres-connector
cd connectors/my-postgres-connectorpom.xml (Maven):
<project>
<groupId>ai.matih.connectors</groupId>
<artifactId>my-postgres-connector</artifactId>
<version>1.0.0</version>
<dependencies>
<dependency>
<groupId>ai.matih</groupId>
<artifactId>connector-sdk</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.3</version>
</dependency>
</dependencies>
</project>Step 2: Define the Configuration
package ai.matih.connectors.postgres;
import ai.matih.connector.api.ConnectorConfig;
public class PostgresConnectorConfig extends ConnectorConfig {
private String host;
private int port = 5432;
private String database;
private String username;
private String password; // Injected from Kubernetes Secret
private String sslMode = "prefer";
private int connectionPoolSize = 5;
private int queryTimeout = 30;
private List<String> schemas; // Schemas to include (null = all)
private List<String> excludeTables; // Tables to exclude
// Getters and setters omitted for brevity
@Override
public void validate() {
requireNonEmpty("host", host);
requireNonEmpty("database", database);
requireNonEmpty("username", username);
requireNonEmpty("password", password);
requirePositive("port", port);
}
}Step 3: Implement the Connector
package ai.matih.connectors.postgres;
import ai.matih.connector.api.*;
import ai.matih.connector.lifecycle.HealthCheckResult;
import javax.sql.DataSource;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
public class PostgresConnector implements Connector {
private PostgresConnectorConfig config;
private ConnectorContext context;
private HikariDataSource dataSource;
private PostgresMetadataExtractor metadataExtractor;
private PostgresDataReader dataReader;
private PostgresChangeTracker changeTracker;
@Override
public void initialize(ConnectorConfig config, ConnectorContext context) {
this.config = (PostgresConnectorConfig) config;
this.context = context;
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setJdbcUrl(String.format(
"jdbc:postgresql://%s:%d/%s?sslmode=%s",
this.config.getHost(),
this.config.getPort(),
this.config.getDatabase(),
this.config.getSslMode()
));
hikariConfig.setUsername(this.config.getUsername());
hikariConfig.setPassword(this.config.getPassword());
hikariConfig.setMaximumPoolSize(this.config.getConnectionPoolSize());
hikariConfig.setConnectionTimeout(this.config.getQueryTimeout() * 1000L);
this.dataSource = new HikariDataSource(hikariConfig);
this.metadataExtractor = new PostgresMetadataExtractor(dataSource, this.config);
this.dataReader = new PostgresDataReader(dataSource, this.config);
this.changeTracker = new PostgresChangeTracker(dataSource, this.config);
}
@Override
public HealthCheckResult testConnection() {
try (var conn = dataSource.getConnection()) {
var stmt = conn.createStatement();
stmt.execute("SELECT 1");
return HealthCheckResult.healthy("Connection successful");
} catch (Exception e) {
return HealthCheckResult.unhealthy(
"Connection failed: " + e.getMessage(),
e
);
}
}
@Override
public MetadataExtractor getMetadataExtractor() {
return metadataExtractor;
}
@Override
public DataReader getDataReader() {
return dataReader;
}
@Override
public ChangeTracker getChangeTracker() {
return changeTracker;
}
@Override
public ConnectorMetadata getMetadata() {
return ConnectorMetadata.builder()
.name("postgresql")
.displayName("PostgreSQL")
.version("1.0.0")
.description("Connector for PostgreSQL databases")
.category(ConnectorCategory.DATABASE)
.capabilities(Set.of(
Capability.METADATA_EXTRACTION,
Capability.FULL_DATA_READ,
Capability.INCREMENTAL_READ,
Capability.SCHEMA_CHANGE_DETECTION,
Capability.PREDICATE_PUSHDOWN
))
.build();
}
@Override
public void close() {
if (dataSource != null) {
dataSource.close();
}
}
}Step 4: Register the Connector
Create a service provider file for automatic discovery:
META-INF/services/ai.matih.connector.api.Connector:
ai.matih.connectors.postgres.PostgresConnectorStep 5: Test the Connector
package ai.matih.connectors.postgres;
import ai.matih.connector.testing.ConnectorTestHarness;
import org.junit.jupiter.api.Test;
class PostgresConnectorTest {
@Test
void testMetadataExtraction() {
var harness = ConnectorTestHarness.forConnector(
PostgresConnector.class,
Map.of(
"host", "localhost",
"port", 5432,
"database", "test_db",
"username", "test_user",
"password", "test_pass"
)
);
harness.assertConnectionHealthy();
harness.assertSchemasDiscovered();
harness.assertTablesDiscovered("public");
harness.assertColumnsMatch("public", "users", List.of(
Column.of("id", DataType.BIGINT, false),
Column.of("email", DataType.VARCHAR, false),
Column.of("created_at", DataType.TIMESTAMP, true)
));
}
@Test
void testDataReading() {
var harness = ConnectorTestHarness.forConnector(
PostgresConnector.class,
Map.of(/* config */)
);
var result = harness.readTable("public", "users", ReadOptions.defaults());
harness.assertRowCount(result, 100);
harness.assertColumnsPresent(result, "id", "email", "created_at");
}
}Building a Cloud Storage Connector
Cloud storage connectors handle object stores like S3, Azure Blob Storage, and Google Cloud Storage.
S3 Connector Example
package ai.matih.connectors.s3;
import ai.matih.connector.api.*;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;
public class S3Connector implements Connector {
private S3ConnectorConfig config;
private S3Client s3Client;
@Override
public void initialize(ConnectorConfig config, ConnectorContext context) {
this.config = (S3ConnectorConfig) config;
this.s3Client = S3Client.builder()
.region(Region.of(this.config.getRegion()))
.credentialsProvider(this.config.getCredentialsProvider())
.build();
}
@Override
public HealthCheckResult testConnection() {
try {
s3Client.headBucket(HeadBucketRequest.builder()
.bucket(config.getBucket())
.build());
return HealthCheckResult.healthy("Bucket accessible");
} catch (Exception e) {
return HealthCheckResult.unhealthy(
"Cannot access bucket: " + e.getMessage(), e);
}
}
@Override
public ConnectorMetadata getMetadata() {
return ConnectorMetadata.builder()
.name("aws-s3")
.displayName("Amazon S3")
.version("1.0.0")
.category(ConnectorCategory.CLOUD_STORAGE)
.capabilities(Set.of(
Capability.METADATA_EXTRACTION,
Capability.FULL_DATA_READ,
Capability.FILE_FORMAT_DETECTION
))
.supportedFormats(Set.of("parquet", "csv", "json", "avro", "orc"))
.build();
}
// MetadataExtractor and DataReader implementations...
@Override
public void close() {
if (s3Client != null) {
s3Client.close();
}
}
}S3 Connector Configuration
public class S3ConnectorConfig extends ConnectorConfig {
private String bucket;
private String prefix = ""; // Object key prefix
private String region = "us-east-1";
private String fileFormat = "auto"; // auto, parquet, csv, json
private boolean recursive = true; // Scan subdirectories
private String partitionPattern; // e.g., "year={year}/month={month}"
private int maxFilesPerSync = 10000; // Limit for large buckets
// Authentication (IRSA preferred in production)
private String accessKeyId; // From Secret
private String secretAccessKey; // From Secret
private String roleArn; // For cross-account access
private boolean useIRSA = true; // Use IRSA in EKS
@Override
public void validate() {
requireNonEmpty("bucket", bucket);
requireNonEmpty("region", region);
}
}Building a SaaS Connector
SaaS connectors integrate with external applications via their REST/GraphQL APIs. They must handle OAuth2 authentication, API rate limiting, and pagination.
Salesforce Connector Example
package ai.matih.connectors.salesforce;
import ai.matih.connector.api.*;
public class SalesforceConnector implements Connector {
private SalesforceConfig config;
private SalesforceOAuthClient oauthClient;
private SalesforceApiClient apiClient;
@Override
public void initialize(ConnectorConfig config, ConnectorContext context) {
this.config = (SalesforceConfig) config;
// Initialize OAuth2 client
this.oauthClient = new SalesforceOAuthClient(
this.config.getInstanceUrl(),
this.config.getClientId(),
this.config.getClientSecret(),
this.config.getRefreshToken()
);
// Initialize API client with auto-token-refresh
this.apiClient = new SalesforceApiClient(
this.config.getInstanceUrl(),
oauthClient
);
}
@Override
public HealthCheckResult testConnection() {
try {
var identity = apiClient.getIdentity();
return HealthCheckResult.healthy(
"Connected as " + identity.getUsername()
);
} catch (Exception e) {
return HealthCheckResult.unhealthy(
"Salesforce connection failed: " + e.getMessage(), e
);
}
}
@Override
public ConnectorMetadata getMetadata() {
return ConnectorMetadata.builder()
.name("salesforce")
.displayName("Salesforce")
.version("1.0.0")
.category(ConnectorCategory.SAAS)
.capabilities(Set.of(
Capability.METADATA_EXTRACTION,
Capability.FULL_DATA_READ,
Capability.INCREMENTAL_READ,
Capability.SCHEMA_CHANGE_DETECTION
))
.authType(AuthType.OAUTH2)
.build();
}
// MetadataExtractor discovers Salesforce objects (Account, Contact, etc.)
// DataReader uses SOQL queries with pagination
// ChangeTracker uses Salesforce Change Data Capture or getUpdated API
}HubSpot Connector Configuration
public class HubSpotConfig extends ConnectorConfig {
private String accessToken; // From Secret (OAuth2 or Private App)
private String portalId;
private List<String> objects; // e.g., ["contacts", "companies", "deals"]
private String syncMode = "incremental"; // "full" or "incremental"
private int apiRateLimit = 100; // Requests per 10 seconds
private int pageSize = 100; // Items per page
@Override
public void validate() {
requireNonEmpty("accessToken", accessToken);
requireNonEmpty("objects", objects);
}
}Slack Connector Configuration
public class SlackConfig extends ConnectorConfig {
private String botToken; // From Secret
private List<String> channels; // Channel IDs to sync
private boolean includeMessages = true;
private boolean includeThreads = true;
private boolean includeFiles = false;
private int lookbackDays = 30; // How far back to sync messages
@Override
public void validate() {
requireNonEmpty("botToken", botToken);
}
}Connector Lifecycle
Every connector goes through a defined lifecycle managed by the Tenant Service:
| Phase | Description | Connector Method |
|---|---|---|
| Registration | User registers the connector via the Tenant Service UI or API | validate() on config |
| Connection Test | Platform tests connectivity before saving | testConnection() |
| Initial Sync | Full metadata extraction and data ingestion | discoverSchemas(), readTable() |
| Scheduled Sync | Periodic incremental sync on a cron schedule | readIncremental(), detectSchemaChanges() |
| Manual Refresh | User-triggered metadata or data refresh | Same as initial or incremental sync |
| Deregistration | User removes the connector | close() |
Sync Modes
| Mode | Description | When to Use |
|---|---|---|
| Full | Extract all data from the source | Initial sync, after schema changes, data recovery |
| Incremental | Extract only changed data since last sync | Regular scheduled syncs |
| CDC (Change Data Capture) | Stream changes in real-time via Kafka | Real-time analytics requirements |
Connector Configuration via the Platform
Users configure connectors through the Tenant Service API or the Data Workbench UI. Connector configurations are stored encrypted in the tenant's settings with credentials in Kubernetes Secrets.
Registration API call:
POST /api/v1/tenants/{tenantId}/connectors
Content-Type: application/json
{
"name": "Production Database",
"type": "postgresql",
"config": {
"host": "prod-db.company.com",
"port": 5432,
"database": "analytics",
"username": "matih_reader",
"sslMode": "require",
"schemas": ["public", "analytics"],
"excludeTables": ["_migrations", "_temp_*"]
},
"credentials": {
"password": "********"
},
"schedule": {
"syncMode": "incremental",
"cronExpression": "0 */6 * * *",
"timezone": "UTC"
}
}Best Practices
Performance
| Practice | Description |
|---|---|
| Use batch reads | Read data in batches (1000-10000 rows) rather than row-by-row |
| Implement predicate pushdown | Push filter conditions to the source database for efficiency |
| Use connection pooling | Maintain a connection pool to avoid connection creation overhead |
| Respect API rate limits | For SaaS connectors, implement rate limiting with exponential backoff |
| Support parallelism | Allow the platform to read multiple tables or partitions concurrently |
Reliability
| Practice | Description |
|---|---|
| Implement retry logic | Retry transient failures (network timeouts, rate limits) with backoff |
| Use checkpointing | Save sync progress to allow resumption after failures |
| Handle schema evolution | Detect and report schema changes (new columns, type changes, drops) |
| Log comprehensively | Log connection events, sync progress, and errors for troubleshooting |
| Test with large datasets | Validate performance with realistic data volumes |
Security
| Practice | Description |
|---|---|
| Never log credentials | Mask sensitive config values in all log output |
| Use minimum privileges | Request only the database permissions needed (SELECT for read-only connectors) |
| Validate SSL certificates | Enable certificate verification for database and API connections |
| Support credential rotation | Handle credential updates without requiring connector recreation |
| Follow the principle of least privilege | SaaS connectors should request minimal OAuth2 scopes |
Publishing Connectors to the Marketplace
Custom connectors can be published to the MATIH Marketplace for other tenants to use:
- Package the connector as a JAR with the
META-INF/servicesregistration - Include a
connector-manifest.yamlwith metadata, configuration schema, and documentation - Submit via the Config Service marketplace API:
POST /api/v1/marketplace/connectors - The connector undergoes security review and testing before publication
- Once approved, it appears in the Marketplace for all tenants to install
# connector-manifest.yaml
name: my-postgres-connector
displayName: PostgreSQL (Custom)
version: 1.0.0
author: Acme Corp
description: Enhanced PostgreSQL connector with CDC support
category: database
icon: postgresql.svg
configSchema:
type: object
required: [host, database, username, password]
properties:
host:
type: string
title: Hostname
description: PostgreSQL server hostname
port:
type: integer
title: Port
default: 5432
database:
type: string
title: Database Name
username:
type: string
title: Username
password:
type: string
title: Password
format: password
sslMode:
type: string
title: SSL Mode
enum: [disable, prefer, require, verify-ca, verify-full]
default: prefer