Spaces:
Running
Running
File size: 8,748 Bytes
2b232e8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
"""
Server management utilities for Universal MCP Client
Version 2.0 - Enhanced with server enabling/disabling support
"""
import asyncio
import re
import logging
import traceback
from typing import Tuple
from config import MCPServerConfig
from mcp_client import UniversalMCPClient
logger = logging.getLogger(__name__)
class ServerManager:
"""Manages MCP server connections and status with enhanced server management"""
def __init__(self, mcp_client: UniversalMCPClient):
self.mcp_client = mcp_client
def convert_hf_space_to_url(self, space_name: str) -> str:
"""
Convert HuggingFace space name to proper URL format.
HuggingFace URL rules:
- Replace "/" with "-"
- Convert to lowercase
- Replace dots and other special chars with "-"
- Remove consecutive hyphens
"""
if "/" not in space_name:
raise ValueError("Space name should be in format: username/space-name")
# Replace "/" with "-"
url_name = space_name.replace("/", "-")
# Convert to lowercase
url_name = url_name.lower()
# Replace dots and other special characters with hyphens
url_name = re.sub(r'[^a-z0-9\-]', '-', url_name)
# Remove consecutive hyphens
url_name = re.sub(r'-+', '-', url_name)
# Remove leading/trailing hyphens
url_name = url_name.strip('-')
return f"https://{url_name}.hf.space"
def add_custom_server(self, name: str, space_name: str) -> Tuple[str, str]:
"""Add a custom MCP server from HuggingFace space name"""
logger.info(f"β Adding MCP server: {name} from space: {space_name}")
if not name or not space_name:
return "β Please provide both server name and space name", ""
space_name = space_name.strip()
try:
# Use the improved URL conversion
mcp_url = self.convert_hf_space_to_url(space_name)
logger.info(f"π Converted {space_name} β {mcp_url}")
except ValueError as e:
return f"β {str(e)}", ""
config = MCPServerConfig(
name=name.strip(),
url=mcp_url,
description=f"MCP server from HuggingFace space: {space_name}",
space_id=space_name
)
try:
# Run async function
def run_async():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(self.mcp_client.add_server_async(config))
finally:
loop.close()
success, message = run_async()
logger.info(f"Server addition result: {success} - {message}")
if success:
# Format success message for accordion display
tools_info = ""
if 'Found' in message and 'tools:' in message:
tools_section = message.split('Found')[1]
tools_info = f"**Available Tools:**\n{tools_section}"
details_html = f"""
<details style="margin-top: 10px;">
<summary style="cursor: pointer; padding: 8px; background: #f0f0f0; border-radius: 4px;"><strong>β
{name} - Connection Details</strong></summary>
<div style="padding: 10px; border-left: 3px solid #28a745; margin-left: 10px; margin-top: 5px;">
<p><strong>Space:</strong> {space_name}</p>
<p><strong>Base URL:</strong> {mcp_url}</p>
<p><strong>Status:</strong> Connected successfully!</p>
<p><strong>Enabled:</strong> Yes (new servers are enabled by default)</p>
<div style="margin-top: 10px;">
{tools_info.replace('**', '<strong>').replace('**', '</strong>').replace(chr(10), '<br>')}
</div>
</div>
</details>
"""
return "β
Server added successfully!", details_html
else:
error_html = f"""
<details style="margin-top: 10px;">
<summary style="cursor: pointer; padding: 8px; background: #f8d7da; border-radius: 4px;"><strong>β {name} - Connection Failed</strong></summary>
<div style="padding: 10px; border-left: 3px solid #dc3545; margin-left: 10px; margin-top: 5px;">
<p>{message}</p>
</div>
</details>
"""
return f"β Failed to add server: {name}", error_html
except Exception as e:
error_msg = f"β Failed to add server: {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
return error_msg, ""
def get_server_status(self) -> Tuple[str, str]:
"""Get status of all servers in accordion format"""
try:
logger.info(f"π Getting server status, found {len(self.mcp_client.servers)} servers")
# Get server status from mcp_client
server_count = len(self.mcp_client.servers)
server_status_text = f"**Total MCP Servers**: {server_count}"
if not self.mcp_client.servers:
logger.info("π No servers found")
return server_status_text, "<p><em>No MCP servers configured yet.</em></p>"
accordion_html = ""
for name, config in self.mcp_client.servers.items():
logger.info(f"π Processing server: {name}")
base_url = config.url.replace("/gradio_api/mcp/sse", "")
# Determine health status (assume healthy if it's in the servers dict)
health = "π’ Healthy"
accordion_html += f"""
<details style="margin-bottom: 10px;">
<summary style="cursor: pointer; padding: 8px; background: #e9ecef; border-radius: 4px;"><strong>π§ {name}</strong></summary>
<div style="padding: 10px; border-left: 3px solid #007bff; margin-left: 10px; margin-top: 5px;">
<p><strong>Title:</strong> {name}</p>
<p><strong>Status:</strong> Connected (MCP Protocol)</p>
<p><strong>Health:</strong> {health}</p>
<p><strong>Base URL:</strong> {base_url}</p>
<p><strong>Description:</strong> {config.description}</p>
{f'<p><strong>Space ID:</strong> {config.space_id}</p>' if config.space_id else ''}
</div>
</details>
"""
logger.info(f"π Generated accordion HTML for {server_count} servers")
return server_status_text, accordion_html
except Exception as e:
error_msg = f"Error getting server status: {str(e)}"
logger.error(f"β {error_msg}")
logger.error(traceback.format_exc())
return "**Total MCP Servers**: Error", f"<p style='color: red;'>β {error_msg}</p>"
def remove_server(self, server_name: str) -> Tuple[str, str]:
"""Remove a specific server"""
success = self.mcp_client.remove_server(server_name)
if success:
return f"β
Removed server: {server_name}", ""
else:
return f"β Server not found: {server_name}", ""
def remove_all_servers(self) -> Tuple[str, str]:
"""Remove all servers"""
count = self.mcp_client.remove_all_servers()
if count > 0:
return f"β
Removed all {count} MCP servers", ""
else:
return "βΉοΈ No servers to remove", ""
def enable_server(self, server_name: str, enabled: bool = True) -> Tuple[str, str]:
"""Enable or disable a server"""
if server_name in self.mcp_client.servers:
self.mcp_client.enable_server(server_name, enabled)
status = "enabled" if enabled else "disabled"
return f"β
Server {server_name} {status}", ""
else:
return f"β Server not found: {server_name}", ""
def get_server_list_with_status(self) -> list:
"""Get server list with enable/disable status for UI components"""
return self.mcp_client.get_server_list_with_status()
|