|
import uuid |
|
from typing import Dict, List, Any, Optional, Callable, Union, Set |
|
from datetime import datetime |
|
import threading |
|
import time |
|
|
|
from utils.logging import setup_logger |
|
from utils.error_handling import handle_exceptions, AutomationError |
|
from utils.storage import load_data, save_data |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
class SyncRule: |
|
"""Rule for data synchronization between features""" |
|
|
|
def __init__(self, name: str, source_type: str, target_type: str, |
|
mapping: Dict[str, str], conditions: Optional[Dict[str, Any]] = None, |
|
description: Optional[str] = None, enabled: bool = True): |
|
"""Initialize a sync rule |
|
|
|
Args: |
|
name: Rule name |
|
source_type: Source data type (tasks, notes, goals, etc.) |
|
target_type: Target data type (tasks, notes, goals, etc.) |
|
mapping: Field mapping from source to target |
|
conditions: Conditions for sync (optional) |
|
description: Rule description (optional) |
|
enabled: Whether rule is enabled |
|
""" |
|
self.id = str(uuid.uuid4()) |
|
self.name = name |
|
self.description = description or "" |
|
self.source_type = source_type |
|
self.target_type = target_type |
|
self.mapping = mapping |
|
self.conditions = conditions or {} |
|
self.enabled = enabled |
|
self.created_at = datetime.now().isoformat() |
|
self.updated_at = self.created_at |
|
self.last_sync = None |
|
self.sync_count = 0 |
|
self.error_count = 0 |
|
self.last_error = None |
|
|
|
@handle_exceptions |
|
def to_dict(self) -> Dict[str, Any]: |
|
"""Convert sync rule to dictionary |
|
|
|
Returns: |
|
Sync rule as dictionary |
|
""" |
|
return { |
|
"id": self.id, |
|
"name": self.name, |
|
"description": self.description, |
|
"source_type": self.source_type, |
|
"target_type": self.target_type, |
|
"mapping": self.mapping, |
|
"conditions": self.conditions, |
|
"enabled": self.enabled, |
|
"created_at": self.created_at, |
|
"updated_at": self.updated_at, |
|
"last_sync": self.last_sync, |
|
"sync_count": self.sync_count, |
|
"error_count": self.error_count, |
|
"last_error": self.last_error |
|
} |
|
|
|
@classmethod |
|
def from_dict(cls, data: Dict[str, Any]) -> 'SyncRule': |
|
"""Create sync rule from dictionary |
|
|
|
Args: |
|
data: Sync rule data |
|
|
|
Returns: |
|
SyncRule instance |
|
""" |
|
rule = cls( |
|
data["name"], |
|
data["source_type"], |
|
data["target_type"], |
|
data["mapping"], |
|
data.get("conditions"), |
|
data.get("description", ""), |
|
data.get("enabled", True) |
|
) |
|
rule.id = data["id"] |
|
rule.created_at = data["created_at"] |
|
rule.updated_at = data["updated_at"] |
|
rule.last_sync = data.get("last_sync") |
|
rule.sync_count = data.get("sync_count", 0) |
|
rule.error_count = data.get("error_count", 0) |
|
rule.last_error = data.get("last_error") |
|
return rule |
|
|
|
|
|
class SyncManager: |
|
"""Manager for data synchronization""" |
|
|
|
def __init__(self): |
|
"""Initialize sync manager""" |
|
self.rules = {} |
|
self.data_providers = {} |
|
self.data_consumers = {} |
|
self.sync_interval = 300 |
|
self.auto_sync = False |
|
self.sync_thread = None |
|
self.stop_event = threading.Event() |
|
self.load_rules() |
|
self._ensure_default_rules() |
|
|
|
@handle_exceptions |
|
def load_rules(self) -> None: |
|
"""Load sync rules from storage""" |
|
try: |
|
rules_data = load_data("sync_rules", default=[]) |
|
for rule_data in rules_data: |
|
rule = SyncRule.from_dict(rule_data) |
|
self.rules[rule.id] = rule |
|
logger.info(f"Loaded {len(self.rules)} sync rules") |
|
except Exception as e: |
|
logger.error(f"Failed to load sync rules: {str(e)}") |
|
|
|
@handle_exceptions |
|
def save_rules(self) -> None: |
|
"""Save sync rules to storage""" |
|
try: |
|
rules_data = [rule.to_dict() for rule in self.rules.values()] |
|
save_data("sync_rules", rules_data) |
|
logger.info(f"Saved {len(self.rules)} sync rules") |
|
except Exception as e: |
|
logger.error(f"Failed to save sync rules: {str(e)}") |
|
|
|
def _ensure_default_rules(self) -> None: |
|
"""Ensure default sync rules exist""" |
|
|
|
if not self.get_rules_by_source_type("task"): |
|
self._create_default_task_to_goal_rule() |
|
|
|
if not self.get_rules_by_source_type("note"): |
|
self._create_default_note_to_task_rule() |
|
|
|
def _create_default_task_to_goal_rule(self) -> None: |
|
"""Create default task to goal sync rule""" |
|
mapping = { |
|
"title": "title", |
|
"description": "description", |
|
"status": "status", |
|
"due_date": "target_date", |
|
"tags": "tags", |
|
"category": "category" |
|
} |
|
|
|
conditions = { |
|
"tags": ["goal-related"] |
|
} |
|
|
|
rule = SyncRule( |
|
"Task to Goal", |
|
"task", |
|
"goal", |
|
mapping, |
|
conditions, |
|
"Sync tasks with 'goal-related' tag to goals", |
|
True |
|
) |
|
|
|
self.rules[rule.id] = rule |
|
self.save_rules() |
|
|
|
def _create_default_note_to_task_rule(self) -> None: |
|
"""Create default note to task sync rule""" |
|
mapping = { |
|
"title": "title", |
|
"content": "description", |
|
"tags": "tags", |
|
"category": "category" |
|
} |
|
|
|
conditions = { |
|
"tags": ["action-item"] |
|
} |
|
|
|
rule = SyncRule( |
|
"Note to Task", |
|
"note", |
|
"task", |
|
mapping, |
|
conditions, |
|
"Sync notes with 'action-item' tag to tasks", |
|
True |
|
) |
|
|
|
self.rules[rule.id] = rule |
|
self.save_rules() |
|
|
|
@handle_exceptions |
|
def register_data_provider(self, data_type: str, provider: Callable) -> None: |
|
"""Register a data provider function |
|
|
|
Args: |
|
data_type: Type of data provided |
|
provider: Function that returns data of the specified type |
|
""" |
|
self.data_providers[data_type] = provider |
|
logger.info(f"Registered data provider for {data_type}") |
|
|
|
@handle_exceptions |
|
def register_data_consumer(self, data_type: str, consumer: Callable) -> None: |
|
"""Register a data consumer function |
|
|
|
Args: |
|
data_type: Type of data consumed |
|
consumer: Function that consumes data of the specified type |
|
""" |
|
self.data_consumers[data_type] = consumer |
|
logger.info(f"Registered data consumer for {data_type}") |
|
|
|
@handle_exceptions |
|
def create_rule(self, name: str, source_type: str, target_type: str, |
|
mapping: Dict[str, str], conditions: Optional[Dict[str, Any]] = None, |
|
description: Optional[str] = None, enabled: bool = True) -> SyncRule: |
|
"""Create a new sync rule |
|
|
|
Args: |
|
name: Rule name |
|
source_type: Source data type |
|
target_type: Target data type |
|
mapping: Field mapping from source to target |
|
conditions: Conditions for sync (optional) |
|
description: Rule description (optional) |
|
enabled: Whether rule is enabled |
|
|
|
Returns: |
|
Created sync rule |
|
""" |
|
|
|
if source_type not in self.data_providers: |
|
raise AutomationError(f"No data provider registered for {source_type}") |
|
|
|
if target_type not in self.data_consumers: |
|
raise AutomationError(f"No data consumer registered for {target_type}") |
|
|
|
rule = SyncRule(name, source_type, target_type, mapping, conditions, description, enabled) |
|
self.rules[rule.id] = rule |
|
self.save_rules() |
|
return rule |
|
|
|
@handle_exceptions |
|
def get_rule(self, rule_id: str) -> Optional[SyncRule]: |
|
"""Get sync rule by ID |
|
|
|
Args: |
|
rule_id: Sync rule ID |
|
|
|
Returns: |
|
SyncRule if found, None otherwise |
|
""" |
|
return self.rules.get(rule_id) |
|
|
|
@handle_exceptions |
|
def update_rule(self, rule: SyncRule) -> None: |
|
"""Update sync rule |
|
|
|
Args: |
|
rule: Sync rule to update |
|
""" |
|
if rule.id in self.rules: |
|
rule.updated_at = datetime.now().isoformat() |
|
self.rules[rule.id] = rule |
|
self.save_rules() |
|
else: |
|
raise AutomationError(f"Sync rule not found: {rule.id}") |
|
|
|
@handle_exceptions |
|
def delete_rule(self, rule_id: str) -> None: |
|
"""Delete sync rule |
|
|
|
Args: |
|
rule_id: Sync rule ID |
|
""" |
|
if rule_id in self.rules: |
|
del self.rules[rule_id] |
|
self.save_rules() |
|
else: |
|
raise AutomationError(f"Sync rule not found: {rule_id}") |
|
|
|
@handle_exceptions |
|
def get_all_rules(self) -> List[SyncRule]: |
|
"""Get all sync rules |
|
|
|
Returns: |
|
List of all sync rules |
|
""" |
|
return list(self.rules.values()) |
|
|
|
@handle_exceptions |
|
def get_enabled_rules(self) -> List[SyncRule]: |
|
"""Get all enabled sync rules |
|
|
|
Returns: |
|
List of enabled sync rules |
|
""" |
|
return [rule for rule in self.rules.values() if rule.enabled] |
|
|
|
@handle_exceptions |
|
def get_rules_by_source_type(self, source_type: str) -> List[SyncRule]: |
|
"""Get sync rules by source type |
|
|
|
Args: |
|
source_type: Source data type |
|
|
|
Returns: |
|
List of sync rules with the specified source type |
|
""" |
|
return [ |
|
rule for rule in self.rules.values() |
|
if rule.source_type == source_type |
|
] |
|
|
|
@handle_exceptions |
|
def get_rules_by_target_type(self, target_type: str) -> List[SyncRule]: |
|
"""Get sync rules by target type |
|
|
|
Args: |
|
target_type: Target data type |
|
|
|
Returns: |
|
List of sync rules with the specified target type |
|
""" |
|
return [ |
|
rule for rule in self.rules.values() |
|
if rule.target_type == target_type |
|
] |
|
|
|
@handle_exceptions |
|
def enable_rule(self, rule_id: str) -> None: |
|
"""Enable sync rule |
|
|
|
Args: |
|
rule_id: Sync rule ID |
|
""" |
|
rule = self.get_rule(rule_id) |
|
if not rule: |
|
raise AutomationError(f"Sync rule not found: {rule_id}") |
|
|
|
rule.enabled = True |
|
rule.updated_at = datetime.now().isoformat() |
|
self.update_rule(rule) |
|
|
|
@handle_exceptions |
|
def disable_rule(self, rule_id: str) -> None: |
|
"""Disable sync rule |
|
|
|
Args: |
|
rule_id: Sync rule ID |
|
""" |
|
rule = self.get_rule(rule_id) |
|
if not rule: |
|
raise AutomationError(f"Sync rule not found: {rule_id}") |
|
|
|
rule.enabled = False |
|
rule.updated_at = datetime.now().isoformat() |
|
self.update_rule(rule) |
|
|
|
@handle_exceptions |
|
def _check_conditions(self, item: Dict[str, Any], conditions: Dict[str, Any]) -> bool: |
|
"""Check if item meets conditions |
|
|
|
Args: |
|
item: Data item |
|
conditions: Conditions to check |
|
|
|
Returns: |
|
True if conditions are met, False otherwise |
|
""" |
|
for field, condition in conditions.items(): |
|
if field not in item: |
|
return False |
|
|
|
if isinstance(condition, list): |
|
|
|
if isinstance(item[field], list): |
|
|
|
if not set(condition).intersection(set(item[field])): |
|
return False |
|
else: |
|
|
|
if item[field] not in condition: |
|
return False |
|
else: |
|
|
|
if item[field] != condition: |
|
return False |
|
|
|
return True |
|
|
|
@handle_exceptions |
|
def _map_item(self, item: Dict[str, Any], mapping: Dict[str, str]) -> Dict[str, Any]: |
|
"""Map item fields according to mapping |
|
|
|
Args: |
|
item: Source data item |
|
mapping: Field mapping from source to target |
|
|
|
Returns: |
|
Mapped item |
|
""" |
|
result = {} |
|
|
|
for source_field, target_field in mapping.items(): |
|
if source_field in item: |
|
result[target_field] = item[source_field] |
|
|
|
return result |
|
|
|
@handle_exceptions |
|
def sync_rule(self, rule_id: str) -> int: |
|
"""Sync data according to rule |
|
|
|
Args: |
|
rule_id: Sync rule ID |
|
|
|
Returns: |
|
Number of items synced |
|
""" |
|
rule = self.get_rule(rule_id) |
|
if not rule: |
|
raise AutomationError(f"Sync rule not found: {rule_id}") |
|
|
|
if not rule.enabled: |
|
logger.info(f"Skipping disabled sync rule: {rule.name}") |
|
return 0 |
|
|
|
|
|
provider = self.data_providers.get(rule.source_type) |
|
consumer = self.data_consumers.get(rule.target_type) |
|
|
|
if not provider: |
|
error_msg = f"No data provider registered for {rule.source_type}" |
|
rule.last_error = error_msg |
|
rule.error_count += 1 |
|
self.update_rule(rule) |
|
raise AutomationError(error_msg) |
|
|
|
if not consumer: |
|
error_msg = f"No data consumer registered for {rule.target_type}" |
|
rule.last_error = error_msg |
|
rule.error_count += 1 |
|
self.update_rule(rule) |
|
raise AutomationError(error_msg) |
|
|
|
try: |
|
|
|
source_data = provider() |
|
|
|
|
|
synced_count = 0 |
|
for item in source_data: |
|
|
|
if rule.conditions and not self._check_conditions(item, rule.conditions): |
|
continue |
|
|
|
|
|
mapped_item = self._map_item(item, rule.mapping) |
|
|
|
|
|
consumer(mapped_item) |
|
synced_count += 1 |
|
|
|
|
|
rule.last_sync = datetime.now().isoformat() |
|
rule.sync_count += synced_count |
|
rule.last_error = None |
|
self.update_rule(rule) |
|
|
|
logger.info(f"Synced {synced_count} items with rule: {rule.name}") |
|
return synced_count |
|
|
|
except Exception as e: |
|
error_msg = f"Error syncing with rule {rule.name}: {str(e)}" |
|
rule.last_error = error_msg |
|
rule.error_count += 1 |
|
self.update_rule(rule) |
|
logger.error(error_msg) |
|
raise AutomationError(error_msg) |
|
|
|
@handle_exceptions |
|
def sync_all(self) -> Dict[str, int]: |
|
"""Sync data according to all enabled rules |
|
|
|
Returns: |
|
Dictionary mapping rule IDs to number of items synced |
|
""" |
|
results = {} |
|
|
|
for rule in self.get_enabled_rules(): |
|
try: |
|
synced_count = self.sync_rule(rule.id) |
|
results[rule.id] = synced_count |
|
except Exception as e: |
|
logger.error(f"Error syncing rule {rule.name}: {str(e)}") |
|
results[rule.id] = 0 |
|
|
|
return results |
|
|
|
@handle_exceptions |
|
def sync_by_source_type(self, source_type: str) -> Dict[str, int]: |
|
"""Sync data according to rules with specified source type |
|
|
|
Args: |
|
source_type: Source data type |
|
|
|
Returns: |
|
Dictionary mapping rule IDs to number of items synced |
|
""" |
|
results = {} |
|
|
|
for rule in self.get_rules_by_source_type(source_type): |
|
if rule.enabled: |
|
try: |
|
synced_count = self.sync_rule(rule.id) |
|
results[rule.id] = synced_count |
|
except Exception as e: |
|
logger.error(f"Error syncing rule {rule.name}: {str(e)}") |
|
results[rule.id] = 0 |
|
|
|
return results |
|
|
|
@handle_exceptions |
|
def sync_by_target_type(self, target_type: str) -> Dict[str, int]: |
|
"""Sync data according to rules with specified target type |
|
|
|
Args: |
|
target_type: Target data type |
|
|
|
Returns: |
|
Dictionary mapping rule IDs to number of items synced |
|
""" |
|
results = {} |
|
|
|
for rule in self.get_rules_by_target_type(target_type): |
|
if rule.enabled: |
|
try: |
|
synced_count = self.sync_rule(rule.id) |
|
results[rule.id] = synced_count |
|
except Exception as e: |
|
logger.error(f"Error syncing rule {rule.name}: {str(e)}") |
|
results[rule.id] = 0 |
|
|
|
return results |
|
|
|
@handle_exceptions |
|
def set_sync_interval(self, interval: int) -> None: |
|
"""Set sync interval in seconds |
|
|
|
Args: |
|
interval: Sync interval in seconds |
|
""" |
|
if interval < 60: |
|
raise AutomationError("Sync interval must be at least 60 seconds") |
|
|
|
self.sync_interval = interval |
|
logger.info(f"Set sync interval to {interval} seconds") |
|
|
|
@handle_exceptions |
|
def start_auto_sync(self) -> None: |
|
"""Start automatic sync thread""" |
|
if self.auto_sync: |
|
logger.info("Auto sync already running") |
|
return |
|
|
|
self.auto_sync = True |
|
self.stop_event.clear() |
|
self.sync_thread = threading.Thread(target=self._auto_sync_thread) |
|
self.sync_thread.daemon = True |
|
self.sync_thread.start() |
|
logger.info("Started auto sync") |
|
|
|
@handle_exceptions |
|
def stop_auto_sync(self) -> None: |
|
"""Stop automatic sync thread""" |
|
if not self.auto_sync: |
|
logger.info("Auto sync not running") |
|
return |
|
|
|
self.auto_sync = False |
|
self.stop_event.set() |
|
if self.sync_thread: |
|
self.sync_thread.join(timeout=1.0) |
|
self.sync_thread = None |
|
logger.info("Stopped auto sync") |
|
|
|
def _auto_sync_thread(self) -> None: |
|
"""Thread function for automatic sync""" |
|
logger.info(f"Auto sync thread started with interval {self.sync_interval} seconds") |
|
|
|
while not self.stop_event.is_set(): |
|
try: |
|
|
|
self.sync_all() |
|
except Exception as e: |
|
logger.error(f"Error in auto sync: {str(e)}") |
|
|
|
|
|
self.stop_event.wait(self.sync_interval) |
|
|
|
logger.info("Auto sync thread stopped") |
|
|
|
|
|
|
|
sync_manager = SyncManager() |