""" Lineage Graph Accelerator - Hugging Face Space A Gradio-based AI agent for extracting and visualizing data lineage from various sources. Built for the Gradio Agents & MCP Hackathon - Winter 2025 """ import gradio as gr import json import os import requests from typing import Optional, Tuple, Dict, Any, List from datetime import datetime # Import exporters try: from exporters import ( LineageGraph, LineageNode, LineageEdge, OpenLineageExporter, CollibraExporter, PurviewExporter, AlationExporter, AtlasExporter ) EXPORTERS_AVAILABLE = True except ImportError: EXPORTERS_AVAILABLE = False # Import Google Gemini for agentic chatbot try: import google.generativeai as genai GEMINI_AVAILABLE = True except ImportError: GEMINI_AVAILABLE = False # ============================================================================ # Constants and Configuration # ============================================================================ SAMPLE_FILES = { "simple": "sample_metadata.json", "dbt": "dbt_manifest_sample.json", "airflow": "airflow_dag_sample.json", "sql": "sql_ddl_sample.sql", "warehouse": "warehouse_lineage_sample.json", "etl": "etl_pipeline_sample.json", "complex": "complex_lineage_demo.json", "api": "sample_api_metadata.json", "bigquery": "sample_bigquery.sql" } EXPORT_FORMATS = ["OpenLineage", "Collibra", "Purview", "Alation", "Atlas"] # Preset MCP Servers on HuggingFace that can provide metadata MCP_PRESETS = { "local_demo": { "name": "Local Demo MCP (Built-in)", "url": "local://demo", "schema_url": "local://demo/schema", "description": "Built-in demo MCP server that provides sample lineage metadata for testing", "tools": ["get_sample_lineage", "get_dbt_metadata", "get_airflow_dag", "get_warehouse_schema"] }, "mcp_tools": { "name": "MCP Tools by abidlabs", "url": "https://abidlabs-mcp-tools.hf.space/gradio_api/mcp/sse", "schema_url": "https://abidlabs-mcp-tools.hf.space/gradio_api/mcp/schema", "description": "Demo MCP server with utility tools for testing integration", "tools": ["prime_factors", "generate_cheetah_image", "image_orientation"] }, "huggingface_mcp": { "name": "HuggingFace MCP by dylanebert", "url": "https://dylanebert-huggingface-mcp.hf.space/gradio_api/mcp/sse", "schema_url": "https://dylanebert-huggingface-mcp.hf.space/gradio_api/mcp/schema", "description": "Search and explore HuggingFace models, datasets, and spaces", "tools": ["search_models", "search_datasets", "get_model_card"] }, "ragmint": { "name": "Ragmint RAG Pipeline", "url": "https://mcp-1st-birthday-ragmint-mcp-server.hf.space/gradio_api/mcp/sse", "schema_url": "https://mcp-1st-birthday-ragmint-mcp-server.hf.space/gradio_api/mcp/schema", "description": "RAG pipeline optimization and document retrieval", "tools": ["optimize_rag", "retrieve_documents"] }, "web_search": { "name": "Web Search MCP", "url": "https://agents-mcp-hackathon-search-web-mcp-server.hf.space/gradio_api/mcp/sse", "schema_url": "https://agents-mcp-hackathon-search-web-mcp-server.hf.space/gradio_api/mcp/schema", "description": "Search the web for data and documentation", "tools": ["search_web", "fetch_page"] } } # ============================================================================ # Local Demo MCP Server (Built-in) # ============================================================================ # Sample metadata that the local MCP server can provide LOCAL_MCP_METADATA = { "ecommerce_pipeline": { "name": "E-commerce Data Pipeline", "nodes": [ {"id": "raw_orders", "type": "source", "name": "Raw Orders (PostgreSQL)"}, {"id": "raw_customers", "type": "source", "name": "Raw Customers (PostgreSQL)"}, {"id": "raw_products", "type": "source", "name": "Raw Products (API)"}, {"id": "stg_orders", "type": "model", "name": "Staging Orders"}, {"id": "stg_customers", "type": "model", "name": "Staging Customers"}, {"id": "stg_products", "type": "model", "name": "Staging Products"}, {"id": "dim_customers", "type": "dimension", "name": "Dim Customers"}, {"id": "dim_products", "type": "dimension", "name": "Dim Products"}, {"id": "fact_orders", "type": "fact", "name": "Fact Orders"}, {"id": "mart_sales", "type": "table", "name": "Sales Mart"}, {"id": "report_daily", "type": "report", "name": "Daily Sales Report"} ], "edges": [ {"from": "raw_orders", "to": "stg_orders"}, {"from": "raw_customers", "to": "stg_customers"}, {"from": "raw_products", "to": "stg_products"}, {"from": "stg_customers", "to": "dim_customers"}, {"from": "stg_products", "to": "dim_products"}, {"from": "stg_orders", "to": "fact_orders"}, {"from": "dim_customers", "to": "fact_orders"}, {"from": "dim_products", "to": "fact_orders"}, {"from": "fact_orders", "to": "mart_sales"}, {"from": "mart_sales", "to": "report_daily"} ] }, "ml_pipeline": { "name": "ML Feature Pipeline", "nodes": [ {"id": "raw_events", "type": "source", "name": "Event Stream (Kafka)"}, {"id": "raw_user_data", "type": "source", "name": "User Data (S3)"}, {"id": "feature_eng", "type": "model", "name": "Feature Engineering"}, {"id": "feature_store", "type": "table", "name": "Feature Store"}, {"id": "training_data", "type": "table", "name": "Training Dataset"}, {"id": "model_output", "type": "destination", "name": "Model Predictions"} ], "edges": [ {"from": "raw_events", "to": "feature_eng"}, {"from": "raw_user_data", "to": "feature_eng"}, {"from": "feature_eng", "to": "feature_store"}, {"from": "feature_store", "to": "training_data"}, {"from": "training_data", "to": "model_output"} ] }, "data_warehouse": { "name": "Data Warehouse Schema", "nodes": [ {"id": "src_crm", "type": "source", "name": "CRM System"}, {"id": "src_erp", "type": "source", "name": "ERP System"}, {"id": "src_web", "type": "source", "name": "Web Analytics"}, {"id": "landing_crm", "type": "table", "name": "Landing CRM"}, {"id": "landing_erp", "type": "table", "name": "Landing ERP"}, {"id": "landing_web", "type": "table", "name": "Landing Web"}, {"id": "dwh_customers", "type": "dimension", "name": "DWH Customers"}, {"id": "dwh_transactions", "type": "fact", "name": "DWH Transactions"}, {"id": "bi_dashboard", "type": "report", "name": "BI Dashboard"} ], "edges": [ {"from": "src_crm", "to": "landing_crm"}, {"from": "src_erp", "to": "landing_erp"}, {"from": "src_web", "to": "landing_web"}, {"from": "landing_crm", "to": "dwh_customers"}, {"from": "landing_erp", "to": "dwh_transactions"}, {"from": "landing_web", "to": "dwh_transactions"}, {"from": "dwh_customers", "to": "dwh_transactions"}, {"from": "dwh_transactions", "to": "bi_dashboard"} ] } } def local_mcp_get_metadata(tool_name: str, query: str = "") -> Dict[str, Any]: """Simulate a local MCP server that returns sample metadata.""" if tool_name == "get_sample_lineage" or tool_name == "search": # Return a random or query-matched sample if "ecommerce" in query.lower() or "sales" in query.lower(): return LOCAL_MCP_METADATA["ecommerce_pipeline"] elif "ml" in query.lower() or "feature" in query.lower(): return LOCAL_MCP_METADATA["ml_pipeline"] elif "warehouse" in query.lower() or "dwh" in query.lower(): return LOCAL_MCP_METADATA["data_warehouse"] else: # Default to ecommerce return LOCAL_MCP_METADATA["ecommerce_pipeline"] elif tool_name == "get_dbt_metadata": return LOCAL_MCP_METADATA["ecommerce_pipeline"] elif tool_name == "get_airflow_dag": return LOCAL_MCP_METADATA["ml_pipeline"] elif tool_name == "get_warehouse_schema": return LOCAL_MCP_METADATA["data_warehouse"] elif tool_name == "list_datasets": return {"datasets": list(LOCAL_MCP_METADATA.keys())} else: return LOCAL_MCP_METADATA["ecommerce_pipeline"] def is_local_mcp(url: str) -> bool: """Check if the URL is for the local demo MCP server.""" return url and url.startswith("local://") def call_local_mcp(tool_name: str, query: str = "") -> Tuple[str, str]: """Call the local MCP server and return metadata as JSON string.""" metadata = local_mcp_get_metadata(tool_name, query) return json.dumps(metadata, indent=2), f"Fetched '{metadata.get('name', 'lineage')}' from Local Demo MCP" # ============================================================================ # Mermaid Rendering # ============================================================================ import base64 import urllib.parse def render_mermaid(viz_code: str) -> str: """Render mermaid diagram using mermaid.ink service (renders as SVG image).""" # Encode the mermaid code for the URL # mermaid.ink accepts base64 encoded diagram encoded = base64.urlsafe_b64encode(viz_code.encode('utf-8')).decode('utf-8') # Create the mermaid.ink URL for SVG rendering img_url = f"https://mermaid.ink/svg/{encoded}" # PNG version for download png_url = f"https://mermaid.ink/img/{encoded}" # Also create a link to the live editor for users who want to modify editor_url = f"https://mermaid.live/edit#base64:{base64.b64encode(viz_code.encode('utf-8')).decode('utf-8')}" html = f'''
Lineage Graph
Edit in Mermaid Live Download PNG Download SVG Click graph to zoom
''' return html def render_mermaid_code(viz_code: str) -> str: """Return the raw mermaid code for display.""" return viz_code # ============================================================================ # Lineage Parsing and Visualization Generation # ============================================================================ def parse_metadata_to_graph(metadata_text: str, source_type: str) -> Tuple[LineageGraph, str]: """Parse metadata text into a LineageGraph structure.""" try: # Try to parse as JSON first if metadata_text.strip().startswith('{') or metadata_text.strip().startswith('['): data = json.loads(metadata_text) else: # For SQL or other text formats, create a simple structure data = {"raw_content": metadata_text, "source_type": source_type} graph = LineageGraph(name=f"Lineage from {source_type}") # Handle different formats if "lineage_graph" in data: # Complex lineage demo format lg = data["lineage_graph"] for node_data in lg.get("nodes", []): node = LineageNode( id=node_data.get("id"), name=node_data.get("name"), type=node_data.get("type", "table"), category=node_data.get("category"), description=node_data.get("description"), metadata=node_data.get("metadata"), tags=node_data.get("tags") ) graph.add_node(node) for edge_data in lg.get("edges", []): edge = LineageEdge( source=edge_data.get("from"), target=edge_data.get("to"), type=edge_data.get("type", "transform") ) graph.add_edge(edge) elif "nodes" in data and "edges" in data: # Simple node/edge format for node_data in data.get("nodes", []): node = LineageNode( id=node_data.get("id"), name=node_data.get("name", node_data.get("id")), type=node_data.get("type", "table") ) graph.add_node(node) for edge_data in data.get("edges", []): edge = LineageEdge( source=edge_data.get("from"), target=edge_data.get("to"), type=edge_data.get("type", "transform") ) graph.add_edge(edge) elif "nodes" in data: # dbt manifest format for node_id, node_data in data.get("nodes", {}).items(): node = LineageNode( id=node_id, name=node_data.get("name", node_id.split(".")[-1]), type=node_data.get("resource_type", "model"), schema=node_data.get("schema"), database=node_data.get("database"), description=node_data.get("description") ) graph.add_node(node) # Add edges from depends_on deps = node_data.get("depends_on", {}).get("nodes", []) for dep in deps: edge = LineageEdge(source=dep, target=node_id, type="transform") graph.add_edge(edge) elif "tasks" in data: # Airflow DAG format for task in data.get("tasks", []): node = LineageNode( id=task.get("task_id"), name=task.get("task_id"), type="task", description=task.get("description") ) graph.add_node(node) # Add edges from upstream dependencies for dep in task.get("upstream_dependencies", []): edge = LineageEdge(source=dep, target=task.get("task_id"), type="dependency") graph.add_edge(edge) elif "lineage" in data: # Warehouse lineage format lineage = data.get("lineage", {}) for dataset in lineage.get("datasets", []): node = LineageNode( id=dataset.get("id"), name=dataset.get("name", dataset.get("id")), type=dataset.get("type", "table"), schema=dataset.get("schema"), database=dataset.get("database"), description=dataset.get("description"), owner=dataset.get("owner"), tags=dataset.get("tags") ) graph.add_node(node) for rel in lineage.get("relationships", []): edge = LineageEdge( source=rel.get("source"), target=rel.get("target"), type=rel.get("type", "transform"), job_name=rel.get("job") ) graph.add_edge(edge) elif "stages" in data: # ETL pipeline format for stage in data.get("stages", []): for step in stage.get("steps", []): node = LineageNode( id=step.get("id"), name=step.get("name", step.get("id")), type="step", category=stage.get("id"), description=step.get("description") or step.get("logic") ) graph.add_node(node) # Add edges from inputs for inp in step.get("inputs", []): edge = LineageEdge(source=inp, target=step.get("id"), type="transform") graph.add_edge(edge) else: # Fallback: create sample nodes graph.add_node(LineageNode(id="source", name="Source", type="source")) graph.add_node(LineageNode(id="target", name="Target", type="table")) graph.add_edge(LineageEdge(source="source", target="target", type="transform")) summary = f"Parsed {len(graph.nodes)} nodes and {len(graph.edges)} relationships from {source_type}" return graph, summary except json.JSONDecodeError as e: # Handle SQL or plain text graph = LineageGraph(name=f"Lineage from {source_type}") graph.add_node(LineageNode(id="input", name="Input Data", type="source")) graph.add_node(LineageNode(id="output", name="Output Data", type="table")) graph.add_edge(LineageEdge(source="input", target="output", type="transform")) return graph, f"Created placeholder lineage (could not parse as JSON: {str(e)[:50]})" except Exception as e: graph = LineageGraph(name="Error") return graph, f"Error parsing metadata: {str(e)}" def sanitize_mermaid_text(text: str) -> str: """Sanitize text for use in Mermaid diagrams by escaping special characters.""" if not text: return "Unknown" # Replace characters that conflict with Mermaid syntax # Parentheses conflict with node shapes, brackets with labels text = text.replace("(", " - ").replace(")", "") text = text.replace("[", " ").replace("]", " ") text = text.replace("{", " ").replace("}", " ") text = text.replace('"', "'") text = text.replace("<", "").replace(">", "") text = text.replace("#", "") return text.strip() def generate_mermaid_from_graph(graph: LineageGraph) -> str: """Generate Mermaid diagram code from a LineageGraph.""" if not graph.nodes: return "graph TD\n A[No data to display]" lines = [ "%%{init: {'theme': 'base', 'themeVariables': {'fontSize': '12px', 'fontFamily': 'arial', 'primaryColor': '#e8f5e9', 'primaryBorderColor': '#4caf50', 'lineColor': '#666'}}}%%", "graph TD" ] # Group nodes by category for subgraphs categories = {} for node in graph.nodes: cat = node.category or "default" if cat not in categories: categories[cat] = [] categories[cat].append(node) # Generate nodes with styling - compact with rounded corners and subtle borders node_styles = { "source": "fill:#e3f2fd,stroke:#1976d2,stroke-width:1px,rx:5,ry:5", "external_api": "fill:#e3f2fd,stroke:#1976d2,stroke-width:1px,rx:5,ry:5", "table": "fill:#e8f5e9,stroke:#388e3c,stroke-width:1px,rx:5,ry:5", "view": "fill:#f3e5f5,stroke:#7b1fa2,stroke-width:1px,rx:5,ry:5", "model": "fill:#fff3e0,stroke:#f57c00,stroke-width:1px,rx:5,ry:5", "report": "fill:#fce4ec,stroke:#c2185b,stroke-width:1px,rx:5,ry:5", "dimension": "fill:#e0f7fa,stroke:#0097a7,stroke-width:1px,rx:5,ry:5", "fact": "fill:#fff8e1,stroke:#ffa000,stroke-width:1px,rx:5,ry:5", "destination": "fill:#ffebee,stroke:#d32f2f,stroke-width:1px,rx:5,ry:5", "task": "fill:#fafafa,stroke:#616161,stroke-width:1px,rx:5,ry:5" } # Add subgraphs for categories if len(categories) > 1: for cat, nodes in categories.items(): if cat != "default": lines.append(f" subgraph {sanitize_mermaid_text(cat.replace('_', ' ').title())}") for node in nodes: safe_name = sanitize_mermaid_text(node.name) shape = f"[{safe_name}]" if node.type in ["table", "model"] else f"({safe_name})" lines.append(f" {node.id}{shape}") lines.append(" end") else: for node in nodes: safe_name = sanitize_mermaid_text(node.name) shape = f"[{safe_name}]" if node.type in ["table", "model"] else f"({safe_name})" lines.append(f" {node.id}{shape}") else: for node in graph.nodes: safe_name = sanitize_mermaid_text(node.name) shape = f"[{safe_name}]" if node.type in ["table", "model"] else f"({safe_name})" lines.append(f" {node.id}{shape}") # Add edges edge_labels = { "transform": "-->", "reference": "-.->", "ingest": "-->", "export": "-->", "join": "-->", "aggregate": "-->", "dependency": "-->" } for edge in graph.edges: arrow = edge_labels.get(edge.type, "-->") if edge.type and edge.type not in ["transform", "dependency"]: lines.append(f" {edge.source} {arrow}|{edge.type}| {edge.target}") else: lines.append(f" {edge.source} {arrow} {edge.target}") # Add styling for node in graph.nodes: style = node_styles.get(node.type, "fill:#f5f5f5") lines.append(f" style {node.id} {style}") return "\n".join(lines) # ============================================================================ # MCP Server Integration # ============================================================================ def send_to_mcp(server_url: str, api_key: str, metadata_text: str, source_type: str, viz_format: str) -> Tuple[str, str]: """Send metadata to an external MCP server and return visualization + summary.""" if not server_url: return "", "No MCP server URL configured." try: payload = { "metadata": metadata_text, "source_type": source_type, "viz_format": viz_format, } headers = {"Content-Type": "application/json"} if api_key: headers["Authorization"] = f"Bearer {api_key}" resp = requests.post(server_url, json=payload, headers=headers, timeout=30) if 200 <= resp.status_code < 300: data = resp.json() viz = data.get("visualization") or data.get("viz") or data.get("mermaid", "") summary = data.get("summary", "Processed by MCP server.") if viz: return render_mermaid(viz), summary return "", summary else: return "", f"MCP server returned status {resp.status_code}: {resp.text[:200]}" except Exception as e: return "", f"Error contacting MCP server: {e}" def test_mcp_connection(server_url: str, api_key: str) -> str: """Health-check to MCP server by fetching schema.""" if not server_url: return "No MCP server URL configured." # Handle local demo MCP server if is_local_mcp(server_url): tools = MCP_PRESETS.get("local_demo", {}).get("tools", []) return f"Local Demo MCP ready! {len(tools)} tools available: {', '.join(tools)}" try: headers = {} if api_key: headers["Authorization"] = f"Bearer {api_key}" # For Gradio MCP servers, the schema endpoint is the best test schema_url = server_url.replace("/sse", "/schema").replace("/mcp/mcp", "/mcp") try: resp = requests.get(schema_url, headers=headers, timeout=15) if resp.status_code == 200: try: schema = resp.json() tool_count = len(schema) if isinstance(schema, dict) else 0 return f"Connected! Found {tool_count} tools available." except: return f"Connected to MCP server: {resp.status_code} OK" except requests.exceptions.RequestException: pass # Fallback: try base URL base_url = server_url.replace("/gradio_api/mcp/sse", "") try: resp = requests.get(base_url, headers=headers, timeout=10) if resp.status_code == 200: return f"Server reachable (status {resp.status_code})" except: pass return "MCP server may be sleeping. Try again in a moment." except Exception as e: return f"Error contacting MCP server: {e}" def get_preset_url(preset_key: str) -> str: """Get the URL for a preset MCP server.""" if preset_key in MCP_PRESETS: return MCP_PRESETS[preset_key]["url"] return "" def get_preset_description(preset_key: str) -> str: """Get description and available tools for a preset MCP server.""" if preset_key in MCP_PRESETS: preset = MCP_PRESETS[preset_key] tools = ", ".join(preset.get("tools", [])) return f"{preset['description']}\n\nAvailable tools: {tools}" return "" def fetch_metadata_from_mcp(server_url: str, api_key: str, query: str) -> Tuple[str, str]: """Fetch metadata from an MCP server and return it for lineage visualization.""" if not server_url: return "", "Please select or enter an MCP server URL first." try: headers = {"Content-Type": "application/json"} if api_key: headers["Authorization"] = f"Bearer {api_key}" # For Gradio MCP servers, try to call a tool # The standard MCP protocol uses JSON-RPC payload = { "jsonrpc": "2.0", "method": "tools/call", "params": { "name": "search", "arguments": {"query": query} }, "id": 1 } # Try the SSE endpoint first (for Gradio MCP) base_url = server_url.replace("/sse", "") resp = requests.post(base_url, json=payload, headers=headers, timeout=30) if resp.status_code == 200: try: data = resp.json() # Format the response as lineage-compatible JSON if isinstance(data, dict): result = data.get("result", data) # Create a simple lineage from the response lineage_data = { "nodes": [ {"id": "mcp_source", "type": "source", "name": f"MCP: {query}"}, {"id": "mcp_result", "type": "table", "name": "Query Result"} ], "edges": [ {"from": "mcp_source", "to": "mcp_result"} ], "metadata": result } return json.dumps(lineage_data, indent=2), f"Fetched metadata from MCP server for query: {query}" except json.JSONDecodeError: pass # Fallback: create sample lineage showing the MCP connection sample_lineage = { "nodes": [ {"id": "mcp_server", "type": "source", "name": server_url.split("/")[2]}, {"id": "query", "type": "model", "name": f"Query: {query[:30]}..."}, {"id": "result", "type": "table", "name": "MCP Result"} ], "edges": [ {"from": "mcp_server", "to": "query"}, {"from": "query", "to": "result"} ] } return json.dumps(sample_lineage, indent=2), f"Created lineage template for MCP query. Connect to the MCP server to fetch real metadata." except Exception as e: return "", f"Error fetching from MCP server: {str(e)}" # ============================================================================ # Export Functions # ============================================================================ def export_lineage(metadata_text: str, source_type: str, export_format: str) -> Tuple[str, str]: """Export lineage to the specified data catalog format.""" if not EXPORTERS_AVAILABLE: return "", "Export functionality not available. Please install the exporters module." try: graph, _ = parse_metadata_to_graph(metadata_text, source_type) if export_format == "OpenLineage": exporter = OpenLineageExporter(graph) elif export_format == "Collibra": exporter = CollibraExporter(graph) elif export_format == "Purview": exporter = PurviewExporter(graph) elif export_format == "Alation": exporter = AlationExporter(graph) elif export_format == "Atlas": exporter = AtlasExporter(graph) else: return "", f"Unknown export format: {export_format}" exported_content = exporter.export() filename = f"lineage_export_{export_format.lower()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" return exported_content, f"Exported to {export_format} format. Download the file below." except Exception as e: return "", f"Export error: {str(e)}" # ============================================================================ # Sample Data Loading # ============================================================================ def load_sample(sample_type: str) -> str: """Load a sample file.""" filename = SAMPLE_FILES.get(sample_type) if not filename: return json.dumps({"error": f"Unknown sample type: {sample_type}"}) filepath = os.path.join(os.path.dirname(__file__), "samples", filename) try: with open(filepath, "r") as f: return f.read() except Exception as e: return json.dumps({"error": f"Could not load sample: {str(e)}"}) # ============================================================================ # Main Extraction Handlers # ============================================================================ def extract_lineage_from_text( metadata_text: str, source_type: str, visualization_format: str, use_mcp: bool = False, mcp_url: str = "", mcp_query: str = "" ) -> Tuple[str, str]: """Extract lineage from provided metadata text, optionally using MCP server.""" # If MCP is enabled and we have a URL, fetch metadata from MCP if use_mcp and mcp_url: if is_local_mcp(mcp_url): # Use local demo MCP server mcp_metadata, mcp_summary = call_local_mcp("get_sample_lineage", mcp_query or source_type) if mcp_metadata: # Parse the MCP metadata if EXPORTERS_AVAILABLE: graph, _ = parse_metadata_to_graph(mcp_metadata, "MCP Response") mermaid_code = generate_mermaid_from_graph(graph) return render_mermaid(mermaid_code), f"[MCP] {mcp_summary}" else: # External MCP - would need proper MCP client implementation return "", f"External MCP servers require proper MCP client. Use Local Demo MCP for testing." # Local processing - use provided metadata if not metadata_text.strip(): return "", "Please provide metadata content or enable MCP to fetch sample data." if EXPORTERS_AVAILABLE: graph, summary = parse_metadata_to_graph(metadata_text, source_type) mermaid_code = generate_mermaid_from_graph(graph) return render_mermaid(mermaid_code), summary else: # Fallback stub viz = "graph TD\n A[Sample Node] --> B[Output Node]" return render_mermaid(viz), f"Processed {source_type} metadata." def extract_lineage_from_bigquery( project_id: str, query: str, api_key: str, visualization_format: str ) -> Tuple[str, str]: """Extract lineage from BigQuery (local processing).""" # Local stub - would integrate with BigQuery API in production viz = f"""graph TD subgraph BigQuery Project: {project_id or 'your-project'} A[Source Tables] --> B[Query Execution] B --> C[Destination Table] end style A fill:#e1f5fe style B fill:#fff3e0 style C fill:#e8f5e9""" return render_mermaid(viz), f"BigQuery lineage from project: {project_id or 'not specified'}" def extract_lineage_from_url( url: str, visualization_format: str ) -> Tuple[str, str]: """Extract lineage from URL/API endpoint (local processing).""" # Try to fetch the URL if url: try: resp = requests.get(url, timeout=10) if resp.status_code == 200: return extract_lineage_from_text(resp.text, "API Response", visualization_format) except Exception: pass viz = "graph TD\n A[API Source] --> B[Data Pipeline] --> C[Output]" return render_mermaid(viz), f"Lineage from URL: {url or 'not specified'}" # ============================================================================ # Gemini Agentic Chatbot # ============================================================================ LINEAGE_AGENT_PROMPT = """You are a Data Lineage Assistant powered by the Lineage Graph Accelerator tool. You help users understand, extract, and visualize data lineage from various sources. Your capabilities: 1. **Extract Lineage**: Parse metadata from dbt manifests, Airflow DAGs, SQL DDL, and custom JSON 2. **Explain Lineage**: Help users understand data flow and dependencies 3. **Generate Metadata**: Create lineage JSON from natural language descriptions 4. **Export Guidance**: Advise on exporting to data catalogs (OpenLineage, Collibra, Purview, Alation, Atlas) When users describe their data pipeline, generate valid JSON lineage in this format: ```json { "nodes": [ {"id": "unique_id", "type": "source|table|model|view|report", "name": "Display Name"} ], "edges": [ {"from": "source_id", "to": "target_id"} ] } ``` Node types: source, table, model, view, report, dimension, fact, destination, task Be helpful, concise, and always offer to generate lineage JSON when users describe data flows. If the user provides metadata or describes a pipeline, generate the JSON they can paste into the tool.""" def init_gemini(api_key: str) -> bool: """Initialize Gemini with the provided API key.""" if not GEMINI_AVAILABLE: return False if not api_key: return False try: genai.configure(api_key=api_key) return True except Exception: return False def chat_with_gemini( message: str, history: List[Dict[str, str]], api_key: str ) -> Tuple[List[Dict[str, str]], str]: """Chat with Gemini about data lineage.""" if not GEMINI_AVAILABLE: return history + [ {"role": "user", "content": message}, {"role": "assistant", "content": "Google Gemini is not available. Please install google-generativeai package."} ], "" if not api_key: return history + [ {"role": "user", "content": message}, {"role": "assistant", "content": "Please enter your Google Gemini API key to use the chatbot. You can get one at https://makersuite.google.com/app/apikey"} ], "" try: genai.configure(api_key=api_key) # Use models/gemini-2.0-flash-001 which is the correct model path model = genai.GenerativeModel('models/gemini-2.0-flash-001') # Build context from history context_parts = [LINEAGE_AGENT_PROMPT, "\n\nConversation history:"] for msg in history[-6:]: # Keep last 6 messages for context role = "User" if msg.get("role") == "user" else "Assistant" context_parts.append(f"{role}: {msg.get('content', '')}") context_parts.append(f"\nUser: {message}\nAssistant:") # Generate response full_prompt = "\n".join(context_parts) response = model.generate_content(full_prompt) assistant_message = response.text # Extract any JSON from the response for the metadata field extracted_json = "" if "```json" in assistant_message: try: json_start = assistant_message.find("```json") + 7 json_end = assistant_message.find("```", json_start) if json_end > json_start: extracted_json = assistant_message[json_start:json_end].strip() except Exception: pass new_history = history + [ {"role": "user", "content": message}, {"role": "assistant", "content": assistant_message} ] return new_history, extracted_json except Exception as e: error_msg = f"Error communicating with Gemini: {str(e)}" return history + [ {"role": "user", "content": message}, {"role": "assistant", "content": error_msg} ], "" def use_generated_json(json_text: str) -> Tuple[str, str, str]: """Use the generated JSON in the lineage extractor.""" if not json_text.strip(): return "", "", "No JSON to use. Ask the chatbot to generate lineage JSON first." try: # Validate JSON json.loads(json_text) # Return the JSON to be used in the main tab return json_text, "Custom JSON", "JSON copied to metadata input. Switch to 'Text/File Metadata' tab and click 'Extract Lineage'." except json.JSONDecodeError as e: return "", "", f"Invalid JSON: {str(e)}" # ============================================================================ # Gradio UI # ============================================================================ # Build the Gradio interface (Gradio 6 compatible) with gr.Blocks( title="Lineage Graph Accelerator", fill_height=True ) as demo: # Header Banner gr.HTML("""

Lineage Graph Accelerator

AI-powered data lineage extraction and visualization for modern data platforms

By Aaman Lamba GitHub
Gradio 6 MCP Integration Gemini AI 5 Export Formats
""") gr.Markdown(""" ### What You Can Do | Feature | Description | |---------|-------------| | **Extract Lineage** | Parse metadata from dbt manifests, Airflow DAGs, SQL DDL, BigQuery, and custom JSON | | **Visualize** | Generate interactive Mermaid diagrams with color-coded nodes and relationship labels | | **Export** | Export to enterprise data catalogs: OpenLineage, Collibra, Purview, Alation, Atlas | | **MCP Integration** | Connect to MCP servers for AI-powered metadata extraction | | **AI Assistant** | Chat with Gemini to generate lineage from natural language descriptions | ### Quick Start 1. **Try the Demo**: Enable "Use MCP Server" and select "Local Demo MCP" to fetch sample lineage metadata 2. **Use Your Data**: Paste your dbt manifest, Airflow DAG, or custom JSON in the Text/File tab 3. **Load Samples**: Click "Load Sample" in the Demo Gallery to explore pre-built examples 4. **Export**: Use the Export section to generate catalog-ready JSON --- """) # MCP Server Configuration (collapsible) with gr.Accordion("MCP Server Configuration", open=True): gr.Markdown(""" **Connect to MCP Servers** to fetch metadata for lineage extraction. Use the built-in **Local Demo MCP** for testing, or connect to external servers on HuggingFace. """) with gr.Row(): use_mcp_checkbox = gr.Checkbox( label="Use MCP Server for Metadata", value=False, info="Enable to fetch lineage metadata from MCP server instead of local input" ) mcp_preset = gr.Dropdown( choices=[ ("-- Select Preset --", ""), ("Local Demo MCP (Built-in)", "local_demo"), ("MCP Tools by abidlabs", "mcp_tools"), ("HuggingFace MCP by dylanebert", "huggingface_mcp"), ("Ragmint RAG Pipeline", "ragmint"), ("Web Search MCP", "web_search"), ], label="Preset MCP Servers", value="", scale=2 ) with gr.Row(): mcp_server = gr.Textbox( label="MCP Server URL", placeholder="Select a preset or enter custom URL", info="local://demo for built-in demo, or external MCP URL", scale=3 ) mcp_query = gr.Textbox( label="MCP Query (Optional)", placeholder="e.g., 'ecommerce', 'ml pipeline', 'warehouse'", info="Query to filter metadata from MCP server", scale=2 ) with gr.Row(): mcp_api_key = gr.Textbox( label="API Key (Optional)", placeholder="API key if required", type="password", scale=2 ) test_btn = gr.Button("Test Connection", size="sm", scale=1) mcp_description = gr.Textbox(label="Server Description", interactive=False, lines=2) mcp_status = gr.Textbox(label="Connection Status", interactive=False) # Wire up preset selection to update URL and description mcp_preset.change(fn=get_preset_url, inputs=[mcp_preset], outputs=[mcp_server]) mcp_preset.change(fn=get_preset_description, inputs=[mcp_preset], outputs=[mcp_description]) test_btn.click(fn=test_mcp_connection, inputs=[mcp_server, mcp_api_key], outputs=[mcp_status]) # Main Tabs with gr.Tabs(): # Tab 1: Text/File Input with gr.Tab("Text/File Metadata", id="text"): with gr.Row(): with gr.Column(scale=1): gr.Markdown("### Input") # Sample selector with gr.Row(): sample_selector = gr.Dropdown( choices=[ ("Simple JSON", "simple"), ("dbt Manifest", "dbt"), ("Airflow DAG", "airflow"), ("SQL DDL", "sql"), ("Data Warehouse", "warehouse"), ("ETL Pipeline", "etl"), ("Complex Demo", "complex") ], label="Load Sample Data", value="simple" ) load_sample_btn = gr.Button("Load Sample", size="sm") metadata_input = gr.Textbox( label="Metadata Content", placeholder="Paste your metadata here (JSON, YAML, SQL, dbt manifest, Airflow DAG, etc.)", lines=18 ) with gr.Row(): source_type = gr.Dropdown( choices=["dbt Manifest", "Airflow DAG", "SQL DDL", "Data Warehouse", "ETL Pipeline", "Custom JSON", "Other"], label="Source Type", value="Custom JSON" ) viz_format = gr.Dropdown( choices=["Mermaid", "DOT/Graphviz", "Text"], label="Visualization Format", value="Mermaid" ) extract_btn = gr.Button("Extract Lineage", variant="primary", size="lg") with gr.Column(scale=1): gr.Markdown("### Visualization") output_viz = gr.HTML(label="Lineage Graph") output_summary = gr.Textbox(label="Summary", lines=3) # Export section with gr.Accordion("Export to Data Catalog", open=False): export_format = gr.Dropdown( choices=EXPORT_FORMATS, label="Export Format", value="OpenLineage" ) export_btn = gr.Button("Generate Export", variant="secondary") export_output = gr.Code(label="Export Content", language="json", lines=10) export_status = gr.Textbox(label="Export Status", interactive=False) # Event handlers load_sample_btn.click( fn=load_sample, inputs=[sample_selector], outputs=[metadata_input] ) extract_btn.click( fn=extract_lineage_from_text, inputs=[metadata_input, source_type, viz_format, use_mcp_checkbox, mcp_server, mcp_query], outputs=[output_viz, output_summary] ) export_btn.click( fn=export_lineage, inputs=[metadata_input, source_type, export_format], outputs=[export_output, export_status] ) # Tab 2: BigQuery with gr.Tab("BigQuery", id="bigquery"): with gr.Row(): with gr.Column(): bq_project = gr.Textbox( label="Project ID", placeholder="your-gcp-project-id" ) bq_query = gr.Textbox( label="Metadata Query", placeholder="SELECT * FROM `project.dataset.INFORMATION_SCHEMA.TABLES`", lines=10 ) load_bq_sample = gr.Button("Load Sample Query", size="sm") bq_creds = gr.Textbox( label="Service Account JSON (optional)", type="password" ) bq_viz_format = gr.Dropdown( choices=["Mermaid", "DOT/Graphviz", "Text"], label="Visualization Format", value="Mermaid" ) bq_extract_btn = gr.Button("Extract Lineage", variant="primary") with gr.Column(): bq_output_viz = gr.HTML(label="Lineage Graph") bq_output_summary = gr.Textbox(label="Summary", lines=3) load_bq_sample.click( fn=lambda: load_sample("bigquery"), outputs=[bq_query] ) bq_extract_btn.click( fn=extract_lineage_from_bigquery, inputs=[bq_project, bq_query, bq_creds, bq_viz_format], outputs=[bq_output_viz, bq_output_summary] ) # Tab 3: URL/API with gr.Tab("URL/API", id="url"): with gr.Row(): with gr.Column(): url_input = gr.Textbox( label="Metadata URL", placeholder="https://api.example.com/metadata" ) load_url_sample = gr.Button("Load Sample API Metadata", size="sm") url_viz_format = gr.Dropdown( choices=["Mermaid", "DOT/Graphviz", "Text"], label="Visualization Format", value="Mermaid" ) url_extract_btn = gr.Button("Extract Lineage", variant="primary") with gr.Column(): url_output_viz = gr.HTML(label="Lineage Graph") url_output_summary = gr.Textbox(label="Summary", lines=3) load_url_sample.click( fn=lambda: load_sample("api"), outputs=[url_input] ) url_extract_btn.click( fn=extract_lineage_from_url, inputs=[url_input, url_viz_format], outputs=[url_output_viz, url_output_summary] ) # Tab 4: Demo Gallery with gr.Tab("Demo Gallery", id="gallery"): gr.Markdown(""" ## Sample Lineage Visualizations Click any example below to see the lineage visualization. """) with gr.Row(): demo_simple = gr.Button("E-Commerce (Simple)") demo_dbt = gr.Button("dbt Project") demo_airflow = gr.Button("Airflow DAG") with gr.Row(): demo_warehouse = gr.Button("Data Warehouse") demo_etl = gr.Button("ETL Pipeline") demo_complex = gr.Button("Complex Platform") demo_viz = gr.HTML(label="Demo Visualization") demo_summary = gr.Textbox(label="Description", lines=2) # Demo handlers for btn, sample_type in [(demo_simple, "simple"), (demo_dbt, "dbt"), (demo_airflow, "airflow"), (demo_warehouse, "warehouse"), (demo_etl, "etl"), (demo_complex, "complex")]: btn.click( fn=lambda st=sample_type: extract_lineage_from_text( load_sample(st), st.replace("_", " ").title(), "Mermaid" ), outputs=[demo_viz, demo_summary] ) # Tab 5: AI Chatbot (Gemini) with gr.Tab("AI Assistant", id="chatbot"): gr.Markdown(""" ## Lineage AI Assistant (Powered by Google Gemini) Ask questions about data lineage, describe your data pipeline in natural language, and get JSON metadata you can use to visualize lineage. **Examples:** - "I have a PostgreSQL database that feeds into a Spark ETL job, which outputs to a Snowflake warehouse" - "Generate lineage for a dbt project with staging, intermediate, and mart layers" - "What's the best way to document column-level lineage?" """) with gr.Row(): with gr.Column(scale=2): gemini_api_key = gr.Textbox( label="Google Gemini API Key", placeholder="Enter your Gemini API key (get one at makersuite.google.com)", type="password", info="Your API key is not stored and only used for this session" ) chatbot_display = gr.Chatbot( label="Chat with Lineage AI", height=400 ) with gr.Row(): chat_input = gr.Textbox( label="Your message", placeholder="Describe your data pipeline or ask about lineage...", lines=2, scale=4 ) send_btn = gr.Button("Send", variant="primary", scale=1) with gr.Accordion("Generated JSON (if any)", open=False): generated_json = gr.Code( label="Extracted JSON", language="json", lines=10 ) use_json_btn = gr.Button("Use This JSON in Lineage Tool", size="sm") json_status = gr.Textbox(label="Status", interactive=False) # Chat handlers chat_state = gr.State([]) def handle_chat(message, history, api_key): if not message.strip(): return history, "", history new_history, extracted = chat_with_gemini(message, history, api_key) return new_history, extracted, new_history send_btn.click( fn=handle_chat, inputs=[chat_input, chat_state, gemini_api_key], outputs=[chatbot_display, generated_json, chat_state] ).then( fn=lambda: "", outputs=[chat_input] ) chat_input.submit( fn=handle_chat, inputs=[chat_input, chat_state, gemini_api_key], outputs=[chatbot_display, generated_json, chat_state] ).then( fn=lambda: "", outputs=[chat_input] ) use_json_btn.click( fn=use_generated_json, inputs=[generated_json], outputs=[metadata_input, source_type, json_status] ) # Footer gr.Markdown(""" --- ### Export Formats Supported | Format | Description | Use Case | |--------|-------------|----------| | **OpenLineage** | Open standard for lineage | Universal compatibility | | **Collibra** | Collibra Data Intelligence | Enterprise data governance | | **Purview** | Microsoft Purview | Azure ecosystem | | **Alation** | Alation Data Catalog | Self-service analytics | | **Atlas** | Apache Atlas | Open-source governance | --- ### 🎥 Watch the Demo See all features in action: [YouTube Demo Video (2:30)](https://youtu.be/U4Dfc7txa_0) --- Built with Gradio for the **Gradio Agents & MCP Hackathon - Winter 2025** by [Aaman Lamba](https://aamanlamba.com) [GitHub](https://github.com/aamanlamba/lineage-graph-accelerator) | [Documentation](https://huggingface.co/spaces/aamanlamba/Lineage-graph-accelerator/blob/main/USER_GUIDE.md) | [HuggingFace Space](https://huggingface.co/spaces/aamanlamba/Lineage-graph-accelerator) """) # Launch if __name__ == "__main__": demo.launch(ssr_mode=False)