MATIH Platform is in active MVP development. Documentation reflects current implementation status.
12. AI Service
Schema Extraction

Schema Extraction

Production - Qdrant-based schema discovery with vector embeddings

Schema Extraction discovers and indexes database schemas for use in the Text-to-SQL pipeline. It extracts table and column metadata, generates semantic embeddings, and stores them in Qdrant for fast similarity search during context retrieval.


12.3.1.1Schema Discovery

The QdrantSchemaStore in data-plane/ai-service/src/vector_store/ manages schema metadata:

class QdrantSchemaStore:
    """Vector store for schema metadata using Qdrant."""
 
    async def initialize_collections(self):
        """Create Qdrant collections for schema metadata."""
        # Collection for table metadata
        await self._client.create_collection(
            collection_name="matih_schema_metadata",
            vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
        )
 
    async def index_schema(self, tenant_id: str, tables: list[dict]):
        """Index table and column metadata with embeddings."""
        for table in tables:
            embedding = await self._embed(table["description"])
            await self._client.upsert(
                collection_name="matih_schema_metadata",
                points=[PointStruct(
                    id=table["fqn"],
                    vector=embedding,
                    payload={
                        "tenant_id": tenant_id,
                        "table_name": table["table_name"],
                        "catalog": table["catalog"],
                        "schema": table["schema"],
                        "fqn": table["fqn"],
                        "columns": table["columns"],
                        "description": table["description"],
                    },
                )],
            )
 
    async def retrieve_context(self, tenant_id: str, question: str, top_k: int = 10):
        """Retrieve relevant schema context for a question."""
        question_embedding = await self._embed(question)
        results = await self._client.search(
            collection_name="matih_schema_metadata",
            query_vector=question_embedding,
            query_filter=Filter(must=[
                FieldCondition(key="tenant_id", match=MatchValue(value=tenant_id)),
            ]),
            limit=top_k,
        )
        return RetrievedContext(
            tables=[r.payload for r in results],
            columns=self._extract_columns(results),
            relationships=self._extract_relationships(results),
            semantic_mappings=self._extract_mappings(results),
            sample_queries=self._find_similar_queries(question_embedding),
            relevance_scores={r.payload["fqn"]: r.score for r in results},
        )

12.3.1.2API Endpoints

# Index schema for a tenant
curl -X POST http://localhost:8000/api/v1/schema/index \
  -H "Content-Type: application/json" \
  -H "X-Tenant-ID: acme-corp" \
  -d '{
    "catalog": "lakehouse",
    "schema": "sales",
    "tables": [
      {
        "table_name": "orders",
        "fqn": "lakehouse.sales.orders",
        "description": "Customer order transactions with revenue data",
        "columns": [
          {"name": "order_id", "type": "BIGINT", "description": "Unique order identifier"},
          {"name": "customer_id", "type": "BIGINT", "description": "Customer reference"},
          {"name": "order_date", "type": "DATE", "description": "Date the order was placed"},
          {"name": "revenue", "type": "DECIMAL(18,2)", "description": "Order revenue in USD"}
        ]
      }
    ]
  }'
 
# Search schema
curl "http://localhost:8000/api/v1/schema/search?tenant_id=acme-corp&query=customer+orders&top_k=5"