Spaces:
Running
Running
""" | |
Server management utilities for Universal MCP Client | |
Version 2.0 - Enhanced with server enabling/disabling support | |
""" | |
import asyncio | |
import re | |
import logging | |
import traceback | |
from typing import Tuple | |
from config import MCPServerConfig | |
from mcp_client import UniversalMCPClient | |
logger = logging.getLogger(__name__) | |
class ServerManager: | |
"""Manages MCP server connections and status with enhanced server management""" | |
def __init__(self, mcp_client: UniversalMCPClient): | |
self.mcp_client = mcp_client | |
def convert_hf_space_to_url(self, space_name: str) -> str: | |
""" | |
Convert HuggingFace space name to proper URL format. | |
HuggingFace URL rules: | |
- Replace "/" with "-" | |
- Convert to lowercase | |
- Replace dots and other special chars with "-" | |
- Remove consecutive hyphens | |
""" | |
if "/" not in space_name: | |
raise ValueError("Space name should be in format: username/space-name") | |
# Replace "/" with "-" | |
url_name = space_name.replace("/", "-") | |
# Convert to lowercase | |
url_name = url_name.lower() | |
# Replace dots and other special characters with hyphens | |
url_name = re.sub(r'[^a-z0-9\-]', '-', url_name) | |
# Remove consecutive hyphens | |
url_name = re.sub(r'-+', '-', url_name) | |
# Remove leading/trailing hyphens | |
url_name = url_name.strip('-') | |
return f"https://{url_name}.hf.space" | |
def add_custom_server(self, name: str, space_name: str) -> Tuple[str, str]: | |
"""Add a custom MCP server from HuggingFace space name""" | |
logger.info(f"β Adding MCP server: {name} from space: {space_name}") | |
if not name or not space_name: | |
return "β Please provide both server name and space name", "" | |
space_name = space_name.strip() | |
try: | |
# Use the improved URL conversion | |
mcp_url = self.convert_hf_space_to_url(space_name) | |
logger.info(f"π Converted {space_name} β {mcp_url}") | |
except ValueError as e: | |
return f"β {str(e)}", "" | |
config = MCPServerConfig( | |
name=name.strip(), | |
url=mcp_url, | |
description=f"MCP server from HuggingFace space: {space_name}", | |
space_id=space_name | |
) | |
try: | |
# Run async function | |
def run_async(): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
try: | |
return loop.run_until_complete(self.mcp_client.add_server_async(config)) | |
finally: | |
loop.close() | |
success, message = run_async() | |
logger.info(f"Server addition result: {success} - {message}") | |
if success: | |
# Format success message for accordion display | |
tools_info = "" | |
if 'Found' in message and 'tools:' in message: | |
tools_section = message.split('Found')[1] | |
tools_info = f"**Available Tools:**\n{tools_section}" | |
details_html = f""" | |
<details style="margin-top: 10px;"> | |
<summary style="cursor: pointer; padding: 8px; background: #f0f0f0; border-radius: 4px;"><strong>β {name} - Connection Details</strong></summary> | |
<div style="padding: 10px; border-left: 3px solid #28a745; margin-left: 10px; margin-top: 5px;"> | |
<p><strong>Space:</strong> {space_name}</p> | |
<p><strong>Base URL:</strong> {mcp_url}</p> | |
<p><strong>Status:</strong> Connected successfully!</p> | |
<p><strong>Enabled:</strong> Yes (new servers are enabled by default)</p> | |
<div style="margin-top: 10px;"> | |
{tools_info.replace('**', '<strong>').replace('**', '</strong>').replace(chr(10), '<br>')} | |
</div> | |
</div> | |
</details> | |
""" | |
return "β Server added successfully!", details_html | |
else: | |
error_html = f""" | |
<details style="margin-top: 10px;"> | |
<summary style="cursor: pointer; padding: 8px; background: #f8d7da; border-radius: 4px;"><strong>β {name} - Connection Failed</strong></summary> | |
<div style="padding: 10px; border-left: 3px solid #dc3545; margin-left: 10px; margin-top: 5px;"> | |
<p>{message}</p> | |
</div> | |
</details> | |
""" | |
return f"β Failed to add server: {name}", error_html | |
except Exception as e: | |
error_msg = f"β Failed to add server: {str(e)}" | |
logger.error(error_msg) | |
logger.error(traceback.format_exc()) | |
return error_msg, "" | |
def get_server_status(self) -> Tuple[str, str]: | |
"""Get status of all servers in accordion format""" | |
try: | |
logger.info(f"π Getting server status, found {len(self.mcp_client.servers)} servers") | |
# Get server status from mcp_client | |
server_count = len(self.mcp_client.servers) | |
server_status_text = f"**Total MCP Servers**: {server_count}" | |
if not self.mcp_client.servers: | |
logger.info("π No servers found") | |
return server_status_text, "<p><em>No MCP servers configured yet.</em></p>" | |
accordion_html = "" | |
for name, config in self.mcp_client.servers.items(): | |
logger.info(f"π Processing server: {name}") | |
base_url = config.url.replace("/gradio_api/mcp/sse", "") | |
# Determine health status (assume healthy if it's in the servers dict) | |
health = "π’ Healthy" | |
accordion_html += f""" | |
<details style="margin-bottom: 10px;"> | |
<summary style="cursor: pointer; padding: 8px; background: #e9ecef; border-radius: 4px;"><strong>π§ {name}</strong></summary> | |
<div style="padding: 10px; border-left: 3px solid #007bff; margin-left: 10px; margin-top: 5px;"> | |
<p><strong>Title:</strong> {name}</p> | |
<p><strong>Status:</strong> Connected (MCP Protocol)</p> | |
<p><strong>Health:</strong> {health}</p> | |
<p><strong>Base URL:</strong> {base_url}</p> | |
<p><strong>Description:</strong> {config.description}</p> | |
{f'<p><strong>Space ID:</strong> {config.space_id}</p>' if config.space_id else ''} | |
</div> | |
</details> | |
""" | |
logger.info(f"π Generated accordion HTML for {server_count} servers") | |
return server_status_text, accordion_html | |
except Exception as e: | |
error_msg = f"Error getting server status: {str(e)}" | |
logger.error(f"β {error_msg}") | |
logger.error(traceback.format_exc()) | |
return "**Total MCP Servers**: Error", f"<p style='color: red;'>β {error_msg}</p>" | |
def remove_server(self, server_name: str) -> Tuple[str, str]: | |
"""Remove a specific server""" | |
success = self.mcp_client.remove_server(server_name) | |
if success: | |
return f"β Removed server: {server_name}", "" | |
else: | |
return f"β Server not found: {server_name}", "" | |
def remove_all_servers(self) -> Tuple[str, str]: | |
"""Remove all servers""" | |
count = self.mcp_client.remove_all_servers() | |
if count > 0: | |
return f"β Removed all {count} MCP servers", "" | |
else: | |
return "βΉοΈ No servers to remove", "" | |
def enable_server(self, server_name: str, enabled: bool = True) -> Tuple[str, str]: | |
"""Enable or disable a server""" | |
if server_name in self.mcp_client.servers: | |
self.mcp_client.enable_server(server_name, enabled) | |
status = "enabled" if enabled else "disabled" | |
return f"β Server {server_name} {status}", "" | |
else: | |
return f"β Server not found: {server_name}", "" | |
def get_server_list_with_status(self) -> list: | |
"""Get server list with enable/disable status for UI components""" | |
return self.mcp_client.get_server_list_with_status() | |