import hashlib import secrets import re from typing import Optional, Dict, Any from datetime import datetime def generate_secure_id(input_string: str) -> str: """Genera un ID seguro basado en una cadena""" return hashlib.md5(input_string.encode()).hexdigest()[:12] def generate_token() -> str: """Genera un token seguro aleatorio""" return secrets.token_urlsafe(32) def clean_channel_name(name: str) -> str: """Limpia el nombre del canal removiendo caracteres especiales""" # Remover caracteres especiales pero mantener espacios y caracteres acentuados cleaned = re.sub(r'[^\w\s\-áéíóúñü]', '', name, flags=re.IGNORECASE) return cleaned.strip() def validate_url(url: str) -> bool: """Valida si una URL tiene formato correcto""" url_pattern = re.compile( r'^https?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain... r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', re.IGNORECASE) return url_pattern.match(url) is not None def format_timestamp(dt: datetime) -> str: """Formatea un timestamp para mostrar""" return dt.strftime("%Y-%m-%d %H:%M:%S") def safe_get(dictionary: Dict[str, Any], key: str, default: Any = None) -> Any: """Obtiene un valor del diccionario de forma segura""" return dictionary.get(key, default) class RateLimiter: """Rate limiter simple en memoria""" def __init__(self, max_requests: int = 100, window_seconds: int = 60): self.max_requests = max_requests self.window_seconds = window_seconds self.requests: Dict[str, list] = {} def is_allowed(self, identifier: str) -> bool: """Verifica si una petición está permitida""" now = datetime.now() if identifier not in self.requests: self.requests[identifier] = [] # Limpiar peticiones antiguas cutoff_time = now.timestamp() - self.window_seconds self.requests[identifier] = [ req_time for req_time in self.requests[identifier] if req_time > cutoff_time ] # Verificar límite if len(self.requests[identifier]) >= self.max_requests: return False # Agregar petición actual self.requests[identifier].append(now.timestamp()) return True # Instancia global del rate limiter rate_limiter = RateLimiter(max_requests=50, window_seconds=60)