MATIH Platform is in active MVP development. Documentation reflects current implementation status.
20. Appendices & Reference
Connector SDK Guide

Connector SDK Guide

The MATIH Connector SDK enables developers to build custom data source connectors that integrate seamlessly with the platform's data ingestion, metadata synchronization, and catalog infrastructure. This guide covers the SDK architecture, the connector lifecycle, and step-by-step instructions for building connectors for databases, cloud storage, and SaaS applications.


Overview

Connectors in the MATIH Platform serve as the bridge between external data sources and the internal data infrastructure. Every connector performs three core functions:

  1. Metadata extraction: Discover schemas, tables, columns, data types, and statistics from the source
  2. Data ingestion: Read data from the source and write it to the platform's Iceberg lakehouse
  3. Change tracking: Detect and propagate schema changes and data updates

Connector Types

TypeDescriptionExamples
DatabaseRelational and NoSQL databasesPostgreSQL, MySQL, SQL Server, Oracle, MongoDB, Cassandra
Cloud StorageObject stores and file systemsAWS S3, Azure Blob Storage, Google Cloud Storage, HDFS
SaaSSoftware-as-a-Service applicationsSalesforce, HubSpot, Slack, Jira, Zendesk, Google Analytics
StreamingEvent and message streamsKafka, Kinesis, Pub/Sub, EventHub
FileStructured and semi-structured filesCSV, JSON, Parquet, Avro, Excel, XML
APIGeneric REST/GraphQL APIsCustom REST endpoints, GraphQL schemas

SDK Architecture

Project Structure

The Connector SDK is located in connectors/connector-sdk/ and follows this structure:

connectors/
  connector-sdk/
    src/
      main/
        java/
          ai/matih/connector/
            api/
              Connector.java              # Core connector interface
              MetadataExtractor.java       # Metadata extraction interface
              DataReader.java              # Data reading interface
              ChangeTracker.java           # Change detection interface
              ConnectorConfig.java         # Configuration base class
              ConnectorContext.java         # Runtime context
            lifecycle/
              ConnectorLifecycle.java       # Lifecycle management
              HealthCheck.java             # Health check interface
            model/
              Table.java                   # Table metadata model
              Column.java                  # Column metadata model
              Schema.java                  # Schema metadata model
              DataType.java                # Type mapping
              SyncResult.java              # Sync operation result
              ChangeEvent.java             # Change event model
            testing/
              ConnectorTestHarness.java    # Test framework
              MockDataSource.java          # Mock data source for testing
    examples/
      postgresql-connector/
      s3-connector/
      salesforce-connector/

Core Interfaces

Every connector must implement the Connector interface:

package ai.matih.connector.api;
 
public interface Connector extends AutoCloseable {
 
    /**
     * Initialize the connector with its configuration.
     * Called once when the connector is first created.
     */
    void initialize(ConnectorConfig config, ConnectorContext context);
 
    /**
     * Test connectivity to the data source.
     * Returns a HealthCheck result indicating success or failure with details.
     */
    HealthCheckResult testConnection();
 
    /**
     * Get the metadata extractor for this connector.
     */
    MetadataExtractor getMetadataExtractor();
 
    /**
     * Get the data reader for this connector.
     */
    DataReader getDataReader();
 
    /**
     * Get the change tracker for this connector (optional).
     * Return null if change tracking is not supported.
     */
    default ChangeTracker getChangeTracker() {
        return null;
    }
 
    /**
     * Get connector metadata (name, version, capabilities).
     */
    ConnectorMetadata getMetadata();
}

Metadata Extractor Interface

package ai.matih.connector.api;
 
import java.util.List;
 
public interface MetadataExtractor {
 
    /**
     * Discover all schemas (databases/namespaces) in the data source.
     */
    List<Schema> discoverSchemas();
 
    /**
     * Discover all tables within a specific schema.
     */
    List<Table> discoverTables(String schemaName);
 
    /**
     * Get detailed column metadata for a specific table.
     */
    List<Column> discoverColumns(String schemaName, String tableName);
 
    /**
     * Get table-level statistics (row count, size, last modified).
     */
    TableStatistics getTableStatistics(String schemaName, String tableName);
 
    /**
     * Get sample data from a table for preview.
     */
    List<List<Object>> getSampleData(String schemaName, String tableName, int limit);
}

