import os import uuid from typing import Dict, List, Any, Optional, Callable, Union from datetime import datetime, timedelta import threading import time import shutil from utils.logging import setup_logger from utils.error_handling import handle_exceptions, AutomationError from utils.storage import load_data, save_data # Initialize logger logger = setup_logger(__name__) class BackupSchedule: """Schedule for automated backups""" def __init__(self, name: str, frequency: str, retention_count: int = 5, backup_types: Optional[List[str]] = None, include_ids: bool = False, include_timestamps: bool = True, description: Optional[str] = None, enabled: bool = True): """Initialize a backup schedule Args: name: Schedule name frequency: Backup frequency (daily, weekly, monthly) retention_count: Number of backups to retain backup_types: Types of data to backup (optional, defaults to all) include_ids: Whether to include IDs in backup include_timestamps: Whether to include timestamps in backup description: Schedule description (optional) enabled: Whether schedule is enabled """ self.id = str(uuid.uuid4()) self.name = name self.description = description or "" self.frequency = frequency self.retention_count = retention_count self.backup_types = backup_types or ["all"] self.include_ids = include_ids self.include_timestamps = include_timestamps self.enabled = enabled self.created_at = datetime.now().isoformat() self.updated_at = self.created_at self.last_backup = None self.next_backup = self._calculate_next_backup() self.backup_count = 0 self.error_count = 0 self.last_error = None def _calculate_next_backup(self) -> str: """Calculate next backup time based on frequency Returns: Next backup time as ISO format string """ now = datetime.now() if self.frequency == "daily": # Next day at midnight next_time = datetime(now.year, now.month, now.day) + timedelta(days=1) elif self.frequency == "weekly": # Next Monday at midnight days_ahead = 7 - now.weekday() if days_ahead == 0: days_ahead = 7 next_time = datetime(now.year, now.month, now.day) + timedelta(days=days_ahead) elif self.frequency == "monthly": # First day of next month at midnight if now.month == 12: next_time = datetime(now.year + 1, 1, 1) else: next_time = datetime(now.year, now.month + 1, 1) else: # Default to daily next_time = datetime(now.year, now.month, now.day) + timedelta(days=1) return next_time.isoformat() @handle_exceptions def update_next_backup(self) -> None: """Update next backup time after a backup""" self.last_backup = datetime.now().isoformat() self.next_backup = self._calculate_next_backup() self.backup_count += 1 self.updated_at = datetime.now().isoformat() @handle_exceptions def is_due(self) -> bool: """Check if backup is due Returns: True if backup is due, False otherwise """ if not self.enabled: return False now = datetime.now() next_backup = datetime.fromisoformat(self.next_backup) return now >= next_backup @handle_exceptions def to_dict(self) -> Dict[str, Any]: """Convert backup schedule to dictionary Returns: Backup schedule as dictionary """ return { "id": self.id, "name": self.name, "description": self.description, "frequency": self.frequency, "retention_count": self.retention_count, "backup_types": self.backup_types, "include_ids": self.include_ids, "include_timestamps": self.include_timestamps, "enabled": self.enabled, "created_at": self.created_at, "updated_at": self.updated_at, "last_backup": self.last_backup, "next_backup": self.next_backup, "backup_count": self.backup_count, "error_count": self.error_count, "last_error": self.last_error } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'BackupSchedule': """Create backup schedule from dictionary Args: data: Backup schedule data Returns: BackupSchedule instance """ schedule = cls( data["name"], data["frequency"], data.get("retention_count", 5), data.get("backup_types"), data.get("include_ids", False), data.get("include_timestamps", True), data.get("description", ""), data.get("enabled", True) ) schedule.id = data["id"] schedule.created_at = data["created_at"] schedule.updated_at = data["updated_at"] schedule.last_backup = data.get("last_backup") schedule.next_backup = data.get("next_backup", schedule._calculate_next_backup()) schedule.backup_count = data.get("backup_count", 0) schedule.error_count = data.get("error_count", 0) schedule.last_error = data.get("last_error") return schedule class BackupManager: """Manager for automated backups""" def __init__(self): """Initialize backup manager""" self.schedules = {} self.backup_dir = os.path.join(os.path.expanduser("~"), "mona_backups") self.backup_function = None self.auto_backup = False self.check_interval = 60 # 1 minute self.backup_thread = None self.stop_event = threading.Event() # Create backup directory if it doesn't exist os.makedirs(self.backup_dir, exist_ok=True) self.load_schedules() self._ensure_default_schedules() @handle_exceptions def load_schedules(self) -> None: """Load backup schedules from storage""" try: schedules_data = load_data("backup_schedules", default=[]) for schedule_data in schedules_data: schedule = BackupSchedule.from_dict(schedule_data) self.schedules[schedule.id] = schedule logger.info(f"Loaded {len(self.schedules)} backup schedules") except Exception as e: logger.error(f"Failed to load backup schedules: {str(e)}") @handle_exceptions def save_schedules(self) -> None: """Save backup schedules to storage""" try: schedules_data = [schedule.to_dict() for schedule in self.schedules.values()] save_data("backup_schedules", schedules_data) logger.info(f"Saved {len(self.schedules)} backup schedules") except Exception as e: logger.error(f"Failed to save backup schedules: {str(e)}") def _ensure_default_schedules(self) -> None: """Ensure default backup schedules exist""" # Check if we need to create default schedules if not self.schedules: self._create_default_daily_schedule() self._create_default_weekly_schedule() def _create_default_daily_schedule(self) -> None: """Create default daily backup schedule""" schedule = BackupSchedule( "Daily Backup", "daily", 3, # Retain 3 daily backups ["all"], False, True, "Automatic daily backup of all data", True ) self.schedules[schedule.id] = schedule self.save_schedules() def _create_default_weekly_schedule(self) -> None: """Create default weekly backup schedule""" schedule = BackupSchedule( "Weekly Backup", "weekly", 4, # Retain 4 weekly backups ["all"], True, # Include IDs for full backup True, "Automatic weekly backup of all data", True ) self.schedules[schedule.id] = schedule self.save_schedules() @handle_exceptions def set_backup_directory(self, directory: str) -> None: """Set backup directory Args: directory: Backup directory path """ if not os.path.exists(directory): os.makedirs(directory, exist_ok=True) self.backup_dir = directory logger.info(f"Set backup directory to {directory}") @handle_exceptions def register_backup_function(self, backup_function: Callable) -> None: """Register backup function Args: backup_function: Function that creates a backup """ self.backup_function = backup_function logger.info("Registered backup function") @handle_exceptions def create_schedule(self, name: str, frequency: str, retention_count: int = 5, backup_types: Optional[List[str]] = None, include_ids: bool = False, include_timestamps: bool = True, description: Optional[str] = None, enabled: bool = True) -> BackupSchedule: """Create a new backup schedule Args: name: Schedule name frequency: Backup frequency (daily, weekly, monthly) retention_count: Number of backups to retain backup_types: Types of data to backup (optional, defaults to all) include_ids: Whether to include IDs in backup include_timestamps: Whether to include timestamps in backup description: Schedule description (optional) enabled: Whether schedule is enabled Returns: Created backup schedule """ # Validate frequency if frequency not in ["daily", "weekly", "monthly"]: raise AutomationError(f"Invalid frequency: {frequency}") schedule = BackupSchedule( name, frequency, retention_count, backup_types, include_ids, include_timestamps, description, enabled ) self.schedules[schedule.id] = schedule self.save_schedules() return schedule @handle_exceptions def get_schedule(self, schedule_id: str) -> Optional[BackupSchedule]: """Get backup schedule by ID Args: schedule_id: Backup schedule ID Returns: BackupSchedule if found, None otherwise """ return self.schedules.get(schedule_id) @handle_exceptions def update_schedule(self, schedule: BackupSchedule) -> None: """Update backup schedule Args: schedule: Backup schedule to update """ if schedule.id in self.schedules: schedule.updated_at = datetime.now().isoformat() self.schedules[schedule.id] = schedule self.save_schedules() else: raise AutomationError(f"Backup schedule not found: {schedule.id}") @handle_exceptions def delete_schedule(self, schedule_id: str) -> None: """Delete backup schedule Args: schedule_id: Backup schedule ID """ if schedule_id in self.schedules: del self.schedules[schedule_id] self.save_schedules() else: raise AutomationError(f"Backup schedule not found: {schedule_id}") @handle_exceptions def get_all_schedules(self) -> List[BackupSchedule]: """Get all backup schedules Returns: List of all backup schedules """ return list(self.schedules.values()) @handle_exceptions def get_enabled_schedules(self) -> List[BackupSchedule]: """Get all enabled backup schedules Returns: List of enabled backup schedules """ return [schedule for schedule in self.schedules.values() if schedule.enabled] @handle_exceptions def enable_schedule(self, schedule_id: str) -> None: """Enable backup schedule Args: schedule_id: Backup schedule ID """ schedule = self.get_schedule(schedule_id) if not schedule: raise AutomationError(f"Backup schedule not found: {schedule_id}") schedule.enabled = True schedule.updated_at = datetime.now().isoformat() self.update_schedule(schedule) @handle_exceptions def disable_schedule(self, schedule_id: str) -> None: """Disable backup schedule Args: schedule_id: Backup schedule ID """ schedule = self.get_schedule(schedule_id) if not schedule: raise AutomationError(f"Backup schedule not found: {schedule_id}") schedule.enabled = False schedule.updated_at = datetime.now().isoformat() self.update_schedule(schedule) @handle_exceptions def run_backup(self, schedule_id: str) -> str: """Run backup according to schedule Args: schedule_id: Backup schedule ID Returns: Path to backup file """ schedule = self.get_schedule(schedule_id) if not schedule: raise AutomationError(f"Backup schedule not found: {schedule_id}") if not self.backup_function: error_msg = "No backup function registered" schedule.last_error = error_msg schedule.error_count += 1 self.update_schedule(schedule) raise AutomationError(error_msg) try: # Create backup filename timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_filename = f"{schedule.name.replace(' ', '_')}_{timestamp}.zip" backup_path = os.path.join(self.backup_dir, backup_filename) # Run backup function self.backup_function( backup_path, schedule.backup_types, schedule.include_ids, schedule.include_timestamps ) # Update schedule schedule.update_next_backup() schedule.last_error = None self.update_schedule(schedule) # Clean up old backups self._cleanup_old_backups(schedule) logger.info(f"Created backup: {backup_path}") return backup_path except Exception as e: error_msg = f"Error creating backup for schedule {schedule.name}: {str(e)}" schedule.last_error = error_msg schedule.error_count += 1 self.update_schedule(schedule) logger.error(error_msg) raise AutomationError(error_msg) @handle_exceptions def _cleanup_old_backups(self, schedule: BackupSchedule) -> None: """Clean up old backups according to retention policy Args: schedule: Backup schedule """ # Get all backups for this schedule prefix = f"{schedule.name.replace(' ', '_')}_" backups = [] for filename in os.listdir(self.backup_dir): if filename.startswith(prefix) and filename.endswith(".zip"): file_path = os.path.join(self.backup_dir, filename) file_time = os.path.getmtime(file_path) backups.append((file_path, file_time)) # Sort by modification time (newest first) backups.sort(key=lambda x: x[1], reverse=True) # Delete old backups if len(backups) > schedule.retention_count: for file_path, _ in backups[schedule.retention_count:]: try: os.remove(file_path) logger.info(f"Deleted old backup: {file_path}") except Exception as e: logger.error(f"Error deleting old backup {file_path}: {str(e)}") @handle_exceptions def check_and_run_due_backups(self) -> List[str]: """Check and run all due backups Returns: List of paths to created backup files """ backup_paths = [] for schedule in self.get_enabled_schedules(): if schedule.is_due(): try: backup_path = self.run_backup(schedule.id) backup_paths.append(backup_path) except Exception as e: logger.error(f"Error running backup for schedule {schedule.name}: {str(e)}") return backup_paths @handle_exceptions def start_auto_backup(self) -> None: """Start automatic backup thread""" if self.auto_backup: logger.info("Auto backup already running") return self.auto_backup = True self.stop_event.clear() self.backup_thread = threading.Thread(target=self._auto_backup_thread) self.backup_thread.daemon = True self.backup_thread.start() logger.info("Started auto backup") @handle_exceptions def stop_auto_backup(self) -> None: """Stop automatic backup thread""" if not self.auto_backup: logger.info("Auto backup not running") return self.auto_backup = False self.stop_event.set() if self.backup_thread: self.backup_thread.join(timeout=1.0) self.backup_thread = None logger.info("Stopped auto backup") def _auto_backup_thread(self) -> None: """Thread function for automatic backup""" logger.info(f"Auto backup thread started with check interval {self.check_interval} seconds") while not self.stop_event.is_set(): try: # Check and run due backups self.check_and_run_due_backups() except Exception as e: logger.error(f"Error in auto backup: {str(e)}") # Wait for next check or stop event self.stop_event.wait(self.check_interval) logger.info("Auto backup thread stopped") # Create a global instance of the backup manager backup_manager = BackupManager()