chat.gradio.app-HFIPs / server_manager.py
ysharma's picture
ysharma HF Staff
Create server_manager.py
2b232e8 verified
"""
Server management utilities for Universal MCP Client
Version 2.0 - Enhanced with server enabling/disabling support
"""
import asyncio
import re
import logging
import traceback
from typing import Tuple
from config import MCPServerConfig
from mcp_client import UniversalMCPClient
logger = logging.getLogger(__name__)
class ServerManager:
"""Manages MCP server connections and status with enhanced server management"""
def __init__(self, mcp_client: UniversalMCPClient):
self.mcp_client = mcp_client
def convert_hf_space_to_url(self, space_name: str) -> str:
"""
Convert HuggingFace space name to proper URL format.
HuggingFace URL rules:
- Replace "/" with "-"
- Convert to lowercase
- Replace dots and other special chars with "-"
- Remove consecutive hyphens
"""
if "/" not in space_name:
raise ValueError("Space name should be in format: username/space-name")
# Replace "/" with "-"
url_name = space_name.replace("/", "-")
# Convert to lowercase
url_name = url_name.lower()
# Replace dots and other special characters with hyphens
url_name = re.sub(r'[^a-z0-9\-]', '-', url_name)
# Remove consecutive hyphens
url_name = re.sub(r'-+', '-', url_name)
# Remove leading/trailing hyphens
url_name = url_name.strip('-')
return f"https://{url_name}.hf.space"
def add_custom_server(self, name: str, space_name: str) -> Tuple[str, str]:
"""Add a custom MCP server from HuggingFace space name"""
logger.info(f"βž• Adding MCP server: {name} from space: {space_name}")
if not name or not space_name:
return "❌ Please provide both server name and space name", ""
space_name = space_name.strip()
try:
# Use the improved URL conversion
mcp_url = self.convert_hf_space_to_url(space_name)
logger.info(f"πŸ”— Converted {space_name} β†’ {mcp_url}")
except ValueError as e:
return f"❌ {str(e)}", ""
config = MCPServerConfig(
name=name.strip(),
url=mcp_url,
description=f"MCP server from HuggingFace space: {space_name}",
space_id=space_name
)
try:
# Run async function
def run_async():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(self.mcp_client.add_server_async(config))
finally:
loop.close()
success, message = run_async()
logger.info(f"Server addition result: {success} - {message}")
if success:
# Format success message for accordion display
tools_info = ""
if 'Found' in message and 'tools:' in message:
tools_section = message.split('Found')[1]
tools_info = f"**Available Tools:**\n{tools_section}"
details_html = f"""
<details style="margin-top: 10px;">
<summary style="cursor: pointer; padding: 8px; background: #f0f0f0; border-radius: 4px;"><strong>βœ… {name} - Connection Details</strong></summary>
<div style="padding: 10px; border-left: 3px solid #28a745; margin-left: 10px; margin-top: 5px;">
<p><strong>Space:</strong> {space_name}</p>
<p><strong>Base URL:</strong> {mcp_url}</p>
<p><strong>Status:</strong> Connected successfully!</p>
<p><strong>Enabled:</strong> Yes (new servers are enabled by default)</p>
<div style="margin-top: 10px;">
{tools_info.replace('**', '<strong>').replace('**', '</strong>').replace(chr(10), '<br>')}
</div>
</div>
</details>
"""
return "βœ… Server added successfully!", details_html
else:
error_html = f"""
<details style="margin-top: 10px;">
<summary style="cursor: pointer; padding: 8px; background: #f8d7da; border-radius: 4px;"><strong>❌ {name} - Connection Failed</strong></summary>
<div style="padding: 10px; border-left: 3px solid #dc3545; margin-left: 10px; margin-top: 5px;">
<p>{message}</p>
</div>
</details>
"""
return f"❌ Failed to add server: {name}", error_html
except Exception as e:
error_msg = f"❌ Failed to add server: {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
return error_msg, ""
def get_server_status(self) -> Tuple[str, str]:
"""Get status of all servers in accordion format"""
try:
logger.info(f"πŸ” Getting server status, found {len(self.mcp_client.servers)} servers")
# Get server status from mcp_client
server_count = len(self.mcp_client.servers)
server_status_text = f"**Total MCP Servers**: {server_count}"
if not self.mcp_client.servers:
logger.info("πŸ“Š No servers found")
return server_status_text, "<p><em>No MCP servers configured yet.</em></p>"
accordion_html = ""
for name, config in self.mcp_client.servers.items():
logger.info(f"πŸ“Š Processing server: {name}")
base_url = config.url.replace("/gradio_api/mcp/sse", "")
# Determine health status (assume healthy if it's in the servers dict)
health = "🟒 Healthy"
accordion_html += f"""
<details style="margin-bottom: 10px;">
<summary style="cursor: pointer; padding: 8px; background: #e9ecef; border-radius: 4px;"><strong>πŸ”§ {name}</strong></summary>
<div style="padding: 10px; border-left: 3px solid #007bff; margin-left: 10px; margin-top: 5px;">
<p><strong>Title:</strong> {name}</p>
<p><strong>Status:</strong> Connected (MCP Protocol)</p>
<p><strong>Health:</strong> {health}</p>
<p><strong>Base URL:</strong> {base_url}</p>
<p><strong>Description:</strong> {config.description}</p>
{f'<p><strong>Space ID:</strong> {config.space_id}</p>' if config.space_id else ''}
</div>
</details>
"""
logger.info(f"πŸ“Š Generated accordion HTML for {server_count} servers")
return server_status_text, accordion_html
except Exception as e:
error_msg = f"Error getting server status: {str(e)}"
logger.error(f"❌ {error_msg}")
logger.error(traceback.format_exc())
return "**Total MCP Servers**: Error", f"<p style='color: red;'>❌ {error_msg}</p>"
def remove_server(self, server_name: str) -> Tuple[str, str]:
"""Remove a specific server"""
success = self.mcp_client.remove_server(server_name)
if success:
return f"βœ… Removed server: {server_name}", ""
else:
return f"❌ Server not found: {server_name}", ""
def remove_all_servers(self) -> Tuple[str, str]:
"""Remove all servers"""
count = self.mcp_client.remove_all_servers()
if count > 0:
return f"βœ… Removed all {count} MCP servers", ""
else:
return "ℹ️ No servers to remove", ""
def enable_server(self, server_name: str, enabled: bool = True) -> Tuple[str, str]:
"""Enable or disable a server"""
if server_name in self.mcp_client.servers:
self.mcp_client.enable_server(server_name, enabled)
status = "enabled" if enabled else "disabled"
return f"βœ… Server {server_name} {status}", ""
else:
return f"❌ Server not found: {server_name}", ""
def get_server_list_with_status(self) -> list:
"""Get server list with enable/disable status for UI components"""
return self.mcp_client.get_server_list_with_status()