Data Reader Interface

package ai.matih.connector.api;
 
import java.util.Iterator;
 
public interface DataReader {
 
    /**
     * Read all data from a table, returning an iterator of record batches.
     * Each batch is a list of rows, where each row is a list of values.
     */
    Iterator<RecordBatch> readTable(
        String schemaName,
        String tableName,
        ReadOptions options
    );
 
    /**
     * Read data with a predicate pushdown filter.
     */
    Iterator<RecordBatch> readTableWithFilter(
        String schemaName,
        String tableName,
        String filterExpression,
        ReadOptions options
    );
 
    /**
     * Read incremental changes since a checkpoint.
     */
    Iterator<RecordBatch> readIncremental(
        String schemaName,
        String tableName,
        String checkpoint,
        ReadOptions options
    );
 
    /**
     * Estimate the number of rows for progress tracking.
     */
    long estimateRowCount(String schemaName, String tableName);
}

Change Tracker Interface

package ai.matih.connector.api;
 
import java.util.List;
 
public interface ChangeTracker {
 
    /**
     * Detect schema changes since the last sync.
     */
    List<SchemaChange> detectSchemaChanges(
        String schemaName,
        String tableName,
        SchemaSnapshot previousSnapshot
    );
 
    /**
     * Get the current change position (for bookmarking).
     */
    String getCurrentPosition();
 
    /**
     * Check if there are pending changes to process.
     */
    boolean hasPendingChanges(String schemaName, String tableName, String lastPosition);
}

Building a Database Connector

This section walks through building a connector for a relational database (using PostgreSQL as the example).

Step 1: Create the Project

mkdir connectors/my-postgres-connector
cd connectors/my-postgres-connector

pom.xml (Maven):

<project>
    <groupId>ai.matih.connectors</groupId>
    <artifactId>my-postgres-connector</artifactId>
    <version>1.0.0</version>
 
    <dependencies>
        <dependency>
            <groupId>ai.matih</groupId>
            <artifactId>connector-sdk</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.7.3</version>
        </dependency>
    </dependencies>
</project>

Step 2: Define the Configuration

package ai.matih.connectors.postgres;
 
import ai.matih.connector.api.ConnectorConfig;
 
public class PostgresConnectorConfig extends ConnectorConfig {
 
    private String host;
    private int port = 5432;
    private String database;
    private String username;
    private String password;      // Injected from Kubernetes Secret
    private String sslMode = "prefer";
    private int connectionPoolSize = 5;
    private int queryTimeout = 30;
    private List<String> schemas;  // Schemas to include (null = all)
    private List<String> excludeTables;  // Tables to exclude
 
    // Getters and setters omitted for brevity
 
    @Override
    public void validate() {
        requireNonEmpty("host", host);
        requireNonEmpty("database", database);
        requireNonEmpty("username", username);
        requireNonEmpty("password", password);
        requirePositive("port", port);
    }
}

Step 3: Implement the Connector

package ai.matih.connectors.postgres;
 
import ai.matih.connector.api.*;
import ai.matih.connector.lifecycle.HealthCheckResult;
 
import javax.sql.DataSource;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
 
public class PostgresConnector implements Connector {
 
    private PostgresConnectorConfig config;
    private ConnectorContext context;
    private HikariDataSource dataSource;
    private PostgresMetadataExtractor metadataExtractor;
    private PostgresDataReader dataReader;
    private PostgresChangeTracker changeTracker;
 
    @Override
    public void initialize(ConnectorConfig config, ConnectorContext context) {
        this.config = (PostgresConnectorConfig) config;
        this.context = context;
 
        HikariConfig hikariConfig = new HikariConfig();
        hikariConfig.setJdbcUrl(String.format(
            "jdbc:postgresql://%s:%d/%s?sslmode=%s",
            this.config.getHost(),
            this.config.getPort(),
            this.config.getDatabase(),
            this.config.getSslMode()
        ));
        hikariConfig.setUsername(this.config.getUsername());
        hikariConfig.setPassword(this.config.getPassword());
        hikariConfig.setMaximumPoolSize(this.config.getConnectionPoolSize());
        hikariConfig.setConnectionTimeout(this.config.getQueryTimeout() * 1000L);
 
        this.dataSource = new HikariDataSource(hikariConfig);
        this.metadataExtractor = new PostgresMetadataExtractor(dataSource, this.config);
        this.dataReader = new PostgresDataReader(dataSource, this.config);
        this.changeTracker = new PostgresChangeTracker(dataSource, this.config);
    }
 
