import json import time import uuid from typing import Dict, List, Any, Optional, Callable, Union from datetime import datetime, timedelta import threading import schedule from utils.logging import setup_logger from utils.error_handling import handle_exceptions, AutomationError from utils.storage import load_data, save_data # Initialize logger logger = setup_logger(__name__) class Workflow: """Smart Workflow for IF-THEN automation""" def __init__(self, name: str, description: Optional[str] = None): """Initialize a workflow Args: name: Workflow name description: Workflow description (optional) """ self.id = str(uuid.uuid4()) self.name = name self.description = description or "" self.triggers = [] self.actions = [] self.conditions = [] self.enabled = True self.created_at = datetime.now().isoformat() self.updated_at = self.created_at self.last_run = None self.run_count = 0 @handle_exceptions def add_trigger(self, trigger_type: str, trigger_config: Dict[str, Any]) -> None: """Add a trigger to the workflow Args: trigger_type: Type of trigger (e.g., 'time', 'event', 'data_change') trigger_config: Trigger configuration """ self.triggers.append({ "id": str(uuid.uuid4()), "type": trigger_type, "config": trigger_config }) self.updated_at = datetime.now().isoformat() @handle_exceptions def add_condition(self, condition_type: str, condition_config: Dict[str, Any]) -> None: """Add a condition to the workflow Args: condition_type: Type of condition (e.g., 'compare', 'exists', 'contains') condition_config: Condition configuration """ self.conditions.append({ "id": str(uuid.uuid4()), "type": condition_type, "config": condition_config }) self.updated_at = datetime.now().isoformat() @handle_exceptions def add_action(self, action_type: str, action_config: Dict[str, Any]) -> None: """Add an action to the workflow Args: action_type: Type of action (e.g., 'notification', 'data_update', 'api_call') action_config: Action configuration """ self.actions.append({ "id": str(uuid.uuid4()), "type": action_type, "config": action_config }) self.updated_at = datetime.now().isoformat() @handle_exceptions def enable(self) -> None: """Enable the workflow""" self.enabled = True self.updated_at = datetime.now().isoformat() @handle_exceptions def disable(self) -> None: """Disable the workflow""" self.enabled = False self.updated_at = datetime.now().isoformat() @handle_exceptions def to_dict(self) -> Dict[str, Any]: """Convert workflow to dictionary Returns: Workflow as dictionary """ return { "id": self.id, "name": self.name, "description": self.description, "triggers": self.triggers, "conditions": self.conditions, "actions": self.actions, "enabled": self.enabled, "created_at": self.created_at, "updated_at": self.updated_at, "last_run": self.last_run, "run_count": self.run_count } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'Workflow': """Create workflow from dictionary Args: data: Workflow data Returns: Workflow instance """ workflow = cls(data["name"], data.get("description", "")) workflow.id = data["id"] workflow.triggers = data["triggers"] workflow.conditions = data["conditions"] workflow.actions = data["actions"] workflow.enabled = data["enabled"] workflow.created_at = data["created_at"] workflow.updated_at = data["updated_at"] workflow.last_run = data.get("last_run") workflow.run_count = data.get("run_count", 0) return workflow class WorkflowManager: """Manager for smart workflows""" def __init__(self): """Initialize workflow manager""" self.workflows = {} self.event_handlers = {} self.data_change_handlers = {} self.scheduler = schedule.Scheduler() self.scheduler_thread = None self.running = False self.load_workflows() @handle_exceptions def load_workflows(self) -> None: """Load workflows from storage""" try: workflows_data = load_data("workflows", default=[]) for workflow_data in workflows_data: workflow = Workflow.from_dict(workflow_data) self.workflows[workflow.id] = workflow logger.info(f"Loaded {len(self.workflows)} workflows") except Exception as e: logger.error(f"Failed to load workflows: {str(e)}") @handle_exceptions def save_workflows(self) -> None: """Save workflows to storage""" try: workflows_data = [workflow.to_dict() for workflow in self.workflows.values()] save_data("workflows", workflows_data) logger.info(f"Saved {len(self.workflows)} workflows") except Exception as e: logger.error(f"Failed to save workflows: {str(e)}") @handle_exceptions def create_workflow(self, name: str, description: Optional[str] = None) -> Workflow: """Create a new workflow Args: name: Workflow name description: Workflow description (optional) Returns: Created workflow """ workflow = Workflow(name, description) self.workflows[workflow.id] = workflow self.save_workflows() return workflow @handle_exceptions def get_workflow(self, workflow_id: str) -> Optional[Workflow]: """Get workflow by ID Args: workflow_id: Workflow ID Returns: Workflow if found, None otherwise """ return self.workflows.get(workflow_id) @handle_exceptions def update_workflow(self, workflow: Workflow) -> None: """Update workflow Args: workflow: Workflow to update """ if workflow.id in self.workflows: workflow.updated_at = datetime.now().isoformat() self.workflows[workflow.id] = workflow self.save_workflows() else: raise AutomationError(f"Workflow not found: {workflow.id}") @handle_exceptions def delete_workflow(self, workflow_id: str) -> None: """Delete workflow Args: workflow_id: Workflow ID """ if workflow_id in self.workflows: del self.workflows[workflow_id] self.save_workflows() else: raise AutomationError(f"Workflow not found: {workflow_id}") @handle_exceptions def get_all_workflows(self) -> List[Workflow]: """Get all workflows Returns: List of all workflows """ return list(self.workflows.values()) @handle_exceptions def register_event_handler(self, event_type: str, handler: Callable) -> None: """Register event handler Args: event_type: Type of event handler: Event handler function """ if event_type not in self.event_handlers: self.event_handlers[event_type] = [] self.event_handlers[event_type].append(handler) @handle_exceptions def register_data_change_handler(self, data_type: str, handler: Callable) -> None: """Register data change handler Args: data_type: Type of data handler: Data change handler function """ if data_type not in self.data_change_handlers: self.data_change_handlers[data_type] = [] self.data_change_handlers[data_type].append(handler) @handle_exceptions def trigger_event(self, event_type: str, event_data: Dict[str, Any]) -> None: """Trigger an event Args: event_type: Type of event event_data: Event data """ # Call event handlers if event_type in self.event_handlers: for handler in self.event_handlers[event_type]: try: handler(event_data) except Exception as e: logger.error(f"Event handler error: {str(e)}") # Process workflows with event triggers for workflow in self.workflows.values(): if not workflow.enabled: continue for trigger in workflow.triggers: if trigger["type"] == "event" and trigger["config"].get("event_type") == event_type: self._process_workflow(workflow, event_data) @handle_exceptions def notify_data_change(self, data_type: str, data_id: str, data: Dict[str, Any]) -> None: """Notify of data change Args: data_type: Type of data data_id: Data ID data: Changed data """ # Call data change handlers if data_type in self.data_change_handlers: for handler in self.data_change_handlers[data_type]: try: handler(data_id, data) except Exception as e: logger.error(f"Data change handler error: {str(e)}") # Process workflows with data change triggers for workflow in self.workflows.values(): if not workflow.enabled: continue for trigger in workflow.triggers: if trigger["type"] == "data_change" and trigger["config"].get("data_type") == data_type: self._process_workflow(workflow, {"data_type": data_type, "data_id": data_id, "data": data}) def _process_workflow(self, workflow: Workflow, context: Dict[str, Any]) -> None: """Process workflow Args: workflow: Workflow to process context: Processing context """ try: # Check conditions if not self._check_conditions(workflow, context): return # Execute actions for action in workflow.actions: self._execute_action(action, context) # Update workflow stats workflow.last_run = datetime.now().isoformat() workflow.run_count += 1 self.update_workflow(workflow) logger.info(f"Workflow executed: {workflow.name}") except Exception as e: logger.error(f"Workflow execution error: {str(e)}") def _check_conditions(self, workflow: Workflow, context: Dict[str, Any]) -> bool: """Check workflow conditions Args: workflow: Workflow to check context: Processing context Returns: True if all conditions are met, False otherwise """ # If no conditions, return True if not workflow.conditions: return True for condition in workflow.conditions: condition_type = condition["type"] config = condition["config"] if condition_type == "compare": # Compare two values left_value = self._get_value(config.get("left_value"), config.get("left_type"), context) right_value = self._get_value(config.get("right_value"), config.get("right_type"), context) operator = config.get("operator", "==") if not self._compare_values(left_value, right_value, operator): return False elif condition_type == "exists": # Check if a value exists path = config.get("path", "") data_type = config.get("data_type", "context") if not self._check_exists(path, data_type, context): return False elif condition_type == "contains": # Check if a value contains another value container = self._get_value(config.get("container"), config.get("container_type"), context) value = self._get_value(config.get("value"), config.get("value_type"), context) if not self._check_contains(container, value): return False return True def _get_value(self, value: Any, value_type: str, context: Dict[str, Any]) -> Any: """Get value based on type Args: value: Value or path value_type: Type of value ('literal', 'context', 'data') context: Processing context Returns: Resolved value """ if value_type == "literal": return value elif value_type == "context": # Get value from context path_parts = value.split(".") current = context for part in path_parts: if isinstance(current, dict) and part in current: current = current[part] else: return None return current elif value_type == "data": # Get value from data storage data_type, data_id, field = value.split(".", 2) data = load_data(data_type, {}) if data_id in data: item = data[data_id] path_parts = field.split(".") current = item for part in path_parts: if isinstance(current, dict) and part in current: current = current[part] else: return None return current return None def _compare_values(self, left_value: Any, right_value: Any, operator: str) -> bool: """Compare two values Args: left_value: Left value right_value: Right value operator: Comparison operator Returns: Comparison result """ if operator == "==": return left_value == right_value elif operator == "!=": return left_value != right_value elif operator == "<": return left_value < right_value elif operator == "<=": return left_value <= right_value elif operator == ">": return left_value > right_value elif operator == ">=": return left_value >= right_value elif operator == "contains": return right_value in left_value elif operator == "starts_with": return str(left_value).startswith(str(right_value)) elif operator == "ends_with": return str(left_value).endswith(str(right_value)) return False def _check_exists(self, path: str, data_type: str, context: Dict[str, Any]) -> bool: """Check if a value exists Args: path: Path to value data_type: Type of data ('context', 'data') context: Processing context Returns: True if value exists, False otherwise """ if data_type == "context": # Check in context path_parts = path.split(".") current = context for part in path_parts: if isinstance(current, dict) and part in current: current = current[part] else: return False return True elif data_type == "data": # Check in data storage data_type, data_id, field = path.split(".", 2) data = load_data(data_type, {}) if data_id in data: item = data[data_id] path_parts = field.split(".") current = item for part in path_parts: if isinstance(current, dict) and part in current: current = current[part] else: return False return True return False def _check_contains(self, container: Any, value: Any) -> bool: """Check if container contains value Args: container: Container value value: Value to check Returns: True if container contains value, False otherwise """ if container is None or value is None: return False if isinstance(container, (list, tuple, set)): return value in container elif isinstance(container, dict): return value in container or value in container.values() elif isinstance(container, str): return str(value) in container return False def _execute_action(self, action: Dict[str, Any], context: Dict[str, Any]) -> None: """Execute workflow action Args: action: Action to execute context: Processing context """ action_type = action["type"] config = action["config"] if action_type == "notification": # Send notification notification_type = config.get("notification_type", "app") title = self._resolve_template(config.get("title", ""), context) message = self._resolve_template(config.get("message", ""), context) if notification_type == "app": # App notification (placeholder) logger.info(f"App notification: {title} - {message}") elif notification_type == "email": # Email notification (placeholder) recipient = config.get("recipient", "") logger.info(f"Email notification to {recipient}: {title} - {message}") elif notification_type == "telegram": # Telegram notification (placeholder) chat_id = config.get("chat_id", "") logger.info(f"Telegram notification to {chat_id}: {title} - {message}") elif action_type == "data_update": # Update data data_type = config.get("data_type", "") data_id = self._resolve_template(config.get("data_id", ""), context) updates = config.get("updates", {}) # Resolve template values in updates resolved_updates = {} for key, value in updates.items(): if isinstance(value, str): resolved_updates[key] = self._resolve_template(value, context) else: resolved_updates[key] = value # Load data data = load_data(data_type, {}) # Update data if data_id in data: for key, value in resolved_updates.items(): data[data_id][key] = value # Save data save_data(data_type, data) logger.info(f"Updated {data_type} data: {data_id}") elif action_type == "api_call": # Make API call (placeholder) url = self._resolve_template(config.get("url", ""), context) method = config.get("method", "GET") headers = config.get("headers", {}) body = config.get("body", {}) logger.info(f"API call: {method} {url}") elif action_type == "function_call": # Call function (placeholder) function_name = config.get("function", "") args = config.get("args", {}) logger.info(f"Function call: {function_name}") def _resolve_template(self, template: str, context: Dict[str, Any]) -> str: """Resolve template with context values Args: template: Template string context: Processing context Returns: Resolved template """ if not template or not isinstance(template, str): return template result = template # Find all {{variable}} patterns import re variables = re.findall(r'\{\{([^}]+)\}\}', template) for var in variables: # Get value from context path_parts = var.strip().split(".") current = context try: for part in path_parts: if isinstance(current, dict) and part in current: current = current[part] else: current = None break # Replace in template if current is not None: result = result.replace(f"{{{{{var}}}}}", str(current)) except: pass return result @handle_exceptions def start_scheduler(self) -> None: """Start the scheduler thread""" if self.scheduler_thread is not None and self.scheduler_thread.is_alive(): return self.running = True self.scheduler_thread = threading.Thread(target=self._scheduler_loop) self.scheduler_thread.daemon = True self.scheduler_thread.start() logger.info("Workflow scheduler started") @handle_exceptions def stop_scheduler(self) -> None: """Stop the scheduler thread""" self.running = False if self.scheduler_thread is not None: self.scheduler_thread.join(timeout=1.0) self.scheduler_thread = None logger.info("Workflow scheduler stopped") def _scheduler_loop(self) -> None: """Scheduler thread loop""" while self.running: try: self.scheduler.run_pending() time.sleep(1) except Exception as e: logger.error(f"Scheduler error: {str(e)}") @handle_exceptions def schedule_workflow(self, workflow_id: str, schedule_type: str, schedule_config: Dict[str, Any]) -> None: """Schedule a workflow Args: workflow_id: Workflow ID schedule_type: Type of schedule ('interval', 'daily', 'weekly', 'monthly') schedule_config: Schedule configuration """ workflow = self.get_workflow(workflow_id) if not workflow: raise AutomationError(f"Workflow not found: {workflow_id}") # Create a job function def job(): self._process_workflow(workflow, {"trigger": "schedule", "schedule_type": schedule_type}) # Schedule based on type if schedule_type == "interval": interval_minutes = schedule_config.get("minutes", 60) self.scheduler.every(interval_minutes).minutes.do(job) logger.info(f"Scheduled workflow {workflow.name} to run every {interval_minutes} minutes") elif schedule_type == "daily": time_str = schedule_config.get("time", "09:00") self.scheduler.every().day.at(time_str).do(job) logger.info(f"Scheduled workflow {workflow.name} to run daily at {time_str}") elif schedule_type == "weekly": day = schedule_config.get("day", "monday").lower() time_str = schedule_config.get("time", "09:00") if day == "monday": self.scheduler.every().monday.at(time_str).do(job) elif day == "tuesday": self.scheduler.every().tuesday.at(time_str).do(job) elif day == "wednesday": self.scheduler.every().wednesday.at(time_str).do(job) elif day == "thursday": self.scheduler.every().thursday.at(time_str).do(job) elif day == "friday": self.scheduler.every().friday.at(time_str).do(job) elif day == "saturday": self.scheduler.every().saturday.at(time_str).do(job) elif day == "sunday": self.scheduler.every().sunday.at(time_str).do(job) logger.info(f"Scheduled workflow {workflow.name} to run weekly on {day} at {time_str}") elif schedule_type == "monthly": day = schedule_config.get("day", 1) time_str = schedule_config.get("time", "09:00") # For monthly, we need to check the day in the job def monthly_job(): now = datetime.now() if now.day == day: job() # Schedule to run daily at the specified time, but only execute on the specified day self.scheduler.every().day.at(time_str).do(monthly_job) logger.info(f"Scheduled workflow {workflow.name} to run monthly on day {day} at {time_str}") # Start scheduler if not already running self.start_scheduler() # Create a global instance of the workflow manager workflow_manager = WorkflowManager()