Spaces:
Running
Running
| """ | |
| Server management utilities for Universal MCP Client | |
| Version 2.0 - Enhanced with server enabling/disabling support | |
| """ | |
| import asyncio | |
| import re | |
| import logging | |
| import traceback | |
| from typing import Tuple | |
| from config import MCPServerConfig | |
| from mcp_client import UniversalMCPClient | |
| logger = logging.getLogger(__name__) | |
| class ServerManager: | |
| """Manages MCP server connections and status with enhanced server management""" | |
| def __init__(self, mcp_client: UniversalMCPClient): | |
| self.mcp_client = mcp_client | |
| def convert_hf_space_to_url(self, space_name: str) -> str: | |
| """ | |
| Convert HuggingFace space name to proper URL format. | |
| HuggingFace URL rules: | |
| - Replace "/" with "-" | |
| - Convert to lowercase | |
| - Replace dots and other special chars with "-" | |
| - Remove consecutive hyphens | |
| """ | |
| if "/" not in space_name: | |
| raise ValueError("Space name should be in format: username/space-name") | |
| # Replace "/" with "-" | |
| url_name = space_name.replace("/", "-") | |
| # Convert to lowercase | |
| url_name = url_name.lower() | |
| # Replace dots and other special characters with hyphens | |
| url_name = re.sub(r'[^a-z0-9\-]', '-', url_name) | |
| # Remove consecutive hyphens | |
| url_name = re.sub(r'-+', '-', url_name) | |
| # Remove leading/trailing hyphens | |
| url_name = url_name.strip('-') | |
| return f"https://{url_name}.hf.space" | |
| def add_custom_server(self, name: str, space_name: str) -> Tuple[str, str]: | |
| """Add a custom MCP server from HuggingFace space name""" | |
| logger.info(f"β Adding MCP server: {name} from space: {space_name}") | |
| if not name or not space_name: | |
| return "β Please provide both server name and space name", "" | |
| space_name = space_name.strip() | |
| try: | |
| # Use the improved URL conversion | |
| mcp_url = self.convert_hf_space_to_url(space_name) | |
| logger.info(f"π Converted {space_name} β {mcp_url}") | |
| except ValueError as e: | |
| return f"β {str(e)}", "" | |
| config = MCPServerConfig( | |
| name=name.strip(), | |
| url=mcp_url, | |
| description=f"MCP server from HuggingFace space: {space_name}", | |
| space_id=space_name | |
| ) | |
| try: | |
| # Run async function | |
| def run_async(): | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| return loop.run_until_complete(self.mcp_client.add_server_async(config)) | |
| finally: | |
| loop.close() | |
| success, message = run_async() | |
| logger.info(f"Server addition result: {success} - {message}") | |
| if success: | |
| # Format success message for accordion display | |
| tools_info = "" | |
| if 'Found' in message and 'tools:' in message: | |
| tools_section = message.split('Found')[1] | |
| tools_info = f"**Available Tools:**\n{tools_section}" | |
| details_html = f""" | |
| <details style="margin-top: 10px;"> | |
| <summary style="cursor: pointer; padding: 8px; background: #f0f0f0; border-radius: 4px;"><strong>β {name} - Connection Details</strong></summary> | |
| <div style="padding: 10px; border-left: 3px solid #28a745; margin-left: 10px; margin-top: 5px;"> | |
| <p><strong>Space:</strong> {space_name}</p> | |
| <p><strong>Base URL:</strong> {mcp_url}</p> | |
| <p><strong>Status:</strong> Connected successfully!</p> | |
| <p><strong>Enabled:</strong> Yes (new servers are enabled by default)</p> | |
| <div style="margin-top: 10px;"> | |
| {tools_info.replace('**', '<strong>').replace('**', '</strong>').replace(chr(10), '<br>')} | |
| </div> | |
| </div> | |
| </details> | |
| """ | |
| return "β Server added successfully!", details_html | |
| else: | |
| error_html = f""" | |
| <details style="margin-top: 10px;"> | |
| <summary style="cursor: pointer; padding: 8px; background: #f8d7da; border-radius: 4px;"><strong>β {name} - Connection Failed</strong></summary> | |
| <div style="padding: 10px; border-left: 3px solid #dc3545; margin-left: 10px; margin-top: 5px;"> | |
| <p>{message}</p> | |
| </div> | |
| </details> | |
| """ | |
| return f"β Failed to add server: {name}", error_html | |
| except Exception as e: | |
| error_msg = f"β Failed to add server: {str(e)}" | |
| logger.error(error_msg) | |
| logger.error(traceback.format_exc()) | |
| return error_msg, "" | |
| def get_server_status(self) -> Tuple[str, str]: | |
| """Get status of all servers in accordion format""" | |
| try: | |
| logger.info(f"π Getting server status, found {len(self.mcp_client.servers)} servers") | |
| # Get server status from mcp_client | |
| server_count = len(self.mcp_client.servers) | |
| server_status_text = f"**Total MCP Servers**: {server_count}" | |
| if not self.mcp_client.servers: | |
| logger.info("π No servers found") | |
| return server_status_text, "<p><em>No MCP servers configured yet.</em></p>" | |
| accordion_html = "" | |
| for name, config in self.mcp_client.servers.items(): | |
| logger.info(f"π Processing server: {name}") | |
| base_url = config.url.replace("/gradio_api/mcp/sse", "") | |
| # Determine health status (assume healthy if it's in the servers dict) | |
| health = "π’ Healthy" | |
| accordion_html += f""" | |
| <details style="margin-bottom: 10px;"> | |
| <summary style="cursor: pointer; padding: 8px; background: #e9ecef; border-radius: 4px;"><strong>π§ {name}</strong></summary> | |
| <div style="padding: 10px; border-left: 3px solid #007bff; margin-left: 10px; margin-top: 5px;"> | |
| <p><strong>Title:</strong> {name}</p> | |
| <p><strong>Status:</strong> Connected (MCP Protocol)</p> | |
| <p><strong>Health:</strong> {health}</p> | |
| <p><strong>Base URL:</strong> {base_url}</p> | |
| <p><strong>Description:</strong> {config.description}</p> | |
| {f'<p><strong>Space ID:</strong> {config.space_id}</p>' if config.space_id else ''} | |
| </div> | |
| </details> | |
| """ | |
| logger.info(f"π Generated accordion HTML for {server_count} servers") | |
| return server_status_text, accordion_html | |
| except Exception as e: | |
| error_msg = f"Error getting server status: {str(e)}" | |
| logger.error(f"β {error_msg}") | |
| logger.error(traceback.format_exc()) | |
| return "**Total MCP Servers**: Error", f"<p style='color: red;'>β {error_msg}</p>" | |
| def remove_server(self, server_name: str) -> Tuple[str, str]: | |
| """Remove a specific server""" | |
| success = self.mcp_client.remove_server(server_name) | |
| if success: | |
| return f"β Removed server: {server_name}", "" | |
| else: | |
| return f"β Server not found: {server_name}", "" | |
| def remove_all_servers(self) -> Tuple[str, str]: | |
| """Remove all servers""" | |
| count = self.mcp_client.remove_all_servers() | |
| if count > 0: | |
| return f"β Removed all {count} MCP servers", "" | |
| else: | |
| return "βΉοΈ No servers to remove", "" | |
| def enable_server(self, server_name: str, enabled: bool = True) -> Tuple[str, str]: | |
| """Enable or disable a server""" | |
| if server_name in self.mcp_client.servers: | |
| self.mcp_client.enable_server(server_name, enabled) | |
| status = "enabled" if enabled else "disabled" | |
| return f"β Server {server_name} {status}", "" | |
| else: | |
| return f"β Server not found: {server_name}", "" | |
| def get_server_list_with_status(self) -> list: | |
| """Get server list with enable/disable status for UI components""" | |
| return self.mcp_client.get_server_list_with_status() | |