mona / utils /storage.py
mrradix's picture
Update utils/storage.py
052657c verified
raw
history blame
11 kB
"""
Storage utilities for the MONA application
Fixed version with proper error handling and data management
"""
import json
import os
import pickle
import csv
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from datetime import datetime
from utils.error_handling import handle_data_exceptions, DataError, ValidationError
from utils.logging import get_logger, log_error, log_info, log_warning
class StorageManager:
"""Manages data storage and retrieval operations"""
def __init__(self, base_path: str = "data"):
self.base_path = Path(base_path)
self.base_path.mkdir(exist_ok=True)
self.logger = get_logger(__name__)
def get_file_path(self, filename: str, subfolder: str = None) -> Path:
"""Get the full file path for a given filename"""
if subfolder:
folder = self.base_path / subfolder
folder.mkdir(exist_ok=True)
return folder / filename
return self.base_path / filename
# Global storage manager instance
_storage_manager = StorageManager()
@handle_data_exceptions
def save_data(data: Any, filename: str, format_type: str = "json", subfolder: str = None) -> bool:
"""
Save data to file in specified format
Args:
data: Data to save
filename: Name of the file to save
format_type: Format to save in ('json', 'pickle', 'csv', 'txt')
subfolder: Optional subfolder to save in
Returns:
bool: True if successful
Raises:
DataError: If saving fails
ValidationError: If parameters are invalid
"""
if not filename:
raise ValidationError("Filename cannot be empty")
if format_type not in ['json', 'pickle', 'csv', 'txt']:
raise ValidationError(f"Unsupported format type: {format_type}")
try:
file_path = _storage_manager.get_file_path(filename, subfolder)
if format_type == "json":
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False, default=str)
elif format_type == "pickle":
with open(file_path, 'wb') as f:
pickle.dump(data, f)
elif format_type == "csv":
if not isinstance(data, (list, tuple)):
raise ValidationError("CSV format requires list or tuple data")
with open(file_path, 'w', newline='', encoding='utf-8') as f:
if data and isinstance(data[0], dict):
# Dictionary data
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
else:
# List data
writer = csv.writer(f)
writer.writerows(data)
elif format_type == "txt":
with open(file_path, 'w', encoding='utf-8') as f:
if isinstance(data, str):
f.write(data)
else:
f.write(str(data))
log_info(f"Successfully saved data to {file_path}")
return True
except Exception as e:
raise DataError(f"Failed to save data to {filename}", details={"format": format_type, "error": str(e)})
@handle_data_exceptions
def load_data(filename: str, format_type: str = "json", subfolder: str = None, default: Any = None) -> Any:
"""
Load data from file in specified format
Args:
filename: Name of the file to load
format_type: Format to load from ('json', 'pickle', 'csv', 'txt')
subfolder: Optional subfolder to load from
default: Default value if file doesn't exist
Returns:
Any: Loaded data or default value
Raises:
DataError: If loading fails
ValidationError: If parameters are invalid
"""
if not filename:
raise ValidationError("Filename cannot be empty")
if format_type not in ['json', 'pickle', 'csv', 'txt']:
raise ValidationError(f"Unsupported format type: {format_type}")
try:
file_path = _storage_manager.get_file_path(filename, subfolder)
if not file_path.exists():
if default is not None:
log_warning(f"File {file_path} not found, returning default value")
return default
else:
raise DataError(f"File not found: {file_path}")
if format_type == "json":
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
elif format_type == "pickle":
with open(file_path, 'rb') as f:
data = pickle.load(f)
elif format_type == "csv":
data = []
with open(file_path, 'r', encoding='utf-8') as f:
# Try to detect if first row is header
sample = f.read(1024)
f.seek(0)
sniffer = csv.Sniffer()
has_header = sniffer.has_header(sample)
if has_header:
reader = csv.DictReader(f)
data = list(reader)
else:
reader = csv.reader(f)
data = list(reader)
elif format_type == "txt":
with open(file_path, 'r', encoding='utf-8') as f:
data = f.read()
log_info(f"Successfully loaded data from {file_path}")
return data
except Exception as e:
if default is not None:
log_warning(f"Failed to load {filename}, returning default: {str(e)}")
return default
raise DataError(f"Failed to load data from {filename}", details={"format": format_type, "error": str(e)})
@handle_data_exceptions
def delete_file(filename: str, subfolder: str = None) -> bool:
"""
Delete a file
Args:
filename: Name of the file to delete
subfolder: Optional subfolder
Returns:
bool: True if successful
Raises:
DataError: If deletion fails
"""
try:
file_path = _storage_manager.get_file_path(filename, subfolder)
if file_path.exists():
file_path.unlink()
log_info(f"Successfully deleted {file_path}")
return True
else:
log_warning(f"File {file_path} does not exist")
return False
except Exception as e:
raise DataError(f"Failed to delete {filename}", details={"error": str(e)})
@handle_data_exceptions
def list_files(subfolder: str = None, extension: str = None) -> List[str]:
"""
List files in the storage directory
Args:
subfolder: Optional subfolder to list
extension: Optional file extension filter
Returns:
List[str]: List of filenames
Raises:
DataError: If listing fails
"""
try:
if subfolder:
folder_path = _storage_manager.base_path / subfolder
else:
folder_path = _storage_manager.base_path
if not folder_path.exists():
return []
files = []
for file_path in folder_path.iterdir():
if file_path.is_file():
if extension is None or file_path.suffix.lower() == extension.lower():
files.append(file_path.name)
return sorted(files)
except Exception as e:
raise DataError("Failed to list files", details={"subfolder": subfolder, "error": str(e)})
@handle_data_exceptions
def file_exists(filename: str, subfolder: str = None) -> bool:
"""
Check if a file exists
Args:
filename: Name of the file to check
subfolder: Optional subfolder
Returns:
bool: True if file exists
"""
try:
file_path = _storage_manager.get_file_path(filename, subfolder)
return file_path.exists()
except Exception as e:
log_error(f"Error checking file existence: {str(e)}", error=e)
return False
@handle_data_exceptions
def get_file_info(filename: str, subfolder: str = None) -> Dict:
"""
Get information about a file
Args:
filename: Name of the file
subfolder: Optional subfolder
Returns:
Dict: File information including size, modified time, etc.
Raises:
DataError: If file doesn't exist or info retrieval fails
"""
try:
file_path = _storage_manager.get_file_path(filename, subfolder)
if not file_path.exists():
raise DataError(f"File not found: {filename}")
stat = file_path.stat()
return {
"name": file_path.name,
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"created": datetime.fromtimestamp(stat.st_ctime).isoformat(),
"extension": file_path.suffix,
"path": str(file_path)
}
except Exception as e:
raise DataError(f"Failed to get file info for {filename}", details={"error": str(e)})
# Cache management
class DataCache:
"""Simple in-memory data cache"""
def __init__(self, max_size: int = 100):
self.cache: Dict[str, Any] = {}
self.access_times: Dict[str, datetime] = {}
self.max_size = max_size
def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
if key in self.cache:
self.access_times[key] = datetime.now()
return self.cache[key]
return None
def set(self, key: str, value: Any) -> None:
"""Set value in cache"""
if len(self.cache) >= self.max_size:
# Remove oldest item
oldest_key = min(self.access_times.keys(), key=lambda k: self.access_times[k])
del self.cache[oldest_key]
del self.access_times[oldest_key]
self.cache[key] = value
self.access_times[key] = datetime.now()
def clear(self) -> None:
"""Clear all cache"""
self.cache.clear()
self.access_times.clear()
def remove(self, key: str) -> bool:
"""Remove specific key from cache"""
if key in self.cache:
del self.cache[key]
del self.access_times[key]
return True
return False
# Global cache instance
_data_cache = DataCache()
def get_cached_data(key: str) -> Optional[Any]:
"""Get data from cache"""
return _data_cache.get(key)
def set_cached_data(key: str, value: Any) -> None:
"""Set data in cache"""
_data_cache.set(key, value)
def clear_cache() -> None:
"""Clear all cached data"""
_data_cache.clear()