DigitalPal / digipal /core /error_handler.py
BladeSzaSza's picture
🥚 Initial DigiPal deployment to HuggingFace Spaces🤖 Generated with [Claude Code](https://claude.ai/code)Co-Authored-By: Claude <[email protected]>
4399e64
raw
history blame
28.1 kB
"""
Error handling utilities and decorators for DigiPal application.
This module provides comprehensive error handling, retry mechanisms,
and graceful degradation functionality.
"""
import logging
import functools
import asyncio
import time
from typing import Any, Callable, Dict, List, Optional, Type, Union, Tuple
from datetime import datetime, timedelta
from .exceptions import (
DigiPalException, ErrorSeverity, ErrorCategory,
AIModelError, StorageError, NetworkError, RecoveryError
)
logger = logging.getLogger(__name__)
class RetryConfig:
"""Configuration for retry mechanisms."""
def __init__(
self,
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_backoff: bool = True,
jitter: bool = True,
retry_on: Optional[List[Type[Exception]]] = None
):
"""
Initialize retry configuration.
Args:
max_attempts: Maximum number of retry attempts
base_delay: Base delay between retries in seconds
max_delay: Maximum delay between retries in seconds
exponential_backoff: Whether to use exponential backoff
jitter: Whether to add random jitter to delays
retry_on: List of exception types to retry on
"""
self.max_attempts = max_attempts
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_backoff = exponential_backoff
self.jitter = jitter
self.retry_on = retry_on or [NetworkError, AIModelError]
class CircuitBreakerConfig:
"""Configuration for circuit breaker pattern."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
expected_exception: Type[Exception] = Exception
):
"""
Initialize circuit breaker configuration.
Args:
failure_threshold: Number of failures before opening circuit
recovery_timeout: Time to wait before attempting recovery
expected_exception: Exception type that triggers circuit breaker
"""
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
class CircuitBreaker:
"""Circuit breaker implementation for external service calls."""
def __init__(self, config: CircuitBreakerConfig):
"""Initialize circuit breaker with configuration."""
self.config = config
self.failure_count = 0
self.last_failure_time = None
self.state = "closed" # closed, open, half-open
def call(self, func: Callable, *args, **kwargs) -> Any:
"""
Call function with circuit breaker protection.
Args:
func: Function to call
*args: Function arguments
**kwargs: Function keyword arguments
Returns:
Function result
Raises:
Exception: If circuit is open or function fails
"""
if self.state == "open":
if self._should_attempt_reset():
self.state = "half-open"
else:
raise DigiPalException(
"Circuit breaker is open - service unavailable",
category=ErrorCategory.SYSTEM,
severity=ErrorSeverity.HIGH,
user_message="Service is temporarily unavailable. Please try again later."
)
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.config.expected_exception as e:
self._on_failure()
raise e
def _should_attempt_reset(self) -> bool:
"""Check if circuit breaker should attempt to reset."""
if self.last_failure_time is None:
return True
return (datetime.now() - self.last_failure_time).total_seconds() > self.config.recovery_timeout
def _on_success(self):
"""Handle successful function call."""
self.failure_count = 0
self.state = "closed"
def _on_failure(self):
"""Handle failed function call."""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.config.failure_threshold:
self.state = "open"
class ErrorHandler:
"""Central error handler for DigiPal application."""
def __init__(self):
"""Initialize error handler."""
self.circuit_breakers: Dict[str, CircuitBreaker] = {}
self.error_counts: Dict[str, int] = {}
self.last_errors: Dict[str, datetime] = {}
self.error_patterns: Dict[str, List[datetime]] = {}
self.critical_error_threshold = 5 # Number of critical errors before emergency mode
self.error_rate_window = 300 # 5 minutes window for error rate calculation
def get_circuit_breaker(self, name: str, config: Optional[CircuitBreakerConfig] = None) -> CircuitBreaker:
"""
Get or create circuit breaker for a service.
Args:
name: Service name
config: Circuit breaker configuration
Returns:
CircuitBreaker instance
"""
if name not in self.circuit_breakers:
config = config or CircuitBreakerConfig()
self.circuit_breakers[name] = CircuitBreaker(config)
return self.circuit_breakers[name]
def handle_error(self, error: Exception, context: Optional[Dict[str, Any]] = None) -> DigiPalException:
"""
Handle and convert errors to DigiPal exceptions.
Args:
error: Original exception
context: Additional context information
Returns:
DigiPalException with appropriate categorization
"""
context = context or {}
# If already a DigiPal exception, just add context and return
if isinstance(error, DigiPalException):
error.context.update(context)
self._track_error_pattern(error)
return error
# Convert common exceptions to DigiPal exceptions
digipal_error = self._convert_to_digipal_exception(error, context)
self._track_error_pattern(digipal_error)
return digipal_error
def _convert_to_digipal_exception(self, error: Exception, context: Dict[str, Any]) -> DigiPalException:
"""Convert standard exceptions to DigiPal exceptions."""
if isinstance(error, (ConnectionError, TimeoutError)):
return NetworkError(
f"Network error: {str(error)}",
context=context,
error_code="NET_001"
)
if isinstance(error, FileNotFoundError):
return StorageError(
f"File not found: {str(error)}",
context=context,
error_code="STOR_001"
)
if isinstance(error, PermissionError):
return StorageError(
f"Permission denied: {str(error)}",
context=context,
error_code="STOR_002"
)
if isinstance(error, MemoryError):
return AIModelError(
f"Memory error: {str(error)}",
context=context,
error_code="AI_MEM_001"
)
if isinstance(error, ImportError):
return DigiPalException(
f"Import error: {str(error)}",
category=ErrorCategory.SYSTEM,
severity=ErrorSeverity.HIGH,
context=context,
error_code="SYS_IMP_001"
)
if isinstance(error, ValueError):
return DigiPalException(
f"Invalid value: {str(error)}",
category=ErrorCategory.VALIDATION,
severity=ErrorSeverity.LOW,
context=context,
error_code="VAL_001"
)
if isinstance(error, KeyError):
return DigiPalException(
f"Missing key: {str(error)}",
category=ErrorCategory.VALIDATION,
severity=ErrorSeverity.MEDIUM,
context=context,
error_code="VAL_KEY_001"
)
if isinstance(error, AttributeError):
return DigiPalException(
f"Attribute error: {str(error)}",
category=ErrorCategory.SYSTEM,
severity=ErrorSeverity.MEDIUM,
context=context,
error_code="SYS_ATTR_001"
)
# Default to system error
return DigiPalException(
f"Unexpected error: {str(error)}",
category=ErrorCategory.SYSTEM,
severity=ErrorSeverity.HIGH,
context=context,
error_code="SYS_001"
)
def _track_error_pattern(self, error: DigiPalException):
"""Track error patterns for analysis and prevention."""
now = datetime.now()
error_key = f"{error.category.value}:{error.error_code}"
if error_key not in self.error_patterns:
self.error_patterns[error_key] = []
self.error_patterns[error_key].append(now)
# Clean old entries (keep only last 24 hours)
cutoff_time = now - timedelta(hours=24)
self.error_patterns[error_key] = [
timestamp for timestamp in self.error_patterns[error_key]
if timestamp > cutoff_time
]
def get_error_rate(self, error_category: Optional[str] = None, window_minutes: int = 5) -> float:
"""
Get error rate for a category or overall.
Args:
error_category: Specific error category (None for all)
window_minutes: Time window in minutes
Returns:
Errors per minute
"""
now = datetime.now()
cutoff_time = now - timedelta(minutes=window_minutes)
total_errors = 0
for error_key, timestamps in self.error_patterns.items():
if error_category and not error_key.startswith(f"{error_category}:"):
continue
recent_errors = [t for t in timestamps if t > cutoff_time]
total_errors += len(recent_errors)
return total_errors / window_minutes if window_minutes > 0 else 0
def is_error_storm_detected(self) -> bool:
"""
Detect if there's an error storm (high error rate).
Returns:
True if error storm detected
"""
error_rate = self.get_error_rate(window_minutes=5)
return error_rate > 10 # More than 10 errors per minute
def get_most_frequent_errors(self, limit: int = 5) -> List[Tuple[str, int]]:
"""
Get most frequent error types.
Args:
limit: Maximum number of errors to return
Returns:
List of (error_key, count) tuples
"""
error_counts = {}
for error_key, timestamps in self.error_patterns.items():
error_counts[error_key] = len(timestamps)
sorted_errors = sorted(error_counts.items(), key=lambda x: x[1], reverse=True)
return sorted_errors[:limit]
def log_error(self, error: DigiPalException, extra_context: Optional[Dict[str, Any]] = None):
"""
Log error with appropriate level and context.
Args:
error: DigiPal exception to log
extra_context: Additional context for logging
"""
context = {**error.context, **(extra_context or {})}
log_data = {
'error_code': error.error_code,
'category': error.category.value,
'severity': error.severity.value,
'user_message': error.user_message,
'context': context
}
if error.severity == ErrorSeverity.CRITICAL:
logger.critical(f"CRITICAL ERROR: {str(error)}", extra=log_data)
elif error.severity == ErrorSeverity.HIGH:
logger.error(f"HIGH SEVERITY: {str(error)}", extra=log_data)
elif error.severity == ErrorSeverity.MEDIUM:
logger.warning(f"MEDIUM SEVERITY: {str(error)}", extra=log_data)
else:
logger.info(f"LOW SEVERITY: {str(error)}", extra=log_data)
# Track error frequency
error_key = f"{error.category.value}:{error.error_code}"
self.error_counts[error_key] = self.error_counts.get(error_key, 0) + 1
self.last_errors[error_key] = datetime.now()
# Global error handler instance
error_handler = ErrorHandler()
def with_error_handling(
fallback_value: Any = None,
log_errors: bool = True,
raise_on_critical: bool = True,
context: Optional[Dict[str, Any]] = None
):
"""
Decorator for comprehensive error handling.
Args:
fallback_value: Value to return on error
log_errors: Whether to log errors
raise_on_critical: Whether to raise critical errors
context: Additional context for error handling
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
digipal_error = error_handler.handle_error(e, context)
if log_errors:
error_handler.log_error(digipal_error)
if raise_on_critical and digipal_error.severity == ErrorSeverity.CRITICAL:
raise digipal_error
if digipal_error.severity in [ErrorSeverity.HIGH, ErrorSeverity.CRITICAL] and fallback_value is None:
raise digipal_error
return fallback_value
return wrapper
return decorator
def with_retry(config: Optional[RetryConfig] = None):
"""
Decorator for retry functionality.
Args:
config: Retry configuration
"""
config = config or RetryConfig()
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(config.max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
# Check if we should retry on this exception
should_retry = any(isinstance(e, exc_type) for exc_type in config.retry_on)
if not should_retry or attempt == config.max_attempts - 1:
break
# Calculate delay
delay = config.base_delay
if config.exponential_backoff:
delay *= (2 ** attempt)
delay = min(delay, config.max_delay)
if config.jitter:
import random
delay *= (0.5 + random.random() * 0.5)
logger.info(f"Retrying {func.__name__} in {delay:.2f}s (attempt {attempt + 1}/{config.max_attempts})")
time.sleep(delay)
# All retries failed
raise last_exception
return wrapper
return decorator
def with_circuit_breaker(service_name: str, config: Optional[CircuitBreakerConfig] = None):
"""
Decorator for circuit breaker functionality.
Args:
service_name: Name of the service for circuit breaker
config: Circuit breaker configuration
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
circuit_breaker = error_handler.get_circuit_breaker(service_name, config)
return circuit_breaker.call(func, *args, **kwargs)
return wrapper
return decorator
async def with_async_error_handling(
fallback_value: Any = None,
log_errors: bool = True,
raise_on_critical: bool = True,
context: Optional[Dict[str, Any]] = None
):
"""
Async decorator for comprehensive error handling.
Args:
fallback_value: Value to return on error
log_errors: Whether to log errors
raise_on_critical: Whether to raise critical errors
context: Additional context for error handling
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
digipal_error = error_handler.handle_error(e, context)
if log_errors:
error_handler.log_error(digipal_error)
if raise_on_critical and digipal_error.severity == ErrorSeverity.CRITICAL:
raise digipal_error
if digipal_error.severity in [ErrorSeverity.HIGH, ErrorSeverity.CRITICAL] and fallback_value is None:
raise digipal_error
return fallback_value
return wrapper
return decorator
async def with_async_retry(config: Optional[RetryConfig] = None):
"""
Async decorator for retry functionality.
Args:
config: Retry configuration
"""
config = config or RetryConfig()
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(config.max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
# Check if we should retry on this exception
should_retry = any(isinstance(e, exc_type) for exc_type in config.retry_on)
if not should_retry or attempt == config.max_attempts - 1:
break
# Calculate delay
delay = config.base_delay
if config.exponential_backoff:
delay *= (2 ** attempt)
delay = min(delay, config.max_delay)
if config.jitter:
import random
delay *= (0.5 + random.random() * 0.5)
logger.info(f"Retrying {func.__name__} in {delay:.2f}s (attempt {attempt + 1}/{config.max_attempts})")
await asyncio.sleep(delay)
# All retries failed
raise last_exception
return wrapper
return decorator
def create_fallback_response(error: DigiPalException, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Create a standardized fallback response for errors.
Args:
error: DigiPal exception
context: Additional context
Returns:
Fallback response dictionary
"""
return {
'success': False,
'error': {
'message': error.user_message,
'category': error.category.value,
'severity': error.severity.value,
'recovery_suggestions': error.recovery_suggestions,
'error_code': error.error_code
},
'context': context or {},
'timestamp': datetime.now().isoformat()
}
def get_error_statistics() -> Dict[str, Any]:
"""
Get error statistics for monitoring and debugging.
Returns:
Dictionary with error statistics
"""
return {
'error_counts': dict(error_handler.error_counts),
'last_errors': {k: v.isoformat() for k, v in error_handler.last_errors.items()},
'circuit_breaker_states': {
name: {
'state': cb.state,
'failure_count': cb.failure_count,
'last_failure': cb.last_failure_time.isoformat() if cb.last_failure_time else None
}
for name, cb in error_handler.circuit_breakers.items()
}
}
class HealthChecker:
"""Health checker for system components."""
def __init__(self):
"""Initialize health checker."""
self.component_health: Dict[str, bool] = {}
self.last_health_check: Dict[str, datetime] = {}
self.health_check_interval = 300 # 5 minutes
def register_component(self, component_name: str, health_check_func: Callable[[], bool]):
"""
Register a component for health checking.
Args:
component_name: Name of the component
health_check_func: Function that returns True if component is healthy
"""
self.component_health[component_name] = True
self._health_check_functions[component_name] = health_check_func
def check_component_health(self, component_name: str) -> bool:
"""
Check health of a specific component.
Args:
component_name: Name of component to check
Returns:
True if component is healthy
"""
if component_name not in self._health_check_functions:
return False
try:
health_func = self._health_check_functions[component_name]
is_healthy = health_func()
self.component_health[component_name] = is_healthy
self.last_health_check[component_name] = datetime.now()
return is_healthy
except Exception as e:
logger.error(f"Health check failed for {component_name}: {e}")
self.component_health[component_name] = False
return False
def get_system_health(self) -> Dict[str, Any]:
"""
Get overall system health status.
Returns:
Dictionary with health information
"""
# Check all components
for component in self._health_check_functions.keys():
self.check_component_health(component)
healthy_components = sum(1 for health in self.component_health.values() if health)
total_components = len(self.component_health)
overall_health = "healthy" if healthy_components == total_components else "degraded"
if healthy_components == 0:
overall_health = "critical"
elif healthy_components < total_components * 0.5:
overall_health = "unhealthy"
return {
'overall_health': overall_health,
'healthy_components': healthy_components,
'total_components': total_components,
'component_status': dict(self.component_health),
'last_checks': {k: v.isoformat() for k, v in self.last_health_check.items()}
}
def __init__(self):
"""Initialize health checker."""
self.component_health: Dict[str, bool] = {}
self.last_health_check: Dict[str, datetime] = {}
self.health_check_interval = 300 # 5 minutes
self._health_check_functions: Dict[str, Callable[[], bool]] = {}
# Global health checker instance
health_checker = HealthChecker()
class RecoveryManager:
"""Manages recovery operations for various system failures."""
def __init__(self):
"""Initialize recovery manager."""
self.recovery_strategies: Dict[str, List[Callable]] = {}
self.recovery_history: List[Dict[str, Any]] = []
def register_recovery_strategy(self, error_type: str, recovery_func: Callable):
"""
Register a recovery strategy for an error type.
Args:
error_type: Type of error (e.g., 'storage_error', 'ai_model_error')
recovery_func: Function to attempt recovery
"""
if error_type not in self.recovery_strategies:
self.recovery_strategies[error_type] = []
self.recovery_strategies[error_type].append(recovery_func)
def attempt_recovery(self, error: DigiPalException) -> bool:
"""
Attempt to recover from an error.
Args:
error: The error to recover from
Returns:
True if recovery was successful
"""
error_type = error.category.value
if error_type not in self.recovery_strategies:
logger.warning(f"No recovery strategies for error type: {error_type}")
return False
recovery_attempt = {
'timestamp': datetime.now(),
'error_type': error_type,
'error_code': error.error_code,
'strategies_attempted': [],
'success': False
}
for strategy in self.recovery_strategies[error_type]:
try:
strategy_name = strategy.__name__
logger.info(f"Attempting recovery strategy: {strategy_name}")
success = strategy(error)
recovery_attempt['strategies_attempted'].append({
'strategy': strategy_name,
'success': success
})
if success:
recovery_attempt['success'] = True
logger.info(f"Recovery successful using strategy: {strategy_name}")
break
except Exception as recovery_error:
logger.error(f"Recovery strategy {strategy.__name__} failed: {recovery_error}")
recovery_attempt['strategies_attempted'].append({
'strategy': strategy.__name__,
'success': False,
'error': str(recovery_error)
})
self.recovery_history.append(recovery_attempt)
return recovery_attempt['success']
def get_recovery_statistics(self) -> Dict[str, Any]:
"""Get recovery statistics."""
if not self.recovery_history:
return {
'total_attempts': 0,
'successful_recoveries': 0,
'success_rate': 0.0,
'error_types': {}
}
total_attempts = len(self.recovery_history)
successful_recoveries = sum(1 for attempt in self.recovery_history if attempt['success'])
success_rate = successful_recoveries / total_attempts if total_attempts > 0 else 0.0
error_types = {}
for attempt in self.recovery_history:
error_type = attempt['error_type']
if error_type not in error_types:
error_types[error_type] = {'attempts': 0, 'successes': 0}
error_types[error_type]['attempts'] += 1
if attempt['success']:
error_types[error_type]['successes'] += 1
return {
'total_attempts': total_attempts,
'successful_recoveries': successful_recoveries,
'success_rate': success_rate,
'error_types': error_types,
'recent_attempts': self.recovery_history[-10:] # Last 10 attempts
}
# Global recovery manager instance
recovery_manager = RecoveryManager()