Schema Extraction
Production - Qdrant-based schema discovery with vector embeddings
Schema Extraction discovers and indexes database schemas for use in the Text-to-SQL pipeline. It extracts table and column metadata, generates semantic embeddings, and stores them in Qdrant for fast similarity search during context retrieval.
12.3.1.1Schema Discovery
The QdrantSchemaStore in data-plane/ai-service/src/vector_store/ manages schema metadata:
class QdrantSchemaStore:
"""Vector store for schema metadata using Qdrant."""
async def initialize_collections(self):
"""Create Qdrant collections for schema metadata."""
# Collection for table metadata
await self._client.create_collection(
collection_name="matih_schema_metadata",
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
)
async def index_schema(self, tenant_id: str, tables: list[dict]):
"""Index table and column metadata with embeddings."""
for table in tables:
embedding = await self._embed(table["description"])
await self._client.upsert(
collection_name="matih_schema_metadata",
points=[PointStruct(
id=table["fqn"],
vector=embedding,
payload={
"tenant_id": tenant_id,
"table_name": table["table_name"],
"catalog": table["catalog"],
"schema": table["schema"],
"fqn": table["fqn"],
"columns": table["columns"],
"description": table["description"],
},
)],
)
async def retrieve_context(self, tenant_id: str, question: str, top_k: int = 10):
"""Retrieve relevant schema context for a question."""
question_embedding = await self._embed(question)
results = await self._client.search(
collection_name="matih_schema_metadata",
query_vector=question_embedding,
query_filter=Filter(must=[
FieldCondition(key="tenant_id", match=MatchValue(value=tenant_id)),
]),
limit=top_k,
)
return RetrievedContext(
tables=[r.payload for r in results],
columns=self._extract_columns(results),
relationships=self._extract_relationships(results),
semantic_mappings=self._extract_mappings(results),
sample_queries=self._find_similar_queries(question_embedding),
relevance_scores={r.payload["fqn"]: r.score for r in results},
)12.3.1.2API Endpoints
# Index schema for a tenant
curl -X POST http://localhost:8000/api/v1/schema/index \
-H "Content-Type: application/json" \
-H "X-Tenant-ID: acme-corp" \
-d '{
"catalog": "lakehouse",
"schema": "sales",
"tables": [
{
"table_name": "orders",
"fqn": "lakehouse.sales.orders",
"description": "Customer order transactions with revenue data",
"columns": [
{"name": "order_id", "type": "BIGINT", "description": "Unique order identifier"},
{"name": "customer_id", "type": "BIGINT", "description": "Customer reference"},
{"name": "order_date", "type": "DATE", "description": "Date the order was placed"},
{"name": "revenue", "type": "DECIMAL(18,2)", "description": "Order revenue in USD"}
]
}
]
}'
# Search schema
curl "http://localhost:8000/api/v1/schema/search?tenant_id=acme-corp&query=customer+orders&top_k=5"