Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| Improved Gradio Interface with MCP Client and SmolAgents | |
| """ | |
| import gradio as gr | |
| import os | |
| import asyncio | |
| import logging | |
| from typing import Optional | |
| from concurrent.futures import ThreadPoolExecutor | |
| from pathlib import Path | |
| # MCP and SmolAgents imports | |
| from smolagents import InferenceClientModel, CodeAgent, MCPClient | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ============================================================================ | |
| # ENVIRONMENT CONFIGURATION | |
| # ============================================================================ | |
| def load_environment_variables(): | |
| """ | |
| Load environment variables with platform-specific handling. | |
| For Hugging Face Spaces: Uses os.environ directly | |
| For local development: Loads from .env file with fallbacks | |
| """ | |
| def is_huggingface_space(): | |
| """Detect if running on Hugging Face Spaces platform""" | |
| return ( | |
| os.getenv("SPACE_ID") is not None or | |
| os.getenv("SPACE_AUTHOR_NAME") is not None or | |
| os.getenv("GRADIO_SERVER_NAME") is not None | |
| ) | |
| def load_dotenv_file(): | |
| """Load variables from .env file for local development""" | |
| env_file = Path(".env") | |
| env_vars = {} | |
| if env_file.exists(): | |
| try: | |
| with open(env_file, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| line = line.strip() | |
| if line and not line.startswith('#') and '=' in line: | |
| key, value = line.split('=', 1) | |
| # Remove quotes if present | |
| value = value.strip().strip('"').strip("'") | |
| env_vars[key.strip()] = value | |
| logger.info(f"Loaded {len(env_vars)} variables from .env file") | |
| except Exception as e: | |
| logger.warning(f"Failed to load .env file: {e}") | |
| else: | |
| logger.info("No .env file found, using system environment variables") | |
| return env_vars | |
| # Platform detection | |
| on_huggingface = is_huggingface_space() | |
| logger.info(f"Platform detected: {'Hugging Face Spaces' if on_huggingface else 'Local Development'}") | |
| # Load environment variables | |
| if on_huggingface: | |
| # On Hugging Face Spaces: use environment variables directly | |
| hf_token = os.getenv("HF_TOKEN") | |
| mcp_server_url = os.getenv("MCP_SERVER_URL", "http://localhost:7860/gradio_api/mcp/sse") | |
| logger.info("Using Hugging Face Spaces environment variables") | |
| else: | |
| # Local development: try .env file first, then system environment | |
| env_vars = load_dotenv_file() | |
| # Get HF_TOKEN | |
| hf_token = ( | |
| env_vars.get("HF_TOKEN") or | |
| os.getenv("HF_TOKEN") | |
| ) | |
| # Get MCP_SERVER_URL | |
| mcp_server_url = ( | |
| env_vars.get("MCP_SERVER_URL") or | |
| os.getenv("MCP_SERVER_URL") or | |
| "http://localhost:7860/gradio_api/mcp/sse" # Default for local development | |
| ) | |
| logger.info("Using local development environment configuration") | |
| # Validate required variables | |
| if not hf_token: | |
| logger.error("HF_TOKEN not found in environment variables!") | |
| logger.error("Please set HF_TOKEN in:") | |
| if on_huggingface: | |
| logger.error("- Hugging Face Spaces secrets/environment variables") | |
| else: | |
| logger.error("- .env file (HF_TOKEN=your_token_here)") | |
| logger.error("- System environment variables") | |
| raise ValueError("HF_TOKEN is required but not found") | |
| # Set environment variables for the application | |
| os.environ["HF_TOKEN"] = hf_token | |
| # Log configuration (without exposing sensitive data) | |
| logger.info(f"HF_TOKEN configured: {'β' if hf_token else 'β'}") | |
| logger.info(f"MCP_SERVER_URL: {mcp_server_url}") | |
| return hf_token, mcp_server_url | |
| # Load environment configuration | |
| try: | |
| HF_TOKEN, MCP_SERVER_URL = load_environment_variables() | |
| except Exception as e: | |
| logger.error(f"Failed to load environment configuration: {e}") | |
| # Fallback values for development/testing | |
| HF_TOKEN = os.getenv("HF_TOKEN", "") | |
| MCP_SERVER_URL = "http://localhost:7860/gradio_api/mcp/sse" | |
| if not HF_TOKEN: | |
| logger.warning("Running without HF_TOKEN - some features may not work") | |
| # Global variables | |
| agent: Optional[CodeAgent] = None | |
| tools: Optional[list] = None | |
| mcp_client: Optional[MCPClient] = None | |
| executor = ThreadPoolExecutor(max_workers=4) | |
| # ============================================================================ | |
| # ASYNC UTILITIES | |
| # ============================================================================ | |
| def run_async_in_thread(coro): | |
| """Run async coroutine in a separate thread with its own event loop""" | |
| def run_in_thread(): | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| return loop.run_until_complete(coro) | |
| finally: | |
| loop.close() | |
| future = executor.submit(run_in_thread) | |
| return future.result(timeout=60) | |
| # ============================================================================ | |
| # MCP CLIENT AND AGENT INITIALIZATION | |
| # ============================================================================ | |
| async def _initialize_mcp_client(): | |
| """Async helper for MCP client initialization""" | |
| global mcp_client, tools, agent | |
| logger.info(f"Connecting to MCP server at: {MCP_SERVER_URL}") | |
| # Create MCP Client connection | |
| mcp_client = MCPClient( | |
| {"url": MCP_SERVER_URL, "transport": "sse"} | |
| ) | |
| # Get available tools from the MCP server | |
| logger.info("Retrieving tools from MCP server...") | |
| all_tools = mcp_client.get_tools() | |
| logger.info(f"Retrieved {len(all_tools)} tools from MCP server") | |
| # Filter out duplicate tool names | |
| seen_names = set() | |
| tools = [] | |
| for tool in all_tools: | |
| tool_name = getattr(tool, 'name', f'tool_{len(tools)}') | |
| if tool_name not in seen_names and not tool_name.startswith('lambda'): | |
| seen_names.add(tool_name) | |
| tools.append(tool) | |
| logger.info(f"Using {len(tools)} unique tools (filtered out duplicates and lambda functions)") | |
| for i, tool in enumerate(tools, 1): | |
| tool_name = getattr(tool, 'name', f'tool_{i}') | |
| tool_desc = getattr(tool, 'description', 'No description available') | |
| logger.info(f" {i}. {tool_name}: {tool_desc}") | |
| # Initialize the Hugging Face model | |
| logger.info("Initializing InferenceClientModel...") | |
| try: | |
| # Try with different free models (2024-2025 Π°ΠΊΡΡΠ°Π»ΡΠ½ΡΠ΅) | |
| models_to_try = [ | |
| "facebook/blenderbot-400M-distill", | |
| "google/flan-t5-small", | |
| "huggingface/CodeBERTa-small-v1", | |
| "bigscience/bloom-560m" | |
| ] | |
| model = None | |
| for model_name in models_to_try: | |
| try: | |
| logger.info(f"Trying model: {model_name}") | |
| model = InferenceClientModel( | |
| model=model_name, | |
| token=os.getenv("HF_TOKEN"), | |
| timeout=30 | |
| ) | |
| logger.info(f"Successfully initialized with {model_name}") | |
| break | |
| except Exception as model_error: | |
| logger.warning(f"Failed with {model_name}: {model_error}") | |
| continue | |
| if model is None: | |
| # Last resort - try without specifying model | |
| logger.info("Trying default model configuration...") | |
| model = InferenceClientModel( | |
| token=os.getenv("HF_TOKEN"), | |
| timeout=30 | |
| ) | |
| except Exception as e: | |
| logger.error(f"All model initialization attempts failed: {e}") | |
| # Create a mock model for testing | |
| logger.info("Creating mock model for testing...") | |
| class MockModel: | |
| def __init__(self): | |
| self.name = "MockModel" | |
| async def __call__(self, messages, **kwargs): | |
| # Simple mock response | |
| if messages and len(messages) > 0: | |
| user_message = str(messages[-1]).lower() | |
| if "sentiment" in user_message or "analyze" in user_message: | |
| return "I'll help you analyze sentiment. Please use the analyze_sentiment tool." | |
| elif "health" in user_message: | |
| return "I'll check the system health for you using the health_check tool." | |
| elif "tools" in user_message: | |
| return "Let me show you the available tools using the get_backend_info tool." | |
| else: | |
| return "I can help you with sentiment analysis. What would you like to analyze?" | |
| return "How can I help you with sentiment analysis?" | |
| model = MockModel() | |
| logger.info("Mock model created successfully") | |
| # Create the CodeAgent with discovered tools | |
| logger.info("Creating CodeAgent with MCP tools...") | |
| try: | |
| agent = CodeAgent( | |
| tools=[*tools], | |
| model=model, | |
| max_steps=3 | |
| ) | |
| logger.info("CodeAgent created successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to create CodeAgent: {e}") | |
| # Create a simple agent wrapper | |
| logger.info("Creating simple agent wrapper...") | |
| class SimpleAgent: | |
| def __init__(self, tools, model): | |
| self.tools = tools | |
| self.model = model | |
| self.name = "SimpleAgent" | |
| async def run(self, query): | |
| query_lower = query.lower() | |
| # Direct tool mapping | |
| if "analyze" in query_lower and ":" in query: | |
| text = query.split(":", 1)[1].strip().strip("'\"") | |
| for tool in self.tools: | |
| if getattr(tool, 'name', '') == 'analyze_sentiment': | |
| try: | |
| result = await tool(text=text) | |
| return f"Sentiment Analysis Result: {result}" | |
| except Exception as e: | |
| return f"Error analyzing sentiment: {e}" | |
| elif "health" in query_lower: | |
| for tool in self.tools: | |
| if 'health' in getattr(tool, 'name', '').lower(): | |
| try: | |
| result = await tool() | |
| return f"Health Check: {result}" | |
| except Exception as e: | |
| return f"Error checking health: {e}" | |
| elif "tools" in query_lower or "available" in query_lower: | |
| tool_list = [] | |
| for i, tool in enumerate(self.tools, 1): | |
| tool_name = getattr(tool, 'name', f'tool_{i}') | |
| tool_desc = getattr(tool, 'description', 'No description') | |
| tool_list.append(f"{i}. {tool_name}: {tool_desc}") | |
| return "Available tools:\n" + "\n".join(tool_list) | |
| elif "batch" in query_lower: | |
| for tool in self.tools: | |
| if 'batch' in getattr(tool, 'name', '').lower(): | |
| try: | |
| texts = ["I love this!", "This is terrible", "It's okay"] | |
| result = await tool(texts=texts) | |
| return f"Batch Analysis: {result}" | |
| except Exception as e: | |
| return f"Error in batch analysis: {e}" | |
| else: | |
| return """I can help you with: | |
| β’ Sentiment Analysis: "analyze: [your text]" | |
| β’ Health Check: "health check" | |
| β’ List Tools: "what tools are available?" | |
| β’ Batch Analysis: "run batch analysis" | |
| Example: "analyze: I love this product!" """ | |
| agent = SimpleAgent(tools, model) | |
| logger.info("Simple agent wrapper created successfully") | |
| logger.info("MCP client and agent initialized successfully!") | |
| return True | |
| def initialize_mcp_client(): | |
| """Initialize MCP client and connect to the local server""" | |
| try: | |
| success = run_async_in_thread(_initialize_mcp_client()) | |
| return success | |
| except Exception as e: | |
| logger.error(f"Failed to initialize MCP client: {str(e)}") | |
| return False | |
| # ============================================================================ | |
| # GRADIO INTERFACE FUNCTIONS | |
| # ============================================================================ | |
| async def _process_question_async(question: str): | |
| """Async helper for processing questions""" | |
| global agent | |
| if not agent: | |
| return "β Agent not initialized. Please check MCP server connection." | |
| logger.info(f"Processing question: {question}") | |
| response = await agent.run(question) | |
| logger.info("Question processed successfully") | |
| return str(response) | |
| def process_question(question: str, history: list) -> tuple: | |
| """Process user question using the MCP-enabled agent""" | |
| if not question.strip(): | |
| error_msg = "β οΈ Please enter a question." | |
| history.append({"role": "user", "content": question}) | |
| history.append({"role": "assistant", "content": error_msg}) | |
| return history, "" | |
| try: | |
| # Add user question to history with thinking indicator | |
| history.append({"role": "user", "content": question}) | |
| history.append({"role": "assistant", "content": "π€ Thinking..."}) | |
| # Process the question in a separate thread | |
| response = run_async_in_thread(_process_question_async(question)) | |
| # Update history with the response | |
| history[-1]["content"] = f"π€ {response}" | |
| except Exception as e: | |
| error_msg = f"β Error processing question: {str(e)}" | |
| history[-1]["content"] = error_msg | |
| logger.error(f"Error processing question: {str(e)}") | |
| return history, "" | |
| def get_server_status() -> str: | |
| """Get current server connection status""" | |
| global agent, tools, mcp_client | |
| status_parts = [] | |
| # MCP Client Status | |
| if mcp_client: | |
| status_parts.append("β MCP Client: Connected") | |
| else: | |
| status_parts.append("β MCP Client: Not connected") | |
| # Tools Status | |
| if tools: | |
| status_parts.append(f"β Tools: {len(tools)} available") | |
| for i, tool in enumerate(tools[:5], 1): | |
| tool_name = getattr(tool, 'name', f'tool_{i}') | |
| tool_desc = getattr(tool, 'description', 'No description') | |
| status_parts.append(f" β’ {tool_name}: {tool_desc[:50]}...") | |
| if len(tools) > 5: | |
| status_parts.append(f" ... and {len(tools) - 5} more tools") | |
| else: | |
| status_parts.append("β Tools: None available") | |
| # Agent Status | |
| if agent: | |
| status_parts.append("β Agent: Ready") | |
| else: | |
| status_parts.append("β Agent: Not initialized") | |
| status_parts.append(f"π Server URL: {MCP_SERVER_URL}") | |
| return "\n".join(status_parts) | |
| def reconnect_to_server(): | |
| """Attempt to reconnect to the MCP server""" | |
| try: | |
| success = initialize_mcp_client() | |
| if success: | |
| return "β Successfully reconnected to MCP server!" | |
| else: | |
| return "β Failed to reconnect. Please check if the MCP server is running." | |
| except Exception as e: | |
| return f"β Reconnection error: {str(e)}" | |
| # ============================================================================ | |
| # GRADIO INTERFACE CREATION | |
| # ============================================================================ | |
| def create_gradio_interface(): | |
| """Create and configure the Gradio interface""" | |
| css = """ | |
| .gradio-container { | |
| max-width: 1200px !important; | |
| } | |
| .status-box { | |
| background-color: #f8f9fa; | |
| border: 1px solid #dee2e6; | |
| border-radius: 8px; | |
| padding: 15px; | |
| font-family: monospace; | |
| font-size: 12px; | |
| white-space: pre-line; | |
| } | |
| """ | |
| with gr.Blocks( | |
| title="MCP Sentiment Analysis Client v2", | |
| theme=gr.themes.Soft(), | |
| css=css | |
| ) as interface: | |
| # Header | |
| gr.Markdown(""" | |
| # π MCP Sentiment Analysis Client v2 | |
| **Improved Version** - Connect to your local MCP server using SmolAgents for AI-powered sentiment analysis. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| # Main chat interface | |
| chatbot = gr.Chatbot( | |
| label="Chat with MCP Agent", | |
| height=500, | |
| show_label=True, | |
| container=True, | |
| type='messages' | |
| ) | |
| with gr.Row(): | |
| question_input = gr.Textbox( | |
| placeholder="Ask about sentiment analysis (e.g., 'Analyze: I love this product!')", | |
| label="Your Question", | |
| lines=2, | |
| scale=4 | |
| ) | |
| submit_btn = gr.Button("Submit", variant="primary", scale=1) | |
| # Quick action buttons | |
| with gr.Row(): | |
| gr.Markdown("### π‘ Quick Actions:") | |
| with gr.Row(): | |
| examples = [ | |
| "Analyze: 'I love this!'", | |
| "Analyze: 'This is terrible'", | |
| "What tools are available?", | |
| "Health check" | |
| ] | |
| for example in examples: | |
| btn = gr.Button(example, size="sm", scale=1) | |
| btn.click( | |
| lambda x=example: x, | |
| outputs=question_input | |
| ) | |
| with gr.Column(scale=1): | |
| # Server status and controls | |
| gr.Markdown("### π§ Server Status") | |
| status_display = gr.Textbox( | |
| label="Connection Status", | |
| lines=12, | |
| interactive=False, | |
| elem_classes=["status-box"] | |
| ) | |
| with gr.Row(): | |
| refresh_btn = gr.Button("π Refresh", size="sm") | |
| reconnect_btn = gr.Button("π Reconnect", size="sm", variant="secondary") | |
| # Information panel | |
| gr.Markdown(""" | |
| ### βΉοΈ Quick Guide | |
| **Example Questions:** | |
| - "Analyze the sentiment of: [your text]" | |
| - "What's the sentiment of multiple texts?" | |
| - "Check system health" | |
| - "What tools do you have?" | |
| **Tips:** | |
| - Be specific with your requests | |
| - Wait for responses (may take 10-30 seconds) | |
| - Use the reconnect button if you see errors | |
| """) | |
| # Event handlers | |
| submit_btn.click( | |
| process_question, | |
| inputs=[question_input, chatbot], | |
| outputs=[chatbot, question_input] | |
| ) | |
| question_input.submit( | |
| process_question, | |
| inputs=[question_input, chatbot], | |
| outputs=[chatbot, question_input] | |
| ) | |
| refresh_btn.click( | |
| get_server_status, | |
| outputs=status_display | |
| ) | |
| reconnect_btn.click( | |
| reconnect_to_server, | |
| outputs=status_display | |
| ) | |
| # Initialize status display on load | |
| interface.load( | |
| get_server_status, | |
| outputs=status_display | |
| ) | |
| return interface | |
| # ============================================================================ | |
| # MAIN EXECUTION | |
| # ============================================================================ | |
| def main(): | |
| """Main function to initialize and launch the application""" | |
| print("π Starting MCP Gradio Client v2...") | |
| print(f"π Connecting to MCP server at: {MCP_SERVER_URL}") | |
| # Initialize MCP client and agent | |
| print("βοΈ Initializing MCP client and agent...") | |
| success = initialize_mcp_client() | |
| if success: | |
| print("β Initialization successful!") | |
| else: | |
| print("β οΈ Initialization failed, but interface will still launch.") | |
| print(" You can try reconnecting using the interface.") | |
| # Create and launch Gradio interface | |
| print("π Creating Gradio interface...") | |
| interface = create_gradio_interface() | |
| print("π Launching interface...") | |
| print("π Press Ctrl+C to stop the server") | |
| # Launch the interface | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7862, | |
| share=False, | |
| debug=False, | |
| show_error=True | |
| ) | |
| if __name__ == "__main__": | |
| main() |