File size: 8,748 Bytes
2b232e8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
"""
Server management utilities for Universal MCP Client
Version 2.0 - Enhanced with server enabling/disabling support
"""
import asyncio
import re
import logging
import traceback
from typing import Tuple

from config import MCPServerConfig
from mcp_client import UniversalMCPClient

logger = logging.getLogger(__name__)

class ServerManager:
    """Manages MCP server connections and status with enhanced server management"""
    
    def __init__(self, mcp_client: UniversalMCPClient):
        self.mcp_client = mcp_client
    
    def convert_hf_space_to_url(self, space_name: str) -> str:
        """
        Convert HuggingFace space name to proper URL format.
        
        HuggingFace URL rules:
        - Replace "/" with "-" 
        - Convert to lowercase
        - Replace dots and other special chars with "-"
        - Remove consecutive hyphens
        """
        if "/" not in space_name:
            raise ValueError("Space name should be in format: username/space-name")
        
        # Replace "/" with "-"
        url_name = space_name.replace("/", "-")
        
        # Convert to lowercase
        url_name = url_name.lower()
        
        # Replace dots and other special characters with hyphens
        url_name = re.sub(r'[^a-z0-9\-]', '-', url_name)
        
        # Remove consecutive hyphens
        url_name = re.sub(r'-+', '-', url_name)
        
        # Remove leading/trailing hyphens
        url_name = url_name.strip('-')
        
        return f"https://{url_name}.hf.space"
    
    def add_custom_server(self, name: str, space_name: str) -> Tuple[str, str]:
        """Add a custom MCP server from HuggingFace space name"""
        logger.info(f"βž• Adding MCP server: {name} from space: {space_name}")
        
        if not name or not space_name:
            return "❌ Please provide both server name and space name", ""
        
        space_name = space_name.strip()
        
        try:
            # Use the improved URL conversion
            mcp_url = self.convert_hf_space_to_url(space_name)
            logger.info(f"πŸ”— Converted {space_name} β†’ {mcp_url}")
            
        except ValueError as e:
            return f"❌ {str(e)}", ""
        
        config = MCPServerConfig(
            name=name.strip(), 
            url=mcp_url, 
            description=f"MCP server from HuggingFace space: {space_name}",
            space_id=space_name
        )
        
        try:
            # Run async function
            def run_async():
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
                try:
                    return loop.run_until_complete(self.mcp_client.add_server_async(config))
                finally:
                    loop.close()
            
            success, message = run_async()
            logger.info(f"Server addition result: {success} - {message}")
            
            if success:
                # Format success message for accordion display
                tools_info = ""
                if 'Found' in message and 'tools:' in message:
                    tools_section = message.split('Found')[1]
                    tools_info = f"**Available Tools:**\n{tools_section}"
                
                details_html = f"""
                <details style="margin-top: 10px;">
                    <summary style="cursor: pointer; padding: 8px; background: #f0f0f0; border-radius: 4px;"><strong>βœ… {name} - Connection Details</strong></summary>
                    <div style="padding: 10px; border-left: 3px solid #28a745; margin-left: 10px; margin-top: 5px;">
                        <p><strong>Space:</strong> {space_name}</p>
                        <p><strong>Base URL:</strong> {mcp_url}</p>
                        <p><strong>Status:</strong> Connected successfully!</p>
                        <p><strong>Enabled:</strong> Yes (new servers are enabled by default)</p>
                        <div style="margin-top: 10px;">
                            {tools_info.replace('**', '<strong>').replace('**', '</strong>').replace(chr(10), '<br>')}
                        </div>
                    </div>
                </details>
                """
                return "βœ… Server added successfully!", details_html
            else:
                error_html = f"""
                <details style="margin-top: 10px;">
                    <summary style="cursor: pointer; padding: 8px; background: #f8d7da; border-radius: 4px;"><strong>❌ {name} - Connection Failed</strong></summary>
                    <div style="padding: 10px; border-left: 3px solid #dc3545; margin-left: 10px; margin-top: 5px;">
                        <p>{message}</p>
                    </div>
                </details>
                """
                return f"❌ Failed to add server: {name}", error_html
            
        except Exception as e:
            error_msg = f"❌ Failed to add server: {str(e)}"
            logger.error(error_msg)
            logger.error(traceback.format_exc())
            return error_msg, ""
    
    def get_server_status(self) -> Tuple[str, str]:
        """Get status of all servers in accordion format"""
        try:
            logger.info(f"πŸ” Getting server status, found {len(self.mcp_client.servers)} servers")
            
            # Get server status from mcp_client
            server_count = len(self.mcp_client.servers)
            server_status_text = f"**Total MCP Servers**: {server_count}"
            
            if not self.mcp_client.servers:
                logger.info("πŸ“Š No servers found")
                return server_status_text, "<p><em>No MCP servers configured yet.</em></p>"
            
            accordion_html = ""
            
            for name, config in self.mcp_client.servers.items():
                logger.info(f"πŸ“Š Processing server: {name}")
                base_url = config.url.replace("/gradio_api/mcp/sse", "")
                
                # Determine health status (assume healthy if it's in the servers dict)
                health = "🟒 Healthy"
                
                accordion_html += f"""
                <details style="margin-bottom: 10px;">
                    <summary style="cursor: pointer; padding: 8px; background: #e9ecef; border-radius: 4px;"><strong>πŸ”§ {name}</strong></summary>
                    <div style="padding: 10px; border-left: 3px solid #007bff; margin-left: 10px; margin-top: 5px;">
                        <p><strong>Title:</strong> {name}</p>
                        <p><strong>Status:</strong> Connected (MCP Protocol)</p>
                        <p><strong>Health:</strong> {health}</p>
                        <p><strong>Base URL:</strong> {base_url}</p>
                        <p><strong>Description:</strong> {config.description}</p>
                        {f'<p><strong>Space ID:</strong> {config.space_id}</p>' if config.space_id else ''}
                    </div>
                </details>
                """
            
            logger.info(f"πŸ“Š Generated accordion HTML for {server_count} servers")
            return server_status_text, accordion_html
            
        except Exception as e:
            error_msg = f"Error getting server status: {str(e)}"
            logger.error(f"❌ {error_msg}")
            logger.error(traceback.format_exc())
            return "**Total MCP Servers**: Error", f"<p style='color: red;'>❌ {error_msg}</p>"
    
    def remove_server(self, server_name: str) -> Tuple[str, str]:
        """Remove a specific server"""
        success = self.mcp_client.remove_server(server_name)
        if success:
            return f"βœ… Removed server: {server_name}", ""
        else:
            return f"❌ Server not found: {server_name}", ""
    
    def remove_all_servers(self) -> Tuple[str, str]:
        """Remove all servers"""
        count = self.mcp_client.remove_all_servers()
        if count > 0:
            return f"βœ… Removed all {count} MCP servers", ""
        else:
            return "ℹ️ No servers to remove", ""
    
    def enable_server(self, server_name: str, enabled: bool = True) -> Tuple[str, str]:
        """Enable or disable a server"""
        if server_name in self.mcp_client.servers:
            self.mcp_client.enable_server(server_name, enabled)
            status = "enabled" if enabled else "disabled"
            return f"βœ… Server {server_name} {status}", ""
        else:
            return f"❌ Server not found: {server_name}", ""
    
    def get_server_list_with_status(self) -> list:
        """Get server list with enable/disable status for UI components"""
        return self.mcp_client.get_server_list_with_status()