""" Storage utilities for the MONA application Fixed version with proper error handling and data management """ import json import os import pickle import csv from pathlib import Path from typing import Any, Dict, List, Optional, Union from datetime import datetime from utils.error_handling import handle_data_exceptions, DataError, ValidationError from utils.logging import get_logger, log_error, log_info, log_warning class StorageManager: """Manages data storage and retrieval operations""" def __init__(self, base_path: str = "data"): self.base_path = Path(base_path) self.base_path.mkdir(exist_ok=True) self.logger = get_logger(__name__) def get_file_path(self, filename: str, subfolder: str = None) -> Path: """Get the full file path for a given filename""" if subfolder: folder = self.base_path / subfolder folder.mkdir(exist_ok=True) return folder / filename return self.base_path / filename # Global storage manager instance _storage_manager = StorageManager() @handle_data_exceptions def save_data(data: Any, filename: str, format_type: str = "json", subfolder: str = None) -> bool: """ Save data to file in specified format Args: data: Data to save filename: Name of the file to save format_type: Format to save in ('json', 'pickle', 'csv', 'txt') subfolder: Optional subfolder to save in Returns: bool: True if successful Raises: DataError: If saving fails ValidationError: If parameters are invalid """ if not filename: raise ValidationError("Filename cannot be empty") if format_type not in ['json', 'pickle', 'csv', 'txt']: raise ValidationError(f"Unsupported format type: {format_type}") try: file_path = _storage_manager.get_file_path(filename, subfolder) if format_type == "json": with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False, default=str) elif format_type == "pickle": with open(file_path, 'wb') as f: pickle.dump(data, f) elif format_type == "csv": if not isinstance(data, (list, tuple)): raise ValidationError("CSV format requires list or tuple data") with open(file_path, 'w', newline='', encoding='utf-8') as f: if data and isinstance(data[0], dict): # Dictionary data writer = csv.DictWriter(f, fieldnames=data[0].keys()) writer.writeheader() writer.writerows(data) else: # List data writer = csv.writer(f) writer.writerows(data) elif format_type == "txt": with open(file_path, 'w', encoding='utf-8') as f: if isinstance(data, str): f.write(data) else: f.write(str(data)) log_info(f"Successfully saved data to {file_path}") return True except Exception as e: raise DataError(f"Failed to save data to {filename}", details={"format": format_type, "error": str(e)}) @handle_data_exceptions def load_data(filename: str, format_type: str = "json", subfolder: str = None, default: Any = None) -> Any: """ Load data from file in specified format Args: filename: Name of the file to load format_type: Format to load from ('json', 'pickle', 'csv', 'txt') subfolder: Optional subfolder to load from default: Default value if file doesn't exist Returns: Any: Loaded data or default value Raises: DataError: If loading fails ValidationError: If parameters are invalid """ if not filename: raise ValidationError("Filename cannot be empty") if format_type not in ['json', 'pickle', 'csv', 'txt']: raise ValidationError(f"Unsupported format type: {format_type}") try: file_path = _storage_manager.get_file_path(filename, subfolder) if not file_path.exists(): if default is not None: log_warning(f"File {file_path} not found, returning default value") return default else: raise DataError(f"File not found: {file_path}") if format_type == "json": with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) elif format_type == "pickle": with open(file_path, 'rb') as f: data = pickle.load(f) elif format_type == "csv": data = [] with open(file_path, 'r', encoding='utf-8') as f: # Try to detect if first row is header sample = f.read(1024) f.seek(0) sniffer = csv.Sniffer() has_header = sniffer.has_header(sample) if has_header: reader = csv.DictReader(f) data = list(reader) else: reader = csv.reader(f) data = list(reader) elif format_type == "txt": with open(file_path, 'r', encoding='utf-8') as f: data = f.read() log_info(f"Successfully loaded data from {file_path}") return data except Exception as e: if default is not None: log_warning(f"Failed to load {filename}, returning default: {str(e)}") return default raise DataError(f"Failed to load data from {filename}", details={"format": format_type, "error": str(e)}) @handle_data_exceptions def delete_file(filename: str, subfolder: str = None) -> bool: """ Delete a file Args: filename: Name of the file to delete subfolder: Optional subfolder Returns: bool: True if successful Raises: DataError: If deletion fails """ try: file_path = _storage_manager.get_file_path(filename, subfolder) if file_path.exists(): file_path.unlink() log_info(f"Successfully deleted {file_path}") return True else: log_warning(f"File {file_path} does not exist") return False except Exception as e: raise DataError(f"Failed to delete {filename}", details={"error": str(e)}) @handle_data_exceptions def list_files(subfolder: str = None, extension: str = None) -> List[str]: """ List files in the storage directory Args: subfolder: Optional subfolder to list extension: Optional file extension filter Returns: List[str]: List of filenames Raises: DataError: If listing fails """ try: if subfolder: folder_path = _storage_manager.base_path / subfolder else: folder_path = _storage_manager.base_path if not folder_path.exists(): return [] files = [] for file_path in folder_path.iterdir(): if file_path.is_file(): if extension is None or file_path.suffix.lower() == extension.lower(): files.append(file_path.name) return sorted(files) except Exception as e: raise DataError("Failed to list files", details={"subfolder": subfolder, "error": str(e)}) @handle_data_exceptions def file_exists(filename: str, subfolder: str = None) -> bool: """ Check if a file exists Args: filename: Name of the file to check subfolder: Optional subfolder Returns: bool: True if file exists """ try: file_path = _storage_manager.get_file_path(filename, subfolder) return file_path.exists() except Exception as e: log_error(f"Error checking file existence: {str(e)}", error=e) return False @handle_data_exceptions def get_file_info(filename: str, subfolder: str = None) -> Dict: """ Get information about a file Args: filename: Name of the file subfolder: Optional subfolder Returns: Dict: File information including size, modified time, etc. Raises: DataError: If file doesn't exist or info retrieval fails """ try: file_path = _storage_manager.get_file_path(filename, subfolder) if not file_path.exists(): raise DataError(f"File not found: {filename}") stat = file_path.stat() return { "name": file_path.name, "size": stat.st_size, "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), "created": datetime.fromtimestamp(stat.st_ctime).isoformat(), "extension": file_path.suffix, "path": str(file_path) } except Exception as e: raise DataError(f"Failed to get file info for {filename}", details={"error": str(e)}) # Cache management class DataCache: """Simple in-memory data cache""" def __init__(self, max_size: int = 100): self.cache: Dict[str, Any] = {} self.access_times: Dict[str, datetime] = {} self.max_size = max_size def get(self, key: str) -> Optional[Any]: """Get value from cache""" if key in self.cache: self.access_times[key] = datetime.now() return self.cache[key] return None def set(self, key: str, value: Any) -> None: """Set value in cache""" if len(self.cache) >= self.max_size: # Remove oldest item oldest_key = min(self.access_times.keys(), key=lambda k: self.access_times[k]) del self.cache[oldest_key] del self.access_times[oldest_key] self.cache[key] = value self.access_times[key] = datetime.now() def clear(self) -> None: """Clear all cache""" self.cache.clear() self.access_times.clear() def remove(self, key: str) -> bool: """Remove specific key from cache""" if key in self.cache: del self.cache[key] del self.access_times[key] return True return False # Global cache instance _data_cache = DataCache() def get_cached_data(key: str) -> Optional[Any]: """Get data from cache""" return _data_cache.get(key) def set_cached_data(key: str, value: Any) -> None: """Set data in cache""" _data_cache.set(key, value) def clear_cache() -> None: """Clear all cached data""" _data_cache.clear()