Spaces:
Build error
Build error
| """Error recovery and fallback mechanisms.""" | |
| import logging | |
| import time | |
| from typing import Dict, List, Optional, Any, Callable, TypeVar, Union | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from functools import wraps | |
| from .structured_logger import StructuredLogger, LogContext, get_structured_logger | |
| from ...domain.exceptions import ( | |
| DomainException, | |
| SpeechRecognitionException, | |
| TranslationFailedException, | |
| SpeechSynthesisException, | |
| AudioProcessingException | |
| ) | |
| logger = get_structured_logger(__name__) | |
| T = TypeVar('T') | |
| class RecoveryStrategy(Enum): | |
| """Recovery strategy types.""" | |
| RETRY = "retry" | |
| FALLBACK = "fallback" | |
| CIRCUIT_BREAKER = "circuit_breaker" | |
| GRACEFUL_DEGRADATION = "graceful_degradation" | |
| class CircuitBreakerState(Enum): | |
| """Circuit breaker states.""" | |
| CLOSED = "closed" | |
| OPEN = "open" | |
| HALF_OPEN = "half_open" | |
| class RetryConfig: | |
| """Retry configuration.""" | |
| max_attempts: int = 3 | |
| base_delay: float = 1.0 | |
| max_delay: float = 60.0 | |
| exponential_backoff: bool = True | |
| jitter: bool = True | |
| retryable_exceptions: Optional[List[type]] = None | |
| class CircuitBreakerConfig: | |
| """Circuit breaker configuration.""" | |
| failure_threshold: int = 5 | |
| recovery_timeout: float = 60.0 | |
| success_threshold: int = 2 | |
| timeout: float = 30.0 | |
| class FallbackConfig: | |
| """Fallback configuration.""" | |
| fallback_providers: List[str] | |
| fallback_function: Optional[Callable] = None | |
| enable_graceful_degradation: bool = True | |
| class CircuitBreaker: | |
| """Circuit breaker implementation.""" | |
| def __init__(self, config: CircuitBreakerConfig, name: str = "default"): | |
| """ | |
| Initialize circuit breaker. | |
| Args: | |
| config: Circuit breaker configuration | |
| name: Circuit breaker name | |
| """ | |
| self.config = config | |
| self.name = name | |
| self.state = CircuitBreakerState.CLOSED | |
| self.failure_count = 0 | |
| self.success_count = 0 | |
| self.last_failure_time = 0.0 | |
| self.logger = get_structured_logger(f"{__name__}.CircuitBreaker.{name}") | |
| def call(self, func: Callable[..., T], *args, **kwargs) -> T: | |
| """ | |
| Call function through circuit breaker. | |
| Args: | |
| func: Function to call | |
| *args: Function arguments | |
| **kwargs: Function keyword arguments | |
| Returns: | |
| T: Function result | |
| Raises: | |
| Exception: If circuit is open or function fails | |
| """ | |
| context = LogContext( | |
| correlation_id=kwargs.get('correlation_id', 'unknown'), | |
| component=f"circuit_breaker_{self.name}", | |
| operation=func.__name__ | |
| ) | |
| if self.state == CircuitBreakerState.OPEN: | |
| if time.time() - self.last_failure_time < self.config.recovery_timeout: | |
| self.logger.warning( | |
| f"Circuit breaker {self.name} is OPEN, rejecting call", | |
| context=context | |
| ) | |
| raise AudioProcessingException(f"Circuit breaker {self.name} is open") | |
| else: | |
| self.state = CircuitBreakerState.HALF_OPEN | |
| self.success_count = 0 | |
| self.logger.info( | |
| f"Circuit breaker {self.name} transitioning to HALF_OPEN", | |
| context=context | |
| ) | |
| try: | |
| result = func(*args, **kwargs) | |
| self._on_success(context) | |
| return result | |
| except Exception as e: | |
| self._on_failure(e, context) | |
| raise | |
| def _on_success(self, context: LogContext) -> None: | |
| """Handle successful call.""" | |
| if self.state == CircuitBreakerState.HALF_OPEN: | |
| self.success_count += 1 | |
| if self.success_count >= self.config.success_threshold: | |
| self.state = CircuitBreakerState.CLOSED | |
| self.failure_count = 0 | |
| self.logger.info( | |
| f"Circuit breaker {self.name} transitioning to CLOSED", | |
| context=context | |
| ) | |
| elif self.state == CircuitBreakerState.CLOSED: | |
| self.failure_count = 0 | |
| def _on_failure(self, exception: Exception, context: LogContext) -> None: | |
| """Handle failed call.""" | |
| self.failure_count += 1 | |
| self.last_failure_time = time.time() | |
| if self.state == CircuitBreakerState.HALF_OPEN: | |
| self.state = CircuitBreakerState.OPEN | |
| self.logger.warning( | |
| f"Circuit breaker {self.name} transitioning to OPEN after failure in HALF_OPEN", | |
| context=context, | |
| exception=exception | |
| ) | |
| elif (self.state == CircuitBreakerState.CLOSED and | |
| self.failure_count >= self.config.failure_threshold): | |
| self.state = CircuitBreakerState.OPEN | |
| self.logger.warning( | |
| f"Circuit breaker {self.name} transitioning to OPEN after {self.failure_count} failures", | |
| context=context, | |
| exception=exception | |
| ) | |
| class RecoveryManager: | |
| """Manages error recovery and fallback mechanisms.""" | |
| def __init__(self): | |
| """Initialize recovery manager.""" | |
| self.circuit_breakers: Dict[str, CircuitBreaker] = {} | |
| self.logger = get_structured_logger(__name__) | |
| # Default retry configurations for different exception types | |
| self.default_retry_configs = { | |
| SpeechRecognitionException: RetryConfig( | |
| max_attempts=2, | |
| base_delay=2.0, | |
| retryable_exceptions=[SpeechRecognitionException] | |
| ), | |
| TranslationFailedException: RetryConfig( | |
| max_attempts=3, | |
| base_delay=1.0, | |
| retryable_exceptions=[TranslationFailedException] | |
| ), | |
| SpeechSynthesisException: RetryConfig( | |
| max_attempts=2, | |
| base_delay=1.5, | |
| retryable_exceptions=[SpeechSynthesisException] | |
| ), | |
| ConnectionError: RetryConfig( | |
| max_attempts=3, | |
| base_delay=2.0, | |
| exponential_backoff=True, | |
| retryable_exceptions=[ConnectionError, TimeoutError] | |
| ) | |
| } | |
| def get_circuit_breaker(self, name: str, config: Optional[CircuitBreakerConfig] = None) -> CircuitBreaker: | |
| """ | |
| Get or create circuit breaker. | |
| Args: | |
| name: Circuit breaker name | |
| config: Optional configuration | |
| Returns: | |
| CircuitBreaker: Circuit breaker instance | |
| """ | |
| if name not in self.circuit_breakers: | |
| if config is None: | |
| config = CircuitBreakerConfig() | |
| self.circuit_breakers[name] = CircuitBreaker(config, name) | |
| return self.circuit_breakers[name] | |
| def retry_with_backoff(self, func: Callable[..., T], | |
| config: Optional[RetryConfig] = None, | |
| correlation_id: Optional[str] = None, | |
| *args, **kwargs) -> T: | |
| """ | |
| Execute function with retry and exponential backoff. | |
| Args: | |
| func: Function to execute | |
| config: Retry configuration | |
| correlation_id: Correlation ID for logging | |
| *args: Function arguments | |
| **kwargs: Function keyword arguments | |
| Returns: | |
| T: Function result | |
| Raises: | |
| Exception: If all retry attempts fail | |
| """ | |
| if config is None: | |
| config = RetryConfig() | |
| context = LogContext( | |
| correlation_id=correlation_id or 'unknown', | |
| component='retry_manager', | |
| operation=func.__name__ | |
| ) | |
| last_exception = None | |
| for attempt in range(config.max_attempts): | |
| try: | |
| if attempt > 0: | |
| delay = self._calculate_delay(attempt, config) | |
| self.logger.info( | |
| f"Retrying {func.__name__} (attempt {attempt + 1}/{config.max_attempts}) after {delay:.2f}s delay", | |
| context=context | |
| ) | |
| time.sleep(delay) | |
| result = func(*args, **kwargs) | |
| if attempt > 0: | |
| self.logger.info( | |
| f"Retry successful for {func.__name__} on attempt {attempt + 1}", | |
| context=context | |
| ) | |
| return result | |
| except Exception as e: | |
| last_exception = e | |
| # Check if exception is retryable | |
| if not self._is_retryable_exception(e, config): | |
| self.logger.error( | |
| f"Non-retryable exception in {func.__name__}: {type(e).__name__}", | |
| context=context, | |
| exception=e | |
| ) | |
| raise | |
| self.logger.warning( | |
| f"Attempt {attempt + 1}/{config.max_attempts} failed for {func.__name__}: {e}", | |
| context=context, | |
| exception=e | |
| ) | |
| # All attempts failed | |
| self.logger.error( | |
| f"All {config.max_attempts} retry attempts failed for {func.__name__}", | |
| context=context, | |
| exception=last_exception | |
| ) | |
| raise last_exception | |
| def execute_with_fallback(self, primary_func: Callable[..., T], | |
| fallback_funcs: List[Callable[..., T]], | |
| correlation_id: Optional[str] = None, | |
| *args, **kwargs) -> T: | |
| """ | |
| Execute function with fallback options. | |
| Args: | |
| primary_func: Primary function to execute | |
| fallback_funcs: List of fallback functions | |
| correlation_id: Correlation ID for logging | |
| *args: Function arguments | |
| **kwargs: Function keyword arguments | |
| Returns: | |
| T: Function result | |
| Raises: | |
| Exception: If all functions fail | |
| """ | |
| context = LogContext( | |
| correlation_id=correlation_id or 'unknown', | |
| component='fallback_manager', | |
| operation=primary_func.__name__ | |
| ) | |
| # Try primary function first | |
| try: | |
| result = primary_func(*args, **kwargs) | |
| return result | |
| except Exception as e: | |
| self.logger.warning( | |
| f"Primary function {primary_func.__name__} failed, trying fallbacks", | |
| context=context, | |
| exception=e | |
| ) | |
| last_exception = e | |
| # Try fallback functions | |
| for i, fallback_func in enumerate(fallback_funcs): | |
| try: | |
| self.logger.info( | |
| f"Trying fallback {i + 1}/{len(fallback_funcs)}: {fallback_func.__name__}", | |
| context=context | |
| ) | |
| result = fallback_func(*args, **kwargs) | |
| self.logger.info( | |
| f"Fallback {fallback_func.__name__} succeeded", | |
| context=context | |
| ) | |
| return result | |
| except Exception as fallback_e: | |
| last_exception = fallback_e | |
| self.logger.warning( | |
| f"Fallback {fallback_func.__name__} failed: {fallback_e}", | |
| context=context, | |
| exception=fallback_e | |
| ) | |
| # All functions failed | |
| self.logger.error( | |
| f"All functions failed (primary + {len(fallback_funcs)} fallbacks)", | |
| context=context, | |
| exception=last_exception | |
| ) | |
| raise last_exception | |
| def execute_with_circuit_breaker(self, func: Callable[..., T], | |
| circuit_breaker_name: str, | |
| config: Optional[CircuitBreakerConfig] = None, | |
| correlation_id: Optional[str] = None, | |
| *args, **kwargs) -> T: | |
| """ | |
| Execute function with circuit breaker protection. | |
| Args: | |
| func: Function to execute | |
| circuit_breaker_name: Circuit breaker name | |
| config: Optional circuit breaker configuration | |
| correlation_id: Correlation ID for logging | |
| *args: Function arguments | |
| **kwargs: Function keyword arguments | |
| Returns: | |
| T: Function result | |
| Raises: | |
| Exception: If circuit is open or function fails | |
| """ | |
| circuit_breaker = self.get_circuit_breaker(circuit_breaker_name, config) | |
| # Add correlation ID to kwargs for circuit breaker logging | |
| if correlation_id: | |
| kwargs['correlation_id'] = correlation_id | |
| return circuit_breaker.call(func, *args, **kwargs) | |
| def _calculate_delay(self, attempt: int, config: RetryConfig) -> float: | |
| """ | |
| Calculate retry delay with exponential backoff and jitter. | |
| Args: | |
| attempt: Attempt number (0-based) | |
| config: Retry configuration | |
| Returns: | |
| float: Delay in seconds | |
| """ | |
| if config.exponential_backoff: | |
| delay = config.base_delay * (2 ** attempt) | |
| else: | |
| delay = config.base_delay | |
| # Apply maximum delay limit | |
| delay = min(delay, config.max_delay) | |
| # Add jitter to prevent thundering herd | |
| if config.jitter: | |
| import random | |
| delay *= (0.5 + random.random() * 0.5) | |
| return delay | |
| def _is_retryable_exception(self, exception: Exception, config: RetryConfig) -> bool: | |
| """ | |
| Check if exception is retryable. | |
| Args: | |
| exception: Exception to check | |
| config: Retry configuration | |
| Returns: | |
| bool: True if exception is retryable | |
| """ | |
| if config.retryable_exceptions is None: | |
| # Default retryable exceptions | |
| retryable_types = ( | |
| ConnectionError, | |
| TimeoutError, | |
| SpeechRecognitionException, | |
| TranslationFailedException, | |
| SpeechSynthesisException | |
| ) | |
| return isinstance(exception, retryable_types) | |
| return any(isinstance(exception, exc_type) for exc_type in config.retryable_exceptions) | |
| def get_retry_config_for_exception(self, exception: Exception) -> RetryConfig: | |
| """ | |
| Get appropriate retry configuration for exception type. | |
| Args: | |
| exception: Exception to get config for | |
| Returns: | |
| RetryConfig: Retry configuration | |
| """ | |
| for exc_type, config in self.default_retry_configs.items(): | |
| if isinstance(exception, exc_type): | |
| return config | |
| # Return default config | |
| return RetryConfig() | |
| def with_retry(config: Optional[RetryConfig] = None): | |
| """ | |
| Decorator for automatic retry with backoff. | |
| Args: | |
| config: Optional retry configuration | |
| Returns: | |
| Decorated function | |
| """ | |
| def decorator(func: Callable[..., T]) -> Callable[..., T]: | |
| def wrapper(*args, **kwargs) -> T: | |
| recovery_manager = RecoveryManager() | |
| correlation_id = kwargs.get('correlation_id') | |
| return recovery_manager.retry_with_backoff( | |
| func, config, correlation_id, *args, **kwargs | |
| ) | |
| return wrapper | |
| return decorator | |
| def with_circuit_breaker(name: str, config: Optional[CircuitBreakerConfig] = None): | |
| """ | |
| Decorator for circuit breaker protection. | |
| Args: | |
| name: Circuit breaker name | |
| config: Optional circuit breaker configuration | |
| Returns: | |
| Decorated function | |
| """ | |
| def decorator(func: Callable[..., T]) -> Callable[..., T]: | |
| def wrapper(*args, **kwargs) -> T: | |
| recovery_manager = RecoveryManager() | |
| correlation_id = kwargs.get('correlation_id') | |
| return recovery_manager.execute_with_circuit_breaker( | |
| func, name, config, correlation_id, *args, **kwargs | |
| ) | |
| return wrapper | |
| return decorator |