Spaces:
Running
Running
| """ | |
| Utility functions for Universal MCP Client | |
| """ | |
| import re | |
| import logging | |
| from typing import List, Dict, Any, Optional | |
| from pathlib import Path | |
| logger = logging.getLogger(__name__) | |
| def validate_huggingface_space_name(space_name: str) -> bool: | |
| """ | |
| Validate HuggingFace space name format | |
| Expected format: username/space-name | |
| """ | |
| if not space_name or not isinstance(space_name, str): | |
| return False | |
| # Check for the required "/" separator | |
| if "/" not in space_name: | |
| return False | |
| parts = space_name.split("/") | |
| if len(parts) != 2: | |
| return False | |
| username, space_name_part = parts | |
| # Basic validation for username and space name | |
| # HuggingFace usernames and space names should be alphanumeric with hyphens and underscores | |
| username_pattern = r'^[a-zA-Z0-9\-_]+$' | |
| space_pattern = r'^[a-zA-Z0-9\-_]+$' | |
| return bool(re.match(username_pattern, username) and re.match(space_pattern, space_name_part)) | |
| def sanitize_server_name(name: str) -> str: | |
| """ | |
| Sanitize server name for use as MCP server identifier | |
| """ | |
| if not name: | |
| return "unnamed_server" | |
| # Convert to lowercase and replace spaces and special chars with underscores | |
| sanitized = re.sub(r'[^a-zA-Z0-9_]', '_', name.lower()) | |
| # Remove multiple consecutive underscores | |
| sanitized = re.sub(r'_+', '_', sanitized) | |
| # Remove leading/trailing underscores | |
| sanitized = sanitized.strip('_') | |
| return sanitized or "unnamed_server" | |
| def format_file_size(size_bytes: int) -> str: | |
| """ | |
| Format file size in human readable format | |
| """ | |
| if size_bytes == 0: | |
| return "0 B" | |
| size_names = ["B", "KB", "MB", "GB", "TB"] | |
| i = 0 | |
| while size_bytes >= 1024 and i < len(size_names) - 1: | |
| size_bytes /= 1024.0 | |
| i += 1 | |
| return f"{size_bytes:.1f} {size_names[i]}" | |
| def get_file_info(file_path: str) -> Dict[str, Any]: | |
| """ | |
| Get information about a file | |
| """ | |
| try: | |
| path = Path(file_path) | |
| if not path.exists(): | |
| return {"error": "File not found"} | |
| stat = path.stat() | |
| return { | |
| "name": path.name, | |
| "size": stat.st_size, | |
| "size_formatted": format_file_size(stat.st_size), | |
| "extension": path.suffix.lower(), | |
| "exists": True | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting file info for {file_path}: {e}") | |
| return {"error": str(e)} | |
| def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str: | |
| """ | |
| Truncate text to a maximum length with suffix | |
| """ | |
| if not text or len(text) <= max_length: | |
| return text | |
| return text[:max_length - len(suffix)] + suffix | |
| def format_tool_description(tool_name: str, description: str, max_desc_length: int = 150) -> str: | |
| """ | |
| Format tool description for display | |
| """ | |
| formatted_name = tool_name.replace("_", " ").title() | |
| truncated_desc = truncate_text(description, max_desc_length) | |
| return f"**{formatted_name}**: {truncated_desc}" | |
| def extract_media_type_from_url(url: str) -> Optional[str]: | |
| """ | |
| Extract media type from URL based on file extension | |
| """ | |
| if not url: | |
| return None | |
| # Handle data URLs | |
| if url.startswith('data:'): | |
| if 'image/' in url: | |
| return 'image' | |
| elif 'audio/' in url: | |
| return 'audio' | |
| elif 'video/' in url: | |
| return 'video' | |
| return None | |
| # Handle regular URLs - extract extension | |
| url_lower = url.lower() | |
| if any(ext in url_lower for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.svg']): | |
| return 'image' | |
| elif any(ext in url_lower for ext in ['.mp3', '.wav', '.ogg', '.m4a', '.flac', '.aac']): | |
| return 'audio' | |
| elif any(ext in url_lower for ext in ['.mp4', '.avi', '.mov', '.mkv', '.webm']): | |
| return 'video' | |
| return None | |
| def clean_html_for_display(html_text: str) -> str: | |
| """ | |
| Clean HTML text for safe display in Gradio | |
| """ | |
| if not html_text: | |
| return "" | |
| # Remove script tags for security | |
| html_text = re.sub(r'<script[^>]*>.*?</script>', '', html_text, flags=re.IGNORECASE | re.DOTALL) | |
| # Remove potentially dangerous attributes | |
| html_text = re.sub(r'on\w+\s*=\s*["\'][^"\']*["\']', '', html_text, flags=re.IGNORECASE) | |
| return html_text | |
| def generate_accordion_html(title: str, content: str, is_open: bool = False) -> str: | |
| """ | |
| Generate HTML for a collapsible accordion section | |
| """ | |
| open_attr = "open" if is_open else "" | |
| return f""" | |
| <details {open_attr} style="margin-bottom: 10px;"> | |
| <summary style="cursor: pointer; padding: 8px; background: #e9ecef; border-radius: 4px;"> | |
| <strong>{title}</strong> | |
| </summary> | |
| <div style="padding: 10px; border-left: 3px solid #007bff; margin-left: 10px; margin-top: 5px;"> | |
| {content} | |
| </div> | |
| </details> | |
| """ | |
| class EventTracker: | |
| """Simple event tracking for debugging and monitoring""" | |
| def __init__(self): | |
| self.events: List[Dict[str, Any]] = [] | |
| self.max_events = 100 | |
| def track_event(self, event_type: str, data: Dict[str, Any] = None): | |
| """Track an event""" | |
| import datetime | |
| event = { | |
| "timestamp": datetime.datetime.now().isoformat(), | |
| "type": event_type, | |
| "data": data or {} | |
| } | |
| self.events.append(event) | |
| # Keep only the most recent events | |
| if len(self.events) > self.max_events: | |
| self.events = self.events[-self.max_events:] | |
| logger.debug(f"Event tracked: {event_type}") | |
| def get_recent_events(self, count: int = 10) -> List[Dict[str, Any]]: | |
| """Get recent events""" | |
| return self.events[-count:] | |
| def clear_events(self): | |
| """Clear all tracked events""" | |
| self.events.clear() | |
| # Global event tracker instance | |
| event_tracker = EventTracker() | |