    @Override
    public HealthCheckResult testConnection() {
        try (var conn = dataSource.getConnection()) {
            var stmt = conn.createStatement();
            stmt.execute("SELECT 1");
            return HealthCheckResult.healthy("Connection successful");
        } catch (Exception e) {
            return HealthCheckResult.unhealthy(
                "Connection failed: " + e.getMessage(),
                e
            );
        }
    }
 
    @Override
    public MetadataExtractor getMetadataExtractor() {
        return metadataExtractor;
    }
 
    @Override
    public DataReader getDataReader() {
        return dataReader;
    }
 
    @Override
    public ChangeTracker getChangeTracker() {
        return changeTracker;
    }
 
    @Override
    public ConnectorMetadata getMetadata() {
        return ConnectorMetadata.builder()
            .name("postgresql")
            .displayName("PostgreSQL")
            .version("1.0.0")
            .description("Connector for PostgreSQL databases")
            .category(ConnectorCategory.DATABASE)
            .capabilities(Set.of(
                Capability.METADATA_EXTRACTION,
                Capability.FULL_DATA_READ,
                Capability.INCREMENTAL_READ,
                Capability.SCHEMA_CHANGE_DETECTION,
                Capability.PREDICATE_PUSHDOWN
            ))
            .build();
    }
 
    @Override
    public void close() {
        if (dataSource != null) {
            dataSource.close();
        }
    }
}

Step 4: Register the Connector

Create a service provider file for automatic discovery:

META-INF/services/ai.matih.connector.api.Connector:

ai.matih.connectors.postgres.PostgresConnector

Step 5: Test the Connector

package ai.matih.connectors.postgres;
 
import ai.matih.connector.testing.ConnectorTestHarness;
import org.junit.jupiter.api.Test;
 
class PostgresConnectorTest {
 
    @Test
    void testMetadataExtraction() {
        var harness = ConnectorTestHarness.forConnector(
            PostgresConnector.class,
            Map.of(
                "host", "localhost",
                "port", 5432,
                "database", "test_db",
                "username", "test_user",
                "password", "test_pass"
            )
        );
 
        harness.assertConnectionHealthy();
        harness.assertSchemasDiscovered();
        harness.assertTablesDiscovered("public");
        harness.assertColumnsMatch("public", "users", List.of(
            Column.of("id", DataType.BIGINT, false),
            Column.of("email", DataType.VARCHAR, false),
            Column.of("created_at", DataType.TIMESTAMP, true)
        ));
    }
 
    @Test
    void testDataReading() {
        var harness = ConnectorTestHarness.forConnector(
            PostgresConnector.class,
            Map.of(/* config */)
        );
 
        var result = harness.readTable("public", "users", ReadOptions.defaults());
        harness.assertRowCount(result, 100);
        harness.assertColumnsPresent(result, "id", "email", "created_at");
    }
}

Building a Cloud Storage Connector

Cloud storage connectors handle object stores like S3, Azure Blob Storage, and Google Cloud Storage.

S3 Connector Example

package ai.matih.connectors.s3;
 
import ai.matih.connector.api.*;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;
 
public class S3Connector implements Connector {
 
    private S3ConnectorConfig config;
    private S3Client s3Client;
 
    @Override
    public void initialize(ConnectorConfig config, ConnectorContext context) {
        this.config = (S3ConnectorConfig) config;
 
        this.s3Client = S3Client.builder()
            .region(Region.of(this.config.getRegion()))
            .credentialsProvider(this.config.getCredentialsProvider())
            .build();
    }
 
    @Override
    public HealthCheckResult testConnection() {
        try {
            s3Client.headBucket(HeadBucketRequest.builder()
                .bucket(config.getBucket())
                .build());
            return HealthCheckResult.healthy("Bucket accessible");
        } catch (Exception e) {
            return HealthCheckResult.unhealthy(
                "Cannot access bucket: " + e.getMessage(), e);
        }
    }
 
