import json import time import uuid from typing import Dict, List, Any, Optional, Callable, Union from datetime import datetime, timedelta import threading import schedule from utils.logging import setup_logger from utils.error_handling import handle_exceptions, AutomationError from utils.storage import load_data, save_data # Initialize logger logger = setup_logger(__name__) class ScheduledTask: """Scheduled task for time-based triggers""" def __init__(self, name: str, task_type: str, task_config: Dict[str, Any], description: Optional[str] = None): """Initialize a scheduled task Args: name: Task name task_type: Type of task (e.g., 'notification', 'data_update', 'api_call') task_config: Task configuration description: Task description (optional) """ self.id = str(uuid.uuid4()) self.name = name self.description = description or "" self.task_type = task_type self.task_config = task_config self.schedule_type = None self.schedule_config = {} self.enabled = True self.created_at = datetime.now().isoformat() self.updated_at = self.created_at self.last_run = None self.next_run = None self.run_count = 0 self.status = "pending" # pending, running, completed, failed self.last_result = None @handle_exceptions def set_schedule(self, schedule_type: str, schedule_config: Dict[str, Any]) -> None: """Set task schedule Args: schedule_type: Type of schedule ('interval', 'daily', 'weekly', 'monthly', 'once') schedule_config: Schedule configuration """ self.schedule_type = schedule_type self.schedule_config = schedule_config self.updated_at = datetime.now().isoformat() # Calculate next run time self._calculate_next_run() def _calculate_next_run(self) -> None: """Calculate the next run time based on schedule""" now = datetime.now() if self.schedule_type == "interval": interval_minutes = self.schedule_config.get("minutes", 60) self.next_run = (now + timedelta(minutes=interval_minutes)).isoformat() elif self.schedule_type == "daily": time_str = self.schedule_config.get("time", "09:00") hour, minute = map(int, time_str.split(":")) next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) if next_run <= now: next_run = next_run + timedelta(days=1) self.next_run = next_run.isoformat() elif self.schedule_type == "weekly": day = self.schedule_config.get("day", "monday").lower() time_str = self.schedule_config.get("time", "09:00") hour, minute = map(int, time_str.split(":")) # Map day names to weekday numbers (0 = Monday, 6 = Sunday) day_map = { "monday": 0, "tuesday": 1, "wednesday": 2, "thursday": 3, "friday": 4, "saturday": 5, "sunday": 6 } target_weekday = day_map.get(day, 0) # Calculate days until the next occurrence of the target weekday days_ahead = target_weekday - now.weekday() if days_ahead <= 0: # Target day already happened this week days_ahead += 7 next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + timedelta(days=days_ahead) self.next_run = next_run.isoformat() elif self.schedule_type == "monthly": day = self.schedule_config.get("day", 1) time_str = self.schedule_config.get("time", "09:00") hour, minute = map(int, time_str.split(":")) # Calculate the next occurrence of the target day in the current or next month if day <= now.day: # Target day already happened this month, move to next month if now.month == 12: next_run = datetime(now.year + 1, 1, day, hour, minute) else: next_run = datetime(now.year, now.month + 1, day, hour, minute) else: # Target day is still to come this month next_run = datetime(now.year, now.month, day, hour, minute) self.next_run = next_run.isoformat() elif self.schedule_type == "once": date_str = self.schedule_config.get("date", "") time_str = self.schedule_config.get("time", "09:00") if date_str: try: date_obj = datetime.strptime(date_str, "%Y-%m-%d") hour, minute = map(int, time_str.split(":")) next_run = date_obj.replace(hour=hour, minute=minute) if next_run > now: self.next_run = next_run.isoformat() else: self.next_run = None # Past date, won't run except: self.next_run = None else: self.next_run = None @handle_exceptions def enable(self) -> None: """Enable the task""" self.enabled = True self.updated_at = datetime.now().isoformat() self._calculate_next_run() @handle_exceptions def disable(self) -> None: """Disable the task""" self.enabled = False self.updated_at = datetime.now().isoformat() self.next_run = None @handle_exceptions def to_dict(self) -> Dict[str, Any]: """Convert task to dictionary Returns: Task as dictionary """ return { "id": self.id, "name": self.name, "description": self.description, "task_type": self.task_type, "task_config": self.task_config, "schedule_type": self.schedule_type, "schedule_config": self.schedule_config, "enabled": self.enabled, "created_at": self.created_at, "updated_at": self.updated_at, "last_run": self.last_run, "next_run": self.next_run, "run_count": self.run_count, "status": self.status, "last_result": self.last_result } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'ScheduledTask': """Create task from dictionary Args: data: Task data Returns: ScheduledTask instance """ task = cls(data["name"], data["task_type"], data["task_config"], data.get("description", "")) task.id = data["id"] task.schedule_type = data.get("schedule_type") task.schedule_config = data.get("schedule_config", {}) task.enabled = data["enabled"] task.created_at = data["created_at"] task.updated_at = data["updated_at"] task.last_run = data.get("last_run") task.next_run = data.get("next_run") task.run_count = data.get("run_count", 0) task.status = data.get("status", "pending") task.last_result = data.get("last_result") return task class TaskManager: """Manager for scheduled tasks""" def __init__(self): """Initialize task manager""" self.tasks = {} self.scheduler = schedule.Scheduler() self.scheduler_thread = None self.running = False self.task_handlers = { "notification": self._handle_notification_task, "data_update": self._handle_data_update_task, "api_call": self._handle_api_call_task, "backup": self._handle_backup_task, "cleanup": self._handle_cleanup_task, "reminder": self._handle_reminder_task, "batch_process": self._handle_batch_process_task, "sync": self._handle_sync_task } self.load_tasks() @handle_exceptions def load_tasks(self) -> None: """Load tasks from storage""" try: tasks_data = load_data("scheduled_tasks", default=[]) for task_data in tasks_data: task = ScheduledTask.from_dict(task_data) self.tasks[task.id] = task logger.info(f"Loaded {len(self.tasks)} scheduled tasks") except Exception as e: logger.error(f"Failed to load scheduled tasks: {str(e)}") @handle_exceptions def save_tasks(self) -> None: """Save tasks to storage""" try: tasks_data = [task.to_dict() for task in self.tasks.values()] save_data("scheduled_tasks", tasks_data) logger.info(f"Saved {len(self.tasks)} scheduled tasks") except Exception as e: logger.error(f"Failed to save scheduled tasks: {str(e)}") @handle_exceptions def create_task(self, name: str, task_type: str, task_config: Dict[str, Any], description: Optional[str] = None) -> ScheduledTask: """Create a new scheduled task Args: name: Task name task_type: Type of task task_config: Task configuration description: Task description (optional) Returns: Created task """ task = ScheduledTask(name, task_type, task_config, description) self.tasks[task.id] = task self.save_tasks() return task @handle_exceptions def get_task(self, task_id: str) -> Optional[ScheduledTask]: """Get task by ID Args: task_id: Task ID Returns: Task if found, None otherwise """ return self.tasks.get(task_id) @handle_exceptions def update_task(self, task: ScheduledTask) -> None: """Update task Args: task: Task to update """ if task.id in self.tasks: task.updated_at = datetime.now().isoformat() self.tasks[task.id] = task self.save_tasks() else: raise AutomationError(f"Task not found: {task.id}") @handle_exceptions def delete_task(self, task_id: str) -> None: """Delete task Args: task_id: Task ID """ if task_id in self.tasks: del self.tasks[task_id] self.save_tasks() else: raise AutomationError(f"Task not found: {task_id}") @handle_exceptions def get_all_tasks(self) -> List[ScheduledTask]: """Get all tasks Returns: List of all tasks """ return list(self.tasks.values()) @handle_exceptions def get_pending_tasks(self) -> List[ScheduledTask]: """Get pending tasks Returns: List of pending tasks """ now = datetime.now().isoformat() return [ task for task in self.tasks.values() if task.enabled and task.next_run and task.next_run <= now ] @handle_exceptions def start_scheduler(self) -> None: """Start the scheduler thread""" if self.scheduler_thread is not None and self.scheduler_thread.is_alive(): return self.running = True self.scheduler_thread = threading.Thread(target=self._scheduler_loop) self.scheduler_thread.daemon = True self.scheduler_thread.start() logger.info("Task scheduler started") @handle_exceptions def stop_scheduler(self) -> None: """Stop the scheduler thread""" self.running = False if self.scheduler_thread is not None: self.scheduler_thread.join(timeout=1.0) self.scheduler_thread = None logger.info("Task scheduler stopped") def _scheduler_loop(self) -> None: """Scheduler thread loop""" while self.running: try: # Check for pending tasks pending_tasks = self.get_pending_tasks() for task in pending_tasks: self._execute_task(task) # Run scheduled jobs self.scheduler.run_pending() # Sleep for a short time time.sleep(1) except Exception as e: logger.error(f"Scheduler error: {str(e)}") def _execute_task(self, task: ScheduledTask) -> None: """Execute a scheduled task Args: task: Task to execute """ try: # Update task status task.status = "running" task.last_run = datetime.now().isoformat() self.update_task(task) # Execute task based on type handler = self.task_handlers.get(task.task_type) if handler: result = handler(task) task.last_result = result task.status = "completed" else: task.last_result = {"error": f"Unknown task type: {task.task_type}"} task.status = "failed" # Update task stats task.run_count += 1 self._calculate_next_run(task) self.update_task(task) logger.info(f"Task executed: {task.name}") except Exception as e: # Update task on error task.status = "failed" task.last_result = {"error": str(e)} self._calculate_next_run(task) self.update_task(task) logger.error(f"Task execution error: {str(e)}") def _calculate_next_run(self, task: ScheduledTask) -> None: """Calculate the next run time for a task Args: task: Task to update """ if not task.enabled or not task.schedule_type: task.next_run = None return now = datetime.now() if task.schedule_type == "interval": interval_minutes = task.schedule_config.get("minutes", 60) task.next_run = (now + timedelta(minutes=interval_minutes)).isoformat() elif task.schedule_type == "daily": time_str = task.schedule_config.get("time", "09:00") hour, minute = map(int, time_str.split(":")) next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) if next_run <= now: next_run = next_run + timedelta(days=1) task.next_run = next_run.isoformat() elif task.schedule_type == "weekly": day = task.schedule_config.get("day", "monday").lower() time_str = task.schedule_config.get("time", "09:00") hour, minute = map(int, time_str.split(":")) # Map day names to weekday numbers (0 = Monday, 6 = Sunday) day_map = { "monday": 0, "tuesday": 1, "wednesday": 2, "thursday": 3, "friday": 4, "saturday": 5, "sunday": 6 } target_weekday = day_map.get(day, 0) # Calculate days until the next occurrence of the target weekday days_ahead = target_weekday - now.weekday() if days_ahead <= 0: # Target day already happened this week days_ahead += 7 next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + timedelta(days=days_ahead) task.next_run = next_run.isoformat() elif task.schedule_type == "monthly": day = task.schedule_config.get("day", 1) time_str = task.schedule_config.get("time", "09:00") hour, minute = map(int, time_str.split(":")) # Calculate the next occurrence of the target day in the current or next month if day <= now.day: # Target day already happened this month, move to next month if now.month == 12: next_run = datetime(now.year + 1, 1, day, hour, minute) else: next_run = datetime(now.year, now.month + 1, day, hour, minute) else: # Target day is still to come this month next_run = datetime(now.year, now.month, day, hour, minute) task.next_run = next_run.isoformat() elif task.schedule_type == "once": # For one-time tasks, if they've run, don't schedule again if task.run_count > 0: task.next_run = None task.enabled = False def _handle_notification_task(self, task: ScheduledTask) -> Dict[str, Any]: """Handle notification task Args: task: Task to handle Returns: Task result """ config = task.task_config notification_type = config.get("notification_type", "app") title = config.get("title", "") message = config.get("message", "") # Placeholder for actual notification sending if notification_type == "app": # App notification logger.info(f"App notification: {title} - {message}") elif notification_type == "email": # Email notification recipient = config.get("recipient", "") logger.info(f"Email notification to {recipient}: {title} - {message}") elif notification_type == "telegram": # Telegram notification chat_id = config.get("chat_id", "") logger.info(f"Telegram notification to {chat_id}: {title} - {message}") return {"status": "success", "message": f"Sent {notification_type} notification"} def _handle_data_update_task(self, task: ScheduledTask) -> Dict[str, Any]: """Handle data update task Args: task: Task to handle Returns: Task result """ config = task.task_config data_type = config.get("data_type", "") data_id = config.get("data_id", "") updates = config.get("updates", {}) # Load data data = load_data(data_type, {}) # Update data if data_id in data: for key, value in updates.items(): data[data_id][key] = value # Save data save_data(data_type, data) logger.info(f"Updated {data_type} data: {data_id}") return {"status": "success", "message": f"Updated {data_type} data: {data_id}"} else: return {"status": "error", "message": f"Data not found: {data_type}/{data_id}"} def _handle_api_call_task(self, task: ScheduledTask) -> Dict[str, Any]: """Handle API call task Args: task: Task to handle Returns: Task result """ config = task.task_config url = config.get("url", "") method = config.get("method", "GET") headers = config.get("headers", {}) body = config.get("body", {}) # Placeholder for actual API call logger.info(f"API call: {method} {url}") return {"status": "success", "message": f"Made API call: {method} {url}"} def _handle_backup_task(self, task: ScheduledTask) -> Dict[str, Any]: """Handle backup task Args: task: Task to handle Returns: Task result """ config = task.task_config backup_type = config.get("backup_type", "all") destination = config.get("destination", "local") # Placeholder for actual backup logger.info(f"Backup: {backup_type} to {destination}") return {"status": "success", "message": f"Created backup: {backup_type} to {destination}"} def _handle_cleanup_task(self, task: ScheduledTask) -> Dict[str, Any]: """Handle cleanup task Args: task: Task to handle Returns: Task result """ config = task.task_config cleanup_type = config.get("cleanup_type", "") older_than_days = config.get("older_than_days", 30) # Placeholder for actual cleanup logger.info(f"Cleanup: {cleanup_type} older than {older_than_days} days") return {"status": "success", "message": f"Performed cleanup: {cleanup_type}"} def _handle_reminder_task(self, task: ScheduledTask) -> Dict[str, Any]: """Handle reminder task Args: task: Task to handle Returns: Task result """ config = task.task_config reminder_type = config.get("reminder_type", "task") item_id = config.get("item_id", "") message = config.get("message", "") # Placeholder for actual reminder logger.info(f"Reminder: {reminder_type} {item_id} - {message}") return {"status": "success", "message": f"Sent reminder: {reminder_type} {item_id}"} def _handle_batch_process_task(self, task: ScheduledTask) -> Dict[str, Any]: """Handle batch process task Args: task: Task to handle Returns: Task result """ config = task.task_config process_type = config.get("process_type", "") items = config.get("items", []) action = config.get("action", "") # Placeholder for actual batch processing logger.info(f"Batch process: {action} on {len(items)} {process_type} items") return {"status": "success", "message": f"Processed {len(items)} {process_type} items"} def _handle_sync_task(self, task: ScheduledTask) -> Dict[str, Any]: """Handle sync task Args: task: Task to handle Returns: Task result """ config = task.task_config sync_type = config.get("sync_type", "") source = config.get("source", "") destination = config.get("destination", "") # Placeholder for actual sync logger.info(f"Sync: {sync_type} from {source} to {destination}") return {"status": "success", "message": f"Synced {sync_type} data"} # Create a global instance of the task manager task_manager = TaskManager()