aamanlamba's picture
Add YouTube demo video link to app footer
90c143f
"""
Lineage Graph Accelerator - Hugging Face Space
A Gradio-based AI agent for extracting and visualizing data lineage from various sources.
Built for the Gradio Agents & MCP Hackathon - Winter 2025
"""
import gradio as gr
import json
import os
import requests
from typing import Optional, Tuple, Dict, Any, List
from datetime import datetime
# Import exporters
try:
from exporters import (
LineageGraph, LineageNode, LineageEdge,
OpenLineageExporter, CollibraExporter, PurviewExporter, AlationExporter, AtlasExporter
)
EXPORTERS_AVAILABLE = True
except ImportError:
EXPORTERS_AVAILABLE = False
# Import Google Gemini for agentic chatbot
try:
import google.generativeai as genai
GEMINI_AVAILABLE = True
except ImportError:
GEMINI_AVAILABLE = False
# ============================================================================
# Constants and Configuration
# ============================================================================
SAMPLE_FILES = {
"simple": "sample_metadata.json",
"dbt": "dbt_manifest_sample.json",
"airflow": "airflow_dag_sample.json",
"sql": "sql_ddl_sample.sql",
"warehouse": "warehouse_lineage_sample.json",
"etl": "etl_pipeline_sample.json",
"complex": "complex_lineage_demo.json",
"api": "sample_api_metadata.json",
"bigquery": "sample_bigquery.sql"
}
EXPORT_FORMATS = ["OpenLineage", "Collibra", "Purview", "Alation", "Atlas"]
# Preset MCP Servers on HuggingFace that can provide metadata
MCP_PRESETS = {
"local_demo": {
"name": "Local Demo MCP (Built-in)",
"url": "local://demo",
"schema_url": "local://demo/schema",
"description": "Built-in demo MCP server that provides sample lineage metadata for testing",
"tools": ["get_sample_lineage", "get_dbt_metadata", "get_airflow_dag", "get_warehouse_schema"]
},
"mcp_tools": {
"name": "MCP Tools by abidlabs",
"url": "https://abidlabs-mcp-tools.hf.space/gradio_api/mcp/sse",
"schema_url": "https://abidlabs-mcp-tools.hf.space/gradio_api/mcp/schema",
"description": "Demo MCP server with utility tools for testing integration",
"tools": ["prime_factors", "generate_cheetah_image", "image_orientation"]
},
"huggingface_mcp": {
"name": "HuggingFace MCP by dylanebert",
"url": "https://dylanebert-huggingface-mcp.hf.space/gradio_api/mcp/sse",
"schema_url": "https://dylanebert-huggingface-mcp.hf.space/gradio_api/mcp/schema",
"description": "Search and explore HuggingFace models, datasets, and spaces",
"tools": ["search_models", "search_datasets", "get_model_card"]
},
"ragmint": {
"name": "Ragmint RAG Pipeline",
"url": "https://mcp-1st-birthday-ragmint-mcp-server.hf.space/gradio_api/mcp/sse",
"schema_url": "https://mcp-1st-birthday-ragmint-mcp-server.hf.space/gradio_api/mcp/schema",
"description": "RAG pipeline optimization and document retrieval",
"tools": ["optimize_rag", "retrieve_documents"]
},
"web_search": {
"name": "Web Search MCP",
"url": "https://agents-mcp-hackathon-search-web-mcp-server.hf.space/gradio_api/mcp/sse",
"schema_url": "https://agents-mcp-hackathon-search-web-mcp-server.hf.space/gradio_api/mcp/schema",
"description": "Search the web for data and documentation",
"tools": ["search_web", "fetch_page"]
}
}
# ============================================================================
# Local Demo MCP Server (Built-in)
# ============================================================================
# Sample metadata that the local MCP server can provide
LOCAL_MCP_METADATA = {
"ecommerce_pipeline": {
"name": "E-commerce Data Pipeline",
"nodes": [
{"id": "raw_orders", "type": "source", "name": "Raw Orders (PostgreSQL)"},
{"id": "raw_customers", "type": "source", "name": "Raw Customers (PostgreSQL)"},
{"id": "raw_products", "type": "source", "name": "Raw Products (API)"},
{"id": "stg_orders", "type": "model", "name": "Staging Orders"},
{"id": "stg_customers", "type": "model", "name": "Staging Customers"},
{"id": "stg_products", "type": "model", "name": "Staging Products"},
{"id": "dim_customers", "type": "dimension", "name": "Dim Customers"},
{"id": "dim_products", "type": "dimension", "name": "Dim Products"},
{"id": "fact_orders", "type": "fact", "name": "Fact Orders"},
{"id": "mart_sales", "type": "table", "name": "Sales Mart"},
{"id": "report_daily", "type": "report", "name": "Daily Sales Report"}
],
"edges": [
{"from": "raw_orders", "to": "stg_orders"},
{"from": "raw_customers", "to": "stg_customers"},
{"from": "raw_products", "to": "stg_products"},
{"from": "stg_customers", "to": "dim_customers"},
{"from": "stg_products", "to": "dim_products"},
{"from": "stg_orders", "to": "fact_orders"},
{"from": "dim_customers", "to": "fact_orders"},
{"from": "dim_products", "to": "fact_orders"},
{"from": "fact_orders", "to": "mart_sales"},
{"from": "mart_sales", "to": "report_daily"}
]
},
"ml_pipeline": {
"name": "ML Feature Pipeline",
"nodes": [
{"id": "raw_events", "type": "source", "name": "Event Stream (Kafka)"},
{"id": "raw_user_data", "type": "source", "name": "User Data (S3)"},
{"id": "feature_eng", "type": "model", "name": "Feature Engineering"},
{"id": "feature_store", "type": "table", "name": "Feature Store"},
{"id": "training_data", "type": "table", "name": "Training Dataset"},
{"id": "model_output", "type": "destination", "name": "Model Predictions"}
],
"edges": [
{"from": "raw_events", "to": "feature_eng"},
{"from": "raw_user_data", "to": "feature_eng"},
{"from": "feature_eng", "to": "feature_store"},
{"from": "feature_store", "to": "training_data"},
{"from": "training_data", "to": "model_output"}
]
},
"data_warehouse": {
"name": "Data Warehouse Schema",
"nodes": [
{"id": "src_crm", "type": "source", "name": "CRM System"},
{"id": "src_erp", "type": "source", "name": "ERP System"},
{"id": "src_web", "type": "source", "name": "Web Analytics"},
{"id": "landing_crm", "type": "table", "name": "Landing CRM"},
{"id": "landing_erp", "type": "table", "name": "Landing ERP"},
{"id": "landing_web", "type": "table", "name": "Landing Web"},
{"id": "dwh_customers", "type": "dimension", "name": "DWH Customers"},
{"id": "dwh_transactions", "type": "fact", "name": "DWH Transactions"},
{"id": "bi_dashboard", "type": "report", "name": "BI Dashboard"}
],
"edges": [
{"from": "src_crm", "to": "landing_crm"},
{"from": "src_erp", "to": "landing_erp"},
{"from": "src_web", "to": "landing_web"},
{"from": "landing_crm", "to": "dwh_customers"},
{"from": "landing_erp", "to": "dwh_transactions"},
{"from": "landing_web", "to": "dwh_transactions"},
{"from": "dwh_customers", "to": "dwh_transactions"},
{"from": "dwh_transactions", "to": "bi_dashboard"}
]
}
}
def local_mcp_get_metadata(tool_name: str, query: str = "") -> Dict[str, Any]:
"""Simulate a local MCP server that returns sample metadata."""
if tool_name == "get_sample_lineage" or tool_name == "search":
# Return a random or query-matched sample
if "ecommerce" in query.lower() or "sales" in query.lower():
return LOCAL_MCP_METADATA["ecommerce_pipeline"]
elif "ml" in query.lower() or "feature" in query.lower():
return LOCAL_MCP_METADATA["ml_pipeline"]
elif "warehouse" in query.lower() or "dwh" in query.lower():
return LOCAL_MCP_METADATA["data_warehouse"]
else:
# Default to ecommerce
return LOCAL_MCP_METADATA["ecommerce_pipeline"]
elif tool_name == "get_dbt_metadata":
return LOCAL_MCP_METADATA["ecommerce_pipeline"]
elif tool_name == "get_airflow_dag":
return LOCAL_MCP_METADATA["ml_pipeline"]
elif tool_name == "get_warehouse_schema":
return LOCAL_MCP_METADATA["data_warehouse"]
elif tool_name == "list_datasets":
return {"datasets": list(LOCAL_MCP_METADATA.keys())}
else:
return LOCAL_MCP_METADATA["ecommerce_pipeline"]
def is_local_mcp(url: str) -> bool:
"""Check if the URL is for the local demo MCP server."""
return url and url.startswith("local://")
def call_local_mcp(tool_name: str, query: str = "") -> Tuple[str, str]:
"""Call the local MCP server and return metadata as JSON string."""
metadata = local_mcp_get_metadata(tool_name, query)
return json.dumps(metadata, indent=2), f"Fetched '{metadata.get('name', 'lineage')}' from Local Demo MCP"
# ============================================================================
# Mermaid Rendering
# ============================================================================
import base64
import urllib.parse
def render_mermaid(viz_code: str) -> str:
"""Render mermaid diagram using mermaid.ink service (renders as SVG image)."""
# Encode the mermaid code for the URL
# mermaid.ink accepts base64 encoded diagram
encoded = base64.urlsafe_b64encode(viz_code.encode('utf-8')).decode('utf-8')
# Create the mermaid.ink URL for SVG rendering
img_url = f"https://mermaid.ink/svg/{encoded}"
# PNG version for download
png_url = f"https://mermaid.ink/img/{encoded}"
# Also create a link to the live editor for users who want to modify
editor_url = f"https://mermaid.live/edit#base64:{base64.b64encode(viz_code.encode('utf-8')).decode('utf-8')}"
html = f'''
<div style="background: white; padding: 20px; border-radius: 8px; min-height: 200px;">
<div style="overflow: auto; max-height: 500px; border: 1px solid #e0e0e0; border-radius: 4px; padding: 10px;">
<img id="lineage-graph" src="{img_url}" alt="Lineage Graph" style="max-width: 100%; height: auto; cursor: zoom-in;" onclick="this.style.maxWidth = this.style.maxWidth === 'none' ? '100%' : 'none'; this.style.cursor = this.style.cursor === 'zoom-in' ? 'zoom-out' : 'zoom-in';" />
</div>
<div style="margin-top: 12px; display: flex; gap: 16px; flex-wrap: wrap; align-items: center;">
<a href="{editor_url}" target="_blank" style="color: #7c3aed; text-decoration: none; font-size: 13px;">
Edit in Mermaid Live
</a>
<a href="{png_url}" download="lineage_graph.png" style="color: #2563eb; text-decoration: none; font-size: 13px;">
Download PNG
</a>
<a href="{img_url}" download="lineage_graph.svg" style="color: #059669; text-decoration: none; font-size: 13px;">
Download SVG
</a>
<span style="color: #888; font-size: 12px; margin-left: auto;">Click graph to zoom</span>
</div>
</div>
'''
return html
def render_mermaid_code(viz_code: str) -> str:
"""Return the raw mermaid code for display."""
return viz_code
# ============================================================================
# Lineage Parsing and Visualization Generation
# ============================================================================
def parse_metadata_to_graph(metadata_text: str, source_type: str) -> Tuple[LineageGraph, str]:
"""Parse metadata text into a LineageGraph structure."""
try:
# Try to parse as JSON first
if metadata_text.strip().startswith('{') or metadata_text.strip().startswith('['):
data = json.loads(metadata_text)
else:
# For SQL or other text formats, create a simple structure
data = {"raw_content": metadata_text, "source_type": source_type}
graph = LineageGraph(name=f"Lineage from {source_type}")
# Handle different formats
if "lineage_graph" in data:
# Complex lineage demo format
lg = data["lineage_graph"]
for node_data in lg.get("nodes", []):
node = LineageNode(
id=node_data.get("id"),
name=node_data.get("name"),
type=node_data.get("type", "table"),
category=node_data.get("category"),
description=node_data.get("description"),
metadata=node_data.get("metadata"),
tags=node_data.get("tags")
)
graph.add_node(node)
for edge_data in lg.get("edges", []):
edge = LineageEdge(
source=edge_data.get("from"),
target=edge_data.get("to"),
type=edge_data.get("type", "transform")
)
graph.add_edge(edge)
elif "nodes" in data and "edges" in data:
# Simple node/edge format
for node_data in data.get("nodes", []):
node = LineageNode(
id=node_data.get("id"),
name=node_data.get("name", node_data.get("id")),
type=node_data.get("type", "table")
)
graph.add_node(node)
for edge_data in data.get("edges", []):
edge = LineageEdge(
source=edge_data.get("from"),
target=edge_data.get("to"),
type=edge_data.get("type", "transform")
)
graph.add_edge(edge)
elif "nodes" in data:
# dbt manifest format
for node_id, node_data in data.get("nodes", {}).items():
node = LineageNode(
id=node_id,
name=node_data.get("name", node_id.split(".")[-1]),
type=node_data.get("resource_type", "model"),
schema=node_data.get("schema"),
database=node_data.get("database"),
description=node_data.get("description")
)
graph.add_node(node)
# Add edges from depends_on
deps = node_data.get("depends_on", {}).get("nodes", [])
for dep in deps:
edge = LineageEdge(source=dep, target=node_id, type="transform")
graph.add_edge(edge)
elif "tasks" in data:
# Airflow DAG format
for task in data.get("tasks", []):
node = LineageNode(
id=task.get("task_id"),
name=task.get("task_id"),
type="task",
description=task.get("description")
)
graph.add_node(node)
# Add edges from upstream dependencies
for dep in task.get("upstream_dependencies", []):
edge = LineageEdge(source=dep, target=task.get("task_id"), type="dependency")
graph.add_edge(edge)
elif "lineage" in data:
# Warehouse lineage format
lineage = data.get("lineage", {})
for dataset in lineage.get("datasets", []):
node = LineageNode(
id=dataset.get("id"),
name=dataset.get("name", dataset.get("id")),
type=dataset.get("type", "table"),
schema=dataset.get("schema"),
database=dataset.get("database"),
description=dataset.get("description"),
owner=dataset.get("owner"),
tags=dataset.get("tags")
)
graph.add_node(node)
for rel in lineage.get("relationships", []):
edge = LineageEdge(
source=rel.get("source"),
target=rel.get("target"),
type=rel.get("type", "transform"),
job_name=rel.get("job")
)
graph.add_edge(edge)
elif "stages" in data:
# ETL pipeline format
for stage in data.get("stages", []):
for step in stage.get("steps", []):
node = LineageNode(
id=step.get("id"),
name=step.get("name", step.get("id")),
type="step",
category=stage.get("id"),
description=step.get("description") or step.get("logic")
)
graph.add_node(node)
# Add edges from inputs
for inp in step.get("inputs", []):
edge = LineageEdge(source=inp, target=step.get("id"), type="transform")
graph.add_edge(edge)
else:
# Fallback: create sample nodes
graph.add_node(LineageNode(id="source", name="Source", type="source"))
graph.add_node(LineageNode(id="target", name="Target", type="table"))
graph.add_edge(LineageEdge(source="source", target="target", type="transform"))
summary = f"Parsed {len(graph.nodes)} nodes and {len(graph.edges)} relationships from {source_type}"
return graph, summary
except json.JSONDecodeError as e:
# Handle SQL or plain text
graph = LineageGraph(name=f"Lineage from {source_type}")
graph.add_node(LineageNode(id="input", name="Input Data", type="source"))
graph.add_node(LineageNode(id="output", name="Output Data", type="table"))
graph.add_edge(LineageEdge(source="input", target="output", type="transform"))
return graph, f"Created placeholder lineage (could not parse as JSON: {str(e)[:50]})"
except Exception as e:
graph = LineageGraph(name="Error")
return graph, f"Error parsing metadata: {str(e)}"
def sanitize_mermaid_text(text: str) -> str:
"""Sanitize text for use in Mermaid diagrams by escaping special characters."""
if not text:
return "Unknown"
# Replace characters that conflict with Mermaid syntax
# Parentheses conflict with node shapes, brackets with labels
text = text.replace("(", " - ").replace(")", "")
text = text.replace("[", " ").replace("]", " ")
text = text.replace("{", " ").replace("}", " ")
text = text.replace('"', "'")
text = text.replace("<", "").replace(">", "")
text = text.replace("#", "")
return text.strip()
def generate_mermaid_from_graph(graph: LineageGraph) -> str:
"""Generate Mermaid diagram code from a LineageGraph."""
if not graph.nodes:
return "graph TD\n A[No data to display]"
lines = [
"%%{init: {'theme': 'base', 'themeVariables': {'fontSize': '12px', 'fontFamily': 'arial', 'primaryColor': '#e8f5e9', 'primaryBorderColor': '#4caf50', 'lineColor': '#666'}}}%%",
"graph TD"
]
# Group nodes by category for subgraphs
categories = {}
for node in graph.nodes:
cat = node.category or "default"
if cat not in categories:
categories[cat] = []
categories[cat].append(node)
# Generate nodes with styling - compact with rounded corners and subtle borders
node_styles = {
"source": "fill:#e3f2fd,stroke:#1976d2,stroke-width:1px,rx:5,ry:5",
"external_api": "fill:#e3f2fd,stroke:#1976d2,stroke-width:1px,rx:5,ry:5",
"table": "fill:#e8f5e9,stroke:#388e3c,stroke-width:1px,rx:5,ry:5",
"view": "fill:#f3e5f5,stroke:#7b1fa2,stroke-width:1px,rx:5,ry:5",
"model": "fill:#fff3e0,stroke:#f57c00,stroke-width:1px,rx:5,ry:5",
"report": "fill:#fce4ec,stroke:#c2185b,stroke-width:1px,rx:5,ry:5",
"dimension": "fill:#e0f7fa,stroke:#0097a7,stroke-width:1px,rx:5,ry:5",
"fact": "fill:#fff8e1,stroke:#ffa000,stroke-width:1px,rx:5,ry:5",
"destination": "fill:#ffebee,stroke:#d32f2f,stroke-width:1px,rx:5,ry:5",
"task": "fill:#fafafa,stroke:#616161,stroke-width:1px,rx:5,ry:5"
}
# Add subgraphs for categories
if len(categories) > 1:
for cat, nodes in categories.items():
if cat != "default":
lines.append(f" subgraph {sanitize_mermaid_text(cat.replace('_', ' ').title())}")
for node in nodes:
safe_name = sanitize_mermaid_text(node.name)
shape = f"[{safe_name}]" if node.type in ["table", "model"] else f"({safe_name})"
lines.append(f" {node.id}{shape}")
lines.append(" end")
else:
for node in nodes:
safe_name = sanitize_mermaid_text(node.name)
shape = f"[{safe_name}]" if node.type in ["table", "model"] else f"({safe_name})"
lines.append(f" {node.id}{shape}")
else:
for node in graph.nodes:
safe_name = sanitize_mermaid_text(node.name)
shape = f"[{safe_name}]" if node.type in ["table", "model"] else f"({safe_name})"
lines.append(f" {node.id}{shape}")
# Add edges
edge_labels = {
"transform": "-->",
"reference": "-.->",
"ingest": "-->",
"export": "-->",
"join": "-->",
"aggregate": "-->",
"dependency": "-->"
}
for edge in graph.edges:
arrow = edge_labels.get(edge.type, "-->")
if edge.type and edge.type not in ["transform", "dependency"]:
lines.append(f" {edge.source} {arrow}|{edge.type}| {edge.target}")
else:
lines.append(f" {edge.source} {arrow} {edge.target}")
# Add styling
for node in graph.nodes:
style = node_styles.get(node.type, "fill:#f5f5f5")
lines.append(f" style {node.id} {style}")
return "\n".join(lines)
# ============================================================================
# MCP Server Integration
# ============================================================================
def send_to_mcp(server_url: str, api_key: str, metadata_text: str, source_type: str, viz_format: str) -> Tuple[str, str]:
"""Send metadata to an external MCP server and return visualization + summary."""
if not server_url:
return "", "No MCP server URL configured."
try:
payload = {
"metadata": metadata_text,
"source_type": source_type,
"viz_format": viz_format,
}
headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
resp = requests.post(server_url, json=payload, headers=headers, timeout=30)
if 200 <= resp.status_code < 300:
data = resp.json()
viz = data.get("visualization") or data.get("viz") or data.get("mermaid", "")
summary = data.get("summary", "Processed by MCP server.")
if viz:
return render_mermaid(viz), summary
return "", summary
else:
return "", f"MCP server returned status {resp.status_code}: {resp.text[:200]}"
except Exception as e:
return "", f"Error contacting MCP server: {e}"
def test_mcp_connection(server_url: str, api_key: str) -> str:
"""Health-check to MCP server by fetching schema."""
if not server_url:
return "No MCP server URL configured."
# Handle local demo MCP server
if is_local_mcp(server_url):
tools = MCP_PRESETS.get("local_demo", {}).get("tools", [])
return f"Local Demo MCP ready! {len(tools)} tools available: {', '.join(tools)}"
try:
headers = {}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
# For Gradio MCP servers, the schema endpoint is the best test
schema_url = server_url.replace("/sse", "/schema").replace("/mcp/mcp", "/mcp")
try:
resp = requests.get(schema_url, headers=headers, timeout=15)
if resp.status_code == 200:
try:
schema = resp.json()
tool_count = len(schema) if isinstance(schema, dict) else 0
return f"Connected! Found {tool_count} tools available."
except:
return f"Connected to MCP server: {resp.status_code} OK"
except requests.exceptions.RequestException:
pass
# Fallback: try base URL
base_url = server_url.replace("/gradio_api/mcp/sse", "")
try:
resp = requests.get(base_url, headers=headers, timeout=10)
if resp.status_code == 200:
return f"Server reachable (status {resp.status_code})"
except:
pass
return "MCP server may be sleeping. Try again in a moment."
except Exception as e:
return f"Error contacting MCP server: {e}"
def get_preset_url(preset_key: str) -> str:
"""Get the URL for a preset MCP server."""
if preset_key in MCP_PRESETS:
return MCP_PRESETS[preset_key]["url"]
return ""
def get_preset_description(preset_key: str) -> str:
"""Get description and available tools for a preset MCP server."""
if preset_key in MCP_PRESETS:
preset = MCP_PRESETS[preset_key]
tools = ", ".join(preset.get("tools", []))
return f"{preset['description']}\n\nAvailable tools: {tools}"
return ""
def fetch_metadata_from_mcp(server_url: str, api_key: str, query: str) -> Tuple[str, str]:
"""Fetch metadata from an MCP server and return it for lineage visualization."""
if not server_url:
return "", "Please select or enter an MCP server URL first."
try:
headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
# For Gradio MCP servers, try to call a tool
# The standard MCP protocol uses JSON-RPC
payload = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "search",
"arguments": {"query": query}
},
"id": 1
}
# Try the SSE endpoint first (for Gradio MCP)
base_url = server_url.replace("/sse", "")
resp = requests.post(base_url, json=payload, headers=headers, timeout=30)
if resp.status_code == 200:
try:
data = resp.json()
# Format the response as lineage-compatible JSON
if isinstance(data, dict):
result = data.get("result", data)
# Create a simple lineage from the response
lineage_data = {
"nodes": [
{"id": "mcp_source", "type": "source", "name": f"MCP: {query}"},
{"id": "mcp_result", "type": "table", "name": "Query Result"}
],
"edges": [
{"from": "mcp_source", "to": "mcp_result"}
],
"metadata": result
}
return json.dumps(lineage_data, indent=2), f"Fetched metadata from MCP server for query: {query}"
except json.JSONDecodeError:
pass
# Fallback: create sample lineage showing the MCP connection
sample_lineage = {
"nodes": [
{"id": "mcp_server", "type": "source", "name": server_url.split("/")[2]},
{"id": "query", "type": "model", "name": f"Query: {query[:30]}..."},
{"id": "result", "type": "table", "name": "MCP Result"}
],
"edges": [
{"from": "mcp_server", "to": "query"},
{"from": "query", "to": "result"}
]
}
return json.dumps(sample_lineage, indent=2), f"Created lineage template for MCP query. Connect to the MCP server to fetch real metadata."
except Exception as e:
return "", f"Error fetching from MCP server: {str(e)}"
# ============================================================================
# Export Functions
# ============================================================================
def export_lineage(metadata_text: str, source_type: str, export_format: str) -> Tuple[str, str]:
"""Export lineage to the specified data catalog format."""
if not EXPORTERS_AVAILABLE:
return "", "Export functionality not available. Please install the exporters module."
try:
graph, _ = parse_metadata_to_graph(metadata_text, source_type)
if export_format == "OpenLineage":
exporter = OpenLineageExporter(graph)
elif export_format == "Collibra":
exporter = CollibraExporter(graph)
elif export_format == "Purview":
exporter = PurviewExporter(graph)
elif export_format == "Alation":
exporter = AlationExporter(graph)
elif export_format == "Atlas":
exporter = AtlasExporter(graph)
else:
return "", f"Unknown export format: {export_format}"
exported_content = exporter.export()
filename = f"lineage_export_{export_format.lower()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
return exported_content, f"Exported to {export_format} format. Download the file below."
except Exception as e:
return "", f"Export error: {str(e)}"
# ============================================================================
# Sample Data Loading
# ============================================================================
def load_sample(sample_type: str) -> str:
"""Load a sample file."""
filename = SAMPLE_FILES.get(sample_type)
if not filename:
return json.dumps({"error": f"Unknown sample type: {sample_type}"})
filepath = os.path.join(os.path.dirname(__file__), "samples", filename)
try:
with open(filepath, "r") as f:
return f.read()
except Exception as e:
return json.dumps({"error": f"Could not load sample: {str(e)}"})
# ============================================================================
# Main Extraction Handlers
# ============================================================================
def extract_lineage_from_text(
metadata_text: str,
source_type: str,
visualization_format: str,
use_mcp: bool = False,
mcp_url: str = "",
mcp_query: str = ""
) -> Tuple[str, str]:
"""Extract lineage from provided metadata text, optionally using MCP server."""
# If MCP is enabled and we have a URL, fetch metadata from MCP
if use_mcp and mcp_url:
if is_local_mcp(mcp_url):
# Use local demo MCP server
mcp_metadata, mcp_summary = call_local_mcp("get_sample_lineage", mcp_query or source_type)
if mcp_metadata:
# Parse the MCP metadata
if EXPORTERS_AVAILABLE:
graph, _ = parse_metadata_to_graph(mcp_metadata, "MCP Response")
mermaid_code = generate_mermaid_from_graph(graph)
return render_mermaid(mermaid_code), f"[MCP] {mcp_summary}"
else:
# External MCP - would need proper MCP client implementation
return "", f"External MCP servers require proper MCP client. Use Local Demo MCP for testing."
# Local processing - use provided metadata
if not metadata_text.strip():
return "", "Please provide metadata content or enable MCP to fetch sample data."
if EXPORTERS_AVAILABLE:
graph, summary = parse_metadata_to_graph(metadata_text, source_type)
mermaid_code = generate_mermaid_from_graph(graph)
return render_mermaid(mermaid_code), summary
else:
# Fallback stub
viz = "graph TD\n A[Sample Node] --> B[Output Node]"
return render_mermaid(viz), f"Processed {source_type} metadata."
def extract_lineage_from_bigquery(
project_id: str,
query: str,
api_key: str,
visualization_format: str
) -> Tuple[str, str]:
"""Extract lineage from BigQuery (local processing)."""
# Local stub - would integrate with BigQuery API in production
viz = f"""graph TD
subgraph BigQuery Project: {project_id or 'your-project'}
A[Source Tables] --> B[Query Execution]
B --> C[Destination Table]
end
style A fill:#e1f5fe
style B fill:#fff3e0
style C fill:#e8f5e9"""
return render_mermaid(viz), f"BigQuery lineage from project: {project_id or 'not specified'}"
def extract_lineage_from_url(
url: str,
visualization_format: str
) -> Tuple[str, str]:
"""Extract lineage from URL/API endpoint (local processing)."""
# Try to fetch the URL
if url:
try:
resp = requests.get(url, timeout=10)
if resp.status_code == 200:
return extract_lineage_from_text(resp.text, "API Response", visualization_format)
except Exception:
pass
viz = "graph TD\n A[API Source] --> B[Data Pipeline] --> C[Output]"
return render_mermaid(viz), f"Lineage from URL: {url or 'not specified'}"
# ============================================================================
# Gemini Agentic Chatbot
# ============================================================================
LINEAGE_AGENT_PROMPT = """You are a Data Lineage Assistant powered by the Lineage Graph Accelerator tool.
You help users understand, extract, and visualize data lineage from various sources.
Your capabilities:
1. **Extract Lineage**: Parse metadata from dbt manifests, Airflow DAGs, SQL DDL, and custom JSON
2. **Explain Lineage**: Help users understand data flow and dependencies
3. **Generate Metadata**: Create lineage JSON from natural language descriptions
4. **Export Guidance**: Advise on exporting to data catalogs (OpenLineage, Collibra, Purview, Alation, Atlas)
When users describe their data pipeline, generate valid JSON lineage in this format:
```json
{
"nodes": [
{"id": "unique_id", "type": "source|table|model|view|report", "name": "Display Name"}
],
"edges": [
{"from": "source_id", "to": "target_id"}
]
}
```
Node types: source, table, model, view, report, dimension, fact, destination, task
Be helpful, concise, and always offer to generate lineage JSON when users describe data flows.
If the user provides metadata or describes a pipeline, generate the JSON they can paste into the tool."""
def init_gemini(api_key: str) -> bool:
"""Initialize Gemini with the provided API key."""
if not GEMINI_AVAILABLE:
return False
if not api_key:
return False
try:
genai.configure(api_key=api_key)
return True
except Exception:
return False
def chat_with_gemini(
message: str,
history: List[Dict[str, str]],
api_key: str
) -> Tuple[List[Dict[str, str]], str]:
"""Chat with Gemini about data lineage."""
if not GEMINI_AVAILABLE:
return history + [
{"role": "user", "content": message},
{"role": "assistant", "content": "Google Gemini is not available. Please install google-generativeai package."}
], ""
if not api_key:
return history + [
{"role": "user", "content": message},
{"role": "assistant", "content": "Please enter your Google Gemini API key to use the chatbot. You can get one at https://makersuite.google.com/app/apikey"}
], ""
try:
genai.configure(api_key=api_key)
# Use models/gemini-2.0-flash-001 which is the correct model path
model = genai.GenerativeModel('models/gemini-2.0-flash-001')
# Build context from history
context_parts = [LINEAGE_AGENT_PROMPT, "\n\nConversation history:"]
for msg in history[-6:]: # Keep last 6 messages for context
role = "User" if msg.get("role") == "user" else "Assistant"
context_parts.append(f"{role}: {msg.get('content', '')}")
context_parts.append(f"\nUser: {message}\nAssistant:")
# Generate response
full_prompt = "\n".join(context_parts)
response = model.generate_content(full_prompt)
assistant_message = response.text
# Extract any JSON from the response for the metadata field
extracted_json = ""
if "```json" in assistant_message:
try:
json_start = assistant_message.find("```json") + 7
json_end = assistant_message.find("```", json_start)
if json_end > json_start:
extracted_json = assistant_message[json_start:json_end].strip()
except Exception:
pass
new_history = history + [
{"role": "user", "content": message},
{"role": "assistant", "content": assistant_message}
]
return new_history, extracted_json
except Exception as e:
error_msg = f"Error communicating with Gemini: {str(e)}"
return history + [
{"role": "user", "content": message},
{"role": "assistant", "content": error_msg}
], ""
def use_generated_json(json_text: str) -> Tuple[str, str, str]:
"""Use the generated JSON in the lineage extractor."""
if not json_text.strip():
return "", "", "No JSON to use. Ask the chatbot to generate lineage JSON first."
try:
# Validate JSON
json.loads(json_text)
# Return the JSON to be used in the main tab
return json_text, "Custom JSON", "JSON copied to metadata input. Switch to 'Text/File Metadata' tab and click 'Extract Lineage'."
except json.JSONDecodeError as e:
return "", "", f"Invalid JSON: {str(e)}"
# ============================================================================
# Gradio UI
# ============================================================================
# Build the Gradio interface (Gradio 6 compatible)
with gr.Blocks(
title="Lineage Graph Accelerator",
fill_height=True
) as demo:
# Header Banner
gr.HTML("""
<div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 24px 32px; border-radius: 12px; margin-bottom: 20px; box-shadow: 0 4px 15px rgba(102, 126, 234, 0.3);">
<div style="display: flex; justify-content: space-between; align-items: center; flex-wrap: wrap; gap: 16px;">
<div>
<h1 style="color: white; margin: 0 0 8px 0; font-size: 2em; font-weight: 700;">Lineage Graph Accelerator</h1>
<p style="color: rgba(255,255,255,0.9); margin: 0; font-size: 1.1em;">AI-powered data lineage extraction and visualization for modern data platforms</p>
</div>
<div style="display: flex; gap: 12px; flex-wrap: wrap;">
<a href="https://aamanlamba.com" target="_blank" style="background: rgba(255,255,255,0.2); color: white; padding: 8px 16px; border-radius: 6px; text-decoration: none; font-weight: 500; transition: background 0.2s;">By Aaman Lamba</a>
<a href="https://github.com/aamanlamba" target="_blank" style="background: rgba(255,255,255,0.2); color: white; padding: 8px 16px; border-radius: 6px; text-decoration: none; font-weight: 500;">GitHub</a>
</div>
</div>
<div style="margin-top: 16px; padding-top: 16px; border-top: 1px solid rgba(255,255,255,0.2);">
<span style="background: rgba(255,255,255,0.25); color: white; padding: 4px 10px; border-radius: 12px; font-size: 0.85em; margin-right: 8px;">Gradio 6</span>
<span style="background: rgba(255,255,255,0.25); color: white; padding: 4px 10px; border-radius: 12px; font-size: 0.85em; margin-right: 8px;">MCP Integration</span>
<span style="background: rgba(255,255,255,0.25); color: white; padding: 4px 10px; border-radius: 12px; font-size: 0.85em; margin-right: 8px;">Gemini AI</span>
<span style="background: rgba(255,255,255,0.25); color: white; padding: 4px 10px; border-radius: 12px; font-size: 0.85em;">5 Export Formats</span>
</div>
</div>
""")
gr.Markdown("""
### What You Can Do
| Feature | Description |
|---------|-------------|
| **Extract Lineage** | Parse metadata from dbt manifests, Airflow DAGs, SQL DDL, BigQuery, and custom JSON |
| **Visualize** | Generate interactive Mermaid diagrams with color-coded nodes and relationship labels |
| **Export** | Export to enterprise data catalogs: OpenLineage, Collibra, Purview, Alation, Atlas |
| **MCP Integration** | Connect to MCP servers for AI-powered metadata extraction |
| **AI Assistant** | Chat with Gemini to generate lineage from natural language descriptions |
### Quick Start
1. **Try the Demo**: Enable "Use MCP Server" and select "Local Demo MCP" to fetch sample lineage metadata
2. **Use Your Data**: Paste your dbt manifest, Airflow DAG, or custom JSON in the Text/File tab
3. **Load Samples**: Click "Load Sample" in the Demo Gallery to explore pre-built examples
4. **Export**: Use the Export section to generate catalog-ready JSON
---
""")
# MCP Server Configuration (collapsible)
with gr.Accordion("MCP Server Configuration", open=True):
gr.Markdown("""
**Connect to MCP Servers** to fetch metadata for lineage extraction.
Use the built-in **Local Demo MCP** for testing, or connect to external servers on HuggingFace.
""")
with gr.Row():
use_mcp_checkbox = gr.Checkbox(
label="Use MCP Server for Metadata",
value=False,
info="Enable to fetch lineage metadata from MCP server instead of local input"
)
mcp_preset = gr.Dropdown(
choices=[
("-- Select Preset --", ""),
("Local Demo MCP (Built-in)", "local_demo"),
("MCP Tools by abidlabs", "mcp_tools"),
("HuggingFace MCP by dylanebert", "huggingface_mcp"),
("Ragmint RAG Pipeline", "ragmint"),
("Web Search MCP", "web_search"),
],
label="Preset MCP Servers",
value="",
scale=2
)
with gr.Row():
mcp_server = gr.Textbox(
label="MCP Server URL",
placeholder="Select a preset or enter custom URL",
info="local://demo for built-in demo, or external MCP URL",
scale=3
)
mcp_query = gr.Textbox(
label="MCP Query (Optional)",
placeholder="e.g., 'ecommerce', 'ml pipeline', 'warehouse'",
info="Query to filter metadata from MCP server",
scale=2
)
with gr.Row():
mcp_api_key = gr.Textbox(
label="API Key (Optional)",
placeholder="API key if required",
type="password",
scale=2
)
test_btn = gr.Button("Test Connection", size="sm", scale=1)
mcp_description = gr.Textbox(label="Server Description", interactive=False, lines=2)
mcp_status = gr.Textbox(label="Connection Status", interactive=False)
# Wire up preset selection to update URL and description
mcp_preset.change(fn=get_preset_url, inputs=[mcp_preset], outputs=[mcp_server])
mcp_preset.change(fn=get_preset_description, inputs=[mcp_preset], outputs=[mcp_description])
test_btn.click(fn=test_mcp_connection, inputs=[mcp_server, mcp_api_key], outputs=[mcp_status])
# Main Tabs
with gr.Tabs():
# Tab 1: Text/File Input
with gr.Tab("Text/File Metadata", id="text"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Input")
# Sample selector
with gr.Row():
sample_selector = gr.Dropdown(
choices=[
("Simple JSON", "simple"),
("dbt Manifest", "dbt"),
("Airflow DAG", "airflow"),
("SQL DDL", "sql"),
("Data Warehouse", "warehouse"),
("ETL Pipeline", "etl"),
("Complex Demo", "complex")
],
label="Load Sample Data",
value="simple"
)
load_sample_btn = gr.Button("Load Sample", size="sm")
metadata_input = gr.Textbox(
label="Metadata Content",
placeholder="Paste your metadata here (JSON, YAML, SQL, dbt manifest, Airflow DAG, etc.)",
lines=18
)
with gr.Row():
source_type = gr.Dropdown(
choices=["dbt Manifest", "Airflow DAG", "SQL DDL", "Data Warehouse", "ETL Pipeline", "Custom JSON", "Other"],
label="Source Type",
value="Custom JSON"
)
viz_format = gr.Dropdown(
choices=["Mermaid", "DOT/Graphviz", "Text"],
label="Visualization Format",
value="Mermaid"
)
extract_btn = gr.Button("Extract Lineage", variant="primary", size="lg")
with gr.Column(scale=1):
gr.Markdown("### Visualization")
output_viz = gr.HTML(label="Lineage Graph")
output_summary = gr.Textbox(label="Summary", lines=3)
# Export section
with gr.Accordion("Export to Data Catalog", open=False):
export_format = gr.Dropdown(
choices=EXPORT_FORMATS,
label="Export Format",
value="OpenLineage"
)
export_btn = gr.Button("Generate Export", variant="secondary")
export_output = gr.Code(label="Export Content", language="json", lines=10)
export_status = gr.Textbox(label="Export Status", interactive=False)
# Event handlers
load_sample_btn.click(
fn=load_sample,
inputs=[sample_selector],
outputs=[metadata_input]
)
extract_btn.click(
fn=extract_lineage_from_text,
inputs=[metadata_input, source_type, viz_format, use_mcp_checkbox, mcp_server, mcp_query],
outputs=[output_viz, output_summary]
)
export_btn.click(
fn=export_lineage,
inputs=[metadata_input, source_type, export_format],
outputs=[export_output, export_status]
)
# Tab 2: BigQuery
with gr.Tab("BigQuery", id="bigquery"):
with gr.Row():
with gr.Column():
bq_project = gr.Textbox(
label="Project ID",
placeholder="your-gcp-project-id"
)
bq_query = gr.Textbox(
label="Metadata Query",
placeholder="SELECT * FROM `project.dataset.INFORMATION_SCHEMA.TABLES`",
lines=10
)
load_bq_sample = gr.Button("Load Sample Query", size="sm")
bq_creds = gr.Textbox(
label="Service Account JSON (optional)",
type="password"
)
bq_viz_format = gr.Dropdown(
choices=["Mermaid", "DOT/Graphviz", "Text"],
label="Visualization Format",
value="Mermaid"
)
bq_extract_btn = gr.Button("Extract Lineage", variant="primary")
with gr.Column():
bq_output_viz = gr.HTML(label="Lineage Graph")
bq_output_summary = gr.Textbox(label="Summary", lines=3)
load_bq_sample.click(
fn=lambda: load_sample("bigquery"),
outputs=[bq_query]
)
bq_extract_btn.click(
fn=extract_lineage_from_bigquery,
inputs=[bq_project, bq_query, bq_creds, bq_viz_format],
outputs=[bq_output_viz, bq_output_summary]
)
# Tab 3: URL/API
with gr.Tab("URL/API", id="url"):
with gr.Row():
with gr.Column():
url_input = gr.Textbox(
label="Metadata URL",
placeholder="https://api.example.com/metadata"
)
load_url_sample = gr.Button("Load Sample API Metadata", size="sm")
url_viz_format = gr.Dropdown(
choices=["Mermaid", "DOT/Graphviz", "Text"],
label="Visualization Format",
value="Mermaid"
)
url_extract_btn = gr.Button("Extract Lineage", variant="primary")
with gr.Column():
url_output_viz = gr.HTML(label="Lineage Graph")
url_output_summary = gr.Textbox(label="Summary", lines=3)
load_url_sample.click(
fn=lambda: load_sample("api"),
outputs=[url_input]
)
url_extract_btn.click(
fn=extract_lineage_from_url,
inputs=[url_input, url_viz_format],
outputs=[url_output_viz, url_output_summary]
)
# Tab 4: Demo Gallery
with gr.Tab("Demo Gallery", id="gallery"):
gr.Markdown("""
## Sample Lineage Visualizations
Click any example below to see the lineage visualization.
""")
with gr.Row():
demo_simple = gr.Button("E-Commerce (Simple)")
demo_dbt = gr.Button("dbt Project")
demo_airflow = gr.Button("Airflow DAG")
with gr.Row():
demo_warehouse = gr.Button("Data Warehouse")
demo_etl = gr.Button("ETL Pipeline")
demo_complex = gr.Button("Complex Platform")
demo_viz = gr.HTML(label="Demo Visualization")
demo_summary = gr.Textbox(label="Description", lines=2)
# Demo handlers
for btn, sample_type in [(demo_simple, "simple"), (demo_dbt, "dbt"),
(demo_airflow, "airflow"), (demo_warehouse, "warehouse"),
(demo_etl, "etl"), (demo_complex, "complex")]:
btn.click(
fn=lambda st=sample_type: extract_lineage_from_text(
load_sample(st),
st.replace("_", " ").title(),
"Mermaid"
),
outputs=[demo_viz, demo_summary]
)
# Tab 5: AI Chatbot (Gemini)
with gr.Tab("AI Assistant", id="chatbot"):
gr.Markdown("""
## Lineage AI Assistant (Powered by Google Gemini)
Ask questions about data lineage, describe your data pipeline in natural language,
and get JSON metadata you can use to visualize lineage.
**Examples:**
- "I have a PostgreSQL database that feeds into a Spark ETL job, which outputs to a Snowflake warehouse"
- "Generate lineage for a dbt project with staging, intermediate, and mart layers"
- "What's the best way to document column-level lineage?"
""")
with gr.Row():
with gr.Column(scale=2):
gemini_api_key = gr.Textbox(
label="Google Gemini API Key",
placeholder="Enter your Gemini API key (get one at makersuite.google.com)",
type="password",
info="Your API key is not stored and only used for this session"
)
chatbot_display = gr.Chatbot(
label="Chat with Lineage AI",
height=400
)
with gr.Row():
chat_input = gr.Textbox(
label="Your message",
placeholder="Describe your data pipeline or ask about lineage...",
lines=2,
scale=4
)
send_btn = gr.Button("Send", variant="primary", scale=1)
with gr.Accordion("Generated JSON (if any)", open=False):
generated_json = gr.Code(
label="Extracted JSON",
language="json",
lines=10
)
use_json_btn = gr.Button("Use This JSON in Lineage Tool", size="sm")
json_status = gr.Textbox(label="Status", interactive=False)
# Chat handlers
chat_state = gr.State([])
def handle_chat(message, history, api_key):
if not message.strip():
return history, "", history
new_history, extracted = chat_with_gemini(message, history, api_key)
return new_history, extracted, new_history
send_btn.click(
fn=handle_chat,
inputs=[chat_input, chat_state, gemini_api_key],
outputs=[chatbot_display, generated_json, chat_state]
).then(
fn=lambda: "",
outputs=[chat_input]
)
chat_input.submit(
fn=handle_chat,
inputs=[chat_input, chat_state, gemini_api_key],
outputs=[chatbot_display, generated_json, chat_state]
).then(
fn=lambda: "",
outputs=[chat_input]
)
use_json_btn.click(
fn=use_generated_json,
inputs=[generated_json],
outputs=[metadata_input, source_type, json_status]
)
# Footer
gr.Markdown("""
---
### Export Formats Supported
| Format | Description | Use Case |
|--------|-------------|----------|
| **OpenLineage** | Open standard for lineage | Universal compatibility |
| **Collibra** | Collibra Data Intelligence | Enterprise data governance |
| **Purview** | Microsoft Purview | Azure ecosystem |
| **Alation** | Alation Data Catalog | Self-service analytics |
| **Atlas** | Apache Atlas | Open-source governance |
---
### 🎥 Watch the Demo
See all features in action: [YouTube Demo Video (2:30)](https://youtu.be/U4Dfc7txa_0)
---
Built with Gradio for the **Gradio Agents & MCP Hackathon - Winter 2025** by [Aaman Lamba](https://aamanlamba.com)
[GitHub](https://github.com/aamanlamba/lineage-graph-accelerator) | [Documentation](https://huggingface.co/spaces/aamanlamba/Lineage-graph-accelerator/blob/main/USER_GUIDE.md) | [HuggingFace Space](https://huggingface.co/spaces/aamanlamba/Lineage-graph-accelerator)
""")
# Launch
if __name__ == "__main__":
demo.launch(ssr_mode=False)