|
""" |
|
Storage utilities for the MONA application |
|
Fixed version with proper error handling and data management |
|
""" |
|
|
|
import json |
|
import os |
|
import pickle |
|
import csv |
|
from pathlib import Path |
|
from typing import Any, Dict, List, Optional, Union |
|
from datetime import datetime |
|
|
|
from utils.error_handling import handle_data_exceptions, DataError, ValidationError |
|
from utils.logging import get_logger, log_error, log_info, log_warning |
|
|
|
|
|
class StorageManager: |
|
"""Manages data storage and retrieval operations""" |
|
|
|
def __init__(self, base_path: str = "data"): |
|
self.base_path = Path(base_path) |
|
self.base_path.mkdir(exist_ok=True) |
|
self.logger = get_logger(__name__) |
|
|
|
def get_file_path(self, filename: str, subfolder: str = None) -> Path: |
|
"""Get the full file path for a given filename""" |
|
if subfolder: |
|
folder = self.base_path / subfolder |
|
folder.mkdir(exist_ok=True) |
|
return folder / filename |
|
return self.base_path / filename |
|
|
|
|
|
|
|
_storage_manager = StorageManager() |
|
|
|
|
|
@handle_data_exceptions |
|
def save_data(data: Any, filename: str, format_type: str = "json", subfolder: str = None) -> bool: |
|
""" |
|
Save data to file in specified format |
|
|
|
Args: |
|
data: Data to save |
|
filename: Name of the file to save |
|
format_type: Format to save in ('json', 'pickle', 'csv', 'txt') |
|
subfolder: Optional subfolder to save in |
|
|
|
Returns: |
|
bool: True if successful |
|
|
|
Raises: |
|
DataError: If saving fails |
|
ValidationError: If parameters are invalid |
|
""" |
|
if not filename: |
|
raise ValidationError("Filename cannot be empty") |
|
|
|
if format_type not in ['json', 'pickle', 'csv', 'txt']: |
|
raise ValidationError(f"Unsupported format type: {format_type}") |
|
|
|
try: |
|
file_path = _storage_manager.get_file_path(filename, subfolder) |
|
|
|
if format_type == "json": |
|
with open(file_path, 'w', encoding='utf-8') as f: |
|
json.dump(data, f, indent=2, ensure_ascii=False, default=str) |
|
|
|
elif format_type == "pickle": |
|
with open(file_path, 'wb') as f: |
|
pickle.dump(data, f) |
|
|
|
elif format_type == "csv": |
|
if not isinstance(data, (list, tuple)): |
|
raise ValidationError("CSV format requires list or tuple data") |
|
|
|
with open(file_path, 'w', newline='', encoding='utf-8') as f: |
|
if data and isinstance(data[0], dict): |
|
|
|
writer = csv.DictWriter(f, fieldnames=data[0].keys()) |
|
writer.writeheader() |
|
writer.writerows(data) |
|
else: |
|
|
|
writer = csv.writer(f) |
|
writer.writerows(data) |
|
|
|
elif format_type == "txt": |
|
with open(file_path, 'w', encoding='utf-8') as f: |
|
if isinstance(data, str): |
|
f.write(data) |
|
else: |
|
f.write(str(data)) |
|
|
|
log_info(f"Successfully saved data to {file_path}") |
|
return True |
|
|
|
except Exception as e: |
|
raise DataError(f"Failed to save data to {filename}", details={"format": format_type, "error": str(e)}) |
|
|
|
|
|
@handle_data_exceptions |
|
def load_data(filename: str, format_type: str = "json", subfolder: str = None, default: Any = None) -> Any: |
|
""" |
|
Load data from file in specified format |
|
|
|
Args: |
|
filename: Name of the file to load |
|
format_type: Format to load from ('json', 'pickle', 'csv', 'txt') |
|
subfolder: Optional subfolder to load from |
|
default: Default value if file doesn't exist |
|
|
|
Returns: |
|
Any: Loaded data or default value |
|
|
|
Raises: |
|
DataError: If loading fails |
|
ValidationError: If parameters are invalid |
|
""" |
|
if not filename: |
|
raise ValidationError("Filename cannot be empty") |
|
|
|
if format_type not in ['json', 'pickle', 'csv', 'txt']: |
|
raise ValidationError(f"Unsupported format type: {format_type}") |
|
|
|
try: |
|
file_path = _storage_manager.get_file_path(filename, subfolder) |
|
|
|
if not file_path.exists(): |
|
if default is not None: |
|
log_warning(f"File {file_path} not found, returning default value") |
|
return default |
|
else: |
|
raise DataError(f"File not found: {file_path}") |
|
|
|
if format_type == "json": |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
data = json.load(f) |
|
|
|
elif format_type == "pickle": |
|
with open(file_path, 'rb') as f: |
|
data = pickle.load(f) |
|
|
|
elif format_type == "csv": |
|
data = [] |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
|
sample = f.read(1024) |
|
f.seek(0) |
|
|
|
sniffer = csv.Sniffer() |
|
has_header = sniffer.has_header(sample) |
|
|
|
if has_header: |
|
reader = csv.DictReader(f) |
|
data = list(reader) |
|
else: |
|
reader = csv.reader(f) |
|
data = list(reader) |
|
|
|
elif format_type == "txt": |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
data = f.read() |
|
|
|
log_info(f"Successfully loaded data from {file_path}") |
|
return data |
|
|
|
except Exception as e: |
|
if default is not None: |
|
log_warning(f"Failed to load {filename}, returning default: {str(e)}") |
|
return default |
|
raise DataError(f"Failed to load data from {filename}", details={"format": format_type, "error": str(e)}) |
|
|
|
|
|
@handle_data_exceptions |
|
def delete_file(filename: str, subfolder: str = None) -> bool: |
|
""" |
|
Delete a file |
|
|
|
Args: |
|
filename: Name of the file to delete |
|
subfolder: Optional subfolder |
|
|
|
Returns: |
|
bool: True if successful |
|
|
|
Raises: |
|
DataError: If deletion fails |
|
""" |
|
try: |
|
file_path = _storage_manager.get_file_path(filename, subfolder) |
|
|
|
if file_path.exists(): |
|
file_path.unlink() |
|
log_info(f"Successfully deleted {file_path}") |
|
return True |
|
else: |
|
log_warning(f"File {file_path} does not exist") |
|
return False |
|
|
|
except Exception as e: |
|
raise DataError(f"Failed to delete {filename}", details={"error": str(e)}) |
|
|
|
|
|
@handle_data_exceptions |
|
def list_files(subfolder: str = None, extension: str = None) -> List[str]: |
|
""" |
|
List files in the storage directory |
|
|
|
Args: |
|
subfolder: Optional subfolder to list |
|
extension: Optional file extension filter |
|
|
|
Returns: |
|
List[str]: List of filenames |
|
|
|
Raises: |
|
DataError: If listing fails |
|
""" |
|
try: |
|
if subfolder: |
|
folder_path = _storage_manager.base_path / subfolder |
|
else: |
|
folder_path = _storage_manager.base_path |
|
|
|
if not folder_path.exists(): |
|
return [] |
|
|
|
files = [] |
|
for file_path in folder_path.iterdir(): |
|
if file_path.is_file(): |
|
if extension is None or file_path.suffix.lower() == extension.lower(): |
|
files.append(file_path.name) |
|
|
|
return sorted(files) |
|
|
|
except Exception as e: |
|
raise DataError("Failed to list files", details={"subfolder": subfolder, "error": str(e)}) |
|
|
|
|
|
@handle_data_exceptions |
|
def file_exists(filename: str, subfolder: str = None) -> bool: |
|
""" |
|
Check if a file exists |
|
|
|
Args: |
|
filename: Name of the file to check |
|
subfolder: Optional subfolder |
|
|
|
Returns: |
|
bool: True if file exists |
|
""" |
|
try: |
|
file_path = _storage_manager.get_file_path(filename, subfolder) |
|
return file_path.exists() |
|
except Exception as e: |
|
log_error(f"Error checking file existence: {str(e)}", error=e) |
|
return False |
|
|
|
|
|
@handle_data_exceptions |
|
def get_file_info(filename: str, subfolder: str = None) -> Dict: |
|
""" |
|
Get information about a file |
|
|
|
Args: |
|
filename: Name of the file |
|
subfolder: Optional subfolder |
|
|
|
Returns: |
|
Dict: File information including size, modified time, etc. |
|
|
|
Raises: |
|
DataError: If file doesn't exist or info retrieval fails |
|
""" |
|
try: |
|
file_path = _storage_manager.get_file_path(filename, subfolder) |
|
|
|
if not file_path.exists(): |
|
raise DataError(f"File not found: {filename}") |
|
|
|
stat = file_path.stat() |
|
|
|
return { |
|
"name": file_path.name, |
|
"size": stat.st_size, |
|
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), |
|
"created": datetime.fromtimestamp(stat.st_ctime).isoformat(), |
|
"extension": file_path.suffix, |
|
"path": str(file_path) |
|
} |
|
|
|
except Exception as e: |
|
raise DataError(f"Failed to get file info for {filename}", details={"error": str(e)}) |
|
|
|
|
|
|
|
class DataCache: |
|
"""Simple in-memory data cache""" |
|
|
|
def __init__(self, max_size: int = 100): |
|
self.cache: Dict[str, Any] = {} |
|
self.access_times: Dict[str, datetime] = {} |
|
self.max_size = max_size |
|
|
|
def get(self, key: str) -> Optional[Any]: |
|
"""Get value from cache""" |
|
if key in self.cache: |
|
self.access_times[key] = datetime.now() |
|
return self.cache[key] |
|
return None |
|
|
|
def set(self, key: str, value: Any) -> None: |
|
"""Set value in cache""" |
|
if len(self.cache) >= self.max_size: |
|
|
|
oldest_key = min(self.access_times.keys(), key=lambda k: self.access_times[k]) |
|
del self.cache[oldest_key] |
|
del self.access_times[oldest_key] |
|
|
|
self.cache[key] = value |
|
self.access_times[key] = datetime.now() |
|
|
|
def clear(self) -> None: |
|
"""Clear all cache""" |
|
self.cache.clear() |
|
self.access_times.clear() |
|
|
|
def remove(self, key: str) -> bool: |
|
"""Remove specific key from cache""" |
|
if key in self.cache: |
|
del self.cache[key] |
|
del self.access_times[key] |
|
return True |
|
return False |
|
|
|
|
|
|
|
_data_cache = DataCache() |
|
|
|
|
|
def get_cached_data(key: str) -> Optional[Any]: |
|
"""Get data from cache""" |
|
return _data_cache.get(key) |
|
|
|
|
|
def set_cached_data(key: str, value: Any) -> None: |
|
"""Set data in cache""" |
|
_data_cache.set(key, value) |
|
|
|
|
|
def clear_cache() -> None: |
|
"""Clear all cached data""" |
|
_data_cache.clear() |