mona / utils /storage.py
mrradix's picture
Update utils/storage.py
5abe59f verified
"""
Storage utilities for data persistence
"""
import json
import pickle
import os
from pathlib import Path
from typing import Any, Dict, Optional, Union
import pandas as pd
import streamlit as st
from utils.error_handling import handle_data_exceptions, DataError, ValidationError
# Default storage directories
DATA_DIR = Path("data")
CACHE_DIR = Path("cache")
# Create directories if they don't exist
DATA_DIR.mkdir(exist_ok=True)
CACHE_DIR.mkdir(exist_ok=True)
@handle_data_exceptions
def load_data(filename: str, data_dir: Optional[Path] = None) -> Optional[Dict[str, Any]]:
"""
Load JSON data from file
"""
if data_dir is None:
data_dir = DATA_DIR
filepath = data_dir / filename
if not filepath.exists():
raise DataError(f"File {filepath} does not exist")
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
return data
except json.JSONDecodeError as e:
raise DataError(f"Invalid JSON in file {filepath}: {str(e)}")
except Exception as e:
raise DataError(f"Error loading data from {filepath}: {str(e)}")
@handle_data_exceptions
def save_data(data: Dict[str, Any], filename: str, data_dir: Optional[Path] = None) -> bool:
"""
Save data to JSON file
"""
if data_dir is None:
data_dir = DATA_DIR
if not isinstance(data, dict):
raise ValidationError("Data must be a dictionary")
filepath = data_dir / filename
# Create directory if it doesn't exist
filepath.parent.mkdir(parents=True, exist_ok=True)
try:
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
raise DataError(f"Error saving data to {filepath}: {str(e)}")
@handle_data_exceptions
def load_dataframe(filename: str, data_dir: Optional[Path] = None) -> Optional[pd.DataFrame]:
"""
Load DataFrame from various file formats
"""
if data_dir is None:
data_dir = DATA_DIR
filepath = data_dir / filename
if not filepath.exists():
raise DataError(f"File {filepath} does not exist")
file_extension = filepath.suffix.lower()
try:
if file_extension == '.csv':
df = pd.read_csv(filepath)
elif file_extension == '.xlsx':
df = pd.read_excel(filepath)
elif file_extension == '.json':
df = pd.read_json(filepath)
elif file_extension == '.pkl':
df = pd.read_pickle(filepath)
else:
raise DataError(f"Unsupported file format: {file_extension}")
return df
except Exception as e:
raise DataError(f"Error loading dataframe from {filepath}: {str(e)}")
@handle_data_exceptions
def save_dataframe(df: pd.DataFrame, filename: str, data_dir: Optional[Path] = None,
file_format: str = 'csv') -> bool:
"""
Save DataFrame to file
"""
if data_dir is None:
data_dir = DATA_DIR
if not isinstance(df, pd.DataFrame):
raise ValidationError("Data must be a pandas DataFrame")
if df.empty:
raise ValidationError("DataFrame cannot be empty")
# Ensure filename has correct extension
if not filename.endswith(f'.{file_format}'):
filename = f"{filename}.{file_format}"
filepath = data_dir / filename
# Create directory if it doesn't exist
filepath.parent.mkdir(parents=True, exist_ok=True)
try:
if file_format == 'csv':
df.to_csv(filepath, index=False)
elif file_format == 'xlsx':
df.to_excel(filepath, index=False)
elif file_format == 'json':
df.to_json(filepath, orient='records', indent=2)
elif file_format == 'pkl':
df.to_pickle(filepath)
else:
raise DataError(f"Unsupported file format: {file_format}")
return True
except Exception as e:
raise DataError(f"Error saving dataframe to {filepath}: {str(e)}")
@handle_data_exceptions
def load_pickle(filename: str, data_dir: Optional[Path] = None) -> Optional[Any]:
"""
Load pickled data from file
"""
if data_dir is None:
data_dir = DATA_DIR
filepath = data_dir / filename
if not filepath.exists():
raise DataError(f"File {filepath} does not exist")
try:
with open(filepath, 'rb') as f:
data = pickle.load(f)
return data
except Exception as e:
raise DataError(f"Error loading pickle from {filepath}: {str(e)}")
@handle_data_exceptions
def save_pickle(data: Any, filename: str, data_dir: Optional[Path] = None) -> bool:
"""
Save data to pickle file
"""
if data_dir is None:
data_dir = DATA_DIR
if not filename.endswith('.pkl'):
filename = f"{filename}.pkl"
filepath = data_dir / filename
# Create directory if it doesn't exist
filepath.parent.mkdir(parents=True, exist_ok=True)
try:
with open(filepath, 'wb') as f:
pickle.dump(data, f)
return True
except Exception as e:
raise DataError(f"Error saving pickle to {filepath}: {str(e)}")
def list_files(data_dir: Optional[Path] = None, pattern: str = "*") -> list[Path]:
"""
List files in data directory
"""
if data_dir is None:
data_dir = DATA_DIR
if not data_dir.exists():
return []
return list(data_dir.glob(pattern))
def file_exists(filename: str, data_dir: Optional[Path] = None) -> bool:
"""
Check if file exists
"""
if data_dir is None:
data_dir = DATA_DIR
filepath = data_dir / filename
return filepath.exists()
def delete_file(filename: str, data_dir: Optional[Path] = None) -> bool:
"""
Delete file
"""
if data_dir is None:
data_dir = DATA_DIR
filepath = data_dir / filename
if filepath.exists():
try:
filepath.unlink()
return True
except Exception as e:
st.error(f"Error deleting file {filepath}: {str(e)}")
return False
return False
def get_file_size(filename: str, data_dir: Optional[Path] = None) -> Optional[int]:
"""
Get file size in bytes
"""
if data_dir is None:
data_dir = DATA_DIR
filepath = data_dir / filename
if filepath.exists():
return filepath.stat().st_size
return None
def create_backup(filename: str, data_dir: Optional[Path] = None) -> bool:
"""
Create backup of file
"""
if data_dir is None:
data_dir = DATA_DIR
filepath = data_dir / filename
if not filepath.exists():
return False
backup_filename = f"{filepath.stem}_backup{filepath.suffix}"
backup_filepath = data_dir / backup_filename
try:
import shutil
shutil.copy2(filepath, backup_filepath)
return True
except Exception as e:
st.error(f"Error creating backup: {str(e)}")
return False
# Session state management
def init_session_state(key: str, default_value: Any = None):
"""
Initialize session state variable if it doesn't exist
"""
if key not in st.session_state:
st.session_state[key] = default_value
def get_session_state(key: str, default_value: Any = None) -> Any:
"""
Get value from session state
"""
return st.session_state.get(key, default_value)
def set_session_state(key: str, value: Any):
"""
Set value in session state
"""
st.session_state[key] = value
def clear_session_state(key: Optional[str] = None):
"""
Clear session state (specific key or all)
"""
if key:
if key in st.session_state:
del st.session_state[key]
else:
st.session_state.clear()