"""
Utility functions for Universal MCP Client
"""
import re
import logging
from typing import List, Dict, Any, Optional
from pathlib import Path
logger = logging.getLogger(__name__)
def validate_huggingface_space_name(space_name: str) -> bool:
"""
Validate HuggingFace space name format
Expected format: username/space-name
"""
if not space_name or not isinstance(space_name, str):
return False
# Check for the required "/" separator
if "/" not in space_name:
return False
parts = space_name.split("/")
if len(parts) != 2:
return False
username, space_name_part = parts
# Basic validation for username and space name
# HuggingFace usernames and space names should be alphanumeric with hyphens and underscores
username_pattern = r'^[a-zA-Z0-9\-_]+$'
space_pattern = r'^[a-zA-Z0-9\-_]+$'
return bool(re.match(username_pattern, username) and re.match(space_pattern, space_name_part))
def sanitize_server_name(name: str) -> str:
"""
Sanitize server name for use as MCP server identifier
"""
if not name:
return "unnamed_server"
# Convert to lowercase and replace spaces and special chars with underscores
sanitized = re.sub(r'[^a-zA-Z0-9_]', '_', name.lower())
# Remove multiple consecutive underscores
sanitized = re.sub(r'_+', '_', sanitized)
# Remove leading/trailing underscores
sanitized = sanitized.strip('_')
return sanitized or "unnamed_server"
def format_file_size(size_bytes: int) -> str:
"""
Format file size in human readable format
"""
if size_bytes == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = 0
while size_bytes >= 1024 and i < len(size_names) - 1:
size_bytes /= 1024.0
i += 1
return f"{size_bytes:.1f} {size_names[i]}"
def get_file_info(file_path: str) -> Dict[str, Any]:
"""
Get information about a file
"""
try:
path = Path(file_path)
if not path.exists():
return {"error": "File not found"}
stat = path.stat()
return {
"name": path.name,
"size": stat.st_size,
"size_formatted": format_file_size(stat.st_size),
"extension": path.suffix.lower(),
"exists": True
}
except Exception as e:
logger.error(f"Error getting file info for {file_path}: {e}")
return {"error": str(e)}
def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str:
"""
Truncate text to a maximum length with suffix
"""
if not text or len(text) <= max_length:
return text
return text[:max_length - len(suffix)] + suffix
def format_tool_description(tool_name: str, description: str, max_desc_length: int = 150) -> str:
"""
Format tool description for display
"""
formatted_name = tool_name.replace("_", " ").title()
truncated_desc = truncate_text(description, max_desc_length)
return f"**{formatted_name}**: {truncated_desc}"
def extract_media_type_from_url(url: str) -> Optional[str]:
"""
Extract media type from URL based on file extension
"""
if not url:
return None
# Handle data URLs
if url.startswith('data:'):
if 'image/' in url:
return 'image'
elif 'audio/' in url:
return 'audio'
elif 'video/' in url:
return 'video'
return None
# Handle regular URLs - extract extension
url_lower = url.lower()
if any(ext in url_lower for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.svg']):
return 'image'
elif any(ext in url_lower for ext in ['.mp3', '.wav', '.ogg', '.m4a', '.flac', '.aac']):
return 'audio'
elif any(ext in url_lower for ext in ['.mp4', '.avi', '.mov', '.mkv', '.webm']):
return 'video'
return None
def clean_html_for_display(html_text: str) -> str:
"""
Clean HTML text for safe display in Gradio
"""
if not html_text:
return ""
# Remove script tags for security
html_text = re.sub(r'', '', html_text, flags=re.IGNORECASE | re.DOTALL)
# Remove potentially dangerous attributes
html_text = re.sub(r'on\w+\s*=\s*["\'][^"\']*["\']', '', html_text, flags=re.IGNORECASE)
return html_text
def generate_accordion_html(title: str, content: str, is_open: bool = False) -> str:
"""
Generate HTML for a collapsible accordion section
"""
open_attr = "open" if is_open else ""
return f"""
{title}
{content}
"""
class EventTracker:
"""Simple event tracking for debugging and monitoring"""
def __init__(self):
self.events: List[Dict[str, Any]] = []
self.max_events = 100
def track_event(self, event_type: str, data: Dict[str, Any] = None):
"""Track an event"""
import datetime
event = {
"timestamp": datetime.datetime.now().isoformat(),
"type": event_type,
"data": data or {}
}
self.events.append(event)
# Keep only the most recent events
if len(self.events) > self.max_events:
self.events = self.events[-self.max_events:]
logger.debug(f"Event tracked: {event_type}")
def get_recent_events(self, count: int = 10) -> List[Dict[str, Any]]:
"""Get recent events"""
return self.events[-count:]
def clear_events(self):
"""Clear all tracked events"""
self.events.clear()
# Global event tracker instance
event_tracker = EventTracker()