    @Override
    public ConnectorMetadata getMetadata() {
        return ConnectorMetadata.builder()
            .name("aws-s3")
            .displayName("Amazon S3")
            .version("1.0.0")
            .category(ConnectorCategory.CLOUD_STORAGE)
            .capabilities(Set.of(
                Capability.METADATA_EXTRACTION,
                Capability.FULL_DATA_READ,
                Capability.FILE_FORMAT_DETECTION
            ))
            .supportedFormats(Set.of("parquet", "csv", "json", "avro", "orc"))
            .build();
    }
 
    // MetadataExtractor and DataReader implementations...
 
    @Override
    public void close() {
        if (s3Client != null) {
            s3Client.close();
        }
    }
}

S3 Connector Configuration

public class S3ConnectorConfig extends ConnectorConfig {
 
    private String bucket;
    private String prefix = "";           // Object key prefix
    private String region = "us-east-1";
    private String fileFormat = "auto";   // auto, parquet, csv, json
    private boolean recursive = true;     // Scan subdirectories
    private String partitionPattern;      // e.g., "year={year}/month={month}"
    private int maxFilesPerSync = 10000;  // Limit for large buckets
 
    // Authentication (IRSA preferred in production)
    private String accessKeyId;           // From Secret
    private String secretAccessKey;       // From Secret
    private String roleArn;              // For cross-account access
    private boolean useIRSA = true;      // Use IRSA in EKS
 
    @Override
    public void validate() {
        requireNonEmpty("bucket", bucket);
        requireNonEmpty("region", region);
    }
}

Building a SaaS Connector

SaaS connectors integrate with external applications via their REST/GraphQL APIs. They must handle OAuth2 authentication, API rate limiting, and pagination.

Salesforce Connector Example

package ai.matih.connectors.salesforce;
 
import ai.matih.connector.api.*;
 
public class SalesforceConnector implements Connector {
 
    private SalesforceConfig config;
    private SalesforceOAuthClient oauthClient;
    private SalesforceApiClient apiClient;
 
    @Override
    public void initialize(ConnectorConfig config, ConnectorContext context) {
        this.config = (SalesforceConfig) config;
 
        // Initialize OAuth2 client
        this.oauthClient = new SalesforceOAuthClient(
            this.config.getInstanceUrl(),
            this.config.getClientId(),
            this.config.getClientSecret(),
            this.config.getRefreshToken()
        );
 
        // Initialize API client with auto-token-refresh
        this.apiClient = new SalesforceApiClient(
            this.config.getInstanceUrl(),
            oauthClient
        );
    }
 
    @Override
    public HealthCheckResult testConnection() {
        try {
            var identity = apiClient.getIdentity();
            return HealthCheckResult.healthy(
                "Connected as " + identity.getUsername()
            );
        } catch (Exception e) {
            return HealthCheckResult.unhealthy(
                "Salesforce connection failed: " + e.getMessage(), e
            );
        }
    }
 
    @Override
    public ConnectorMetadata getMetadata() {
        return ConnectorMetadata.builder()
            .name("salesforce")
            .displayName("Salesforce")
            .version("1.0.0")
            .category(ConnectorCategory.SAAS)
            .capabilities(Set.of(
                Capability.METADATA_EXTRACTION,
                Capability.FULL_DATA_READ,
                Capability.INCREMENTAL_READ,
                Capability.SCHEMA_CHANGE_DETECTION
            ))
            .authType(AuthType.OAUTH2)
            .build();
    }
 
    // MetadataExtractor discovers Salesforce objects (Account, Contact, etc.)
    // DataReader uses SOQL queries with pagination
    // ChangeTracker uses Salesforce Change Data Capture or getUpdated API
}

HubSpot Connector Configuration

public class HubSpotConfig extends ConnectorConfig {
 
    private String accessToken;       // From Secret (OAuth2 or Private App)
    private String portalId;
    private List<String> objects;     // e.g., ["contacts", "companies", "deals"]
    private String syncMode = "incremental";  // "full" or "incremental"
    private int apiRateLimit = 100;   // Requests per 10 seconds
    private int pageSize = 100;       // Items per page
 
    @Override
    public void validate() {
        requireNonEmpty("accessToken", accessToken);
        requireNonEmpty("objects", objects);
    }
}

Slack Connector Configuration

public class SlackConfig extends ConnectorConfig {
 
