|
import hashlib |
|
import secrets |
|
import re |
|
from typing import Optional, Dict, Any |
|
from datetime import datetime |
|
|
|
def generate_secure_id(input_string: str) -> str: |
|
"""Genera un ID seguro basado en una cadena""" |
|
return hashlib.md5(input_string.encode()).hexdigest()[:12] |
|
|
|
def generate_token() -> str: |
|
"""Genera un token seguro aleatorio""" |
|
return secrets.token_urlsafe(32) |
|
|
|
def clean_channel_name(name: str) -> str: |
|
"""Limpia el nombre del canal removiendo caracteres especiales""" |
|
|
|
cleaned = re.sub(r'[^\w\s\-áéíóúñü]', '', name, flags=re.IGNORECASE) |
|
return cleaned.strip() |
|
|
|
def validate_url(url: str) -> bool: |
|
"""Valida si una URL tiene formato correcto""" |
|
url_pattern = re.compile( |
|
r'^https?://' |
|
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' |
|
r'localhost|' |
|
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' |
|
r'(?::\d+)?' |
|
r'(?:/?|[/?]\S+)$', re.IGNORECASE) |
|
return url_pattern.match(url) is not None |
|
|
|
def format_timestamp(dt: datetime) -> str: |
|
"""Formatea un timestamp para mostrar""" |
|
return dt.strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
def safe_get(dictionary: Dict[str, Any], key: str, default: Any = None) -> Any: |
|
"""Obtiene un valor del diccionario de forma segura""" |
|
return dictionary.get(key, default) |
|
|
|
class RateLimiter: |
|
"""Rate limiter simple en memoria""" |
|
|
|
def __init__(self, max_requests: int = 100, window_seconds: int = 60): |
|
self.max_requests = max_requests |
|
self.window_seconds = window_seconds |
|
self.requests: Dict[str, list] = {} |
|
|
|
def is_allowed(self, identifier: str) -> bool: |
|
"""Verifica si una petición está permitida""" |
|
now = datetime.now() |
|
|
|
if identifier not in self.requests: |
|
self.requests[identifier] = [] |
|
|
|
|
|
cutoff_time = now.timestamp() - self.window_seconds |
|
self.requests[identifier] = [ |
|
req_time for req_time in self.requests[identifier] |
|
if req_time > cutoff_time |
|
] |
|
|
|
|
|
if len(self.requests[identifier]) >= self.max_requests: |
|
return False |
|
|
|
|
|
self.requests[identifier].append(now.timestamp()) |
|
return True |
|
|
|
|
|
rate_limiter = RateLimiter(max_requests=50, window_seconds=60) |