"""Error recovery and fallback mechanisms.""" import logging import time from typing import Dict, List, Optional, Any, Callable, TypeVar, Union from dataclasses import dataclass from enum import Enum from functools import wraps from .structured_logger import StructuredLogger, LogContext, get_structured_logger from ...domain.exceptions import ( DomainException, SpeechRecognitionException, TranslationFailedException, SpeechSynthesisException, AudioProcessingException ) logger = get_structured_logger(__name__) T = TypeVar('T') class RecoveryStrategy(Enum): """Recovery strategy types.""" RETRY = "retry" FALLBACK = "fallback" CIRCUIT_BREAKER = "circuit_breaker" GRACEFUL_DEGRADATION = "graceful_degradation" class CircuitBreakerState(Enum): """Circuit breaker states.""" CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open" @dataclass class RetryConfig: """Retry configuration.""" max_attempts: int = 3 base_delay: float = 1.0 max_delay: float = 60.0 exponential_backoff: bool = True jitter: bool = True retryable_exceptions: Optional[List[type]] = None @dataclass class CircuitBreakerConfig: """Circuit breaker configuration.""" failure_threshold: int = 5 recovery_timeout: float = 60.0 success_threshold: int = 2 timeout: float = 30.0 @dataclass class FallbackConfig: """Fallback configuration.""" fallback_providers: List[str] fallback_function: Optional[Callable] = None enable_graceful_degradation: bool = True class CircuitBreaker: """Circuit breaker implementation.""" def __init__(self, config: CircuitBreakerConfig, name: str = "default"): """ Initialize circuit breaker. Args: config: Circuit breaker configuration name: Circuit breaker name """ self.config = config self.name = name self.state = CircuitBreakerState.CLOSED self.failure_count = 0 self.success_count = 0 self.last_failure_time = 0.0 self.logger = get_structured_logger(f"{__name__}.CircuitBreaker.{name}") def call(self, func: Callable[..., T], *args, **kwargs) -> T: """ Call function through circuit breaker. Args: func: Function to call *args: Function arguments **kwargs: Function keyword arguments Returns: T: Function result Raises: Exception: If circuit is open or function fails """ context = LogContext( correlation_id=kwargs.get('correlation_id', 'unknown'), component=f"circuit_breaker_{self.name}", operation=func.__name__ ) if self.state == CircuitBreakerState.OPEN: if time.time() - self.last_failure_time < self.config.recovery_timeout: self.logger.warning( f"Circuit breaker {self.name} is OPEN, rejecting call", context=context ) raise AudioProcessingException(f"Circuit breaker {self.name} is open") else: self.state = CircuitBreakerState.HALF_OPEN self.success_count = 0 self.logger.info( f"Circuit breaker {self.name} transitioning to HALF_OPEN", context=context ) try: result = func(*args, **kwargs) self._on_success(context) return result except Exception as e: self._on_failure(e, context) raise def _on_success(self, context: LogContext) -> None: """Handle successful call.""" if self.state == CircuitBreakerState.HALF_OPEN: self.success_count += 1 if self.success_count >= self.config.success_threshold: self.state = CircuitBreakerState.CLOSED self.failure_count = 0 self.logger.info( f"Circuit breaker {self.name} transitioning to CLOSED", context=context ) elif self.state == CircuitBreakerState.CLOSED: self.failure_count = 0 def _on_failure(self, exception: Exception, context: LogContext) -> None: """Handle failed call.""" self.failure_count += 1 self.last_failure_time = time.time() if self.state == CircuitBreakerState.HALF_OPEN: self.state = CircuitBreakerState.OPEN self.logger.warning( f"Circuit breaker {self.name} transitioning to OPEN after failure in HALF_OPEN", context=context ) elif (self.state == CircuitBreakerState.CLOSED and self.failure_count >= self.config.failure_threshold): self.state = CircuitBreakerState.OPEN self.logger.warning( f"Circuit breaker {self.name} transitioning to OPEN after {self.failure_count} failures", context=context ) class RecoveryManager: """Manages error recovery and fallback mechanisms.""" def __init__(self): """Initialize recovery manager.""" self.circuit_breakers: Dict[str, CircuitBreaker] = {} self.logger = get_structured_logger(__name__) # Default retry configurations for different exception types self.default_retry_configs = { SpeechRecognitionException: RetryConfig( max_attempts=2, base_delay=2.0, retryable_exceptions=[SpeechRecognitionException] ), TranslationFailedException: RetryConfig( max_attempts=3, base_delay=1.0, retryable_exceptions=[TranslationFailedException] ), SpeechSynthesisException: RetryConfig( max_attempts=2, base_delay=1.5, retryable_exceptions=[SpeechSynthesisException] ), ConnectionError: RetryConfig( max_attempts=3, base_delay=2.0, exponential_backoff=True, retryable_exceptions=[ConnectionError, TimeoutError] ) } def get_circuit_breaker(self, name: str, config: Optional[CircuitBreakerConfig] = None) -> CircuitBreaker: """ Get or create circuit breaker. Args: name: Circuit breaker name config: Optional configuration Returns: CircuitBreaker: Circuit breaker instance """ if name not in self.circuit_breakers: if config is None: config = CircuitBreakerConfig() self.circuit_breakers[name] = CircuitBreaker(config, name) return self.circuit_breakers[name] def retry_with_backoff(self, func: Callable[..., T], config: Optional[RetryConfig] = None, correlation_id: Optional[str] = None, *args, **kwargs) -> T: """ Execute function with retry and exponential backoff. Args: func: Function to execute config: Retry configuration correlation_id: Correlation ID for logging *args: Function arguments **kwargs: Function keyword arguments Returns: T: Function result Raises: Exception: If all retry attempts fail """ if config is None: config = RetryConfig() context = LogContext( correlation_id=correlation_id or 'unknown', component='retry_manager', operation=func.__name__ ) last_exception = None for attempt in range(config.max_attempts): try: if attempt > 0: delay = self._calculate_delay(attempt, config) self.logger.info( f"Retrying {func.__name__} (attempt {attempt + 1}/{config.max_attempts}) after {delay:.2f}s delay", context=context ) time.sleep(delay) result = func(*args, **kwargs) if attempt > 0: self.logger.info( f"Retry successful for {func.__name__} on attempt {attempt + 1}", context=context ) return result except Exception as e: last_exception = e # Check if exception is retryable if not self._is_retryable_exception(e, config): self.logger.error( f"Non-retryable exception in {func.__name__}: {type(e).__name__}", context=context, exception=e ) raise self.logger.warning( f"Attempt {attempt + 1}/{config.max_attempts} failed for {func.__name__}: {e}", context=context ) # All attempts failed self.logger.error( f"All {config.max_attempts} retry attempts failed for {func.__name__}", context=context, exception=last_exception ) raise last_exception def execute_with_fallback(self, primary_func: Callable[..., T], fallback_funcs: List[Callable[..., T]], correlation_id: Optional[str] = None, *args, **kwargs) -> T: """ Execute function with fallback options. Args: primary_func: Primary function to execute fallback_funcs: List of fallback functions correlation_id: Correlation ID for logging *args: Function arguments **kwargs: Function keyword arguments Returns: T: Function result Raises: Exception: If all functions fail """ context = LogContext( correlation_id=correlation_id or 'unknown', component='fallback_manager', operation=primary_func.__name__ ) # Try primary function first try: result = primary_func(*args, **kwargs) return result except Exception as e: self.logger.warning( f"Primary function {primary_func.__name__} failed, trying fallbacks", context=context ) last_exception = e # Try fallback functions for i, fallback_func in enumerate(fallback_funcs): try: self.logger.info( f"Trying fallback {i + 1}/{len(fallback_funcs)}: {fallback_func.__name__}", context=context ) result = fallback_func(*args, **kwargs) self.logger.info( f"Fallback {fallback_func.__name__} succeeded", context=context ) return result except Exception as fallback_e: last_exception = fallback_e self.logger.warning( f"Fallback {fallback_func.__name__} failed: {fallback_e}", context=context ) # All functions failed self.logger.error( f"All functions failed (primary + {len(fallback_funcs)} fallbacks)", context=context, exception=last_exception ) raise last_exception def execute_with_circuit_breaker(self, func: Callable[..., T], circuit_breaker_name: str, config: Optional[CircuitBreakerConfig] = None, correlation_id: Optional[str] = None, *args, **kwargs) -> T: """ Execute function with circuit breaker protection. Args: func: Function to execute circuit_breaker_name: Circuit breaker name config: Optional circuit breaker configuration correlation_id: Correlation ID for logging *args: Function arguments **kwargs: Function keyword arguments Returns: T: Function result Raises: Exception: If circuit is open or function fails """ circuit_breaker = self.get_circuit_breaker(circuit_breaker_name, config) # Add correlation ID to kwargs for circuit breaker logging if correlation_id: kwargs['correlation_id'] = correlation_id return circuit_breaker.call(func, *args, **kwargs) def _calculate_delay(self, attempt: int, config: RetryConfig) -> float: """ Calculate retry delay with exponential backoff and jitter. Args: attempt: Attempt number (0-based) config: Retry configuration Returns: float: Delay in seconds """ if config.exponential_backoff: delay = config.base_delay * (2 ** attempt) else: delay = config.base_delay # Apply maximum delay limit delay = min(delay, config.max_delay) # Add jitter to prevent thundering herd if config.jitter: import random delay *= (0.5 + random.random() * 0.5) return delay def _is_retryable_exception(self, exception: Exception, config: RetryConfig) -> bool: """ Check if exception is retryable. Args: exception: Exception to check config: Retry configuration Returns: bool: True if exception is retryable """ if config.retryable_exceptions is None: # Default retryable exceptions retryable_types = ( ConnectionError, TimeoutError, SpeechRecognitionException, TranslationFailedException, SpeechSynthesisException ) return isinstance(exception, retryable_types) return any(isinstance(exception, exc_type) for exc_type in config.retryable_exceptions) def get_retry_config_for_exception(self, exception: Exception) -> RetryConfig: """ Get appropriate retry configuration for exception type. Args: exception: Exception to get config for Returns: RetryConfig: Retry configuration """ for exc_type, config in self.default_retry_configs.items(): if isinstance(exception, exc_type): return config # Return default config return RetryConfig() def with_retry(config: Optional[RetryConfig] = None): """ Decorator for automatic retry with backoff. Args: config: Optional retry configuration Returns: Decorated function """ def decorator(func: Callable[..., T]) -> Callable[..., T]: @wraps(func) def wrapper(*args, **kwargs) -> T: recovery_manager = RecoveryManager() correlation_id = kwargs.get('correlation_id') return recovery_manager.retry_with_backoff( func, config, correlation_id, *args, **kwargs ) return wrapper return decorator def with_circuit_breaker(name: str, config: Optional[CircuitBreakerConfig] = None): """ Decorator for circuit breaker protection. Args: name: Circuit breaker name config: Optional circuit breaker configuration Returns: Decorated function """ def decorator(func: Callable[..., T]) -> Callable[..., T]: @wraps(func) def wrapper(*args, **kwargs) -> T: recovery_manager = RecoveryManager() correlation_id = kwargs.get('correlation_id') return recovery_manager.execute_with_circuit_breaker( func, name, config, correlation_id, *args, **kwargs ) return wrapper return decorator