|
import uuid |
|
from typing import Dict, List, Any, Optional, Callable, Union, Set |
|
from datetime import datetime, timedelta |
|
import threading |
|
import time |
|
|
|
from utils.logging import setup_logger |
|
from utils.error_handling import handle_exceptions, AutomationError |
|
from utils.storage import load_data, save_data |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
class CleanupRule: |
|
"""Rule for automated cleanup""" |
|
|
|
def __init__(self, name: str, data_type: str, conditions: Dict[str, Any], |
|
action: str, retention_period: Optional[int] = None, |
|
description: Optional[str] = None, enabled: bool = True): |
|
"""Initialize a cleanup rule |
|
|
|
Args: |
|
name: Rule name |
|
data_type: Type of data to clean up (tasks, notes, etc.) |
|
conditions: Conditions for cleanup |
|
action: Cleanup action (delete, archive, tag) |
|
retention_period: Retention period in days (optional) |
|
description: Rule description (optional) |
|
enabled: Whether rule is enabled |
|
""" |
|
self.id = str(uuid.uuid4()) |
|
self.name = name |
|
self.description = description or "" |
|
self.data_type = data_type |
|
self.conditions = conditions |
|
self.action = action |
|
self.retention_period = retention_period |
|
self.enabled = enabled |
|
self.created_at = datetime.now().isoformat() |
|
self.updated_at = self.created_at |
|
self.last_run = None |
|
self.run_count = 0 |
|
self.items_processed = 0 |
|
self.error_count = 0 |
|
self.last_error = None |
|
|
|
@handle_exceptions |
|
def to_dict(self) -> Dict[str, Any]: |
|
"""Convert cleanup rule to dictionary |
|
|
|
Returns: |
|
Cleanup rule as dictionary |
|
""" |
|
return { |
|
"id": self.id, |
|
"name": self.name, |
|
"description": self.description, |
|
"data_type": self.data_type, |
|
"conditions": self.conditions, |
|
"action": self.action, |
|
"retention_period": self.retention_period, |
|
"enabled": self.enabled, |
|
"created_at": self.created_at, |
|
"updated_at": self.updated_at, |
|
"last_run": self.last_run, |
|
"run_count": self.run_count, |
|
"items_processed": self.items_processed, |
|
"error_count": self.error_count, |
|
"last_error": self.last_error |
|
} |
|
|
|
@classmethod |
|
def from_dict(cls, data: Dict[str, Any]) -> 'CleanupRule': |
|
"""Create cleanup rule from dictionary |
|
|
|
Args: |
|
data: Cleanup rule data |
|
|
|
Returns: |
|
CleanupRule instance |
|
""" |
|
rule = cls( |
|
data["name"], |
|
data["data_type"], |
|
data["conditions"], |
|
data["action"], |
|
data.get("retention_period"), |
|
data.get("description", ""), |
|
data.get("enabled", True) |
|
) |
|
rule.id = data["id"] |
|
rule.created_at = data["created_at"] |
|
rule.updated_at = data["updated_at"] |
|
rule.last_run = data.get("last_run") |
|
rule.run_count = data.get("run_count", 0) |
|
rule.items_processed = data.get("items_processed", 0) |
|
rule.error_count = data.get("error_count", 0) |
|
rule.last_error = data.get("last_error") |
|
return rule |
|
|
|
|
|
class CleanupManager: |
|
"""Manager for automated cleanup""" |
|
|
|
def __init__(self): |
|
"""Initialize cleanup manager""" |
|
self.rules = {} |
|
self.data_providers = {} |
|
self.action_handlers = {} |
|
self.auto_cleanup = False |
|
self.cleanup_interval = 86400 |
|
self.cleanup_thread = None |
|
self.stop_event = threading.Event() |
|
self.load_rules() |
|
self._ensure_default_rules() |
|
|
|
@handle_exceptions |
|
def load_rules(self) -> None: |
|
"""Load cleanup rules from storage""" |
|
try: |
|
rules_data = load_data("cleanup_rules", default=[]) |
|
for rule_data in rules_data: |
|
rule = CleanupRule.from_dict(rule_data) |
|
self.rules[rule.id] = rule |
|
logger.info(f"Loaded {len(self.rules)} cleanup rules") |
|
except Exception as e: |
|
logger.error(f"Failed to load cleanup rules: {str(e)}") |
|
|
|
@handle_exceptions |
|
def save_rules(self) -> None: |
|
"""Save cleanup rules to storage""" |
|
try: |
|
rules_data = [rule.to_dict() for rule in self.rules.values()] |
|
save_data("cleanup_rules", rules_data) |
|
logger.info(f"Saved {len(self.rules)} cleanup rules") |
|
except Exception as e: |
|
logger.error(f"Failed to save cleanup rules: {str(e)}") |
|
|
|
def _ensure_default_rules(self) -> None: |
|
"""Ensure default cleanup rules exist""" |
|
|
|
if not self.rules: |
|
self._create_default_completed_tasks_rule() |
|
self._create_default_old_notes_rule() |
|
|
|
def _create_default_completed_tasks_rule(self) -> None: |
|
"""Create default rule for completed tasks""" |
|
conditions = { |
|
"status": "completed", |
|
"age_days": 30 |
|
} |
|
|
|
rule = CleanupRule( |
|
"Archive Completed Tasks", |
|
"task", |
|
conditions, |
|
"archive", |
|
30, |
|
"Archive tasks that have been completed for more than 30 days", |
|
True |
|
) |
|
|
|
self.rules[rule.id] = rule |
|
self.save_rules() |
|
|
|
def _create_default_old_notes_rule(self) -> None: |
|
"""Create default rule for old notes""" |
|
conditions = { |
|
"age_days": 90, |
|
"tags_exclude": ["important", "keep"] |
|
} |
|
|
|
rule = CleanupRule( |
|
"Archive Old Notes", |
|
"note", |
|
conditions, |
|
"archive", |
|
90, |
|
"Archive notes older than 90 days that are not tagged as important or keep", |
|
True |
|
) |
|
|
|
self.rules[rule.id] = rule |
|
self.save_rules() |
|
|
|
@handle_exceptions |
|
def register_data_provider(self, data_type: str, provider: Callable) -> None: |
|
"""Register a data provider function |
|
|
|
Args: |
|
data_type: Type of data provided |
|
provider: Function that returns data of the specified type |
|
""" |
|
self.data_providers[data_type] = provider |
|
logger.info(f"Registered data provider for {data_type}") |
|
|
|
@handle_exceptions |
|
def register_action_handler(self, action: str, handler: Callable) -> None: |
|
"""Register an action handler function |
|
|
|
Args: |
|
action: Action name |
|
handler: Function that handles the action |
|
""" |
|
self.action_handlers[action] = handler |
|
logger.info(f"Registered action handler for {action}") |
|
|
|
@handle_exceptions |
|
def create_rule(self, name: str, data_type: str, conditions: Dict[str, Any], |
|
action: str, retention_period: Optional[int] = None, |
|
description: Optional[str] = None, enabled: bool = True) -> CleanupRule: |
|
"""Create a new cleanup rule |
|
|
|
Args: |
|
name: Rule name |
|
data_type: Type of data to clean up |
|
conditions: Conditions for cleanup |
|
action: Cleanup action |
|
retention_period: Retention period in days (optional) |
|
description: Rule description (optional) |
|
enabled: Whether rule is enabled |
|
|
|
Returns: |
|
Created cleanup rule |
|
""" |
|
|
|
if data_type not in self.data_providers: |
|
raise AutomationError(f"No data provider registered for {data_type}") |
|
|
|
|
|
if action not in self.action_handlers: |
|
raise AutomationError(f"No action handler registered for {action}") |
|
|
|
rule = CleanupRule(name, data_type, conditions, action, retention_period, description, enabled) |
|
self.rules[rule.id] = rule |
|
self.save_rules() |
|
return rule |
|
|
|
@handle_exceptions |
|
def get_rule(self, rule_id: str) -> Optional[CleanupRule]: |
|
"""Get cleanup rule by ID |
|
|
|
Args: |
|
rule_id: Cleanup rule ID |
|
|
|
Returns: |
|
CleanupRule if found, None otherwise |
|
""" |
|
return self.rules.get(rule_id) |
|
|
|
@handle_exceptions |
|
def update_rule(self, rule: CleanupRule) -> None: |
|
"""Update cleanup rule |
|
|
|
Args: |
|
rule: Cleanup rule to update |
|
""" |
|
if rule.id in self.rules: |
|
rule.updated_at = datetime.now().isoformat() |
|
self.rules[rule.id] = rule |
|
self.save_rules() |
|
else: |
|
raise AutomationError(f"Cleanup rule not found: {rule.id}") |
|
|
|
@handle_exceptions |
|
def delete_rule(self, rule_id: str) -> None: |
|
"""Delete cleanup rule |
|
|
|
Args: |
|
rule_id: Cleanup rule ID |
|
""" |
|
if rule_id in self.rules: |
|
del self.rules[rule_id] |
|
self.save_rules() |
|
else: |
|
raise AutomationError(f"Cleanup rule not found: {rule_id}") |
|
|
|
@handle_exceptions |
|
def get_all_rules(self) -> List[CleanupRule]: |
|
"""Get all cleanup rules |
|
|
|
Returns: |
|
List of all cleanup rules |
|
""" |
|
return list(self.rules.values()) |
|
|
|
@handle_exceptions |
|
def get_enabled_rules(self) -> List[CleanupRule]: |
|
"""Get all enabled cleanup rules |
|
|
|
Returns: |
|
List of enabled cleanup rules |
|
""" |
|
return [rule for rule in self.rules.values() if rule.enabled] |
|
|
|
@handle_exceptions |
|
def get_rules_by_data_type(self, data_type: str) -> List[CleanupRule]: |
|
"""Get cleanup rules by data type |
|
|
|
Args: |
|
data_type: Data type |
|
|
|
Returns: |
|
List of cleanup rules for the specified data type |
|
""" |
|
return [ |
|
rule for rule in self.rules.values() |
|
if rule.data_type == data_type |
|
] |
|
|
|
@handle_exceptions |
|
def get_rules_by_action(self, action: str) -> List[CleanupRule]: |
|
"""Get cleanup rules by action |
|
|
|
Args: |
|
action: Action |
|
|
|
Returns: |
|
List of cleanup rules for the specified action |
|
""" |
|
return [ |
|
rule for rule in self.rules.values() |
|
if rule.action == action |
|
] |
|
|
|
@handle_exceptions |
|
def enable_rule(self, rule_id: str) -> None: |
|
"""Enable cleanup rule |
|
|
|
Args: |
|
rule_id: Cleanup rule ID |
|
""" |
|
rule = self.get_rule(rule_id) |
|
if not rule: |
|
raise AutomationError(f"Cleanup rule not found: {rule_id}") |
|
|
|
rule.enabled = True |
|
rule.updated_at = datetime.now().isoformat() |
|
self.update_rule(rule) |
|
|
|
@handle_exceptions |
|
def disable_rule(self, rule_id: str) -> None: |
|
"""Disable cleanup rule |
|
|
|
Args: |
|
rule_id: Cleanup rule ID |
|
""" |
|
rule = self.get_rule(rule_id) |
|
if not rule: |
|
raise AutomationError(f"Cleanup rule not found: {rule_id}") |
|
|
|
rule.enabled = False |
|
rule.updated_at = datetime.now().isoformat() |
|
self.update_rule(rule) |
|
|
|
@handle_exceptions |
|
def _check_conditions(self, item: Dict[str, Any], conditions: Dict[str, Any]) -> bool: |
|
"""Check if item meets conditions |
|
|
|
Args: |
|
item: Data item |
|
conditions: Conditions to check |
|
|
|
Returns: |
|
True if conditions are met, False otherwise |
|
""" |
|
for field, condition in conditions.items(): |
|
|
|
if field == "age_days": |
|
if "created_at" not in item and "updated_at" not in item: |
|
return False |
|
|
|
|
|
date_field = "created_at" |
|
if "updated_at" in item and conditions.get("use_updated_date", False): |
|
date_field = "updated_at" |
|
|
|
if date_field not in item: |
|
return False |
|
|
|
|
|
try: |
|
item_date = datetime.fromisoformat(item[date_field]) |
|
age_days = (datetime.now() - item_date).days |
|
if age_days < condition: |
|
return False |
|
except Exception: |
|
return False |
|
|
|
continue |
|
|
|
|
|
if field == "tags_include": |
|
if "tags" not in item: |
|
return False |
|
|
|
|
|
if not set(condition).intersection(set(item.get("tags", []))): |
|
return False |
|
|
|
continue |
|
|
|
|
|
if field == "tags_exclude": |
|
if "tags" not in item: |
|
continue |
|
|
|
|
|
if set(condition).intersection(set(item.get("tags", []))): |
|
return False |
|
|
|
continue |
|
|
|
|
|
if field not in item: |
|
return False |
|
|
|
if isinstance(condition, list): |
|
|
|
if isinstance(item[field], list): |
|
|
|
if not set(condition).intersection(set(item[field])): |
|
return False |
|
else: |
|
|
|
if item[field] not in condition: |
|
return False |
|
else: |
|
|
|
if item[field] != condition: |
|
return False |
|
|
|
return True |
|
|
|
@handle_exceptions |
|
def run_rule(self, rule_id: str) -> int: |
|
"""Run cleanup rule |
|
|
|
Args: |
|
rule_id: Cleanup rule ID |
|
|
|
Returns: |
|
Number of items processed |
|
""" |
|
rule = self.get_rule(rule_id) |
|
if not rule: |
|
raise AutomationError(f"Cleanup rule not found: {rule_id}") |
|
|
|
if not rule.enabled: |
|
logger.info(f"Skipping disabled cleanup rule: {rule.name}") |
|
return 0 |
|
|
|
|
|
provider = self.data_providers.get(rule.data_type) |
|
handler = self.action_handlers.get(rule.action) |
|
|
|
if not provider: |
|
error_msg = f"No data provider registered for {rule.data_type}" |
|
rule.last_error = error_msg |
|
rule.error_count += 1 |
|
self.update_rule(rule) |
|
raise AutomationError(error_msg) |
|
|
|
if not handler: |
|
error_msg = f"No action handler registered for {rule.action}" |
|
rule.last_error = error_msg |
|
rule.error_count += 1 |
|
self.update_rule(rule) |
|
raise AutomationError(error_msg) |
|
|
|
try: |
|
|
|
data = provider() |
|
|
|
|
|
processed_count = 0 |
|
processed_items = [] |
|
|
|
for item in data: |
|
|
|
if self._check_conditions(item, rule.conditions): |
|
|
|
handler(item, rule.action) |
|
processed_items.append(item) |
|
processed_count += 1 |
|
|
|
|
|
rule.last_run = datetime.now().isoformat() |
|
rule.run_count += 1 |
|
rule.items_processed += processed_count |
|
rule.last_error = None |
|
self.update_rule(rule) |
|
|
|
logger.info(f"Processed {processed_count} items with rule: {rule.name}") |
|
return processed_count |
|
|
|
except Exception as e: |
|
error_msg = f"Error running cleanup rule {rule.name}: {str(e)}" |
|
rule.last_error = error_msg |
|
rule.error_count += 1 |
|
self.update_rule(rule) |
|
logger.error(error_msg) |
|
raise AutomationError(error_msg) |
|
|
|
@handle_exceptions |
|
def run_all_rules(self) -> Dict[str, int]: |
|
"""Run all enabled cleanup rules |
|
|
|
Returns: |
|
Dictionary mapping rule IDs to number of items processed |
|
""" |
|
results = {} |
|
|
|
for rule in self.get_enabled_rules(): |
|
try: |
|
processed_count = self.run_rule(rule.id) |
|
results[rule.id] = processed_count |
|
except Exception as e: |
|
logger.error(f"Error running cleanup rule {rule.name}: {str(e)}") |
|
results[rule.id] = 0 |
|
|
|
return results |
|
|
|
@handle_exceptions |
|
def run_rules_by_data_type(self, data_type: str) -> Dict[str, int]: |
|
"""Run cleanup rules for a specific data type |
|
|
|
Args: |
|
data_type: Data type |
|
|
|
Returns: |
|
Dictionary mapping rule IDs to number of items processed |
|
""" |
|
results = {} |
|
|
|
for rule in self.get_rules_by_data_type(data_type): |
|
if rule.enabled: |
|
try: |
|
processed_count = self.run_rule(rule.id) |
|
results[rule.id] = processed_count |
|
except Exception as e: |
|
logger.error(f"Error running cleanup rule {rule.name}: {str(e)}") |
|
results[rule.id] = 0 |
|
|
|
return results |
|
|
|
@handle_exceptions |
|
def set_cleanup_interval(self, interval: int) -> None: |
|
"""Set cleanup interval in seconds |
|
|
|
Args: |
|
interval: Cleanup interval in seconds |
|
""" |
|
if interval < 3600: |
|
raise AutomationError("Cleanup interval must be at least 3600 seconds (1 hour)") |
|
|
|
self.cleanup_interval = interval |
|
logger.info(f"Set cleanup interval to {interval} seconds") |
|
|
|
@handle_exceptions |
|
def start_auto_cleanup(self) -> None: |
|
"""Start automatic cleanup thread""" |
|
if self.auto_cleanup: |
|
logger.info("Auto cleanup already running") |
|
return |
|
|
|
self.auto_cleanup = True |
|
self.stop_event.clear() |
|
self.cleanup_thread = threading.Thread(target=self._auto_cleanup_thread) |
|
self.cleanup_thread.daemon = True |
|
self.cleanup_thread.start() |
|
logger.info("Started auto cleanup") |
|
|
|
@handle_exceptions |
|
def stop_auto_cleanup(self) -> None: |
|
"""Stop automatic cleanup thread""" |
|
if not self.auto_cleanup: |
|
logger.info("Auto cleanup not running") |
|
return |
|
|
|
self.auto_cleanup = False |
|
self.stop_event.set() |
|
if self.cleanup_thread: |
|
self.cleanup_thread.join(timeout=1.0) |
|
self.cleanup_thread = None |
|
logger.info("Stopped auto cleanup") |
|
|
|
def _auto_cleanup_thread(self) -> None: |
|
"""Thread function for automatic cleanup""" |
|
logger.info(f"Auto cleanup thread started with interval {self.cleanup_interval} seconds") |
|
|
|
while not self.stop_event.is_set(): |
|
try: |
|
|
|
self.run_all_rules() |
|
except Exception as e: |
|
logger.error(f"Error in auto cleanup: {str(e)}") |
|
|
|
|
|
self.stop_event.wait(self.cleanup_interval) |
|
|
|
logger.info("Auto cleanup thread stopped") |
|
|
|
|
|
|
|
cleanup_manager = CleanupManager() |