""" Storage utilities for data persistence """ import json import pickle import os from pathlib import Path from typing import Any, Dict, Optional, Union import pandas as pd import streamlit as st from utils.error_handling import handle_data_exceptions, DataError, ValidationError # Default storage directories DATA_DIR = Path("data") CACHE_DIR = Path("cache") # Create directories if they don't exist DATA_DIR.mkdir(exist_ok=True) CACHE_DIR.mkdir(exist_ok=True) @handle_data_exceptions def load_data(filename: str, data_dir: Optional[Path] = None) -> Optional[Dict[str, Any]]: """ Load JSON data from file """ if data_dir is None: data_dir = DATA_DIR filepath = data_dir / filename if not filepath.exists(): raise DataError(f"File {filepath} does not exist") try: with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) return data except json.JSONDecodeError as e: raise DataError(f"Invalid JSON in file {filepath}: {str(e)}") except Exception as e: raise DataError(f"Error loading data from {filepath}: {str(e)}") @handle_data_exceptions def save_data(data: Dict[str, Any], filename: str, data_dir: Optional[Path] = None) -> bool: """ Save data to JSON file """ if data_dir is None: data_dir = DATA_DIR if not isinstance(data, dict): raise ValidationError("Data must be a dictionary") filepath = data_dir / filename # Create directory if it doesn't exist filepath.parent.mkdir(parents=True, exist_ok=True) try: with open(filepath, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) return True except Exception as e: raise DataError(f"Error saving data to {filepath}: {str(e)}") @handle_data_exceptions def load_dataframe(filename: str, data_dir: Optional[Path] = None) -> Optional[pd.DataFrame]: """ Load DataFrame from various file formats """ if data_dir is None: data_dir = DATA_DIR filepath = data_dir / filename if not filepath.exists(): raise DataError(f"File {filepath} does not exist") file_extension = filepath.suffix.lower() try: if file_extension == '.csv': df = pd.read_csv(filepath) elif file_extension == '.xlsx': df = pd.read_excel(filepath) elif file_extension == '.json': df = pd.read_json(filepath) elif file_extension == '.pkl': df = pd.read_pickle(filepath) else: raise DataError(f"Unsupported file format: {file_extension}") return df except Exception as e: raise DataError(f"Error loading dataframe from {filepath}: {str(e)}") @handle_data_exceptions def save_dataframe(df: pd.DataFrame, filename: str, data_dir: Optional[Path] = None, file_format: str = 'csv') -> bool: """ Save DataFrame to file """ if data_dir is None: data_dir = DATA_DIR if not isinstance(df, pd.DataFrame): raise ValidationError("Data must be a pandas DataFrame") if df.empty: raise ValidationError("DataFrame cannot be empty") # Ensure filename has correct extension if not filename.endswith(f'.{file_format}'): filename = f"{filename}.{file_format}" filepath = data_dir / filename # Create directory if it doesn't exist filepath.parent.mkdir(parents=True, exist_ok=True) try: if file_format == 'csv': df.to_csv(filepath, index=False) elif file_format == 'xlsx': df.to_excel(filepath, index=False) elif file_format == 'json': df.to_json(filepath, orient='records', indent=2) elif file_format == 'pkl': df.to_pickle(filepath) else: raise DataError(f"Unsupported file format: {file_format}") return True except Exception as e: raise DataError(f"Error saving dataframe to {filepath}: {str(e)}") @handle_data_exceptions def load_pickle(filename: str, data_dir: Optional[Path] = None) -> Optional[Any]: """ Load pickled data from file """ if data_dir is None: data_dir = DATA_DIR filepath = data_dir / filename if not filepath.exists(): raise DataError(f"File {filepath} does not exist") try: with open(filepath, 'rb') as f: data = pickle.load(f) return data except Exception as e: raise DataError(f"Error loading pickle from {filepath}: {str(e)}") @handle_data_exceptions def save_pickle(data: Any, filename: str, data_dir: Optional[Path] = None) -> bool: """ Save data to pickle file """ if data_dir is None: data_dir = DATA_DIR if not filename.endswith('.pkl'): filename = f"{filename}.pkl" filepath = data_dir / filename # Create directory if it doesn't exist filepath.parent.mkdir(parents=True, exist_ok=True) try: with open(filepath, 'wb') as f: pickle.dump(data, f) return True except Exception as e: raise DataError(f"Error saving pickle to {filepath}: {str(e)}") def list_files(data_dir: Optional[Path] = None, pattern: str = "*") -> list[Path]: """ List files in data directory """ if data_dir is None: data_dir = DATA_DIR if not data_dir.exists(): return [] return list(data_dir.glob(pattern)) def file_exists(filename: str, data_dir: Optional[Path] = None) -> bool: """ Check if file exists """ if data_dir is None: data_dir = DATA_DIR filepath = data_dir / filename return filepath.exists() def delete_file(filename: str, data_dir: Optional[Path] = None) -> bool: """ Delete file """ if data_dir is None: data_dir = DATA_DIR filepath = data_dir / filename if filepath.exists(): try: filepath.unlink() return True except Exception as e: st.error(f"Error deleting file {filepath}: {str(e)}") return False return False def get_file_size(filename: str, data_dir: Optional[Path] = None) -> Optional[int]: """ Get file size in bytes """ if data_dir is None: data_dir = DATA_DIR filepath = data_dir / filename if filepath.exists(): return filepath.stat().st_size return None def create_backup(filename: str, data_dir: Optional[Path] = None) -> bool: """ Create backup of file """ if data_dir is None: data_dir = DATA_DIR filepath = data_dir / filename if not filepath.exists(): return False backup_filename = f"{filepath.stem}_backup{filepath.suffix}" backup_filepath = data_dir / backup_filename try: import shutil shutil.copy2(filepath, backup_filepath) return True except Exception as e: st.error(f"Error creating backup: {str(e)}") return False # Session state management def init_session_state(key: str, default_value: Any = None): """ Initialize session state variable if it doesn't exist """ if key not in st.session_state: st.session_state[key] = default_value def get_session_state(key: str, default_value: Any = None) -> Any: """ Get value from session state """ return st.session_state.get(key, default_value) def set_session_state(key: str, value: Any): """ Set value in session state """ st.session_state[key] = value def clear_session_state(key: Optional[str] = None): """ Clear session state (specific key or all) """ if key: if key in st.session_state: del st.session_state[key] else: st.session_state.clear()