Spaces:
Runtime error
Runtime error

🥚 Initial DigiPal deployment to HuggingFace Spaces🤖 Generated with [Claude Code](https://claude.ai/code)Co-Authored-By: Claude <[email protected]>
4399e64
""" | |
Recovery strategies for DigiPal error handling system. | |
This module implements specific recovery strategies for different types of errors, | |
providing automated recovery mechanisms and fallback procedures. | |
""" | |
import logging | |
import time | |
import shutil | |
import sqlite3 | |
from datetime import datetime, timedelta | |
from pathlib import Path | |
from typing import Dict, List, Optional, Any, Callable, Tuple | |
from dataclasses import dataclass | |
from .exceptions import ( | |
DigiPalException, StorageError, AIModelError, NetworkError, | |
AuthenticationError, ImageGenerationError, SpeechProcessingError, | |
PetLifecycleError, MCPProtocolError, RecoveryError, ErrorSeverity | |
) | |
from .error_handler import recovery_manager | |
from ..storage.backup_recovery import BackupRecoveryManager | |
logger = logging.getLogger(__name__) | |
class RecoveryResult: | |
"""Result of a recovery attempt.""" | |
success: bool | |
strategy_used: str | |
message: str | |
context: Dict[str, Any] | |
recovery_time_seconds: float | |
class StorageRecoveryStrategy: | |
"""Recovery strategies for storage-related errors.""" | |
def __init__(self, backup_manager: Optional[BackupRecoveryManager] = None): | |
"""Initialize storage recovery strategy.""" | |
self.backup_manager = backup_manager | |
def recover_corrupted_database(self, error: StorageError) -> bool: | |
""" | |
Recover from database corruption. | |
Args: | |
error: Storage error that occurred | |
Returns: | |
True if recovery successful | |
""" | |
try: | |
logger.info("Attempting database corruption recovery") | |
if not self.backup_manager: | |
logger.error("No backup manager available for recovery") | |
return False | |
# Get the most recent backup | |
backups = self.backup_manager.list_backups() | |
if not backups: | |
logger.error("No backups available for recovery") | |
return False | |
latest_backup = backups[0] # Already sorted by timestamp | |
# Attempt to restore from backup | |
success = self.backup_manager.restore_backup(latest_backup.backup_id) | |
if success: | |
logger.info(f"Database recovered from backup: {latest_backup.backup_id}") | |
return True | |
else: | |
logger.error("Failed to restore from backup") | |
return False | |
except Exception as e: | |
logger.error(f"Database recovery failed: {e}") | |
return False | |
def recover_disk_space_issue(self, error: StorageError) -> bool: | |
""" | |
Recover from disk space issues. | |
Args: | |
error: Storage error that occurred | |
Returns: | |
True if recovery successful | |
""" | |
try: | |
logger.info("Attempting disk space recovery") | |
# Clean up old backups | |
if self.backup_manager: | |
old_backups = self.backup_manager.list_backups() | |
# Keep only the 3 most recent backups | |
backups_to_delete = old_backups[3:] | |
for backup in backups_to_delete: | |
self.backup_manager.delete_backup(backup.backup_id) | |
logger.info(f"Deleted old backup: {backup.backup_id}") | |
# Clean up temporary files | |
temp_dirs = [Path("/tmp"), Path.cwd() / "temp", Path.cwd() / "cache"] | |
for temp_dir in temp_dirs: | |
if temp_dir.exists(): | |
for temp_file in temp_dir.glob("digipal_*"): | |
try: | |
if temp_file.is_file(): | |
temp_file.unlink() | |
elif temp_file.is_dir(): | |
shutil.rmtree(temp_file) | |
except Exception as e: | |
logger.warning(f"Failed to delete temp file {temp_file}: {e}") | |
logger.info("Disk space cleanup completed") | |
return True | |
except Exception as e: | |
logger.error(f"Disk space recovery failed: {e}") | |
return False | |
def recover_permission_error(self, error: StorageError) -> bool: | |
""" | |
Recover from permission errors. | |
Args: | |
error: Storage error that occurred | |
Returns: | |
True if recovery successful | |
""" | |
try: | |
logger.info("Attempting permission error recovery") | |
# Try to create alternative storage location | |
alternative_paths = [ | |
Path.home() / ".digipal" / "data", | |
Path("/tmp") / "digipal_data", | |
Path.cwd() / "digipal_data_alt" | |
] | |
for alt_path in alternative_paths: | |
try: | |
alt_path.mkdir(parents=True, exist_ok=True) | |
# Test write access | |
test_file = alt_path / "test_write.tmp" | |
test_file.write_text("test") | |
test_file.unlink() | |
logger.info(f"Alternative storage location available: {alt_path}") | |
# Store the alternative path in error context for later use | |
error.context['alternative_storage_path'] = str(alt_path) | |
return True | |
except Exception as e: | |
logger.debug(f"Alternative path {alt_path} not accessible: {e}") | |
continue | |
logger.error("No alternative storage locations available") | |
return False | |
except Exception as e: | |
logger.error(f"Permission error recovery failed: {e}") | |
return False | |
class AIModelRecoveryStrategy: | |
"""Recovery strategies for AI model-related errors.""" | |
def __init__(self): | |
"""Initialize AI model recovery strategy.""" | |
self.model_fallback_hierarchy = { | |
'language_model': ['qwen3-0.6b', 'simple_responses', 'static_responses'], | |
'speech_processing': ['kyutai', 'basic_speech', 'text_only'], | |
'image_generation': ['flux', 'stable_diffusion', 'default_images'] | |
} | |
def recover_model_loading_failure(self, error: AIModelError) -> bool: | |
""" | |
Recover from model loading failures. | |
Args: | |
error: AI model error that occurred | |
Returns: | |
True if recovery successful | |
""" | |
try: | |
logger.info("Attempting AI model loading recovery") | |
# Clear model cache | |
import gc | |
import torch | |
# Force garbage collection | |
gc.collect() | |
# Clear CUDA cache if available | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
logger.info("Cleared CUDA cache") | |
# Try loading with reduced precision | |
error.context['use_reduced_precision'] = True | |
error.context['use_cpu_only'] = True | |
logger.info("Set fallback options for model loading") | |
return True | |
except Exception as e: | |
logger.error(f"Model loading recovery failed: {e}") | |
return False | |
def recover_model_inference_failure(self, error: AIModelError) -> bool: | |
""" | |
Recover from model inference failures. | |
Args: | |
error: AI model error that occurred | |
Returns: | |
True if recovery successful | |
""" | |
try: | |
logger.info("Attempting AI model inference recovery") | |
# Switch to fallback response mode | |
error.context['use_fallback_responses'] = True | |
error.context['degradation_level'] = 'basic_responses' | |
logger.info("Switched to fallback response mode") | |
return True | |
except Exception as e: | |
logger.error(f"Model inference recovery failed: {e}") | |
return False | |
def recover_memory_error(self, error: AIModelError) -> bool: | |
""" | |
Recover from memory-related errors. | |
Args: | |
error: AI model error that occurred | |
Returns: | |
True if recovery successful | |
""" | |
try: | |
logger.info("Attempting memory error recovery") | |
import gc | |
import torch | |
# Aggressive memory cleanup | |
gc.collect() | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
torch.cuda.synchronize() | |
# Reduce batch sizes and model precision | |
error.context['reduce_batch_size'] = True | |
error.context['use_fp16'] = True | |
error.context['offload_to_cpu'] = True | |
logger.info("Applied memory optimization settings") | |
return True | |
except Exception as e: | |
logger.error(f"Memory error recovery failed: {e}") | |
return False | |
class NetworkRecoveryStrategy: | |
"""Recovery strategies for network-related errors.""" | |
def __init__(self): | |
"""Initialize network recovery strategy.""" | |
self.retry_delays = [1, 2, 5, 10, 30] # Progressive delays in seconds | |
def recover_connection_timeout(self, error: NetworkError) -> bool: | |
""" | |
Recover from connection timeout errors. | |
Args: | |
error: Network error that occurred | |
Returns: | |
True if recovery successful | |
""" | |
try: | |
logger.info("Attempting connection timeout recovery") | |
# Switch to offline mode | |
error.context['enable_offline_mode'] = True | |
error.context['use_cached_data'] = True | |
logger.info("Switched to offline mode") | |
return True | |
except Exception as e: | |
logger.error(f"Connection timeout recovery failed: {e}") | |
return False | |
def recover_dns_resolution_failure(self, error: NetworkError) -> bool: | |
""" | |
Recover from DNS resolution failures. | |
Args: | |
error: Network error that occurred | |
Returns: | |
True if recovery successful | |
""" | |
try: | |
logger.info("Attempting DNS resolution recovery") | |
# Try alternative DNS servers or cached endpoints | |
alternative_endpoints = [ | |
"8.8.8.8", # Google DNS | |
"1.1.1.1", # Cloudflare DNS | |
"208.67.222.222" # OpenDNS | |
] | |
error.context['alternative_dns_servers'] = alternative_endpoints | |
error.context['use_ip_addresses'] = True | |
logger.info("Set alternative DNS configuration") | |
return True | |
except Exception as e: | |
logger.error(f"DNS resolution recovery failed: {e}") | |
return False | |
def recover_rate_limit_error(self, error: NetworkError) -> bool: | |
""" | |
Recover from rate limiting errors. | |
Args: | |
error: Network error that occurred | |
Returns: | |
True if recovery successful | |
""" | |
try: | |
logger.info("Attempting rate limit recovery") | |
# Implement exponential backoff | |
error.context['implement_backoff'] = True | |
error.context['backoff_multiplier'] = 2.0 | |
error.context['max_backoff_seconds'] = 300 | |
# Switch to cached responses temporarily | |
error.context['use_cached_responses'] = True | |
error.context['cache_duration_hours'] = 1 | |
logger.info("Applied rate limiting recovery measures") | |
return True | |
except Exception as e: | |
logger.error(f"Rate limit recovery failed: {e}") | |
return False | |
class AuthenticationRecoveryStrategy: | |
"""Recovery strategies for authentication-related errors.""" | |
def recover_token_expiry(self, error: AuthenticationError) -> bool: | |
""" | |
Recover from token expiry errors. | |
Args: | |
error: Authentication error that occurred | |
Returns: | |
True if recovery successful | |
""" | |
try: | |
logger.info("Attempting token expiry recovery") | |
# Switch to offline mode with cached authentication | |
error.context['use_offline_auth'] = True | |
error.context['extend_session_timeout'] = True | |
logger.info("Switched to offline authentication mode") | |
return True | |
except Exception as e: | |
logger.error(f"Token expiry recovery failed: {e}") | |
return False | |
def recover_invalid_credentials(self, error: AuthenticationError) -> bool: | |
""" | |
Recover from invalid credentials errors. | |
Args: | |
error: Authentication error that occurred | |
Returns: | |
True if recovery successful | |
""" | |
try: | |
logger.info("Attempting invalid credentials recovery") | |
# Enable guest mode | |
error.context['enable_guest_mode'] = True | |
error.context['limited_functionality'] = True | |
logger.info("Enabled guest mode for limited functionality") | |
return True | |
except Exception as e: | |
logger.error(f"Invalid credentials recovery failed: {e}") | |
return False | |
class PetLifecycleRecoveryStrategy: | |
"""Recovery strategies for pet lifecycle errors.""" | |
def __init__(self, backup_manager: Optional[BackupRecoveryManager] = None): | |
"""Initialize pet lifecycle recovery strategy.""" | |
self.backup_manager = backup_manager | |
def recover_corrupted_pet_data(self, error: PetLifecycleError) -> bool: | |
""" | |
Recover from corrupted pet data. | |
Args: | |
error: Pet lifecycle error that occurred | |
Returns: | |
True if recovery successful | |
""" | |
try: | |
logger.info("Attempting corrupted pet data recovery") | |
if not self.backup_manager: | |
logger.error("No backup manager available for pet recovery") | |
return False | |
# Try to restore pet from backup | |
pet_id = error.context.get('pet_id') | |
user_id = error.context.get('user_id') | |
if pet_id: | |
# Look for pet-specific backups | |
backups = self.backup_manager.list_backups() | |
pet_backups = [b for b in backups if b.pet_id == pet_id] | |
if pet_backups: | |
latest_backup = pet_backups[0] | |
success = self.backup_manager.restore_backup(latest_backup.backup_id) | |
if success: | |
logger.info(f"Pet data recovered from backup: {latest_backup.backup_id}") | |
return True | |
# Fallback to user-level backup | |
if user_id: | |
user_backups = [b for b in self.backup_manager.list_backups() if b.user_id == user_id] | |
if user_backups: | |
latest_backup = user_backups[0] | |
success = self.backup_manager.restore_backup(latest_backup.backup_id) | |
if success: | |
logger.info(f"User data recovered from backup: {latest_backup.backup_id}") | |
return True | |
logger.error("No suitable backups found for pet recovery") | |
return False | |
except Exception as e: | |
logger.error(f"Pet data recovery failed: {e}") | |
return False | |
def recover_evolution_failure(self, error: PetLifecycleError) -> bool: | |
""" | |
Recover from evolution failures. | |
Args: | |
error: Pet lifecycle error that occurred | |
Returns: | |
True if recovery successful | |
""" | |
try: | |
logger.info("Attempting evolution failure recovery") | |
# Reset evolution state to previous stable state | |
error.context['reset_evolution_state'] = True | |
error.context['use_safe_evolution'] = True | |
error.context['skip_complex_calculations'] = True | |
logger.info("Set evolution recovery parameters") | |
return True | |
except Exception as e: | |
logger.error(f"Evolution failure recovery failed: {e}") | |
return False | |
class SystemRecoveryOrchestrator: | |
"""Orchestrates recovery strategies across all system components.""" | |
def __init__(self, backup_manager: Optional[BackupRecoveryManager] = None): | |
"""Initialize system recovery orchestrator.""" | |
self.backup_manager = backup_manager | |
# Initialize recovery strategies | |
self.storage_recovery = StorageRecoveryStrategy(backup_manager) | |
self.ai_model_recovery = AIModelRecoveryStrategy() | |
self.network_recovery = NetworkRecoveryStrategy() | |
self.auth_recovery = AuthenticationRecoveryStrategy() | |
self.pet_lifecycle_recovery = PetLifecycleRecoveryStrategy(backup_manager) | |
# Register recovery strategies | |
self._register_recovery_strategies() | |
def _register_recovery_strategies(self): | |
"""Register all recovery strategies with the recovery manager.""" | |
# Storage recovery strategies | |
recovery_manager.register_recovery_strategy( | |
'storage', self.storage_recovery.recover_corrupted_database | |
) | |
recovery_manager.register_recovery_strategy( | |
'storage', self.storage_recovery.recover_disk_space_issue | |
) | |
recovery_manager.register_recovery_strategy( | |
'storage', self.storage_recovery.recover_permission_error | |
) | |
# AI model recovery strategies | |
recovery_manager.register_recovery_strategy( | |
'ai_model', self.ai_model_recovery.recover_model_loading_failure | |
) | |
recovery_manager.register_recovery_strategy( | |
'ai_model', self.ai_model_recovery.recover_model_inference_failure | |
) | |
recovery_manager.register_recovery_strategy( | |
'ai_model', self.ai_model_recovery.recover_memory_error | |
) | |
# Network recovery strategies | |
recovery_manager.register_recovery_strategy( | |
'network', self.network_recovery.recover_connection_timeout | |
) | |
recovery_manager.register_recovery_strategy( | |
'network', self.network_recovery.recover_dns_resolution_failure | |
) | |
recovery_manager.register_recovery_strategy( | |
'network', self.network_recovery.recover_rate_limit_error | |
) | |
# Authentication recovery strategies | |
recovery_manager.register_recovery_strategy( | |
'authentication', self.auth_recovery.recover_token_expiry | |
) | |
recovery_manager.register_recovery_strategy( | |
'authentication', self.auth_recovery.recover_invalid_credentials | |
) | |
# Pet lifecycle recovery strategies | |
recovery_manager.register_recovery_strategy( | |
'pet_lifecycle', self.pet_lifecycle_recovery.recover_corrupted_pet_data | |
) | |
recovery_manager.register_recovery_strategy( | |
'pet_lifecycle', self.pet_lifecycle_recovery.recover_evolution_failure | |
) | |
logger.info("All recovery strategies registered") | |
def execute_comprehensive_recovery(self, error: DigiPalException) -> RecoveryResult: | |
""" | |
Execute comprehensive recovery for any DigiPal error. | |
Args: | |
error: The error to recover from | |
Returns: | |
RecoveryResult with recovery outcome | |
""" | |
start_time = time.time() | |
try: | |
logger.info(f"Starting comprehensive recovery for error: {error.category.value}") | |
# Create pre-recovery backup if critical error | |
if error.severity in [ErrorSeverity.HIGH, ErrorSeverity.CRITICAL]: | |
if self.backup_manager: | |
backup_id = self.backup_manager.create_pre_operation_backup( | |
"error_recovery", | |
{"error_type": error.category.value, "error_code": error.error_code} | |
) | |
if backup_id: | |
logger.info(f"Pre-recovery backup created: {backup_id}") | |
# Attempt recovery using registered strategies | |
recovery_success = recovery_manager.attempt_recovery(error) | |
recovery_time = time.time() - start_time | |
if recovery_success: | |
return RecoveryResult( | |
success=True, | |
strategy_used="comprehensive_recovery", | |
message="Recovery completed successfully", | |
context=error.context, | |
recovery_time_seconds=recovery_time | |
) | |
else: | |
return RecoveryResult( | |
success=False, | |
strategy_used="comprehensive_recovery", | |
message="Recovery failed - manual intervention required", | |
context=error.context, | |
recovery_time_seconds=recovery_time | |
) | |
except Exception as recovery_error: | |
recovery_time = time.time() - start_time | |
logger.error(f"Comprehensive recovery failed: {recovery_error}") | |
return RecoveryResult( | |
success=False, | |
strategy_used="comprehensive_recovery", | |
message=f"Recovery failed with error: {str(recovery_error)}", | |
context={"recovery_error": str(recovery_error)}, | |
recovery_time_seconds=recovery_time | |
) | |
def get_recovery_recommendations(self, error: DigiPalException) -> List[str]: | |
""" | |
Get recovery recommendations for a specific error. | |
Args: | |
error: The error to get recommendations for | |
Returns: | |
List of recovery recommendations | |
""" | |
recommendations = [] | |
# Add error-specific recommendations | |
recommendations.extend(error.recovery_suggestions) | |
# Add category-specific recommendations | |
category_recommendations = { | |
'storage': [ | |
"Check available disk space", | |
"Verify file permissions", | |
"Run database integrity check", | |
"Consider restoring from backup" | |
], | |
'ai_model': [ | |
"Check available memory", | |
"Verify model files are not corrupted", | |
"Try restarting the application", | |
"Consider using reduced model precision" | |
], | |
'network': [ | |
"Check internet connection", | |
"Verify firewall settings", | |
"Try using offline mode", | |
"Check for service outages" | |
], | |
'authentication': [ | |
"Verify credentials are correct", | |
"Check token expiration", | |
"Try logging out and back in", | |
"Consider using offline mode" | |
], | |
'pet_lifecycle': [ | |
"Check pet data integrity", | |
"Verify backup availability", | |
"Try reloading pet data", | |
"Consider restoring from backup" | |
] | |
} | |
category_recs = category_recommendations.get(error.category.value, []) | |
recommendations.extend(category_recs) | |
# Remove duplicates while preserving order | |
seen = set() | |
unique_recommendations = [] | |
for rec in recommendations: | |
if rec not in seen: | |
seen.add(rec) | |
unique_recommendations.append(rec) | |
return unique_recommendations | |
# Global system recovery orchestrator | |
system_recovery_orchestrator = None | |
def initialize_system_recovery(backup_manager: Optional[BackupRecoveryManager] = None): | |
""" | |
Initialize the global system recovery orchestrator. | |
Args: | |
backup_manager: Backup manager instance | |
""" | |
global system_recovery_orchestrator | |
system_recovery_orchestrator = SystemRecoveryOrchestrator(backup_manager) | |
logger.info("System recovery orchestrator initialized") | |
def get_system_recovery_orchestrator() -> Optional[SystemRecoveryOrchestrator]: | |
"""Get the global system recovery orchestrator.""" | |
return system_recovery_orchestrator |