    private String botToken;          // From Secret
    private List<String> channels;    // Channel IDs to sync
    private boolean includeMessages = true;
    private boolean includeThreads = true;
    private boolean includeFiles = false;
    private int lookbackDays = 30;    // How far back to sync messages
 
    @Override
    public void validate() {
        requireNonEmpty("botToken", botToken);
    }
}

Connector Lifecycle

Every connector goes through a defined lifecycle managed by the Tenant Service:

PhaseDescriptionConnector Method
RegistrationUser registers the connector via the Tenant Service UI or APIvalidate() on config
Connection TestPlatform tests connectivity before savingtestConnection()
Initial SyncFull metadata extraction and data ingestiondiscoverSchemas(), readTable()
Scheduled SyncPeriodic incremental sync on a cron schedulereadIncremental(), detectSchemaChanges()
Manual RefreshUser-triggered metadata or data refreshSame as initial or incremental sync
DeregistrationUser removes the connectorclose()

Sync Modes

ModeDescriptionWhen to Use
FullExtract all data from the sourceInitial sync, after schema changes, data recovery
IncrementalExtract only changed data since last syncRegular scheduled syncs
CDC (Change Data Capture)Stream changes in real-time via KafkaReal-time analytics requirements

Connector Configuration via the Platform

Users configure connectors through the Tenant Service API or the Data Workbench UI. Connector configurations are stored encrypted in the tenant's settings with credentials in Kubernetes Secrets.

Registration API call:

POST /api/v1/tenants/{tenantId}/connectors
Content-Type: application/json
 
{
  "name": "Production Database",
  "type": "postgresql",
  "config": {
    "host": "prod-db.company.com",
    "port": 5432,
    "database": "analytics",
    "username": "matih_reader",
    "sslMode": "require",
    "schemas": ["public", "analytics"],
    "excludeTables": ["_migrations", "_temp_*"]
  },
  "credentials": {
    "password": "********"
  },
  "schedule": {
    "syncMode": "incremental",
    "cronExpression": "0 */6 * * *",
    "timezone": "UTC"
  }
}

Best Practices

Performance

PracticeDescription
Use batch readsRead data in batches (1000-10000 rows) rather than row-by-row
Implement predicate pushdownPush filter conditions to the source database for efficiency
Use connection poolingMaintain a connection pool to avoid connection creation overhead
Respect API rate limitsFor SaaS connectors, implement rate limiting with exponential backoff
Support parallelismAllow the platform to read multiple tables or partitions concurrently

Reliability

PracticeDescription
Implement retry logicRetry transient failures (network timeouts, rate limits) with backoff
Use checkpointingSave sync progress to allow resumption after failures
Handle schema evolutionDetect and report schema changes (new columns, type changes, drops)
Log comprehensivelyLog connection events, sync progress, and errors for troubleshooting
Test with large datasetsValidate performance with realistic data volumes

Security

PracticeDescription
Never log credentialsMask sensitive config values in all log output
Use minimum privilegesRequest only the database permissions needed (SELECT for read-only connectors)
Validate SSL certificatesEnable certificate verification for database and API connections
Support credential rotationHandle credential updates without requiring connector recreation
Follow the principle of least privilegeSaaS connectors should request minimal OAuth2 scopes

Publishing Connectors to the Marketplace

Custom connectors can be published to the MATIH Marketplace for other tenants to use:

  1. Package the connector as a JAR with the META-INF/services registration
  2. Include a connector-manifest.yaml with metadata, configuration schema, and documentation
  3. Submit via the Config Service marketplace API: POST /api/v1/marketplace/connectors
  4. The connector undergoes security review and testing before publication
  5. Once approved, it appears in the Marketplace for all tenants to install
# connector-manifest.yaml
name: my-postgres-connector
displayName: PostgreSQL (Custom)
version: 1.0.0
author: Acme Corp
description: Enhanced PostgreSQL connector with CDC support
category: database
icon: postgresql.svg
configSchema:
  type: object
  required: [host, database, username, password]
  properties:
    host:
      type: string
      title: Hostname
      description: PostgreSQL server hostname
    port:
      type: integer
      title: Port
      default: 5432
    database:
      type: string
      title: Database Name
    username:
      type: string
      title: Username
    password:
      type: string
      title: Password
      format: password
    sslMode:
      type: string
      title: SSL Mode
      enum: [disable, prefer, require, verify-ca, verify-full]
      default: prefer