Spaces:
Running
Running
File size: 6,116 Bytes
6af4299 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
"""
Utility functions for Universal MCP Client
"""
import re
import logging
from typing import List, Dict, Any, Optional
from pathlib import Path
logger = logging.getLogger(__name__)
def validate_huggingface_space_name(space_name: str) -> bool:
"""
Validate HuggingFace space name format
Expected format: username/space-name
"""
if not space_name or not isinstance(space_name, str):
return False
# Check for the required "/" separator
if "/" not in space_name:
return False
parts = space_name.split("/")
if len(parts) != 2:
return False
username, space_name_part = parts
# Basic validation for username and space name
# HuggingFace usernames and space names should be alphanumeric with hyphens and underscores
username_pattern = r'^[a-zA-Z0-9\-_]+$'
space_pattern = r'^[a-zA-Z0-9\-_]+$'
return bool(re.match(username_pattern, username) and re.match(space_pattern, space_name_part))
def sanitize_server_name(name: str) -> str:
"""
Sanitize server name for use as MCP server identifier
"""
if not name:
return "unnamed_server"
# Convert to lowercase and replace spaces and special chars with underscores
sanitized = re.sub(r'[^a-zA-Z0-9_]', '_', name.lower())
# Remove multiple consecutive underscores
sanitized = re.sub(r'_+', '_', sanitized)
# Remove leading/trailing underscores
sanitized = sanitized.strip('_')
return sanitized or "unnamed_server"
def format_file_size(size_bytes: int) -> str:
"""
Format file size in human readable format
"""
if size_bytes == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = 0
while size_bytes >= 1024 and i < len(size_names) - 1:
size_bytes /= 1024.0
i += 1
return f"{size_bytes:.1f} {size_names[i]}"
def get_file_info(file_path: str) -> Dict[str, Any]:
"""
Get information about a file
"""
try:
path = Path(file_path)
if not path.exists():
return {"error": "File not found"}
stat = path.stat()
return {
"name": path.name,
"size": stat.st_size,
"size_formatted": format_file_size(stat.st_size),
"extension": path.suffix.lower(),
"exists": True
}
except Exception as e:
logger.error(f"Error getting file info for {file_path}: {e}")
return {"error": str(e)}
def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str:
"""
Truncate text to a maximum length with suffix
"""
if not text or len(text) <= max_length:
return text
return text[:max_length - len(suffix)] + suffix
def format_tool_description(tool_name: str, description: str, max_desc_length: int = 150) -> str:
"""
Format tool description for display
"""
formatted_name = tool_name.replace("_", " ").title()
truncated_desc = truncate_text(description, max_desc_length)
return f"**{formatted_name}**: {truncated_desc}"
def extract_media_type_from_url(url: str) -> Optional[str]:
"""
Extract media type from URL based on file extension
"""
if not url:
return None
# Handle data URLs
if url.startswith('data:'):
if 'image/' in url:
return 'image'
elif 'audio/' in url:
return 'audio'
elif 'video/' in url:
return 'video'
return None
# Handle regular URLs - extract extension
url_lower = url.lower()
if any(ext in url_lower for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.svg']):
return 'image'
elif any(ext in url_lower for ext in ['.mp3', '.wav', '.ogg', '.m4a', '.flac', '.aac']):
return 'audio'
elif any(ext in url_lower for ext in ['.mp4', '.avi', '.mov', '.mkv', '.webm']):
return 'video'
return None
def clean_html_for_display(html_text: str) -> str:
"""
Clean HTML text for safe display in Gradio
"""
if not html_text:
return ""
# Remove script tags for security
html_text = re.sub(r'<script[^>]*>.*?</script>', '', html_text, flags=re.IGNORECASE | re.DOTALL)
# Remove potentially dangerous attributes
html_text = re.sub(r'on\w+\s*=\s*["\'][^"\']*["\']', '', html_text, flags=re.IGNORECASE)
return html_text
def generate_accordion_html(title: str, content: str, is_open: bool = False) -> str:
"""
Generate HTML for a collapsible accordion section
"""
open_attr = "open" if is_open else ""
return f"""
<details {open_attr} style="margin-bottom: 10px;">
<summary style="cursor: pointer; padding: 8px; background: #e9ecef; border-radius: 4px;">
<strong>{title}</strong>
</summary>
<div style="padding: 10px; border-left: 3px solid #007bff; margin-left: 10px; margin-top: 5px;">
{content}
</div>
</details>
"""
class EventTracker:
"""Simple event tracking for debugging and monitoring"""
def __init__(self):
self.events: List[Dict[str, Any]] = []
self.max_events = 100
def track_event(self, event_type: str, data: Dict[str, Any] = None):
"""Track an event"""
import datetime
event = {
"timestamp": datetime.datetime.now().isoformat(),
"type": event_type,
"data": data or {}
}
self.events.append(event)
# Keep only the most recent events
if len(self.events) > self.max_events:
self.events = self.events[-self.max_events:]
logger.debug(f"Event tracked: {event_type}")
def get_recent_events(self, count: int = 10) -> List[Dict[str, Any]]:
"""Get recent events"""
return self.events[-count:]
def clear_events(self):
"""Clear all tracked events"""
self.events.clear()
# Global event tracker instance
event_tracker = EventTracker()
|