mona / utils /automation /data_sync.py
mrradix's picture
Upload 48 files
8e4018d verified
import uuid
from typing import Dict, List, Any, Optional, Callable, Union, Set
from datetime import datetime
import threading
import time
from utils.logging import setup_logger
from utils.error_handling import handle_exceptions, AutomationError
from utils.storage import load_data, save_data
# Initialize logger
logger = setup_logger(__name__)
class SyncRule:
"""Rule for data synchronization between features"""
def __init__(self, name: str, source_type: str, target_type: str,
mapping: Dict[str, str], conditions: Optional[Dict[str, Any]] = None,
description: Optional[str] = None, enabled: bool = True):
"""Initialize a sync rule
Args:
name: Rule name
source_type: Source data type (tasks, notes, goals, etc.)
target_type: Target data type (tasks, notes, goals, etc.)
mapping: Field mapping from source to target
conditions: Conditions for sync (optional)
description: Rule description (optional)
enabled: Whether rule is enabled
"""
self.id = str(uuid.uuid4())
self.name = name
self.description = description or ""
self.source_type = source_type
self.target_type = target_type
self.mapping = mapping
self.conditions = conditions or {}
self.enabled = enabled
self.created_at = datetime.now().isoformat()
self.updated_at = self.created_at
self.last_sync = None
self.sync_count = 0
self.error_count = 0
self.last_error = None
@handle_exceptions
def to_dict(self) -> Dict[str, Any]:
"""Convert sync rule to dictionary
Returns:
Sync rule as dictionary
"""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"source_type": self.source_type,
"target_type": self.target_type,
"mapping": self.mapping,
"conditions": self.conditions,
"enabled": self.enabled,
"created_at": self.created_at,
"updated_at": self.updated_at,
"last_sync": self.last_sync,
"sync_count": self.sync_count,
"error_count": self.error_count,
"last_error": self.last_error
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'SyncRule':
"""Create sync rule from dictionary
Args:
data: Sync rule data
Returns:
SyncRule instance
"""
rule = cls(
data["name"],
data["source_type"],
data["target_type"],
data["mapping"],
data.get("conditions"),
data.get("description", ""),
data.get("enabled", True)
)
rule.id = data["id"]
rule.created_at = data["created_at"]
rule.updated_at = data["updated_at"]
rule.last_sync = data.get("last_sync")
rule.sync_count = data.get("sync_count", 0)
rule.error_count = data.get("error_count", 0)
rule.last_error = data.get("last_error")
return rule
class SyncManager:
"""Manager for data synchronization"""
def __init__(self):
"""Initialize sync manager"""
self.rules = {}
self.data_providers = {}
self.data_consumers = {}
self.sync_interval = 300 # 5 minutes
self.auto_sync = False
self.sync_thread = None
self.stop_event = threading.Event()
self.load_rules()
self._ensure_default_rules()
@handle_exceptions
def load_rules(self) -> None:
"""Load sync rules from storage"""
try:
rules_data = load_data("sync_rules", default=[])
for rule_data in rules_data:
rule = SyncRule.from_dict(rule_data)
self.rules[rule.id] = rule
logger.info(f"Loaded {len(self.rules)} sync rules")
except Exception as e:
logger.error(f"Failed to load sync rules: {str(e)}")
@handle_exceptions
def save_rules(self) -> None:
"""Save sync rules to storage"""
try:
rules_data = [rule.to_dict() for rule in self.rules.values()]
save_data("sync_rules", rules_data)
logger.info(f"Saved {len(self.rules)} sync rules")
except Exception as e:
logger.error(f"Failed to save sync rules: {str(e)}")
def _ensure_default_rules(self) -> None:
"""Ensure default sync rules exist"""
# Check if we need to create default rules
if not self.get_rules_by_source_type("task"):
self._create_default_task_to_goal_rule()
if not self.get_rules_by_source_type("note"):
self._create_default_note_to_task_rule()
def _create_default_task_to_goal_rule(self) -> None:
"""Create default task to goal sync rule"""
mapping = {
"title": "title",
"description": "description",
"status": "status",
"due_date": "target_date",
"tags": "tags",
"category": "category"
}
conditions = {
"tags": ["goal-related"]
}
rule = SyncRule(
"Task to Goal",
"task",
"goal",
mapping,
conditions,
"Sync tasks with 'goal-related' tag to goals",
True
)
self.rules[rule.id] = rule
self.save_rules()
def _create_default_note_to_task_rule(self) -> None:
"""Create default note to task sync rule"""
mapping = {
"title": "title",
"content": "description",
"tags": "tags",
"category": "category"
}
conditions = {
"tags": ["action-item"]
}
rule = SyncRule(
"Note to Task",
"note",
"task",
mapping,
conditions,
"Sync notes with 'action-item' tag to tasks",
True
)
self.rules[rule.id] = rule
self.save_rules()
@handle_exceptions
def register_data_provider(self, data_type: str, provider: Callable) -> None:
"""Register a data provider function
Args:
data_type: Type of data provided
provider: Function that returns data of the specified type
"""
self.data_providers[data_type] = provider
logger.info(f"Registered data provider for {data_type}")
@handle_exceptions
def register_data_consumer(self, data_type: str, consumer: Callable) -> None:
"""Register a data consumer function
Args:
data_type: Type of data consumed
consumer: Function that consumes data of the specified type
"""
self.data_consumers[data_type] = consumer
logger.info(f"Registered data consumer for {data_type}")
@handle_exceptions
def create_rule(self, name: str, source_type: str, target_type: str,
mapping: Dict[str, str], conditions: Optional[Dict[str, Any]] = None,
description: Optional[str] = None, enabled: bool = True) -> SyncRule:
"""Create a new sync rule
Args:
name: Rule name
source_type: Source data type
target_type: Target data type
mapping: Field mapping from source to target
conditions: Conditions for sync (optional)
description: Rule description (optional)
enabled: Whether rule is enabled
Returns:
Created sync rule
"""
# Validate source and target types
if source_type not in self.data_providers:
raise AutomationError(f"No data provider registered for {source_type}")
if target_type not in self.data_consumers:
raise AutomationError(f"No data consumer registered for {target_type}")
rule = SyncRule(name, source_type, target_type, mapping, conditions, description, enabled)
self.rules[rule.id] = rule
self.save_rules()
return rule
@handle_exceptions
def get_rule(self, rule_id: str) -> Optional[SyncRule]:
"""Get sync rule by ID
Args:
rule_id: Sync rule ID
Returns:
SyncRule if found, None otherwise
"""
return self.rules.get(rule_id)
@handle_exceptions
def update_rule(self, rule: SyncRule) -> None:
"""Update sync rule
Args:
rule: Sync rule to update
"""
if rule.id in self.rules:
rule.updated_at = datetime.now().isoformat()
self.rules[rule.id] = rule
self.save_rules()
else:
raise AutomationError(f"Sync rule not found: {rule.id}")
@handle_exceptions
def delete_rule(self, rule_id: str) -> None:
"""Delete sync rule
Args:
rule_id: Sync rule ID
"""
if rule_id in self.rules:
del self.rules[rule_id]
self.save_rules()
else:
raise AutomationError(f"Sync rule not found: {rule_id}")
@handle_exceptions
def get_all_rules(self) -> List[SyncRule]:
"""Get all sync rules
Returns:
List of all sync rules
"""
return list(self.rules.values())
@handle_exceptions
def get_enabled_rules(self) -> List[SyncRule]:
"""Get all enabled sync rules
Returns:
List of enabled sync rules
"""
return [rule for rule in self.rules.values() if rule.enabled]
@handle_exceptions
def get_rules_by_source_type(self, source_type: str) -> List[SyncRule]:
"""Get sync rules by source type
Args:
source_type: Source data type
Returns:
List of sync rules with the specified source type
"""
return [
rule for rule in self.rules.values()
if rule.source_type == source_type
]
@handle_exceptions
def get_rules_by_target_type(self, target_type: str) -> List[SyncRule]:
"""Get sync rules by target type
Args:
target_type: Target data type
Returns:
List of sync rules with the specified target type
"""
return [
rule for rule in self.rules.values()
if rule.target_type == target_type
]
@handle_exceptions
def enable_rule(self, rule_id: str) -> None:
"""Enable sync rule
Args:
rule_id: Sync rule ID
"""
rule = self.get_rule(rule_id)
if not rule:
raise AutomationError(f"Sync rule not found: {rule_id}")
rule.enabled = True
rule.updated_at = datetime.now().isoformat()
self.update_rule(rule)
@handle_exceptions
def disable_rule(self, rule_id: str) -> None:
"""Disable sync rule
Args:
rule_id: Sync rule ID
"""
rule = self.get_rule(rule_id)
if not rule:
raise AutomationError(f"Sync rule not found: {rule_id}")
rule.enabled = False
rule.updated_at = datetime.now().isoformat()
self.update_rule(rule)
@handle_exceptions
def _check_conditions(self, item: Dict[str, Any], conditions: Dict[str, Any]) -> bool:
"""Check if item meets conditions
Args:
item: Data item
conditions: Conditions to check
Returns:
True if conditions are met, False otherwise
"""
for field, condition in conditions.items():
if field not in item:
return False
if isinstance(condition, list):
# Check if any value in the list matches
if isinstance(item[field], list):
# Check for intersection between lists
if not set(condition).intersection(set(item[field])):
return False
else:
# Check if item value is in condition list
if item[field] not in condition:
return False
else:
# Direct comparison
if item[field] != condition:
return False
return True
@handle_exceptions
def _map_item(self, item: Dict[str, Any], mapping: Dict[str, str]) -> Dict[str, Any]:
"""Map item fields according to mapping
Args:
item: Source data item
mapping: Field mapping from source to target
Returns:
Mapped item
"""
result = {}
for source_field, target_field in mapping.items():
if source_field in item:
result[target_field] = item[source_field]
return result
@handle_exceptions
def sync_rule(self, rule_id: str) -> int:
"""Sync data according to rule
Args:
rule_id: Sync rule ID
Returns:
Number of items synced
"""
rule = self.get_rule(rule_id)
if not rule:
raise AutomationError(f"Sync rule not found: {rule_id}")
if not rule.enabled:
logger.info(f"Skipping disabled sync rule: {rule.name}")
return 0
# Get data provider and consumer
provider = self.data_providers.get(rule.source_type)
consumer = self.data_consumers.get(rule.target_type)
if not provider:
error_msg = f"No data provider registered for {rule.source_type}"
rule.last_error = error_msg
rule.error_count += 1
self.update_rule(rule)
raise AutomationError(error_msg)
if not consumer:
error_msg = f"No data consumer registered for {rule.target_type}"
rule.last_error = error_msg
rule.error_count += 1
self.update_rule(rule)
raise AutomationError(error_msg)
try:
# Get source data
source_data = provider()
# Filter and map items
synced_count = 0
for item in source_data:
# Check conditions
if rule.conditions and not self._check_conditions(item, rule.conditions):
continue
# Map item
mapped_item = self._map_item(item, rule.mapping)
# Consume mapped item
consumer(mapped_item)
synced_count += 1
# Update rule stats
rule.last_sync = datetime.now().isoformat()
rule.sync_count += synced_count
rule.last_error = None
self.update_rule(rule)
logger.info(f"Synced {synced_count} items with rule: {rule.name}")
return synced_count
except Exception as e:
error_msg = f"Error syncing with rule {rule.name}: {str(e)}"
rule.last_error = error_msg
rule.error_count += 1
self.update_rule(rule)
logger.error(error_msg)
raise AutomationError(error_msg)
@handle_exceptions
def sync_all(self) -> Dict[str, int]:
"""Sync data according to all enabled rules
Returns:
Dictionary mapping rule IDs to number of items synced
"""
results = {}
for rule in self.get_enabled_rules():
try:
synced_count = self.sync_rule(rule.id)
results[rule.id] = synced_count
except Exception as e:
logger.error(f"Error syncing rule {rule.name}: {str(e)}")
results[rule.id] = 0
return results
@handle_exceptions
def sync_by_source_type(self, source_type: str) -> Dict[str, int]:
"""Sync data according to rules with specified source type
Args:
source_type: Source data type
Returns:
Dictionary mapping rule IDs to number of items synced
"""
results = {}
for rule in self.get_rules_by_source_type(source_type):
if rule.enabled:
try:
synced_count = self.sync_rule(rule.id)
results[rule.id] = synced_count
except Exception as e:
logger.error(f"Error syncing rule {rule.name}: {str(e)}")
results[rule.id] = 0
return results
@handle_exceptions
def sync_by_target_type(self, target_type: str) -> Dict[str, int]:
"""Sync data according to rules with specified target type
Args:
target_type: Target data type
Returns:
Dictionary mapping rule IDs to number of items synced
"""
results = {}
for rule in self.get_rules_by_target_type(target_type):
if rule.enabled:
try:
synced_count = self.sync_rule(rule.id)
results[rule.id] = synced_count
except Exception as e:
logger.error(f"Error syncing rule {rule.name}: {str(e)}")
results[rule.id] = 0
return results
@handle_exceptions
def set_sync_interval(self, interval: int) -> None:
"""Set sync interval in seconds
Args:
interval: Sync interval in seconds
"""
if interval < 60:
raise AutomationError("Sync interval must be at least 60 seconds")
self.sync_interval = interval
logger.info(f"Set sync interval to {interval} seconds")
@handle_exceptions
def start_auto_sync(self) -> None:
"""Start automatic sync thread"""
if self.auto_sync:
logger.info("Auto sync already running")
return
self.auto_sync = True
self.stop_event.clear()
self.sync_thread = threading.Thread(target=self._auto_sync_thread)
self.sync_thread.daemon = True
self.sync_thread.start()
logger.info("Started auto sync")
@handle_exceptions
def stop_auto_sync(self) -> None:
"""Stop automatic sync thread"""
if not self.auto_sync:
logger.info("Auto sync not running")
return
self.auto_sync = False
self.stop_event.set()
if self.sync_thread:
self.sync_thread.join(timeout=1.0)
self.sync_thread = None
logger.info("Stopped auto sync")
def _auto_sync_thread(self) -> None:
"""Thread function for automatic sync"""
logger.info(f"Auto sync thread started with interval {self.sync_interval} seconds")
while not self.stop_event.is_set():
try:
# Sync all enabled rules
self.sync_all()
except Exception as e:
logger.error(f"Error in auto sync: {str(e)}")
# Wait for next sync or stop event
self.stop_event.wait(self.sync_interval)
logger.info("Auto sync thread stopped")
# Create a global instance of the sync manager
sync_manager = SyncManager()