|
|
""" |
|
|
Lineage Graph Accelerator - Hugging Face Space |
|
|
A Gradio-based AI agent for extracting and visualizing data lineage from various sources. |
|
|
|
|
|
Built for the Gradio Agents & MCP Hackathon - Winter 2025 |
|
|
""" |
|
|
|
|
|
import gradio as gr |
|
|
import json |
|
|
import os |
|
|
import requests |
|
|
from typing import Optional, Tuple, Dict, Any, List |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
try: |
|
|
from exporters import ( |
|
|
LineageGraph, LineageNode, LineageEdge, |
|
|
OpenLineageExporter, CollibraExporter, PurviewExporter, AlationExporter, AtlasExporter |
|
|
) |
|
|
EXPORTERS_AVAILABLE = True |
|
|
except ImportError: |
|
|
EXPORTERS_AVAILABLE = False |
|
|
|
|
|
|
|
|
try: |
|
|
import google.generativeai as genai |
|
|
GEMINI_AVAILABLE = True |
|
|
except ImportError: |
|
|
GEMINI_AVAILABLE = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SAMPLE_FILES = { |
|
|
"simple": "sample_metadata.json", |
|
|
"dbt": "dbt_manifest_sample.json", |
|
|
"airflow": "airflow_dag_sample.json", |
|
|
"sql": "sql_ddl_sample.sql", |
|
|
"warehouse": "warehouse_lineage_sample.json", |
|
|
"etl": "etl_pipeline_sample.json", |
|
|
"complex": "complex_lineage_demo.json", |
|
|
"api": "sample_api_metadata.json", |
|
|
"bigquery": "sample_bigquery.sql" |
|
|
} |
|
|
|
|
|
EXPORT_FORMATS = ["OpenLineage", "Collibra", "Purview", "Alation", "Atlas"] |
|
|
|
|
|
|
|
|
MCP_PRESETS = { |
|
|
"local_demo": { |
|
|
"name": "Local Demo MCP (Built-in)", |
|
|
"url": "local://demo", |
|
|
"schema_url": "local://demo/schema", |
|
|
"description": "Built-in demo MCP server that provides sample lineage metadata for testing", |
|
|
"tools": ["get_sample_lineage", "get_dbt_metadata", "get_airflow_dag", "get_warehouse_schema"] |
|
|
}, |
|
|
"mcp_tools": { |
|
|
"name": "MCP Tools by abidlabs", |
|
|
"url": "https://abidlabs-mcp-tools.hf.space/gradio_api/mcp/sse", |
|
|
"schema_url": "https://abidlabs-mcp-tools.hf.space/gradio_api/mcp/schema", |
|
|
"description": "Demo MCP server with utility tools for testing integration", |
|
|
"tools": ["prime_factors", "generate_cheetah_image", "image_orientation"] |
|
|
}, |
|
|
"huggingface_mcp": { |
|
|
"name": "HuggingFace MCP by dylanebert", |
|
|
"url": "https://dylanebert-huggingface-mcp.hf.space/gradio_api/mcp/sse", |
|
|
"schema_url": "https://dylanebert-huggingface-mcp.hf.space/gradio_api/mcp/schema", |
|
|
"description": "Search and explore HuggingFace models, datasets, and spaces", |
|
|
"tools": ["search_models", "search_datasets", "get_model_card"] |
|
|
}, |
|
|
"ragmint": { |
|
|
"name": "Ragmint RAG Pipeline", |
|
|
"url": "https://mcp-1st-birthday-ragmint-mcp-server.hf.space/gradio_api/mcp/sse", |
|
|
"schema_url": "https://mcp-1st-birthday-ragmint-mcp-server.hf.space/gradio_api/mcp/schema", |
|
|
"description": "RAG pipeline optimization and document retrieval", |
|
|
"tools": ["optimize_rag", "retrieve_documents"] |
|
|
}, |
|
|
"web_search": { |
|
|
"name": "Web Search MCP", |
|
|
"url": "https://agents-mcp-hackathon-search-web-mcp-server.hf.space/gradio_api/mcp/sse", |
|
|
"schema_url": "https://agents-mcp-hackathon-search-web-mcp-server.hf.space/gradio_api/mcp/schema", |
|
|
"description": "Search the web for data and documentation", |
|
|
"tools": ["search_web", "fetch_page"] |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LOCAL_MCP_METADATA = { |
|
|
"ecommerce_pipeline": { |
|
|
"name": "E-commerce Data Pipeline", |
|
|
"nodes": [ |
|
|
{"id": "raw_orders", "type": "source", "name": "Raw Orders (PostgreSQL)"}, |
|
|
{"id": "raw_customers", "type": "source", "name": "Raw Customers (PostgreSQL)"}, |
|
|
{"id": "raw_products", "type": "source", "name": "Raw Products (API)"}, |
|
|
{"id": "stg_orders", "type": "model", "name": "Staging Orders"}, |
|
|
{"id": "stg_customers", "type": "model", "name": "Staging Customers"}, |
|
|
{"id": "stg_products", "type": "model", "name": "Staging Products"}, |
|
|
{"id": "dim_customers", "type": "dimension", "name": "Dim Customers"}, |
|
|
{"id": "dim_products", "type": "dimension", "name": "Dim Products"}, |
|
|
{"id": "fact_orders", "type": "fact", "name": "Fact Orders"}, |
|
|
{"id": "mart_sales", "type": "table", "name": "Sales Mart"}, |
|
|
{"id": "report_daily", "type": "report", "name": "Daily Sales Report"} |
|
|
], |
|
|
"edges": [ |
|
|
{"from": "raw_orders", "to": "stg_orders"}, |
|
|
{"from": "raw_customers", "to": "stg_customers"}, |
|
|
{"from": "raw_products", "to": "stg_products"}, |
|
|
{"from": "stg_customers", "to": "dim_customers"}, |
|
|
{"from": "stg_products", "to": "dim_products"}, |
|
|
{"from": "stg_orders", "to": "fact_orders"}, |
|
|
{"from": "dim_customers", "to": "fact_orders"}, |
|
|
{"from": "dim_products", "to": "fact_orders"}, |
|
|
{"from": "fact_orders", "to": "mart_sales"}, |
|
|
{"from": "mart_sales", "to": "report_daily"} |
|
|
] |
|
|
}, |
|
|
"ml_pipeline": { |
|
|
"name": "ML Feature Pipeline", |
|
|
"nodes": [ |
|
|
{"id": "raw_events", "type": "source", "name": "Event Stream (Kafka)"}, |
|
|
{"id": "raw_user_data", "type": "source", "name": "User Data (S3)"}, |
|
|
{"id": "feature_eng", "type": "model", "name": "Feature Engineering"}, |
|
|
{"id": "feature_store", "type": "table", "name": "Feature Store"}, |
|
|
{"id": "training_data", "type": "table", "name": "Training Dataset"}, |
|
|
{"id": "model_output", "type": "destination", "name": "Model Predictions"} |
|
|
], |
|
|
"edges": [ |
|
|
{"from": "raw_events", "to": "feature_eng"}, |
|
|
{"from": "raw_user_data", "to": "feature_eng"}, |
|
|
{"from": "feature_eng", "to": "feature_store"}, |
|
|
{"from": "feature_store", "to": "training_data"}, |
|
|
{"from": "training_data", "to": "model_output"} |
|
|
] |
|
|
}, |
|
|
"data_warehouse": { |
|
|
"name": "Data Warehouse Schema", |
|
|
"nodes": [ |
|
|
{"id": "src_crm", "type": "source", "name": "CRM System"}, |
|
|
{"id": "src_erp", "type": "source", "name": "ERP System"}, |
|
|
{"id": "src_web", "type": "source", "name": "Web Analytics"}, |
|
|
{"id": "landing_crm", "type": "table", "name": "Landing CRM"}, |
|
|
{"id": "landing_erp", "type": "table", "name": "Landing ERP"}, |
|
|
{"id": "landing_web", "type": "table", "name": "Landing Web"}, |
|
|
{"id": "dwh_customers", "type": "dimension", "name": "DWH Customers"}, |
|
|
{"id": "dwh_transactions", "type": "fact", "name": "DWH Transactions"}, |
|
|
{"id": "bi_dashboard", "type": "report", "name": "BI Dashboard"} |
|
|
], |
|
|
"edges": [ |
|
|
{"from": "src_crm", "to": "landing_crm"}, |
|
|
{"from": "src_erp", "to": "landing_erp"}, |
|
|
{"from": "src_web", "to": "landing_web"}, |
|
|
{"from": "landing_crm", "to": "dwh_customers"}, |
|
|
{"from": "landing_erp", "to": "dwh_transactions"}, |
|
|
{"from": "landing_web", "to": "dwh_transactions"}, |
|
|
{"from": "dwh_customers", "to": "dwh_transactions"}, |
|
|
{"from": "dwh_transactions", "to": "bi_dashboard"} |
|
|
] |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
def local_mcp_get_metadata(tool_name: str, query: str = "") -> Dict[str, Any]: |
|
|
"""Simulate a local MCP server that returns sample metadata.""" |
|
|
if tool_name == "get_sample_lineage" or tool_name == "search": |
|
|
|
|
|
if "ecommerce" in query.lower() or "sales" in query.lower(): |
|
|
return LOCAL_MCP_METADATA["ecommerce_pipeline"] |
|
|
elif "ml" in query.lower() or "feature" in query.lower(): |
|
|
return LOCAL_MCP_METADATA["ml_pipeline"] |
|
|
elif "warehouse" in query.lower() or "dwh" in query.lower(): |
|
|
return LOCAL_MCP_METADATA["data_warehouse"] |
|
|
else: |
|
|
|
|
|
return LOCAL_MCP_METADATA["ecommerce_pipeline"] |
|
|
elif tool_name == "get_dbt_metadata": |
|
|
return LOCAL_MCP_METADATA["ecommerce_pipeline"] |
|
|
elif tool_name == "get_airflow_dag": |
|
|
return LOCAL_MCP_METADATA["ml_pipeline"] |
|
|
elif tool_name == "get_warehouse_schema": |
|
|
return LOCAL_MCP_METADATA["data_warehouse"] |
|
|
elif tool_name == "list_datasets": |
|
|
return {"datasets": list(LOCAL_MCP_METADATA.keys())} |
|
|
else: |
|
|
return LOCAL_MCP_METADATA["ecommerce_pipeline"] |
|
|
|
|
|
|
|
|
def is_local_mcp(url: str) -> bool: |
|
|
"""Check if the URL is for the local demo MCP server.""" |
|
|
return url and url.startswith("local://") |
|
|
|
|
|
|
|
|
def call_local_mcp(tool_name: str, query: str = "") -> Tuple[str, str]: |
|
|
"""Call the local MCP server and return metadata as JSON string.""" |
|
|
metadata = local_mcp_get_metadata(tool_name, query) |
|
|
return json.dumps(metadata, indent=2), f"Fetched '{metadata.get('name', 'lineage')}' from Local Demo MCP" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import base64 |
|
|
import urllib.parse |
|
|
|
|
|
def render_mermaid(viz_code: str) -> str: |
|
|
"""Render mermaid diagram using mermaid.ink service (renders as SVG image).""" |
|
|
|
|
|
|
|
|
encoded = base64.urlsafe_b64encode(viz_code.encode('utf-8')).decode('utf-8') |
|
|
|
|
|
|
|
|
img_url = f"https://mermaid.ink/svg/{encoded}" |
|
|
|
|
|
|
|
|
png_url = f"https://mermaid.ink/img/{encoded}" |
|
|
|
|
|
|
|
|
editor_url = f"https://mermaid.live/edit#base64:{base64.b64encode(viz_code.encode('utf-8')).decode('utf-8')}" |
|
|
|
|
|
html = f''' |
|
|
<div style="background: white; padding: 20px; border-radius: 8px; min-height: 200px;"> |
|
|
<div style="overflow: auto; max-height: 500px; border: 1px solid #e0e0e0; border-radius: 4px; padding: 10px;"> |
|
|
<img id="lineage-graph" src="{img_url}" alt="Lineage Graph" style="max-width: 100%; height: auto; cursor: zoom-in;" onclick="this.style.maxWidth = this.style.maxWidth === 'none' ? '100%' : 'none'; this.style.cursor = this.style.cursor === 'zoom-in' ? 'zoom-out' : 'zoom-in';" /> |
|
|
</div> |
|
|
<div style="margin-top: 12px; display: flex; gap: 16px; flex-wrap: wrap; align-items: center;"> |
|
|
<a href="{editor_url}" target="_blank" style="color: #7c3aed; text-decoration: none; font-size: 13px;"> |
|
|
Edit in Mermaid Live |
|
|
</a> |
|
|
<a href="{png_url}" download="lineage_graph.png" style="color: #2563eb; text-decoration: none; font-size: 13px;"> |
|
|
Download PNG |
|
|
</a> |
|
|
<a href="{img_url}" download="lineage_graph.svg" style="color: #059669; text-decoration: none; font-size: 13px;"> |
|
|
Download SVG |
|
|
</a> |
|
|
<span style="color: #888; font-size: 12px; margin-left: auto;">Click graph to zoom</span> |
|
|
</div> |
|
|
</div> |
|
|
''' |
|
|
return html |
|
|
|
|
|
|
|
|
def render_mermaid_code(viz_code: str) -> str: |
|
|
"""Return the raw mermaid code for display.""" |
|
|
return viz_code |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_metadata_to_graph(metadata_text: str, source_type: str) -> Tuple[LineageGraph, str]: |
|
|
"""Parse metadata text into a LineageGraph structure.""" |
|
|
try: |
|
|
|
|
|
if metadata_text.strip().startswith('{') or metadata_text.strip().startswith('['): |
|
|
data = json.loads(metadata_text) |
|
|
else: |
|
|
|
|
|
data = {"raw_content": metadata_text, "source_type": source_type} |
|
|
|
|
|
graph = LineageGraph(name=f"Lineage from {source_type}") |
|
|
|
|
|
|
|
|
if "lineage_graph" in data: |
|
|
|
|
|
lg = data["lineage_graph"] |
|
|
for node_data in lg.get("nodes", []): |
|
|
node = LineageNode( |
|
|
id=node_data.get("id"), |
|
|
name=node_data.get("name"), |
|
|
type=node_data.get("type", "table"), |
|
|
category=node_data.get("category"), |
|
|
description=node_data.get("description"), |
|
|
metadata=node_data.get("metadata"), |
|
|
tags=node_data.get("tags") |
|
|
) |
|
|
graph.add_node(node) |
|
|
for edge_data in lg.get("edges", []): |
|
|
edge = LineageEdge( |
|
|
source=edge_data.get("from"), |
|
|
target=edge_data.get("to"), |
|
|
type=edge_data.get("type", "transform") |
|
|
) |
|
|
graph.add_edge(edge) |
|
|
|
|
|
elif "nodes" in data and "edges" in data: |
|
|
|
|
|
for node_data in data.get("nodes", []): |
|
|
node = LineageNode( |
|
|
id=node_data.get("id"), |
|
|
name=node_data.get("name", node_data.get("id")), |
|
|
type=node_data.get("type", "table") |
|
|
) |
|
|
graph.add_node(node) |
|
|
for edge_data in data.get("edges", []): |
|
|
edge = LineageEdge( |
|
|
source=edge_data.get("from"), |
|
|
target=edge_data.get("to"), |
|
|
type=edge_data.get("type", "transform") |
|
|
) |
|
|
graph.add_edge(edge) |
|
|
|
|
|
elif "nodes" in data: |
|
|
|
|
|
for node_id, node_data in data.get("nodes", {}).items(): |
|
|
node = LineageNode( |
|
|
id=node_id, |
|
|
name=node_data.get("name", node_id.split(".")[-1]), |
|
|
type=node_data.get("resource_type", "model"), |
|
|
schema=node_data.get("schema"), |
|
|
database=node_data.get("database"), |
|
|
description=node_data.get("description") |
|
|
) |
|
|
graph.add_node(node) |
|
|
|
|
|
deps = node_data.get("depends_on", {}).get("nodes", []) |
|
|
for dep in deps: |
|
|
edge = LineageEdge(source=dep, target=node_id, type="transform") |
|
|
graph.add_edge(edge) |
|
|
|
|
|
elif "tasks" in data: |
|
|
|
|
|
for task in data.get("tasks", []): |
|
|
node = LineageNode( |
|
|
id=task.get("task_id"), |
|
|
name=task.get("task_id"), |
|
|
type="task", |
|
|
description=task.get("description") |
|
|
) |
|
|
graph.add_node(node) |
|
|
|
|
|
for dep in task.get("upstream_dependencies", []): |
|
|
edge = LineageEdge(source=dep, target=task.get("task_id"), type="dependency") |
|
|
graph.add_edge(edge) |
|
|
|
|
|
elif "lineage" in data: |
|
|
|
|
|
lineage = data.get("lineage", {}) |
|
|
for dataset in lineage.get("datasets", []): |
|
|
node = LineageNode( |
|
|
id=dataset.get("id"), |
|
|
name=dataset.get("name", dataset.get("id")), |
|
|
type=dataset.get("type", "table"), |
|
|
schema=dataset.get("schema"), |
|
|
database=dataset.get("database"), |
|
|
description=dataset.get("description"), |
|
|
owner=dataset.get("owner"), |
|
|
tags=dataset.get("tags") |
|
|
) |
|
|
graph.add_node(node) |
|
|
for rel in lineage.get("relationships", []): |
|
|
edge = LineageEdge( |
|
|
source=rel.get("source"), |
|
|
target=rel.get("target"), |
|
|
type=rel.get("type", "transform"), |
|
|
job_name=rel.get("job") |
|
|
) |
|
|
graph.add_edge(edge) |
|
|
|
|
|
elif "stages" in data: |
|
|
|
|
|
for stage in data.get("stages", []): |
|
|
for step in stage.get("steps", []): |
|
|
node = LineageNode( |
|
|
id=step.get("id"), |
|
|
name=step.get("name", step.get("id")), |
|
|
type="step", |
|
|
category=stage.get("id"), |
|
|
description=step.get("description") or step.get("logic") |
|
|
) |
|
|
graph.add_node(node) |
|
|
|
|
|
for inp in step.get("inputs", []): |
|
|
edge = LineageEdge(source=inp, target=step.get("id"), type="transform") |
|
|
graph.add_edge(edge) |
|
|
else: |
|
|
|
|
|
graph.add_node(LineageNode(id="source", name="Source", type="source")) |
|
|
graph.add_node(LineageNode(id="target", name="Target", type="table")) |
|
|
graph.add_edge(LineageEdge(source="source", target="target", type="transform")) |
|
|
|
|
|
summary = f"Parsed {len(graph.nodes)} nodes and {len(graph.edges)} relationships from {source_type}" |
|
|
return graph, summary |
|
|
|
|
|
except json.JSONDecodeError as e: |
|
|
|
|
|
graph = LineageGraph(name=f"Lineage from {source_type}") |
|
|
graph.add_node(LineageNode(id="input", name="Input Data", type="source")) |
|
|
graph.add_node(LineageNode(id="output", name="Output Data", type="table")) |
|
|
graph.add_edge(LineageEdge(source="input", target="output", type="transform")) |
|
|
return graph, f"Created placeholder lineage (could not parse as JSON: {str(e)[:50]})" |
|
|
except Exception as e: |
|
|
graph = LineageGraph(name="Error") |
|
|
return graph, f"Error parsing metadata: {str(e)}" |
|
|
|
|
|
|
|
|
def sanitize_mermaid_text(text: str) -> str: |
|
|
"""Sanitize text for use in Mermaid diagrams by escaping special characters.""" |
|
|
if not text: |
|
|
return "Unknown" |
|
|
|
|
|
|
|
|
text = text.replace("(", " - ").replace(")", "") |
|
|
text = text.replace("[", " ").replace("]", " ") |
|
|
text = text.replace("{", " ").replace("}", " ") |
|
|
text = text.replace('"', "'") |
|
|
text = text.replace("<", "").replace(">", "") |
|
|
text = text.replace("#", "") |
|
|
return text.strip() |
|
|
|
|
|
|
|
|
def generate_mermaid_from_graph(graph: LineageGraph) -> str: |
|
|
"""Generate Mermaid diagram code from a LineageGraph.""" |
|
|
if not graph.nodes: |
|
|
return "graph TD\n A[No data to display]" |
|
|
|
|
|
lines = [ |
|
|
"%%{init: {'theme': 'base', 'themeVariables': {'fontSize': '12px', 'fontFamily': 'arial', 'primaryColor': '#e8f5e9', 'primaryBorderColor': '#4caf50', 'lineColor': '#666'}}}%%", |
|
|
"graph TD" |
|
|
] |
|
|
|
|
|
|
|
|
categories = {} |
|
|
for node in graph.nodes: |
|
|
cat = node.category or "default" |
|
|
if cat not in categories: |
|
|
categories[cat] = [] |
|
|
categories[cat].append(node) |
|
|
|
|
|
|
|
|
node_styles = { |
|
|
"source": "fill:#e3f2fd,stroke:#1976d2,stroke-width:1px,rx:5,ry:5", |
|
|
"external_api": "fill:#e3f2fd,stroke:#1976d2,stroke-width:1px,rx:5,ry:5", |
|
|
"table": "fill:#e8f5e9,stroke:#388e3c,stroke-width:1px,rx:5,ry:5", |
|
|
"view": "fill:#f3e5f5,stroke:#7b1fa2,stroke-width:1px,rx:5,ry:5", |
|
|
"model": "fill:#fff3e0,stroke:#f57c00,stroke-width:1px,rx:5,ry:5", |
|
|
"report": "fill:#fce4ec,stroke:#c2185b,stroke-width:1px,rx:5,ry:5", |
|
|
"dimension": "fill:#e0f7fa,stroke:#0097a7,stroke-width:1px,rx:5,ry:5", |
|
|
"fact": "fill:#fff8e1,stroke:#ffa000,stroke-width:1px,rx:5,ry:5", |
|
|
"destination": "fill:#ffebee,stroke:#d32f2f,stroke-width:1px,rx:5,ry:5", |
|
|
"task": "fill:#fafafa,stroke:#616161,stroke-width:1px,rx:5,ry:5" |
|
|
} |
|
|
|
|
|
|
|
|
if len(categories) > 1: |
|
|
for cat, nodes in categories.items(): |
|
|
if cat != "default": |
|
|
lines.append(f" subgraph {sanitize_mermaid_text(cat.replace('_', ' ').title())}") |
|
|
for node in nodes: |
|
|
safe_name = sanitize_mermaid_text(node.name) |
|
|
shape = f"[{safe_name}]" if node.type in ["table", "model"] else f"({safe_name})" |
|
|
lines.append(f" {node.id}{shape}") |
|
|
lines.append(" end") |
|
|
else: |
|
|
for node in nodes: |
|
|
safe_name = sanitize_mermaid_text(node.name) |
|
|
shape = f"[{safe_name}]" if node.type in ["table", "model"] else f"({safe_name})" |
|
|
lines.append(f" {node.id}{shape}") |
|
|
else: |
|
|
for node in graph.nodes: |
|
|
safe_name = sanitize_mermaid_text(node.name) |
|
|
shape = f"[{safe_name}]" if node.type in ["table", "model"] else f"({safe_name})" |
|
|
lines.append(f" {node.id}{shape}") |
|
|
|
|
|
|
|
|
edge_labels = { |
|
|
"transform": "-->", |
|
|
"reference": "-.->", |
|
|
"ingest": "-->", |
|
|
"export": "-->", |
|
|
"join": "-->", |
|
|
"aggregate": "-->", |
|
|
"dependency": "-->" |
|
|
} |
|
|
|
|
|
for edge in graph.edges: |
|
|
arrow = edge_labels.get(edge.type, "-->") |
|
|
if edge.type and edge.type not in ["transform", "dependency"]: |
|
|
lines.append(f" {edge.source} {arrow}|{edge.type}| {edge.target}") |
|
|
else: |
|
|
lines.append(f" {edge.source} {arrow} {edge.target}") |
|
|
|
|
|
|
|
|
for node in graph.nodes: |
|
|
style = node_styles.get(node.type, "fill:#f5f5f5") |
|
|
lines.append(f" style {node.id} {style}") |
|
|
|
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def send_to_mcp(server_url: str, api_key: str, metadata_text: str, source_type: str, viz_format: str) -> Tuple[str, str]: |
|
|
"""Send metadata to an external MCP server and return visualization + summary.""" |
|
|
if not server_url: |
|
|
return "", "No MCP server URL configured." |
|
|
try: |
|
|
payload = { |
|
|
"metadata": metadata_text, |
|
|
"source_type": source_type, |
|
|
"viz_format": viz_format, |
|
|
} |
|
|
headers = {"Content-Type": "application/json"} |
|
|
if api_key: |
|
|
headers["Authorization"] = f"Bearer {api_key}" |
|
|
resp = requests.post(server_url, json=payload, headers=headers, timeout=30) |
|
|
if 200 <= resp.status_code < 300: |
|
|
data = resp.json() |
|
|
viz = data.get("visualization") or data.get("viz") or data.get("mermaid", "") |
|
|
summary = data.get("summary", "Processed by MCP server.") |
|
|
if viz: |
|
|
return render_mermaid(viz), summary |
|
|
return "", summary |
|
|
else: |
|
|
return "", f"MCP server returned status {resp.status_code}: {resp.text[:200]}" |
|
|
except Exception as e: |
|
|
return "", f"Error contacting MCP server: {e}" |
|
|
|
|
|
|
|
|
def test_mcp_connection(server_url: str, api_key: str) -> str: |
|
|
"""Health-check to MCP server by fetching schema.""" |
|
|
if not server_url: |
|
|
return "No MCP server URL configured." |
|
|
|
|
|
|
|
|
if is_local_mcp(server_url): |
|
|
tools = MCP_PRESETS.get("local_demo", {}).get("tools", []) |
|
|
return f"Local Demo MCP ready! {len(tools)} tools available: {', '.join(tools)}" |
|
|
|
|
|
try: |
|
|
headers = {} |
|
|
if api_key: |
|
|
headers["Authorization"] = f"Bearer {api_key}" |
|
|
|
|
|
|
|
|
schema_url = server_url.replace("/sse", "/schema").replace("/mcp/mcp", "/mcp") |
|
|
try: |
|
|
resp = requests.get(schema_url, headers=headers, timeout=15) |
|
|
if resp.status_code == 200: |
|
|
try: |
|
|
schema = resp.json() |
|
|
tool_count = len(schema) if isinstance(schema, dict) else 0 |
|
|
return f"Connected! Found {tool_count} tools available." |
|
|
except: |
|
|
return f"Connected to MCP server: {resp.status_code} OK" |
|
|
except requests.exceptions.RequestException: |
|
|
pass |
|
|
|
|
|
|
|
|
base_url = server_url.replace("/gradio_api/mcp/sse", "") |
|
|
try: |
|
|
resp = requests.get(base_url, headers=headers, timeout=10) |
|
|
if resp.status_code == 200: |
|
|
return f"Server reachable (status {resp.status_code})" |
|
|
except: |
|
|
pass |
|
|
|
|
|
return "MCP server may be sleeping. Try again in a moment." |
|
|
except Exception as e: |
|
|
return f"Error contacting MCP server: {e}" |
|
|
|
|
|
|
|
|
def get_preset_url(preset_key: str) -> str: |
|
|
"""Get the URL for a preset MCP server.""" |
|
|
if preset_key in MCP_PRESETS: |
|
|
return MCP_PRESETS[preset_key]["url"] |
|
|
return "" |
|
|
|
|
|
|
|
|
def get_preset_description(preset_key: str) -> str: |
|
|
"""Get description and available tools for a preset MCP server.""" |
|
|
if preset_key in MCP_PRESETS: |
|
|
preset = MCP_PRESETS[preset_key] |
|
|
tools = ", ".join(preset.get("tools", [])) |
|
|
return f"{preset['description']}\n\nAvailable tools: {tools}" |
|
|
return "" |
|
|
|
|
|
|
|
|
def fetch_metadata_from_mcp(server_url: str, api_key: str, query: str) -> Tuple[str, str]: |
|
|
"""Fetch metadata from an MCP server and return it for lineage visualization.""" |
|
|
if not server_url: |
|
|
return "", "Please select or enter an MCP server URL first." |
|
|
|
|
|
try: |
|
|
headers = {"Content-Type": "application/json"} |
|
|
if api_key: |
|
|
headers["Authorization"] = f"Bearer {api_key}" |
|
|
|
|
|
|
|
|
|
|
|
payload = { |
|
|
"jsonrpc": "2.0", |
|
|
"method": "tools/call", |
|
|
"params": { |
|
|
"name": "search", |
|
|
"arguments": {"query": query} |
|
|
}, |
|
|
"id": 1 |
|
|
} |
|
|
|
|
|
|
|
|
base_url = server_url.replace("/sse", "") |
|
|
resp = requests.post(base_url, json=payload, headers=headers, timeout=30) |
|
|
|
|
|
if resp.status_code == 200: |
|
|
try: |
|
|
data = resp.json() |
|
|
|
|
|
if isinstance(data, dict): |
|
|
result = data.get("result", data) |
|
|
|
|
|
lineage_data = { |
|
|
"nodes": [ |
|
|
{"id": "mcp_source", "type": "source", "name": f"MCP: {query}"}, |
|
|
{"id": "mcp_result", "type": "table", "name": "Query Result"} |
|
|
], |
|
|
"edges": [ |
|
|
{"from": "mcp_source", "to": "mcp_result"} |
|
|
], |
|
|
"metadata": result |
|
|
} |
|
|
return json.dumps(lineage_data, indent=2), f"Fetched metadata from MCP server for query: {query}" |
|
|
except json.JSONDecodeError: |
|
|
pass |
|
|
|
|
|
|
|
|
sample_lineage = { |
|
|
"nodes": [ |
|
|
{"id": "mcp_server", "type": "source", "name": server_url.split("/")[2]}, |
|
|
{"id": "query", "type": "model", "name": f"Query: {query[:30]}..."}, |
|
|
{"id": "result", "type": "table", "name": "MCP Result"} |
|
|
], |
|
|
"edges": [ |
|
|
{"from": "mcp_server", "to": "query"}, |
|
|
{"from": "query", "to": "result"} |
|
|
] |
|
|
} |
|
|
return json.dumps(sample_lineage, indent=2), f"Created lineage template for MCP query. Connect to the MCP server to fetch real metadata." |
|
|
|
|
|
except Exception as e: |
|
|
return "", f"Error fetching from MCP server: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def export_lineage(metadata_text: str, source_type: str, export_format: str) -> Tuple[str, str]: |
|
|
"""Export lineage to the specified data catalog format.""" |
|
|
if not EXPORTERS_AVAILABLE: |
|
|
return "", "Export functionality not available. Please install the exporters module." |
|
|
|
|
|
try: |
|
|
graph, _ = parse_metadata_to_graph(metadata_text, source_type) |
|
|
|
|
|
if export_format == "OpenLineage": |
|
|
exporter = OpenLineageExporter(graph) |
|
|
elif export_format == "Collibra": |
|
|
exporter = CollibraExporter(graph) |
|
|
elif export_format == "Purview": |
|
|
exporter = PurviewExporter(graph) |
|
|
elif export_format == "Alation": |
|
|
exporter = AlationExporter(graph) |
|
|
elif export_format == "Atlas": |
|
|
exporter = AtlasExporter(graph) |
|
|
else: |
|
|
return "", f"Unknown export format: {export_format}" |
|
|
|
|
|
exported_content = exporter.export() |
|
|
filename = f"lineage_export_{export_format.lower()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" |
|
|
|
|
|
return exported_content, f"Exported to {export_format} format. Download the file below." |
|
|
|
|
|
except Exception as e: |
|
|
return "", f"Export error: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_sample(sample_type: str) -> str: |
|
|
"""Load a sample file.""" |
|
|
filename = SAMPLE_FILES.get(sample_type) |
|
|
if not filename: |
|
|
return json.dumps({"error": f"Unknown sample type: {sample_type}"}) |
|
|
|
|
|
filepath = os.path.join(os.path.dirname(__file__), "samples", filename) |
|
|
try: |
|
|
with open(filepath, "r") as f: |
|
|
return f.read() |
|
|
except Exception as e: |
|
|
return json.dumps({"error": f"Could not load sample: {str(e)}"}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_lineage_from_text( |
|
|
metadata_text: str, |
|
|
source_type: str, |
|
|
visualization_format: str, |
|
|
use_mcp: bool = False, |
|
|
mcp_url: str = "", |
|
|
mcp_query: str = "" |
|
|
) -> Tuple[str, str]: |
|
|
"""Extract lineage from provided metadata text, optionally using MCP server.""" |
|
|
|
|
|
|
|
|
if use_mcp and mcp_url: |
|
|
if is_local_mcp(mcp_url): |
|
|
|
|
|
mcp_metadata, mcp_summary = call_local_mcp("get_sample_lineage", mcp_query or source_type) |
|
|
if mcp_metadata: |
|
|
|
|
|
if EXPORTERS_AVAILABLE: |
|
|
graph, _ = parse_metadata_to_graph(mcp_metadata, "MCP Response") |
|
|
mermaid_code = generate_mermaid_from_graph(graph) |
|
|
return render_mermaid(mermaid_code), f"[MCP] {mcp_summary}" |
|
|
else: |
|
|
|
|
|
return "", f"External MCP servers require proper MCP client. Use Local Demo MCP for testing." |
|
|
|
|
|
|
|
|
if not metadata_text.strip(): |
|
|
return "", "Please provide metadata content or enable MCP to fetch sample data." |
|
|
|
|
|
if EXPORTERS_AVAILABLE: |
|
|
graph, summary = parse_metadata_to_graph(metadata_text, source_type) |
|
|
mermaid_code = generate_mermaid_from_graph(graph) |
|
|
return render_mermaid(mermaid_code), summary |
|
|
else: |
|
|
|
|
|
viz = "graph TD\n A[Sample Node] --> B[Output Node]" |
|
|
return render_mermaid(viz), f"Processed {source_type} metadata." |
|
|
|
|
|
|
|
|
def extract_lineage_from_bigquery( |
|
|
project_id: str, |
|
|
query: str, |
|
|
api_key: str, |
|
|
visualization_format: str |
|
|
) -> Tuple[str, str]: |
|
|
"""Extract lineage from BigQuery (local processing).""" |
|
|
|
|
|
viz = f"""graph TD |
|
|
subgraph BigQuery Project: {project_id or 'your-project'} |
|
|
A[Source Tables] --> B[Query Execution] |
|
|
B --> C[Destination Table] |
|
|
end |
|
|
style A fill:#e1f5fe |
|
|
style B fill:#fff3e0 |
|
|
style C fill:#e8f5e9""" |
|
|
return render_mermaid(viz), f"BigQuery lineage from project: {project_id or 'not specified'}" |
|
|
|
|
|
|
|
|
def extract_lineage_from_url( |
|
|
url: str, |
|
|
visualization_format: str |
|
|
) -> Tuple[str, str]: |
|
|
"""Extract lineage from URL/API endpoint (local processing).""" |
|
|
|
|
|
if url: |
|
|
try: |
|
|
resp = requests.get(url, timeout=10) |
|
|
if resp.status_code == 200: |
|
|
return extract_lineage_from_text(resp.text, "API Response", visualization_format) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
viz = "graph TD\n A[API Source] --> B[Data Pipeline] --> C[Output]" |
|
|
return render_mermaid(viz), f"Lineage from URL: {url or 'not specified'}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LINEAGE_AGENT_PROMPT = """You are a Data Lineage Assistant powered by the Lineage Graph Accelerator tool. |
|
|
You help users understand, extract, and visualize data lineage from various sources. |
|
|
|
|
|
Your capabilities: |
|
|
1. **Extract Lineage**: Parse metadata from dbt manifests, Airflow DAGs, SQL DDL, and custom JSON |
|
|
2. **Explain Lineage**: Help users understand data flow and dependencies |
|
|
3. **Generate Metadata**: Create lineage JSON from natural language descriptions |
|
|
4. **Export Guidance**: Advise on exporting to data catalogs (OpenLineage, Collibra, Purview, Alation, Atlas) |
|
|
|
|
|
When users describe their data pipeline, generate valid JSON lineage in this format: |
|
|
```json |
|
|
{ |
|
|
"nodes": [ |
|
|
{"id": "unique_id", "type": "source|table|model|view|report", "name": "Display Name"} |
|
|
], |
|
|
"edges": [ |
|
|
{"from": "source_id", "to": "target_id"} |
|
|
] |
|
|
} |
|
|
``` |
|
|
|
|
|
Node types: source, table, model, view, report, dimension, fact, destination, task |
|
|
|
|
|
Be helpful, concise, and always offer to generate lineage JSON when users describe data flows. |
|
|
If the user provides metadata or describes a pipeline, generate the JSON they can paste into the tool.""" |
|
|
|
|
|
|
|
|
def init_gemini(api_key: str) -> bool: |
|
|
"""Initialize Gemini with the provided API key.""" |
|
|
if not GEMINI_AVAILABLE: |
|
|
return False |
|
|
if not api_key: |
|
|
return False |
|
|
try: |
|
|
genai.configure(api_key=api_key) |
|
|
return True |
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
|
|
|
def chat_with_gemini( |
|
|
message: str, |
|
|
history: List[Dict[str, str]], |
|
|
api_key: str |
|
|
) -> Tuple[List[Dict[str, str]], str]: |
|
|
"""Chat with Gemini about data lineage.""" |
|
|
if not GEMINI_AVAILABLE: |
|
|
return history + [ |
|
|
{"role": "user", "content": message}, |
|
|
{"role": "assistant", "content": "Google Gemini is not available. Please install google-generativeai package."} |
|
|
], "" |
|
|
|
|
|
if not api_key: |
|
|
return history + [ |
|
|
{"role": "user", "content": message}, |
|
|
{"role": "assistant", "content": "Please enter your Google Gemini API key to use the chatbot. You can get one at https://makersuite.google.com/app/apikey"} |
|
|
], "" |
|
|
|
|
|
try: |
|
|
genai.configure(api_key=api_key) |
|
|
|
|
|
|
|
|
model = genai.GenerativeModel('models/gemini-2.0-flash-001') |
|
|
|
|
|
|
|
|
context_parts = [LINEAGE_AGENT_PROMPT, "\n\nConversation history:"] |
|
|
for msg in history[-6:]: |
|
|
role = "User" if msg.get("role") == "user" else "Assistant" |
|
|
context_parts.append(f"{role}: {msg.get('content', '')}") |
|
|
|
|
|
context_parts.append(f"\nUser: {message}\nAssistant:") |
|
|
|
|
|
|
|
|
full_prompt = "\n".join(context_parts) |
|
|
response = model.generate_content(full_prompt) |
|
|
|
|
|
assistant_message = response.text |
|
|
|
|
|
|
|
|
extracted_json = "" |
|
|
if "```json" in assistant_message: |
|
|
try: |
|
|
json_start = assistant_message.find("```json") + 7 |
|
|
json_end = assistant_message.find("```", json_start) |
|
|
if json_end > json_start: |
|
|
extracted_json = assistant_message[json_start:json_end].strip() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
new_history = history + [ |
|
|
{"role": "user", "content": message}, |
|
|
{"role": "assistant", "content": assistant_message} |
|
|
] |
|
|
|
|
|
return new_history, extracted_json |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error communicating with Gemini: {str(e)}" |
|
|
return history + [ |
|
|
{"role": "user", "content": message}, |
|
|
{"role": "assistant", "content": error_msg} |
|
|
], "" |
|
|
|
|
|
|
|
|
def use_generated_json(json_text: str) -> Tuple[str, str, str]: |
|
|
"""Use the generated JSON in the lineage extractor.""" |
|
|
if not json_text.strip(): |
|
|
return "", "", "No JSON to use. Ask the chatbot to generate lineage JSON first." |
|
|
|
|
|
try: |
|
|
|
|
|
json.loads(json_text) |
|
|
|
|
|
return json_text, "Custom JSON", "JSON copied to metadata input. Switch to 'Text/File Metadata' tab and click 'Extract Lineage'." |
|
|
except json.JSONDecodeError as e: |
|
|
return "", "", f"Invalid JSON: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks( |
|
|
title="Lineage Graph Accelerator", |
|
|
fill_height=True |
|
|
) as demo: |
|
|
|
|
|
|
|
|
gr.HTML(""" |
|
|
<div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 24px 32px; border-radius: 12px; margin-bottom: 20px; box-shadow: 0 4px 15px rgba(102, 126, 234, 0.3);"> |
|
|
<div style="display: flex; justify-content: space-between; align-items: center; flex-wrap: wrap; gap: 16px;"> |
|
|
<div> |
|
|
<h1 style="color: white; margin: 0 0 8px 0; font-size: 2em; font-weight: 700;">Lineage Graph Accelerator</h1> |
|
|
<p style="color: rgba(255,255,255,0.9); margin: 0; font-size: 1.1em;">AI-powered data lineage extraction and visualization for modern data platforms</p> |
|
|
</div> |
|
|
<div style="display: flex; gap: 12px; flex-wrap: wrap;"> |
|
|
<a href="https://aamanlamba.com" target="_blank" style="background: rgba(255,255,255,0.2); color: white; padding: 8px 16px; border-radius: 6px; text-decoration: none; font-weight: 500; transition: background 0.2s;">By Aaman Lamba</a> |
|
|
<a href="https://github.com/aamanlamba" target="_blank" style="background: rgba(255,255,255,0.2); color: white; padding: 8px 16px; border-radius: 6px; text-decoration: none; font-weight: 500;">GitHub</a> |
|
|
</div> |
|
|
</div> |
|
|
<div style="margin-top: 16px; padding-top: 16px; border-top: 1px solid rgba(255,255,255,0.2);"> |
|
|
<span style="background: rgba(255,255,255,0.25); color: white; padding: 4px 10px; border-radius: 12px; font-size: 0.85em; margin-right: 8px;">Gradio 6</span> |
|
|
<span style="background: rgba(255,255,255,0.25); color: white; padding: 4px 10px; border-radius: 12px; font-size: 0.85em; margin-right: 8px;">MCP Integration</span> |
|
|
<span style="background: rgba(255,255,255,0.25); color: white; padding: 4px 10px; border-radius: 12px; font-size: 0.85em; margin-right: 8px;">Gemini AI</span> |
|
|
<span style="background: rgba(255,255,255,0.25); color: white; padding: 4px 10px; border-radius: 12px; font-size: 0.85em;">5 Export Formats</span> |
|
|
</div> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
gr.Markdown(""" |
|
|
### What You Can Do |
|
|
|
|
|
| Feature | Description | |
|
|
|---------|-------------| |
|
|
| **Extract Lineage** | Parse metadata from dbt manifests, Airflow DAGs, SQL DDL, BigQuery, and custom JSON | |
|
|
| **Visualize** | Generate interactive Mermaid diagrams with color-coded nodes and relationship labels | |
|
|
| **Export** | Export to enterprise data catalogs: OpenLineage, Collibra, Purview, Alation, Atlas | |
|
|
| **MCP Integration** | Connect to MCP servers for AI-powered metadata extraction | |
|
|
| **AI Assistant** | Chat with Gemini to generate lineage from natural language descriptions | |
|
|
|
|
|
### Quick Start |
|
|
|
|
|
1. **Try the Demo**: Enable "Use MCP Server" and select "Local Demo MCP" to fetch sample lineage metadata |
|
|
2. **Use Your Data**: Paste your dbt manifest, Airflow DAG, or custom JSON in the Text/File tab |
|
|
3. **Load Samples**: Click "Load Sample" in the Demo Gallery to explore pre-built examples |
|
|
4. **Export**: Use the Export section to generate catalog-ready JSON |
|
|
|
|
|
--- |
|
|
""") |
|
|
|
|
|
|
|
|
with gr.Accordion("MCP Server Configuration", open=True): |
|
|
gr.Markdown(""" |
|
|
**Connect to MCP Servers** to fetch metadata for lineage extraction. |
|
|
Use the built-in **Local Demo MCP** for testing, or connect to external servers on HuggingFace. |
|
|
""") |
|
|
with gr.Row(): |
|
|
use_mcp_checkbox = gr.Checkbox( |
|
|
label="Use MCP Server for Metadata", |
|
|
value=False, |
|
|
info="Enable to fetch lineage metadata from MCP server instead of local input" |
|
|
) |
|
|
mcp_preset = gr.Dropdown( |
|
|
choices=[ |
|
|
("-- Select Preset --", ""), |
|
|
("Local Demo MCP (Built-in)", "local_demo"), |
|
|
("MCP Tools by abidlabs", "mcp_tools"), |
|
|
("HuggingFace MCP by dylanebert", "huggingface_mcp"), |
|
|
("Ragmint RAG Pipeline", "ragmint"), |
|
|
("Web Search MCP", "web_search"), |
|
|
], |
|
|
label="Preset MCP Servers", |
|
|
value="", |
|
|
scale=2 |
|
|
) |
|
|
with gr.Row(): |
|
|
mcp_server = gr.Textbox( |
|
|
label="MCP Server URL", |
|
|
placeholder="Select a preset or enter custom URL", |
|
|
info="local://demo for built-in demo, or external MCP URL", |
|
|
scale=3 |
|
|
) |
|
|
mcp_query = gr.Textbox( |
|
|
label="MCP Query (Optional)", |
|
|
placeholder="e.g., 'ecommerce', 'ml pipeline', 'warehouse'", |
|
|
info="Query to filter metadata from MCP server", |
|
|
scale=2 |
|
|
) |
|
|
with gr.Row(): |
|
|
mcp_api_key = gr.Textbox( |
|
|
label="API Key (Optional)", |
|
|
placeholder="API key if required", |
|
|
type="password", |
|
|
scale=2 |
|
|
) |
|
|
test_btn = gr.Button("Test Connection", size="sm", scale=1) |
|
|
mcp_description = gr.Textbox(label="Server Description", interactive=False, lines=2) |
|
|
mcp_status = gr.Textbox(label="Connection Status", interactive=False) |
|
|
|
|
|
|
|
|
mcp_preset.change(fn=get_preset_url, inputs=[mcp_preset], outputs=[mcp_server]) |
|
|
mcp_preset.change(fn=get_preset_description, inputs=[mcp_preset], outputs=[mcp_description]) |
|
|
test_btn.click(fn=test_mcp_connection, inputs=[mcp_server, mcp_api_key], outputs=[mcp_status]) |
|
|
|
|
|
|
|
|
with gr.Tabs(): |
|
|
|
|
|
with gr.Tab("Text/File Metadata", id="text"): |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### Input") |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
sample_selector = gr.Dropdown( |
|
|
choices=[ |
|
|
("Simple JSON", "simple"), |
|
|
("dbt Manifest", "dbt"), |
|
|
("Airflow DAG", "airflow"), |
|
|
("SQL DDL", "sql"), |
|
|
("Data Warehouse", "warehouse"), |
|
|
("ETL Pipeline", "etl"), |
|
|
("Complex Demo", "complex") |
|
|
], |
|
|
label="Load Sample Data", |
|
|
value="simple" |
|
|
) |
|
|
load_sample_btn = gr.Button("Load Sample", size="sm") |
|
|
|
|
|
metadata_input = gr.Textbox( |
|
|
label="Metadata Content", |
|
|
placeholder="Paste your metadata here (JSON, YAML, SQL, dbt manifest, Airflow DAG, etc.)", |
|
|
lines=18 |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
source_type = gr.Dropdown( |
|
|
choices=["dbt Manifest", "Airflow DAG", "SQL DDL", "Data Warehouse", "ETL Pipeline", "Custom JSON", "Other"], |
|
|
label="Source Type", |
|
|
value="Custom JSON" |
|
|
) |
|
|
viz_format = gr.Dropdown( |
|
|
choices=["Mermaid", "DOT/Graphviz", "Text"], |
|
|
label="Visualization Format", |
|
|
value="Mermaid" |
|
|
) |
|
|
|
|
|
extract_btn = gr.Button("Extract Lineage", variant="primary", size="lg") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### Visualization") |
|
|
output_viz = gr.HTML(label="Lineage Graph") |
|
|
output_summary = gr.Textbox(label="Summary", lines=3) |
|
|
|
|
|
|
|
|
with gr.Accordion("Export to Data Catalog", open=False): |
|
|
export_format = gr.Dropdown( |
|
|
choices=EXPORT_FORMATS, |
|
|
label="Export Format", |
|
|
value="OpenLineage" |
|
|
) |
|
|
export_btn = gr.Button("Generate Export", variant="secondary") |
|
|
export_output = gr.Code(label="Export Content", language="json", lines=10) |
|
|
export_status = gr.Textbox(label="Export Status", interactive=False) |
|
|
|
|
|
|
|
|
load_sample_btn.click( |
|
|
fn=load_sample, |
|
|
inputs=[sample_selector], |
|
|
outputs=[metadata_input] |
|
|
) |
|
|
|
|
|
extract_btn.click( |
|
|
fn=extract_lineage_from_text, |
|
|
inputs=[metadata_input, source_type, viz_format, use_mcp_checkbox, mcp_server, mcp_query], |
|
|
outputs=[output_viz, output_summary] |
|
|
) |
|
|
|
|
|
export_btn.click( |
|
|
fn=export_lineage, |
|
|
inputs=[metadata_input, source_type, export_format], |
|
|
outputs=[export_output, export_status] |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("BigQuery", id="bigquery"): |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
bq_project = gr.Textbox( |
|
|
label="Project ID", |
|
|
placeholder="your-gcp-project-id" |
|
|
) |
|
|
bq_query = gr.Textbox( |
|
|
label="Metadata Query", |
|
|
placeholder="SELECT * FROM `project.dataset.INFORMATION_SCHEMA.TABLES`", |
|
|
lines=10 |
|
|
) |
|
|
load_bq_sample = gr.Button("Load Sample Query", size="sm") |
|
|
bq_creds = gr.Textbox( |
|
|
label="Service Account JSON (optional)", |
|
|
type="password" |
|
|
) |
|
|
bq_viz_format = gr.Dropdown( |
|
|
choices=["Mermaid", "DOT/Graphviz", "Text"], |
|
|
label="Visualization Format", |
|
|
value="Mermaid" |
|
|
) |
|
|
bq_extract_btn = gr.Button("Extract Lineage", variant="primary") |
|
|
|
|
|
with gr.Column(): |
|
|
bq_output_viz = gr.HTML(label="Lineage Graph") |
|
|
bq_output_summary = gr.Textbox(label="Summary", lines=3) |
|
|
|
|
|
load_bq_sample.click( |
|
|
fn=lambda: load_sample("bigquery"), |
|
|
outputs=[bq_query] |
|
|
) |
|
|
|
|
|
bq_extract_btn.click( |
|
|
fn=extract_lineage_from_bigquery, |
|
|
inputs=[bq_project, bq_query, bq_creds, bq_viz_format], |
|
|
outputs=[bq_output_viz, bq_output_summary] |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("URL/API", id="url"): |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
url_input = gr.Textbox( |
|
|
label="Metadata URL", |
|
|
placeholder="https://api.example.com/metadata" |
|
|
) |
|
|
load_url_sample = gr.Button("Load Sample API Metadata", size="sm") |
|
|
url_viz_format = gr.Dropdown( |
|
|
choices=["Mermaid", "DOT/Graphviz", "Text"], |
|
|
label="Visualization Format", |
|
|
value="Mermaid" |
|
|
) |
|
|
url_extract_btn = gr.Button("Extract Lineage", variant="primary") |
|
|
|
|
|
with gr.Column(): |
|
|
url_output_viz = gr.HTML(label="Lineage Graph") |
|
|
url_output_summary = gr.Textbox(label="Summary", lines=3) |
|
|
|
|
|
load_url_sample.click( |
|
|
fn=lambda: load_sample("api"), |
|
|
outputs=[url_input] |
|
|
) |
|
|
|
|
|
url_extract_btn.click( |
|
|
fn=extract_lineage_from_url, |
|
|
inputs=[url_input, url_viz_format], |
|
|
outputs=[url_output_viz, url_output_summary] |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("Demo Gallery", id="gallery"): |
|
|
gr.Markdown(""" |
|
|
## Sample Lineage Visualizations |
|
|
|
|
|
Click any example below to see the lineage visualization. |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
demo_simple = gr.Button("E-Commerce (Simple)") |
|
|
demo_dbt = gr.Button("dbt Project") |
|
|
demo_airflow = gr.Button("Airflow DAG") |
|
|
with gr.Row(): |
|
|
demo_warehouse = gr.Button("Data Warehouse") |
|
|
demo_etl = gr.Button("ETL Pipeline") |
|
|
demo_complex = gr.Button("Complex Platform") |
|
|
|
|
|
demo_viz = gr.HTML(label="Demo Visualization") |
|
|
demo_summary = gr.Textbox(label="Description", lines=2) |
|
|
|
|
|
|
|
|
for btn, sample_type in [(demo_simple, "simple"), (demo_dbt, "dbt"), |
|
|
(demo_airflow, "airflow"), (demo_warehouse, "warehouse"), |
|
|
(demo_etl, "etl"), (demo_complex, "complex")]: |
|
|
btn.click( |
|
|
fn=lambda st=sample_type: extract_lineage_from_text( |
|
|
load_sample(st), |
|
|
st.replace("_", " ").title(), |
|
|
"Mermaid" |
|
|
), |
|
|
outputs=[demo_viz, demo_summary] |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("AI Assistant", id="chatbot"): |
|
|
gr.Markdown(""" |
|
|
## Lineage AI Assistant (Powered by Google Gemini) |
|
|
|
|
|
Ask questions about data lineage, describe your data pipeline in natural language, |
|
|
and get JSON metadata you can use to visualize lineage. |
|
|
|
|
|
**Examples:** |
|
|
- "I have a PostgreSQL database that feeds into a Spark ETL job, which outputs to a Snowflake warehouse" |
|
|
- "Generate lineage for a dbt project with staging, intermediate, and mart layers" |
|
|
- "What's the best way to document column-level lineage?" |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
gemini_api_key = gr.Textbox( |
|
|
label="Google Gemini API Key", |
|
|
placeholder="Enter your Gemini API key (get one at makersuite.google.com)", |
|
|
type="password", |
|
|
info="Your API key is not stored and only used for this session" |
|
|
) |
|
|
|
|
|
chatbot_display = gr.Chatbot( |
|
|
label="Chat with Lineage AI", |
|
|
height=400 |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
chat_input = gr.Textbox( |
|
|
label="Your message", |
|
|
placeholder="Describe your data pipeline or ask about lineage...", |
|
|
lines=2, |
|
|
scale=4 |
|
|
) |
|
|
send_btn = gr.Button("Send", variant="primary", scale=1) |
|
|
|
|
|
with gr.Accordion("Generated JSON (if any)", open=False): |
|
|
generated_json = gr.Code( |
|
|
label="Extracted JSON", |
|
|
language="json", |
|
|
lines=10 |
|
|
) |
|
|
use_json_btn = gr.Button("Use This JSON in Lineage Tool", size="sm") |
|
|
json_status = gr.Textbox(label="Status", interactive=False) |
|
|
|
|
|
|
|
|
chat_state = gr.State([]) |
|
|
|
|
|
def handle_chat(message, history, api_key): |
|
|
if not message.strip(): |
|
|
return history, "", history |
|
|
new_history, extracted = chat_with_gemini(message, history, api_key) |
|
|
return new_history, extracted, new_history |
|
|
|
|
|
send_btn.click( |
|
|
fn=handle_chat, |
|
|
inputs=[chat_input, chat_state, gemini_api_key], |
|
|
outputs=[chatbot_display, generated_json, chat_state] |
|
|
).then( |
|
|
fn=lambda: "", |
|
|
outputs=[chat_input] |
|
|
) |
|
|
|
|
|
chat_input.submit( |
|
|
fn=handle_chat, |
|
|
inputs=[chat_input, chat_state, gemini_api_key], |
|
|
outputs=[chatbot_display, generated_json, chat_state] |
|
|
).then( |
|
|
fn=lambda: "", |
|
|
outputs=[chat_input] |
|
|
) |
|
|
|
|
|
use_json_btn.click( |
|
|
fn=use_generated_json, |
|
|
inputs=[generated_json], |
|
|
outputs=[metadata_input, source_type, json_status] |
|
|
) |
|
|
|
|
|
|
|
|
gr.Markdown(""" |
|
|
--- |
|
|
|
|
|
### Export Formats Supported |
|
|
|
|
|
| Format | Description | Use Case | |
|
|
|--------|-------------|----------| |
|
|
| **OpenLineage** | Open standard for lineage | Universal compatibility | |
|
|
| **Collibra** | Collibra Data Intelligence | Enterprise data governance | |
|
|
| **Purview** | Microsoft Purview | Azure ecosystem | |
|
|
| **Alation** | Alation Data Catalog | Self-service analytics | |
|
|
| **Atlas** | Apache Atlas | Open-source governance | |
|
|
|
|
|
--- |
|
|
|
|
|
### 🎥 Watch the Demo |
|
|
|
|
|
See all features in action: [YouTube Demo Video (2:30)](https://youtu.be/U4Dfc7txa_0) |
|
|
|
|
|
--- |
|
|
|
|
|
Built with Gradio for the **Gradio Agents & MCP Hackathon - Winter 2025** by [Aaman Lamba](https://aamanlamba.com) |
|
|
|
|
|
[GitHub](https://github.com/aamanlamba/lineage-graph-accelerator) | [Documentation](https://huggingface.co/spaces/aamanlamba/Lineage-graph-accelerator/blob/main/USER_GUIDE.md) | [HuggingFace Space](https://huggingface.co/spaces/aamanlamba/Lineage-graph-accelerator) |
|
|
""") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(ssr_mode=False) |
|
|
|