File size: 2,583 Bytes
84121fd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
import hashlib
import secrets
import re
from typing import Optional, Dict, Any
from datetime import datetime
def generate_secure_id(input_string: str) -> str:
"""Genera un ID seguro basado en una cadena"""
return hashlib.md5(input_string.encode()).hexdigest()[:12]
def generate_token() -> str:
"""Genera un token seguro aleatorio"""
return secrets.token_urlsafe(32)
def clean_channel_name(name: str) -> str:
"""Limpia el nombre del canal removiendo caracteres especiales"""
# Remover caracteres especiales pero mantener espacios y caracteres acentuados
cleaned = re.sub(r'[^\w\s\-áéíóúñü]', '', name, flags=re.IGNORECASE)
return cleaned.strip()
def validate_url(url: str) -> bool:
"""Valida si una URL tiene formato correcto"""
url_pattern = re.compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
return url_pattern.match(url) is not None
def format_timestamp(dt: datetime) -> str:
"""Formatea un timestamp para mostrar"""
return dt.strftime("%Y-%m-%d %H:%M:%S")
def safe_get(dictionary: Dict[str, Any], key: str, default: Any = None) -> Any:
"""Obtiene un valor del diccionario de forma segura"""
return dictionary.get(key, default)
class RateLimiter:
"""Rate limiter simple en memoria"""
def __init__(self, max_requests: int = 100, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests: Dict[str, list] = {}
def is_allowed(self, identifier: str) -> bool:
"""Verifica si una petición está permitida"""
now = datetime.now()
if identifier not in self.requests:
self.requests[identifier] = []
# Limpiar peticiones antiguas
cutoff_time = now.timestamp() - self.window_seconds
self.requests[identifier] = [
req_time for req_time in self.requests[identifier]
if req_time > cutoff_time
]
# Verificar límite
if len(self.requests[identifier]) >= self.max_requests:
return False
# Agregar petición actual
self.requests[identifier].append(now.timestamp())
return True
# Instancia global del rate limiter
rate_limiter = RateLimiter(max_requests=50, window_seconds=60) |