""" Utility functions for Universal MCP Client """ import re import logging from typing import List, Dict, Any, Optional from pathlib import Path logger = logging.getLogger(__name__) def validate_huggingface_space_name(space_name: str) -> bool: """ Validate HuggingFace space name format Expected format: username/space-name """ if not space_name or not isinstance(space_name, str): return False # Check for the required "/" separator if "/" not in space_name: return False parts = space_name.split("/") if len(parts) != 2: return False username, space_name_part = parts # Basic validation for username and space name # HuggingFace usernames and space names should be alphanumeric with hyphens and underscores username_pattern = r'^[a-zA-Z0-9\-_]+$' space_pattern = r'^[a-zA-Z0-9\-_]+$' return bool(re.match(username_pattern, username) and re.match(space_pattern, space_name_part)) def sanitize_server_name(name: str) -> str: """ Sanitize server name for use as MCP server identifier """ if not name: return "unnamed_server" # Convert to lowercase and replace spaces and special chars with underscores sanitized = re.sub(r'[^a-zA-Z0-9_]', '_', name.lower()) # Remove multiple consecutive underscores sanitized = re.sub(r'_+', '_', sanitized) # Remove leading/trailing underscores sanitized = sanitized.strip('_') return sanitized or "unnamed_server" def format_file_size(size_bytes: int) -> str: """ Format file size in human readable format """ if size_bytes == 0: return "0 B" size_names = ["B", "KB", "MB", "GB", "TB"] i = 0 while size_bytes >= 1024 and i < len(size_names) - 1: size_bytes /= 1024.0 i += 1 return f"{size_bytes:.1f} {size_names[i]}" def get_file_info(file_path: str) -> Dict[str, Any]: """ Get information about a file """ try: path = Path(file_path) if not path.exists(): return {"error": "File not found"} stat = path.stat() return { "name": path.name, "size": stat.st_size, "size_formatted": format_file_size(stat.st_size), "extension": path.suffix.lower(), "exists": True } except Exception as e: logger.error(f"Error getting file info for {file_path}: {e}") return {"error": str(e)} def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str: """ Truncate text to a maximum length with suffix """ if not text or len(text) <= max_length: return text return text[:max_length - len(suffix)] + suffix def format_tool_description(tool_name: str, description: str, max_desc_length: int = 150) -> str: """ Format tool description for display """ formatted_name = tool_name.replace("_", " ").title() truncated_desc = truncate_text(description, max_desc_length) return f"**{formatted_name}**: {truncated_desc}" def extract_media_type_from_url(url: str) -> Optional[str]: """ Extract media type from URL based on file extension """ if not url: return None # Handle data URLs if url.startswith('data:'): if 'image/' in url: return 'image' elif 'audio/' in url: return 'audio' elif 'video/' in url: return 'video' return None # Handle regular URLs - extract extension url_lower = url.lower() if any(ext in url_lower for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.svg']): return 'image' elif any(ext in url_lower for ext in ['.mp3', '.wav', '.ogg', '.m4a', '.flac', '.aac']): return 'audio' elif any(ext in url_lower for ext in ['.mp4', '.avi', '.mov', '.mkv', '.webm']): return 'video' return None def clean_html_for_display(html_text: str) -> str: """ Clean HTML text for safe display in Gradio """ if not html_text: return "" # Remove script tags for security html_text = re.sub(r']*>.*?', '', html_text, flags=re.IGNORECASE | re.DOTALL) # Remove potentially dangerous attributes html_text = re.sub(r'on\w+\s*=\s*["\'][^"\']*["\']', '', html_text, flags=re.IGNORECASE) return html_text def generate_accordion_html(title: str, content: str, is_open: bool = False) -> str: """ Generate HTML for a collapsible accordion section """ open_attr = "open" if is_open else "" return f"""
{title}
{content}
""" class EventTracker: """Simple event tracking for debugging and monitoring""" def __init__(self): self.events: List[Dict[str, Any]] = [] self.max_events = 100 def track_event(self, event_type: str, data: Dict[str, Any] = None): """Track an event""" import datetime event = { "timestamp": datetime.datetime.now().isoformat(), "type": event_type, "data": data or {} } self.events.append(event) # Keep only the most recent events if len(self.events) > self.max_events: self.events = self.events[-self.max_events:] logger.debug(f"Event tracked: {event_type}") def get_recent_events(self, count: int = 10) -> List[Dict[str, Any]]: """Get recent events""" return self.events[-count:] def clear_events(self): """Clear all tracked events""" self.events.clear() # Global event tracker instance event_tracker = EventTracker()