import uuid from typing import Dict, List, Any, Optional, Callable, Union, Set from datetime import datetime import threading import time from utils.logging import setup_logger from utils.error_handling import handle_exceptions, AutomationError from utils.storage import load_data, save_data # Initialize logger logger = setup_logger(__name__) class SyncRule: """Rule for data synchronization between features""" def __init__(self, name: str, source_type: str, target_type: str, mapping: Dict[str, str], conditions: Optional[Dict[str, Any]] = None, description: Optional[str] = None, enabled: bool = True): """Initialize a sync rule Args: name: Rule name source_type: Source data type (tasks, notes, goals, etc.) target_type: Target data type (tasks, notes, goals, etc.) mapping: Field mapping from source to target conditions: Conditions for sync (optional) description: Rule description (optional) enabled: Whether rule is enabled """ self.id = str(uuid.uuid4()) self.name = name self.description = description or "" self.source_type = source_type self.target_type = target_type self.mapping = mapping self.conditions = conditions or {} self.enabled = enabled self.created_at = datetime.now().isoformat() self.updated_at = self.created_at self.last_sync = None self.sync_count = 0 self.error_count = 0 self.last_error = None @handle_exceptions def to_dict(self) -> Dict[str, Any]: """Convert sync rule to dictionary Returns: Sync rule as dictionary """ return { "id": self.id, "name": self.name, "description": self.description, "source_type": self.source_type, "target_type": self.target_type, "mapping": self.mapping, "conditions": self.conditions, "enabled": self.enabled, "created_at": self.created_at, "updated_at": self.updated_at, "last_sync": self.last_sync, "sync_count": self.sync_count, "error_count": self.error_count, "last_error": self.last_error } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'SyncRule': """Create sync rule from dictionary Args: data: Sync rule data Returns: SyncRule instance """ rule = cls( data["name"], data["source_type"], data["target_type"], data["mapping"], data.get("conditions"), data.get("description", ""), data.get("enabled", True) ) rule.id = data["id"] rule.created_at = data["created_at"] rule.updated_at = data["updated_at"] rule.last_sync = data.get("last_sync") rule.sync_count = data.get("sync_count", 0) rule.error_count = data.get("error_count", 0) rule.last_error = data.get("last_error") return rule class SyncManager: """Manager for data synchronization""" def __init__(self): """Initialize sync manager""" self.rules = {} self.data_providers = {} self.data_consumers = {} self.sync_interval = 300 # 5 minutes self.auto_sync = False self.sync_thread = None self.stop_event = threading.Event() self.load_rules() self._ensure_default_rules() @handle_exceptions def load_rules(self) -> None: """Load sync rules from storage""" try: rules_data = load_data("sync_rules", default=[]) for rule_data in rules_data: rule = SyncRule.from_dict(rule_data) self.rules[rule.id] = rule logger.info(f"Loaded {len(self.rules)} sync rules") except Exception as e: logger.error(f"Failed to load sync rules: {str(e)}") @handle_exceptions def save_rules(self) -> None: """Save sync rules to storage""" try: rules_data = [rule.to_dict() for rule in self.rules.values()] save_data("sync_rules", rules_data) logger.info(f"Saved {len(self.rules)} sync rules") except Exception as e: logger.error(f"Failed to save sync rules: {str(e)}") def _ensure_default_rules(self) -> None: """Ensure default sync rules exist""" # Check if we need to create default rules if not self.get_rules_by_source_type("task"): self._create_default_task_to_goal_rule() if not self.get_rules_by_source_type("note"): self._create_default_note_to_task_rule() def _create_default_task_to_goal_rule(self) -> None: """Create default task to goal sync rule""" mapping = { "title": "title", "description": "description", "status": "status", "due_date": "target_date", "tags": "tags", "category": "category" } conditions = { "tags": ["goal-related"] } rule = SyncRule( "Task to Goal", "task", "goal", mapping, conditions, "Sync tasks with 'goal-related' tag to goals", True ) self.rules[rule.id] = rule self.save_rules() def _create_default_note_to_task_rule(self) -> None: """Create default note to task sync rule""" mapping = { "title": "title", "content": "description", "tags": "tags", "category": "category" } conditions = { "tags": ["action-item"] } rule = SyncRule( "Note to Task", "note", "task", mapping, conditions, "Sync notes with 'action-item' tag to tasks", True ) self.rules[rule.id] = rule self.save_rules() @handle_exceptions def register_data_provider(self, data_type: str, provider: Callable) -> None: """Register a data provider function Args: data_type: Type of data provided provider: Function that returns data of the specified type """ self.data_providers[data_type] = provider logger.info(f"Registered data provider for {data_type}") @handle_exceptions def register_data_consumer(self, data_type: str, consumer: Callable) -> None: """Register a data consumer function Args: data_type: Type of data consumed consumer: Function that consumes data of the specified type """ self.data_consumers[data_type] = consumer logger.info(f"Registered data consumer for {data_type}") @handle_exceptions def create_rule(self, name: str, source_type: str, target_type: str, mapping: Dict[str, str], conditions: Optional[Dict[str, Any]] = None, description: Optional[str] = None, enabled: bool = True) -> SyncRule: """Create a new sync rule Args: name: Rule name source_type: Source data type target_type: Target data type mapping: Field mapping from source to target conditions: Conditions for sync (optional) description: Rule description (optional) enabled: Whether rule is enabled Returns: Created sync rule """ # Validate source and target types if source_type not in self.data_providers: raise AutomationError(f"No data provider registered for {source_type}") if target_type not in self.data_consumers: raise AutomationError(f"No data consumer registered for {target_type}") rule = SyncRule(name, source_type, target_type, mapping, conditions, description, enabled) self.rules[rule.id] = rule self.save_rules() return rule @handle_exceptions def get_rule(self, rule_id: str) -> Optional[SyncRule]: """Get sync rule by ID Args: rule_id: Sync rule ID Returns: SyncRule if found, None otherwise """ return self.rules.get(rule_id) @handle_exceptions def update_rule(self, rule: SyncRule) -> None: """Update sync rule Args: rule: Sync rule to update """ if rule.id in self.rules: rule.updated_at = datetime.now().isoformat() self.rules[rule.id] = rule self.save_rules() else: raise AutomationError(f"Sync rule not found: {rule.id}") @handle_exceptions def delete_rule(self, rule_id: str) -> None: """Delete sync rule Args: rule_id: Sync rule ID """ if rule_id in self.rules: del self.rules[rule_id] self.save_rules() else: raise AutomationError(f"Sync rule not found: {rule_id}") @handle_exceptions def get_all_rules(self) -> List[SyncRule]: """Get all sync rules Returns: List of all sync rules """ return list(self.rules.values()) @handle_exceptions def get_enabled_rules(self) -> List[SyncRule]: """Get all enabled sync rules Returns: List of enabled sync rules """ return [rule for rule in self.rules.values() if rule.enabled] @handle_exceptions def get_rules_by_source_type(self, source_type: str) -> List[SyncRule]: """Get sync rules by source type Args: source_type: Source data type Returns: List of sync rules with the specified source type """ return [ rule for rule in self.rules.values() if rule.source_type == source_type ] @handle_exceptions def get_rules_by_target_type(self, target_type: str) -> List[SyncRule]: """Get sync rules by target type Args: target_type: Target data type Returns: List of sync rules with the specified target type """ return [ rule for rule in self.rules.values() if rule.target_type == target_type ] @handle_exceptions def enable_rule(self, rule_id: str) -> None: """Enable sync rule Args: rule_id: Sync rule ID """ rule = self.get_rule(rule_id) if not rule: raise AutomationError(f"Sync rule not found: {rule_id}") rule.enabled = True rule.updated_at = datetime.now().isoformat() self.update_rule(rule) @handle_exceptions def disable_rule(self, rule_id: str) -> None: """Disable sync rule Args: rule_id: Sync rule ID """ rule = self.get_rule(rule_id) if not rule: raise AutomationError(f"Sync rule not found: {rule_id}") rule.enabled = False rule.updated_at = datetime.now().isoformat() self.update_rule(rule) @handle_exceptions def _check_conditions(self, item: Dict[str, Any], conditions: Dict[str, Any]) -> bool: """Check if item meets conditions Args: item: Data item conditions: Conditions to check Returns: True if conditions are met, False otherwise """ for field, condition in conditions.items(): if field not in item: return False if isinstance(condition, list): # Check if any value in the list matches if isinstance(item[field], list): # Check for intersection between lists if not set(condition).intersection(set(item[field])): return False else: # Check if item value is in condition list if item[field] not in condition: return False else: # Direct comparison if item[field] != condition: return False return True @handle_exceptions def _map_item(self, item: Dict[str, Any], mapping: Dict[str, str]) -> Dict[str, Any]: """Map item fields according to mapping Args: item: Source data item mapping: Field mapping from source to target Returns: Mapped item """ result = {} for source_field, target_field in mapping.items(): if source_field in item: result[target_field] = item[source_field] return result @handle_exceptions def sync_rule(self, rule_id: str) -> int: """Sync data according to rule Args: rule_id: Sync rule ID Returns: Number of items synced """ rule = self.get_rule(rule_id) if not rule: raise AutomationError(f"Sync rule not found: {rule_id}") if not rule.enabled: logger.info(f"Skipping disabled sync rule: {rule.name}") return 0 # Get data provider and consumer provider = self.data_providers.get(rule.source_type) consumer = self.data_consumers.get(rule.target_type) if not provider: error_msg = f"No data provider registered for {rule.source_type}" rule.last_error = error_msg rule.error_count += 1 self.update_rule(rule) raise AutomationError(error_msg) if not consumer: error_msg = f"No data consumer registered for {rule.target_type}" rule.last_error = error_msg rule.error_count += 1 self.update_rule(rule) raise AutomationError(error_msg) try: # Get source data source_data = provider() # Filter and map items synced_count = 0 for item in source_data: # Check conditions if rule.conditions and not self._check_conditions(item, rule.conditions): continue # Map item mapped_item = self._map_item(item, rule.mapping) # Consume mapped item consumer(mapped_item) synced_count += 1 # Update rule stats rule.last_sync = datetime.now().isoformat() rule.sync_count += synced_count rule.last_error = None self.update_rule(rule) logger.info(f"Synced {synced_count} items with rule: {rule.name}") return synced_count except Exception as e: error_msg = f"Error syncing with rule {rule.name}: {str(e)}" rule.last_error = error_msg rule.error_count += 1 self.update_rule(rule) logger.error(error_msg) raise AutomationError(error_msg) @handle_exceptions def sync_all(self) -> Dict[str, int]: """Sync data according to all enabled rules Returns: Dictionary mapping rule IDs to number of items synced """ results = {} for rule in self.get_enabled_rules(): try: synced_count = self.sync_rule(rule.id) results[rule.id] = synced_count except Exception as e: logger.error(f"Error syncing rule {rule.name}: {str(e)}") results[rule.id] = 0 return results @handle_exceptions def sync_by_source_type(self, source_type: str) -> Dict[str, int]: """Sync data according to rules with specified source type Args: source_type: Source data type Returns: Dictionary mapping rule IDs to number of items synced """ results = {} for rule in self.get_rules_by_source_type(source_type): if rule.enabled: try: synced_count = self.sync_rule(rule.id) results[rule.id] = synced_count except Exception as e: logger.error(f"Error syncing rule {rule.name}: {str(e)}") results[rule.id] = 0 return results @handle_exceptions def sync_by_target_type(self, target_type: str) -> Dict[str, int]: """Sync data according to rules with specified target type Args: target_type: Target data type Returns: Dictionary mapping rule IDs to number of items synced """ results = {} for rule in self.get_rules_by_target_type(target_type): if rule.enabled: try: synced_count = self.sync_rule(rule.id) results[rule.id] = synced_count except Exception as e: logger.error(f"Error syncing rule {rule.name}: {str(e)}") results[rule.id] = 0 return results @handle_exceptions def set_sync_interval(self, interval: int) -> None: """Set sync interval in seconds Args: interval: Sync interval in seconds """ if interval < 60: raise AutomationError("Sync interval must be at least 60 seconds") self.sync_interval = interval logger.info(f"Set sync interval to {interval} seconds") @handle_exceptions def start_auto_sync(self) -> None: """Start automatic sync thread""" if self.auto_sync: logger.info("Auto sync already running") return self.auto_sync = True self.stop_event.clear() self.sync_thread = threading.Thread(target=self._auto_sync_thread) self.sync_thread.daemon = True self.sync_thread.start() logger.info("Started auto sync") @handle_exceptions def stop_auto_sync(self) -> None: """Stop automatic sync thread""" if not self.auto_sync: logger.info("Auto sync not running") return self.auto_sync = False self.stop_event.set() if self.sync_thread: self.sync_thread.join(timeout=1.0) self.sync_thread = None logger.info("Stopped auto sync") def _auto_sync_thread(self) -> None: """Thread function for automatic sync""" logger.info(f"Auto sync thread started with interval {self.sync_interval} seconds") while not self.stop_event.is_set(): try: # Sync all enabled rules self.sync_all() except Exception as e: logger.error(f"Error in auto sync: {str(e)}") # Wait for next sync or stop event self.stop_event.wait(self.sync_interval) logger.info("Auto sync thread stopped") # Create a global instance of the sync manager sync_manager = SyncManager()