Spaces:
Build error
Build error
"""Error recovery and fallback mechanisms.""" | |
import logging | |
import time | |
from typing import Dict, List, Optional, Any, Callable, TypeVar, Union | |
from dataclasses import dataclass | |
from enum import Enum | |
from functools import wraps | |
from .structured_logger import StructuredLogger, LogContext, get_structured_logger | |
from ...domain.exceptions import ( | |
DomainException, | |
SpeechRecognitionException, | |
TranslationFailedException, | |
SpeechSynthesisException, | |
AudioProcessingException | |
) | |
logger = get_structured_logger(__name__) | |
T = TypeVar('T') | |
class RecoveryStrategy(Enum): | |
"""Recovery strategy types.""" | |
RETRY = "retry" | |
FALLBACK = "fallback" | |
CIRCUIT_BREAKER = "circuit_breaker" | |
GRACEFUL_DEGRADATION = "graceful_degradation" | |
class CircuitBreakerState(Enum): | |
"""Circuit breaker states.""" | |
CLOSED = "closed" | |
OPEN = "open" | |
HALF_OPEN = "half_open" | |
class RetryConfig: | |
"""Retry configuration.""" | |
max_attempts: int = 3 | |
base_delay: float = 1.0 | |
max_delay: float = 60.0 | |
exponential_backoff: bool = True | |
jitter: bool = True | |
retryable_exceptions: Optional[List[type]] = None | |
class CircuitBreakerConfig: | |
"""Circuit breaker configuration.""" | |
failure_threshold: int = 5 | |
recovery_timeout: float = 60.0 | |
success_threshold: int = 2 | |
timeout: float = 30.0 | |
class FallbackConfig: | |
"""Fallback configuration.""" | |
fallback_providers: List[str] | |
fallback_function: Optional[Callable] = None | |
enable_graceful_degradation: bool = True | |
class CircuitBreaker: | |
"""Circuit breaker implementation.""" | |
def __init__(self, config: CircuitBreakerConfig, name: str = "default"): | |
""" | |
Initialize circuit breaker. | |
Args: | |
config: Circuit breaker configuration | |
name: Circuit breaker name | |
""" | |
self.config = config | |
self.name = name | |
self.state = CircuitBreakerState.CLOSED | |
self.failure_count = 0 | |
self.success_count = 0 | |
self.last_failure_time = 0.0 | |
self.logger = get_structured_logger(f"{__name__}.CircuitBreaker.{name}") | |
def call(self, func: Callable[..., T], *args, **kwargs) -> T: | |
""" | |
Call function through circuit breaker. | |
Args: | |
func: Function to call | |
*args: Function arguments | |
**kwargs: Function keyword arguments | |
Returns: | |
T: Function result | |
Raises: | |
Exception: If circuit is open or function fails | |
""" | |
context = LogContext( | |
correlation_id=kwargs.get('correlation_id', 'unknown'), | |
component=f"circuit_breaker_{self.name}", | |
operation=func.__name__ | |
) | |
if self.state == CircuitBreakerState.OPEN: | |
if time.time() - self.last_failure_time < self.config.recovery_timeout: | |
self.logger.warning( | |
f"Circuit breaker {self.name} is OPEN, rejecting call", | |
context=context | |
) | |
raise AudioProcessingException(f"Circuit breaker {self.name} is open") | |
else: | |
self.state = CircuitBreakerState.HALF_OPEN | |
self.success_count = 0 | |
self.logger.info( | |
f"Circuit breaker {self.name} transitioning to HALF_OPEN", | |
context=context | |
) | |
try: | |
result = func(*args, **kwargs) | |
self._on_success(context) | |
return result | |
except Exception as e: | |
self._on_failure(e, context) | |
raise | |
def _on_success(self, context: LogContext) -> None: | |
"""Handle successful call.""" | |
if self.state == CircuitBreakerState.HALF_OPEN: | |
self.success_count += 1 | |
if self.success_count >= self.config.success_threshold: | |
self.state = CircuitBreakerState.CLOSED | |
self.failure_count = 0 | |
self.logger.info( | |
f"Circuit breaker {self.name} transitioning to CLOSED", | |
context=context | |
) | |
elif self.state == CircuitBreakerState.CLOSED: | |
self.failure_count = 0 | |
def _on_failure(self, exception: Exception, context: LogContext) -> None: | |
"""Handle failed call.""" | |
self.failure_count += 1 | |
self.last_failure_time = time.time() | |
if self.state == CircuitBreakerState.HALF_OPEN: | |
self.state = CircuitBreakerState.OPEN | |
self.logger.warning( | |
f"Circuit breaker {self.name} transitioning to OPEN after failure in HALF_OPEN", | |
context=context | |
) | |
elif (self.state == CircuitBreakerState.CLOSED and | |
self.failure_count >= self.config.failure_threshold): | |
self.state = CircuitBreakerState.OPEN | |
self.logger.warning( | |
f"Circuit breaker {self.name} transitioning to OPEN after {self.failure_count} failures", | |
context=context | |
) | |
class RecoveryManager: | |
"""Manages error recovery and fallback mechanisms.""" | |
def __init__(self): | |
"""Initialize recovery manager.""" | |
self.circuit_breakers: Dict[str, CircuitBreaker] = {} | |
self.logger = get_structured_logger(__name__) | |
# Default retry configurations for different exception types | |
self.default_retry_configs = { | |
SpeechRecognitionException: RetryConfig( | |
max_attempts=2, | |
base_delay=2.0, | |
retryable_exceptions=[SpeechRecognitionException] | |
), | |
TranslationFailedException: RetryConfig( | |
max_attempts=3, | |
base_delay=1.0, | |
retryable_exceptions=[TranslationFailedException] | |
), | |
SpeechSynthesisException: RetryConfig( | |
max_attempts=2, | |
base_delay=1.5, | |
retryable_exceptions=[SpeechSynthesisException] | |
), | |
ConnectionError: RetryConfig( | |
max_attempts=3, | |
base_delay=2.0, | |
exponential_backoff=True, | |
retryable_exceptions=[ConnectionError, TimeoutError] | |
) | |
} | |
def get_circuit_breaker(self, name: str, config: Optional[CircuitBreakerConfig] = None) -> CircuitBreaker: | |
""" | |
Get or create circuit breaker. | |
Args: | |
name: Circuit breaker name | |
config: Optional configuration | |
Returns: | |
CircuitBreaker: Circuit breaker instance | |
""" | |
if name not in self.circuit_breakers: | |
if config is None: | |
config = CircuitBreakerConfig() | |
self.circuit_breakers[name] = CircuitBreaker(config, name) | |
return self.circuit_breakers[name] | |
def retry_with_backoff(self, func: Callable[..., T], | |
config: Optional[RetryConfig] = None, | |
correlation_id: Optional[str] = None, | |
*args, **kwargs) -> T: | |
""" | |
Execute function with retry and exponential backoff. | |
Args: | |
func: Function to execute | |
config: Retry configuration | |
correlation_id: Correlation ID for logging | |
*args: Function arguments | |
**kwargs: Function keyword arguments | |
Returns: | |
T: Function result | |
Raises: | |
Exception: If all retry attempts fail | |
""" | |
if config is None: | |
config = RetryConfig() | |
context = LogContext( | |
correlation_id=correlation_id or 'unknown', | |
component='retry_manager', | |
operation=func.__name__ | |
) | |
last_exception = None | |
for attempt in range(config.max_attempts): | |
try: | |
if attempt > 0: | |
delay = self._calculate_delay(attempt, config) | |
self.logger.info( | |
f"Retrying {func.__name__} (attempt {attempt + 1}/{config.max_attempts}) after {delay:.2f}s delay", | |
context=context | |
) | |
time.sleep(delay) | |
result = func(*args, **kwargs) | |
if attempt > 0: | |
self.logger.info( | |
f"Retry successful for {func.__name__} on attempt {attempt + 1}", | |
context=context | |
) | |
return result | |
except Exception as e: | |
last_exception = e | |
# Check if exception is retryable | |
if not self._is_retryable_exception(e, config): | |
self.logger.error( | |
f"Non-retryable exception in {func.__name__}: {type(e).__name__}", | |
context=context, | |
exception=e | |
) | |
raise | |
self.logger.warning( | |
f"Attempt {attempt + 1}/{config.max_attempts} failed for {func.__name__}: {e}", | |
context=context | |
) | |
# All attempts failed | |
self.logger.error( | |
f"All {config.max_attempts} retry attempts failed for {func.__name__}", | |
context=context, | |
exception=last_exception | |
) | |
raise last_exception | |
def execute_with_fallback(self, primary_func: Callable[..., T], | |
fallback_funcs: List[Callable[..., T]], | |
correlation_id: Optional[str] = None, | |
*args, **kwargs) -> T: | |
""" | |
Execute function with fallback options. | |
Args: | |
primary_func: Primary function to execute | |
fallback_funcs: List of fallback functions | |
correlation_id: Correlation ID for logging | |
*args: Function arguments | |
**kwargs: Function keyword arguments | |
Returns: | |
T: Function result | |
Raises: | |
Exception: If all functions fail | |
""" | |
context = LogContext( | |
correlation_id=correlation_id or 'unknown', | |
component='fallback_manager', | |
operation=primary_func.__name__ | |
) | |
# Try primary function first | |
try: | |
result = primary_func(*args, **kwargs) | |
return result | |
except Exception as e: | |
self.logger.warning( | |
f"Primary function {primary_func.__name__} failed, trying fallbacks", | |
context=context | |
) | |
last_exception = e | |
# Try fallback functions | |
for i, fallback_func in enumerate(fallback_funcs): | |
try: | |
self.logger.info( | |
f"Trying fallback {i + 1}/{len(fallback_funcs)}: {fallback_func.__name__}", | |
context=context | |
) | |
result = fallback_func(*args, **kwargs) | |
self.logger.info( | |
f"Fallback {fallback_func.__name__} succeeded", | |
context=context | |
) | |
return result | |
except Exception as fallback_e: | |
last_exception = fallback_e | |
self.logger.warning( | |
f"Fallback {fallback_func.__name__} failed: {fallback_e}", | |
context=context | |
) | |
# All functions failed | |
self.logger.error( | |
f"All functions failed (primary + {len(fallback_funcs)} fallbacks)", | |
context=context, | |
exception=last_exception | |
) | |
raise last_exception | |
def execute_with_circuit_breaker(self, func: Callable[..., T], | |
circuit_breaker_name: str, | |
config: Optional[CircuitBreakerConfig] = None, | |
correlation_id: Optional[str] = None, | |
*args, **kwargs) -> T: | |
""" | |
Execute function with circuit breaker protection. | |
Args: | |
func: Function to execute | |
circuit_breaker_name: Circuit breaker name | |
config: Optional circuit breaker configuration | |
correlation_id: Correlation ID for logging | |
*args: Function arguments | |
**kwargs: Function keyword arguments | |
Returns: | |
T: Function result | |
Raises: | |
Exception: If circuit is open or function fails | |
""" | |
circuit_breaker = self.get_circuit_breaker(circuit_breaker_name, config) | |
# Add correlation ID to kwargs for circuit breaker logging | |
if correlation_id: | |
kwargs['correlation_id'] = correlation_id | |
return circuit_breaker.call(func, *args, **kwargs) | |
def _calculate_delay(self, attempt: int, config: RetryConfig) -> float: | |
""" | |
Calculate retry delay with exponential backoff and jitter. | |
Args: | |
attempt: Attempt number (0-based) | |
config: Retry configuration | |
Returns: | |
float: Delay in seconds | |
""" | |
if config.exponential_backoff: | |
delay = config.base_delay * (2 ** attempt) | |
else: | |
delay = config.base_delay | |
# Apply maximum delay limit | |
delay = min(delay, config.max_delay) | |
# Add jitter to prevent thundering herd | |
if config.jitter: | |
import random | |
delay *= (0.5 + random.random() * 0.5) | |
return delay | |
def _is_retryable_exception(self, exception: Exception, config: RetryConfig) -> bool: | |
""" | |
Check if exception is retryable. | |
Args: | |
exception: Exception to check | |
config: Retry configuration | |
Returns: | |
bool: True if exception is retryable | |
""" | |
if config.retryable_exceptions is None: | |
# Default retryable exceptions | |
retryable_types = ( | |
ConnectionError, | |
TimeoutError, | |
SpeechRecognitionException, | |
TranslationFailedException, | |
SpeechSynthesisException | |
) | |
return isinstance(exception, retryable_types) | |
return any(isinstance(exception, exc_type) for exc_type in config.retryable_exceptions) | |
def get_retry_config_for_exception(self, exception: Exception) -> RetryConfig: | |
""" | |
Get appropriate retry configuration for exception type. | |
Args: | |
exception: Exception to get config for | |
Returns: | |
RetryConfig: Retry configuration | |
""" | |
for exc_type, config in self.default_retry_configs.items(): | |
if isinstance(exception, exc_type): | |
return config | |
# Return default config | |
return RetryConfig() | |
def with_retry(config: Optional[RetryConfig] = None): | |
""" | |
Decorator for automatic retry with backoff. | |
Args: | |
config: Optional retry configuration | |
Returns: | |
Decorated function | |
""" | |
def decorator(func: Callable[..., T]) -> Callable[..., T]: | |
def wrapper(*args, **kwargs) -> T: | |
recovery_manager = RecoveryManager() | |
correlation_id = kwargs.get('correlation_id') | |
return recovery_manager.retry_with_backoff( | |
func, config, correlation_id, *args, **kwargs | |
) | |
return wrapper | |
return decorator | |
def with_circuit_breaker(name: str, config: Optional[CircuitBreakerConfig] = None): | |
""" | |
Decorator for circuit breaker protection. | |
Args: | |
name: Circuit breaker name | |
config: Optional circuit breaker configuration | |
Returns: | |
Decorated function | |
""" | |
def decorator(func: Callable[..., T]) -> Callable[..., T]: | |
def wrapper(*args, **kwargs) -> T: | |
recovery_manager = RecoveryManager() | |
correlation_id = kwargs.get('correlation_id') | |
return recovery_manager.execute_with_circuit_breaker( | |
func, name, config, correlation_id, *args, **kwargs | |
) | |
return wrapper | |
return decorator |