import json import uuid from typing import Dict, List, Any, Optional, Callable, Union from datetime import datetime import threading from utils.logging import setup_logger from utils.error_handling import handle_exceptions, AutomationError from utils.storage import load_data, save_data # Initialize logger logger = setup_logger(__name__) class BatchOperation: """Batch operation for mass processing""" def __init__(self, name: str, operation_type: str, target_items: List[Dict[str, Any]], operation_config: Dict[str, Any], description: Optional[str] = None): """Initialize a batch operation Args: name: Operation name operation_type: Type of operation (update, delete, tag, categorize, etc.) target_items: List of items to process operation_config: Operation configuration description: Operation description (optional) """ self.id = str(uuid.uuid4()) self.name = name self.description = description or "" self.operation_type = operation_type self.target_items = target_items self.operation_config = operation_config self.created_at = datetime.now().isoformat() self.updated_at = self.created_at self.started_at = None self.completed_at = None self.status = "pending" # pending, running, completed, failed, cancelled self.progress = 0 # 0-100 self.results = { "total": len(target_items), "processed": 0, "succeeded": 0, "failed": 0, "skipped": 0, "errors": [] } self.dry_run = False # If True, simulate but don't actually perform changes @handle_exceptions def start(self) -> None: """Start the batch operation""" if self.status != "pending": raise AutomationError(f"Cannot start operation in {self.status} state") self.status = "running" self.started_at = datetime.now().isoformat() self.updated_at = self.started_at @handle_exceptions def complete(self) -> None: """Mark the batch operation as completed""" if self.status != "running": raise AutomationError(f"Cannot complete operation in {self.status} state") self.status = "completed" self.completed_at = datetime.now().isoformat() self.updated_at = self.completed_at self.progress = 100 @handle_exceptions def fail(self, error: str) -> None: """Mark the batch operation as failed Args: error: Error message """ self.status = "failed" self.updated_at = datetime.now().isoformat() self.results["errors"].append(error) @handle_exceptions def cancel(self) -> None: """Cancel the batch operation""" if self.status not in ["pending", "running"]: raise AutomationError(f"Cannot cancel operation in {self.status} state") self.status = "cancelled" self.updated_at = datetime.now().isoformat() @handle_exceptions def update_progress(self, processed: int, succeeded: int, failed: int, skipped: int) -> None: """Update operation progress Args: processed: Number of items processed succeeded: Number of items processed successfully failed: Number of items that failed skipped: Number of items skipped """ self.results["processed"] = processed self.results["succeeded"] = succeeded self.results["failed"] = failed self.results["skipped"] = skipped total = self.results["total"] if total > 0: self.progress = min(100, int((processed / total) * 100)) self.updated_at = datetime.now().isoformat() @handle_exceptions def add_error(self, error: str) -> None: """Add an error message Args: error: Error message """ self.results["errors"].append(error) self.updated_at = datetime.now().isoformat() @handle_exceptions def set_dry_run(self, dry_run: bool) -> None: """Set dry run mode Args: dry_run: Whether to run in dry run mode """ self.dry_run = dry_run self.updated_at = datetime.now().isoformat() @handle_exceptions def to_dict(self) -> Dict[str, Any]: """Convert operation to dictionary Returns: Operation as dictionary """ return { "id": self.id, "name": self.name, "description": self.description, "operation_type": self.operation_type, "target_items": self.target_items, "operation_config": self.operation_config, "created_at": self.created_at, "updated_at": self.updated_at, "started_at": self.started_at, "completed_at": self.completed_at, "status": self.status, "progress": self.progress, "results": self.results, "dry_run": self.dry_run } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'BatchOperation': """Create operation from dictionary Args: data: Operation data Returns: BatchOperation instance """ operation = cls( data["name"], data["operation_type"], data["target_items"], data["operation_config"], data.get("description", "") ) operation.id = data["id"] operation.created_at = data["created_at"] operation.updated_at = data["updated_at"] operation.started_at = data.get("started_at") operation.completed_at = data.get("completed_at") operation.status = data["status"] operation.progress = data["progress"] operation.results = data["results"] operation.dry_run = data.get("dry_run", False) return operation class BatchProcessor: """Processor for batch operations""" def __init__(self): """Initialize batch processor""" self.operations = {} self.running_operations = {} self.operation_handlers = { "update": self._handle_update_operation, "delete": self._handle_delete_operation, "tag": self._handle_tag_operation, "categorize": self._handle_categorize_operation, "status_change": self._handle_status_change_operation, "priority_change": self._handle_priority_change_operation, "due_date_change": self._handle_due_date_change_operation, "assign": self._handle_assign_operation, "export": self._handle_export_operation, "import": self._handle_import_operation, "custom": self._handle_custom_operation } self.load_operations() @handle_exceptions def load_operations(self) -> None: """Load operations from storage""" try: operations_data = load_data("batch_operations", default=[]) for operation_data in operations_data: operation = BatchOperation.from_dict(operation_data) self.operations[operation.id] = operation logger.info(f"Loaded {len(self.operations)} batch operations") except Exception as e: logger.error(f"Failed to load batch operations: {str(e)}") @handle_exceptions def save_operations(self) -> None: """Save operations to storage""" try: operations_data = [operation.to_dict() for operation in self.operations.values()] save_data("batch_operations", operations_data) logger.info(f"Saved {len(self.operations)} batch operations") except Exception as e: logger.error(f"Failed to save batch operations: {str(e)}") @handle_exceptions def create_operation(self, name: str, operation_type: str, target_items: List[Dict[str, Any]], operation_config: Dict[str, Any], description: Optional[str] = None, dry_run: bool = False) -> BatchOperation: """Create a new batch operation Args: name: Operation name operation_type: Type of operation target_items: List of items to process operation_config: Operation configuration description: Operation description (optional) dry_run: Whether to run in dry run mode Returns: Created operation """ operation = BatchOperation(name, operation_type, target_items, operation_config, description) operation.set_dry_run(dry_run) self.operations[operation.id] = operation self.save_operations() return operation @handle_exceptions def get_operation(self, operation_id: str) -> Optional[BatchOperation]: """Get operation by ID Args: operation_id: Operation ID Returns: Operation if found, None otherwise """ return self.operations.get(operation_id) @handle_exceptions def update_operation(self, operation: BatchOperation) -> None: """Update operation Args: operation: Operation to update """ if operation.id in self.operations: operation.updated_at = datetime.now().isoformat() self.operations[operation.id] = operation self.save_operations() else: raise AutomationError(f"Operation not found: {operation.id}") @handle_exceptions def delete_operation(self, operation_id: str) -> None: """Delete operation Args: operation_id: Operation ID """ if operation_id in self.operations: del self.operations[operation_id] self.save_operations() else: raise AutomationError(f"Operation not found: {operation_id}") @handle_exceptions def get_all_operations(self) -> List[BatchOperation]: """Get all operations Returns: List of all operations """ return list(self.operations.values()) @handle_exceptions def execute_operation(self, operation_id: str) -> None: """Execute a batch operation Args: operation_id: Operation ID """ operation = self.get_operation(operation_id) if not operation: raise AutomationError(f"Operation not found: {operation_id}") if operation.status != "pending": raise AutomationError(f"Cannot execute operation in {operation.status} state") # Start the operation operation.start() self.update_operation(operation) # Start a thread to execute the operation thread = threading.Thread(target=self._execute_operation_thread, args=(operation_id,)) thread.daemon = True thread.start() # Store the thread self.running_operations[operation_id] = thread def _execute_operation_thread(self, operation_id: str) -> None: """Execute a batch operation in a separate thread Args: operation_id: Operation ID """ operation = self.get_operation(operation_id) if not operation: return try: # Get the handler for this operation type handler = self.operation_handlers.get(operation.operation_type) if not handler: raise AutomationError(f"Unknown operation type: {operation.operation_type}") # Execute the operation handler(operation) # Mark as completed operation.complete() self.update_operation(operation) except Exception as e: # Mark as failed operation.fail(str(e)) self.update_operation(operation) logger.error(f"Operation execution error: {str(e)}") # Remove from running operations if operation_id in self.running_operations: del self.running_operations[operation_id] @handle_exceptions def cancel_operation(self, operation_id: str) -> None: """Cancel a running operation Args: operation_id: Operation ID """ operation = self.get_operation(operation_id) if not operation: raise AutomationError(f"Operation not found: {operation_id}") # Mark as cancelled operation.cancel() self.update_operation(operation) # Thread will notice the cancelled status and exit def _handle_update_operation(self, operation: BatchOperation) -> None: """Handle update operation Args: operation: Operation to handle """ config = operation.operation_config data_type = config.get("data_type", "") updates = config.get("updates", {}) # Load data data = load_data(data_type, {}) # Process items processed = 0 succeeded = 0 failed = 0 skipped = 0 for item in operation.target_items: processed += 1 item_id = item.get("id") if not item_id or item_id not in data: skipped += 1 continue try: # Apply updates if not operation.dry_run: for key, value in updates.items(): data[item_id][key] = value succeeded += 1 except Exception as e: failed += 1 operation.add_error(f"Failed to update item {item_id}: {str(e)}") # Update progress operation.update_progress(processed, succeeded, failed, skipped) # Save data if not operation.dry_run: save_data(data_type, data) def _handle_delete_operation(self, operation: BatchOperation) -> None: """Handle delete operation Args: operation: Operation to handle """ config = operation.operation_config data_type = config.get("data_type", "") # Load data data = load_data(data_type, {}) # Process items processed = 0 succeeded = 0 failed = 0 skipped = 0 for item in operation.target_items: processed += 1 item_id = item.get("id") if not item_id or item_id not in data: skipped += 1 continue try: # Delete item if not operation.dry_run: del data[item_id] succeeded += 1 except Exception as e: failed += 1 operation.add_error(f"Failed to delete item {item_id}: {str(e)}") # Update progress operation.update_progress(processed, succeeded, failed, skipped) # Save data if not operation.dry_run: save_data(data_type, data) def _handle_tag_operation(self, operation: BatchOperation) -> None: """Handle tag operation Args: operation: Operation to handle """ config = operation.operation_config data_type = config.get("data_type", "") action = config.get("action", "add") # add, remove, replace tags = config.get("tags", []) # Load data data = load_data(data_type, {}) # Process items processed = 0 succeeded = 0 failed = 0 skipped = 0 for item in operation.target_items: processed += 1 item_id = item.get("id") if not item_id or item_id not in data: skipped += 1 continue try: # Get current tags current_tags = data[item_id].get("tags", []) # Apply tag operation if action == "add": # Add tags that don't already exist new_tags = current_tags.copy() for tag in tags: if tag not in new_tags: new_tags.append(tag) elif action == "remove": # Remove specified tags new_tags = [tag for tag in current_tags if tag not in tags] elif action == "replace": # Replace all tags new_tags = tags.copy() else: raise AutomationError(f"Unknown tag action: {action}") # Update tags if not operation.dry_run: data[item_id]["tags"] = new_tags succeeded += 1 except Exception as e: failed += 1 operation.add_error(f"Failed to update tags for item {item_id}: {str(e)}") # Update progress operation.update_progress(processed, succeeded, failed, skipped) # Save data if not operation.dry_run: save_data(data_type, data) def _handle_categorize_operation(self, operation: BatchOperation) -> None: """Handle categorize operation Args: operation: Operation to handle """ config = operation.operation_config data_type = config.get("data_type", "") category = config.get("category", "") # Load data data = load_data(data_type, {}) # Process items processed = 0 succeeded = 0 failed = 0 skipped = 0 for item in operation.target_items: processed += 1 item_id = item.get("id") if not item_id or item_id not in data: skipped += 1 continue try: # Update category if not operation.dry_run: data[item_id]["category"] = category succeeded += 1 except Exception as e: failed += 1 operation.add_error(f"Failed to categorize item {item_id}: {str(e)}") # Update progress operation.update_progress(processed, succeeded, failed, skipped) # Save data if not operation.dry_run: save_data(data_type, data) def _handle_status_change_operation(self, operation: BatchOperation) -> None: """Handle status change operation Args: operation: Operation to handle """ config = operation.operation_config data_type = config.get("data_type", "") status = config.get("status", "") # Load data data = load_data(data_type, {}) # Process items processed = 0 succeeded = 0 failed = 0 skipped = 0 for item in operation.target_items: processed += 1 item_id = item.get("id") if not item_id or item_id not in data: skipped += 1 continue try: # Update status if not operation.dry_run: data[item_id]["status"] = status succeeded += 1 except Exception as e: failed += 1 operation.add_error(f"Failed to change status for item {item_id}: {str(e)}") # Update progress operation.update_progress(processed, succeeded, failed, skipped) # Save data if not operation.dry_run: save_data(data_type, data) def _handle_priority_change_operation(self, operation: BatchOperation) -> None: """Handle priority change operation Args: operation: Operation to handle """ config = operation.operation_config data_type = config.get("data_type", "") priority = config.get("priority", "") # Load data data = load_data(data_type, {}) # Process items processed = 0 succeeded = 0 failed = 0 skipped = 0 for item in operation.target_items: processed += 1 item_id = item.get("id") if not item_id or item_id not in data: skipped += 1 continue try: # Update priority if not operation.dry_run: data[item_id]["priority"] = priority succeeded += 1 except Exception as e: failed += 1 operation.add_error(f"Failed to change priority for item {item_id}: {str(e)}") # Update progress operation.update_progress(processed, succeeded, failed, skipped) # Save data if not operation.dry_run: save_data(data_type, data) def _handle_due_date_change_operation(self, operation: BatchOperation) -> None: """Handle due date change operation Args: operation: Operation to handle """ config = operation.operation_config data_type = config.get("data_type", "") due_date = config.get("due_date", "") # Load data data = load_data(data_type, {}) # Process items processed = 0 succeeded = 0 failed = 0 skipped = 0 for item in operation.target_items: processed += 1 item_id = item.get("id") if not item_id or item_id not in data: skipped += 1 continue try: # Update due date if not operation.dry_run: data[item_id]["due_date"] = due_date succeeded += 1 except Exception as e: failed += 1 operation.add_error(f"Failed to change due date for item {item_id}: {str(e)}") # Update progress operation.update_progress(processed, succeeded, failed, skipped) # Save data if not operation.dry_run: save_data(data_type, data) def _handle_assign_operation(self, operation: BatchOperation) -> None: """Handle assign operation Args: operation: Operation to handle """ config = operation.operation_config data_type = config.get("data_type", "") assignee = config.get("assignee", "") # Load data data = load_data(data_type, {}) # Process items processed = 0 succeeded = 0 failed = 0 skipped = 0 for item in operation.target_items: processed += 1 item_id = item.get("id") if not item_id or item_id not in data: skipped += 1 continue try: # Update assignee if not operation.dry_run: data[item_id]["assignee"] = assignee succeeded += 1 except Exception as e: failed += 1 operation.add_error(f"Failed to assign item {item_id}: {str(e)}") # Update progress operation.update_progress(processed, succeeded, failed, skipped) # Save data if not operation.dry_run: save_data(data_type, data) def _handle_export_operation(self, operation: BatchOperation) -> None: """Handle export operation Args: operation: Operation to handle """ config = operation.operation_config data_type = config.get("data_type", "") export_format = config.get("format", "json") file_path = config.get("file_path", "") # Load data data = load_data(data_type, {}) # Process items processed = 0 succeeded = 0 failed = 0 skipped = 0 # Filter data to only include target items export_data = {} for item in operation.target_items: processed += 1 item_id = item.get("id") if not item_id or item_id not in data: skipped += 1 continue try: # Add to export data export_data[item_id] = data[item_id] succeeded += 1 except Exception as e: failed += 1 operation.add_error(f"Failed to export item {item_id}: {str(e)}") # Update progress operation.update_progress(processed, succeeded, failed, skipped) # Export data if not operation.dry_run and file_path: try: if export_format == "json": with open(file_path, "w") as f: json.dump(export_data, f, indent=2) elif export_format == "csv": # Placeholder for CSV export pass elif export_format == "markdown": # Placeholder for Markdown export pass else: operation.add_error(f"Unknown export format: {export_format}") except Exception as e: operation.add_error(f"Failed to export data: {str(e)}") def _handle_import_operation(self, operation: BatchOperation) -> None: """Handle import operation Args: operation: Operation to handle """ config = operation.operation_config data_type = config.get("data_type", "") import_format = config.get("format", "json") file_path = config.get("file_path", "") merge_strategy = config.get("merge_strategy", "overwrite") # overwrite, merge, skip # Load current data current_data = load_data(data_type, {}) # Load import data import_data = {} try: if import_format == "json" and file_path: with open(file_path, "r") as f: import_data = json.load(f) elif import_format == "csv" and file_path: # Placeholder for CSV import pass elif import_format == "markdown" and file_path: # Placeholder for Markdown import pass else: operation.add_error(f"Unknown import format: {import_format}") return except Exception as e: operation.add_error(f"Failed to load import data: {str(e)}") return # Process items processed = 0 succeeded = 0 failed = 0 skipped = 0 for item_id, item_data in import_data.items(): processed += 1 try: if item_id in current_data: if merge_strategy == "overwrite": # Overwrite existing item if not operation.dry_run: current_data[item_id] = item_data succeeded += 1 elif merge_strategy == "merge": # Merge with existing item if not operation.dry_run: for key, value in item_data.items(): current_data[item_id][key] = value succeeded += 1 elif merge_strategy == "skip": # Skip existing item skipped += 1 else: operation.add_error(f"Unknown merge strategy: {merge_strategy}") failed += 1 else: # Add new item if not operation.dry_run: current_data[item_id] = item_data succeeded += 1 except Exception as e: failed += 1 operation.add_error(f"Failed to import item {item_id}: {str(e)}") # Update progress operation.update_progress(processed, succeeded, failed, skipped) # Save data if not operation.dry_run: save_data(data_type, current_data) def _handle_custom_operation(self, operation: BatchOperation) -> None: """Handle custom operation Args: operation: Operation to handle """ config = operation.operation_config custom_type = config.get("custom_type", "") # Process items based on custom type if custom_type == "archive": self._handle_archive_operation(operation) elif custom_type == "duplicate": self._handle_duplicate_operation(operation) elif custom_type == "merge": self._handle_merge_operation(operation) else: operation.add_error(f"Unknown custom operation type: {custom_type}") def _handle_archive_operation(self, operation: BatchOperation) -> None: """Handle archive operation Args: operation: Operation to handle """ config = operation.operation_config data_type = config.get("data_type", "") # Load data data = load_data(data_type, {}) archives = load_data(f"{data_type}_archives", {}) # Process items processed = 0 succeeded = 0 failed = 0 skipped = 0 for item in operation.target_items: processed += 1 item_id = item.get("id") if not item_id or item_id not in data: skipped += 1 continue try: # Move to archives if not operation.dry_run: archives[item_id] = data[item_id] archives[item_id]["archived_at"] = datetime.now().isoformat() del data[item_id] succeeded += 1 except Exception as e: failed += 1 operation.add_error(f"Failed to archive item {item_id}: {str(e)}") # Update progress operation.update_progress(processed, succeeded, failed, skipped) # Save data if not operation.dry_run: save_data(data_type, data) save_data(f"{data_type}_archives", archives) def _handle_duplicate_operation(self, operation: BatchOperation) -> None: """Handle duplicate operation Args: operation: Operation to handle """ config = operation.operation_config data_type = config.get("data_type", "") # Load data data = load_data(data_type, {}) # Process items processed = 0 succeeded = 0 failed = 0 skipped = 0 for item in operation.target_items: processed += 1 item_id = item.get("id") if not item_id or item_id not in data: skipped += 1 continue try: # Create duplicate if not operation.dry_run: new_id = str(uuid.uuid4()) new_item = data[item_id].copy() new_item["id"] = new_id new_item["created_at"] = datetime.now().isoformat() new_item["updated_at"] = datetime.now().isoformat() new_item["title"] = f"Copy of {new_item.get('title', '')}" # Add 'Copy of' prefix data[new_id] = new_item succeeded += 1 except Exception as e: failed += 1 operation.add_error(f"Failed to duplicate item {item_id}: {str(e)}") # Update progress operation.update_progress(processed, succeeded, failed, skipped) # Save data if not operation.dry_run: save_data(data_type, data) def _handle_merge_operation(self, operation: BatchOperation) -> None: """Handle merge operation Args: operation: Operation to handle """ config = operation.operation_config data_type = config.get("data_type", "") target_id = config.get("target_id", "") # ID of the item to merge into merge_fields = config.get("merge_fields", []) # Fields to merge # Load data data = load_data(data_type, {}) # Check if target exists if not target_id or target_id not in data: operation.add_error(f"Target item not found: {target_id}") return # Process items processed = 0 succeeded = 0 failed = 0 skipped = 0 for item in operation.target_items: processed += 1 item_id = item.get("id") if not item_id or item_id not in data or item_id == target_id: skipped += 1 continue try: # Merge fields if not operation.dry_run: for field in merge_fields: if field in data[item_id]: # Handle different field types if field == "content" or field == "description": # Append text content target_content = data[target_id].get(field, "") item_content = data[item_id].get(field, "") data[target_id][field] = f"{target_content}\n\n{item_content}" elif field == "tags": # Merge tags target_tags = set(data[target_id].get("tags", [])) item_tags = set(data[item_id].get("tags", [])) data[target_id]["tags"] = list(target_tags.union(item_tags)) elif field == "attachments": # Merge attachments target_attachments = data[target_id].get("attachments", []) item_attachments = data[item_id].get("attachments", []) data[target_id]["attachments"] = target_attachments + item_attachments elif field == "comments": # Merge comments target_comments = data[target_id].get("comments", []) item_comments = data[item_id].get("comments", []) data[target_id]["comments"] = target_comments + item_comments # Delete the merged item del data[item_id] succeeded += 1 except Exception as e: failed += 1 operation.add_error(f"Failed to merge item {item_id}: {str(e)}") # Update progress operation.update_progress(processed, succeeded, failed, skipped) # Save data if not operation.dry_run: save_data(data_type, data) # Create a global instance of the batch processor batch_processor = BatchProcessor()