import uuid from typing import Dict, List, Any, Optional, Callable, Union, Set from datetime import datetime, timedelta import threading import time from utils.logging import setup_logger from utils.error_handling import handle_exceptions, AutomationError from utils.storage import load_data, save_data # Initialize logger logger = setup_logger(__name__) class CleanupRule: """Rule for automated cleanup""" def __init__(self, name: str, data_type: str, conditions: Dict[str, Any], action: str, retention_period: Optional[int] = None, description: Optional[str] = None, enabled: bool = True): """Initialize a cleanup rule Args: name: Rule name data_type: Type of data to clean up (tasks, notes, etc.) conditions: Conditions for cleanup action: Cleanup action (delete, archive, tag) retention_period: Retention period in days (optional) description: Rule description (optional) enabled: Whether rule is enabled """ self.id = str(uuid.uuid4()) self.name = name self.description = description or "" self.data_type = data_type self.conditions = conditions self.action = action self.retention_period = retention_period self.enabled = enabled self.created_at = datetime.now().isoformat() self.updated_at = self.created_at self.last_run = None self.run_count = 0 self.items_processed = 0 self.error_count = 0 self.last_error = None @handle_exceptions def to_dict(self) -> Dict[str, Any]: """Convert cleanup rule to dictionary Returns: Cleanup rule as dictionary """ return { "id": self.id, "name": self.name, "description": self.description, "data_type": self.data_type, "conditions": self.conditions, "action": self.action, "retention_period": self.retention_period, "enabled": self.enabled, "created_at": self.created_at, "updated_at": self.updated_at, "last_run": self.last_run, "run_count": self.run_count, "items_processed": self.items_processed, "error_count": self.error_count, "last_error": self.last_error } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'CleanupRule': """Create cleanup rule from dictionary Args: data: Cleanup rule data Returns: CleanupRule instance """ rule = cls( data["name"], data["data_type"], data["conditions"], data["action"], data.get("retention_period"), data.get("description", ""), data.get("enabled", True) ) rule.id = data["id"] rule.created_at = data["created_at"] rule.updated_at = data["updated_at"] rule.last_run = data.get("last_run") rule.run_count = data.get("run_count", 0) rule.items_processed = data.get("items_processed", 0) rule.error_count = data.get("error_count", 0) rule.last_error = data.get("last_error") return rule class CleanupManager: """Manager for automated cleanup""" def __init__(self): """Initialize cleanup manager""" self.rules = {} self.data_providers = {} self.action_handlers = {} self.auto_cleanup = False self.cleanup_interval = 86400 # 24 hours self.cleanup_thread = None self.stop_event = threading.Event() self.load_rules() self._ensure_default_rules() @handle_exceptions def load_rules(self) -> None: """Load cleanup rules from storage""" try: rules_data = load_data("cleanup_rules", default=[]) for rule_data in rules_data: rule = CleanupRule.from_dict(rule_data) self.rules[rule.id] = rule logger.info(f"Loaded {len(self.rules)} cleanup rules") except Exception as e: logger.error(f"Failed to load cleanup rules: {str(e)}") @handle_exceptions def save_rules(self) -> None: """Save cleanup rules to storage""" try: rules_data = [rule.to_dict() for rule in self.rules.values()] save_data("cleanup_rules", rules_data) logger.info(f"Saved {len(self.rules)} cleanup rules") except Exception as e: logger.error(f"Failed to save cleanup rules: {str(e)}") def _ensure_default_rules(self) -> None: """Ensure default cleanup rules exist""" # Check if we need to create default rules if not self.rules: self._create_default_completed_tasks_rule() self._create_default_old_notes_rule() def _create_default_completed_tasks_rule(self) -> None: """Create default rule for completed tasks""" conditions = { "status": "completed", "age_days": 30 } rule = CleanupRule( "Archive Completed Tasks", "task", conditions, "archive", 30, "Archive tasks that have been completed for more than 30 days", True ) self.rules[rule.id] = rule self.save_rules() def _create_default_old_notes_rule(self) -> None: """Create default rule for old notes""" conditions = { "age_days": 90, "tags_exclude": ["important", "keep"] } rule = CleanupRule( "Archive Old Notes", "note", conditions, "archive", 90, "Archive notes older than 90 days that are not tagged as important or keep", True ) self.rules[rule.id] = rule self.save_rules() @handle_exceptions def register_data_provider(self, data_type: str, provider: Callable) -> None: """Register a data provider function Args: data_type: Type of data provided provider: Function that returns data of the specified type """ self.data_providers[data_type] = provider logger.info(f"Registered data provider for {data_type}") @handle_exceptions def register_action_handler(self, action: str, handler: Callable) -> None: """Register an action handler function Args: action: Action name handler: Function that handles the action """ self.action_handlers[action] = handler logger.info(f"Registered action handler for {action}") @handle_exceptions def create_rule(self, name: str, data_type: str, conditions: Dict[str, Any], action: str, retention_period: Optional[int] = None, description: Optional[str] = None, enabled: bool = True) -> CleanupRule: """Create a new cleanup rule Args: name: Rule name data_type: Type of data to clean up conditions: Conditions for cleanup action: Cleanup action retention_period: Retention period in days (optional) description: Rule description (optional) enabled: Whether rule is enabled Returns: Created cleanup rule """ # Validate data type if data_type not in self.data_providers: raise AutomationError(f"No data provider registered for {data_type}") # Validate action if action not in self.action_handlers: raise AutomationError(f"No action handler registered for {action}") rule = CleanupRule(name, data_type, conditions, action, retention_period, description, enabled) self.rules[rule.id] = rule self.save_rules() return rule @handle_exceptions def get_rule(self, rule_id: str) -> Optional[CleanupRule]: """Get cleanup rule by ID Args: rule_id: Cleanup rule ID Returns: CleanupRule if found, None otherwise """ return self.rules.get(rule_id) @handle_exceptions def update_rule(self, rule: CleanupRule) -> None: """Update cleanup rule Args: rule: Cleanup rule to update """ if rule.id in self.rules: rule.updated_at = datetime.now().isoformat() self.rules[rule.id] = rule self.save_rules() else: raise AutomationError(f"Cleanup rule not found: {rule.id}") @handle_exceptions def delete_rule(self, rule_id: str) -> None: """Delete cleanup rule Args: rule_id: Cleanup rule ID """ if rule_id in self.rules: del self.rules[rule_id] self.save_rules() else: raise AutomationError(f"Cleanup rule not found: {rule_id}") @handle_exceptions def get_all_rules(self) -> List[CleanupRule]: """Get all cleanup rules Returns: List of all cleanup rules """ return list(self.rules.values()) @handle_exceptions def get_enabled_rules(self) -> List[CleanupRule]: """Get all enabled cleanup rules Returns: List of enabled cleanup rules """ return [rule for rule in self.rules.values() if rule.enabled] @handle_exceptions def get_rules_by_data_type(self, data_type: str) -> List[CleanupRule]: """Get cleanup rules by data type Args: data_type: Data type Returns: List of cleanup rules for the specified data type """ return [ rule for rule in self.rules.values() if rule.data_type == data_type ] @handle_exceptions def get_rules_by_action(self, action: str) -> List[CleanupRule]: """Get cleanup rules by action Args: action: Action Returns: List of cleanup rules for the specified action """ return [ rule for rule in self.rules.values() if rule.action == action ] @handle_exceptions def enable_rule(self, rule_id: str) -> None: """Enable cleanup rule Args: rule_id: Cleanup rule ID """ rule = self.get_rule(rule_id) if not rule: raise AutomationError(f"Cleanup rule not found: {rule_id}") rule.enabled = True rule.updated_at = datetime.now().isoformat() self.update_rule(rule) @handle_exceptions def disable_rule(self, rule_id: str) -> None: """Disable cleanup rule Args: rule_id: Cleanup rule ID """ rule = self.get_rule(rule_id) if not rule: raise AutomationError(f"Cleanup rule not found: {rule_id}") rule.enabled = False rule.updated_at = datetime.now().isoformat() self.update_rule(rule) @handle_exceptions def _check_conditions(self, item: Dict[str, Any], conditions: Dict[str, Any]) -> bool: """Check if item meets conditions Args: item: Data item conditions: Conditions to check Returns: True if conditions are met, False otherwise """ for field, condition in conditions.items(): # Special case for age_days if field == "age_days": if "created_at" not in item and "updated_at" not in item: return False # Use created_at or updated_at date_field = "created_at" if "updated_at" in item and conditions.get("use_updated_date", False): date_field = "updated_at" if date_field not in item: return False # Parse date and check age try: item_date = datetime.fromisoformat(item[date_field]) age_days = (datetime.now() - item_date).days if age_days < condition: return False except Exception: return False continue # Special case for tags_include if field == "tags_include": if "tags" not in item: return False # Check if any tag in condition is in item tags if not set(condition).intersection(set(item.get("tags", []))): return False continue # Special case for tags_exclude if field == "tags_exclude": if "tags" not in item: continue # Check if any tag in condition is in item tags if set(condition).intersection(set(item.get("tags", []))): return False continue # Regular field check if field not in item: return False if isinstance(condition, list): # Check if any value in the list matches if isinstance(item[field], list): # Check for intersection between lists if not set(condition).intersection(set(item[field])): return False else: # Check if item value is in condition list if item[field] not in condition: return False else: # Direct comparison if item[field] != condition: return False return True @handle_exceptions def run_rule(self, rule_id: str) -> int: """Run cleanup rule Args: rule_id: Cleanup rule ID Returns: Number of items processed """ rule = self.get_rule(rule_id) if not rule: raise AutomationError(f"Cleanup rule not found: {rule_id}") if not rule.enabled: logger.info(f"Skipping disabled cleanup rule: {rule.name}") return 0 # Get data provider and action handler provider = self.data_providers.get(rule.data_type) handler = self.action_handlers.get(rule.action) if not provider: error_msg = f"No data provider registered for {rule.data_type}" rule.last_error = error_msg rule.error_count += 1 self.update_rule(rule) raise AutomationError(error_msg) if not handler: error_msg = f"No action handler registered for {rule.action}" rule.last_error = error_msg rule.error_count += 1 self.update_rule(rule) raise AutomationError(error_msg) try: # Get data data = provider() # Filter and process items processed_count = 0 processed_items = [] for item in data: # Check conditions if self._check_conditions(item, rule.conditions): # Process item handler(item, rule.action) processed_items.append(item) processed_count += 1 # Update rule stats rule.last_run = datetime.now().isoformat() rule.run_count += 1 rule.items_processed += processed_count rule.last_error = None self.update_rule(rule) logger.info(f"Processed {processed_count} items with rule: {rule.name}") return processed_count except Exception as e: error_msg = f"Error running cleanup rule {rule.name}: {str(e)}" rule.last_error = error_msg rule.error_count += 1 self.update_rule(rule) logger.error(error_msg) raise AutomationError(error_msg) @handle_exceptions def run_all_rules(self) -> Dict[str, int]: """Run all enabled cleanup rules Returns: Dictionary mapping rule IDs to number of items processed """ results = {} for rule in self.get_enabled_rules(): try: processed_count = self.run_rule(rule.id) results[rule.id] = processed_count except Exception as e: logger.error(f"Error running cleanup rule {rule.name}: {str(e)}") results[rule.id] = 0 return results @handle_exceptions def run_rules_by_data_type(self, data_type: str) -> Dict[str, int]: """Run cleanup rules for a specific data type Args: data_type: Data type Returns: Dictionary mapping rule IDs to number of items processed """ results = {} for rule in self.get_rules_by_data_type(data_type): if rule.enabled: try: processed_count = self.run_rule(rule.id) results[rule.id] = processed_count except Exception as e: logger.error(f"Error running cleanup rule {rule.name}: {str(e)}") results[rule.id] = 0 return results @handle_exceptions def set_cleanup_interval(self, interval: int) -> None: """Set cleanup interval in seconds Args: interval: Cleanup interval in seconds """ if interval < 3600: # 1 hour raise AutomationError("Cleanup interval must be at least 3600 seconds (1 hour)") self.cleanup_interval = interval logger.info(f"Set cleanup interval to {interval} seconds") @handle_exceptions def start_auto_cleanup(self) -> None: """Start automatic cleanup thread""" if self.auto_cleanup: logger.info("Auto cleanup already running") return self.auto_cleanup = True self.stop_event.clear() self.cleanup_thread = threading.Thread(target=self._auto_cleanup_thread) self.cleanup_thread.daemon = True self.cleanup_thread.start() logger.info("Started auto cleanup") @handle_exceptions def stop_auto_cleanup(self) -> None: """Stop automatic cleanup thread""" if not self.auto_cleanup: logger.info("Auto cleanup not running") return self.auto_cleanup = False self.stop_event.set() if self.cleanup_thread: self.cleanup_thread.join(timeout=1.0) self.cleanup_thread = None logger.info("Stopped auto cleanup") def _auto_cleanup_thread(self) -> None: """Thread function for automatic cleanup""" logger.info(f"Auto cleanup thread started with interval {self.cleanup_interval} seconds") while not self.stop_event.is_set(): try: # Run all enabled rules self.run_all_rules() except Exception as e: logger.error(f"Error in auto cleanup: {str(e)}") # Wait for next cleanup or stop event self.stop_event.wait(self.cleanup_interval) logger.info("Auto cleanup thread stopped") # Create a global instance of the cleanup manager cleanup_manager = CleanupManager()