Spaces:
Running
Running
import os | |
import hashlib | |
import uuid | |
import json | |
import logging | |
import shutil | |
from pathlib import Path | |
from datetime import datetime, timedelta | |
import pandas as pd | |
logger = logging.getLogger(__name__) | |
class SessionManager: | |
""" | |
Менеджер сесій користувачів для управління даними в багатокористувацькому середовищі. | |
Забезпечує ізоляцію даних між користувачами та уникнення конфліктів. | |
""" | |
def __init__(self, base_dir="temp/sessions"): | |
""" | |
Ініціалізація менеджера сесій. | |
Args: | |
base_dir (str): Базова директорія для зберігання сесій | |
""" | |
self.base_dir = Path(base_dir) | |
self.base_dir.mkdir(exist_ok=True, parents=True) | |
# Очищення застарілих сесій при ініціалізації | |
self.cleanup_old_sessions() | |
def create_session(self, user_id=None): | |
""" | |
Створення нової сесії користувача. | |
Args: | |
user_id (str, optional): Ідентифікатор користувача. Якщо None, генерується випадковий. | |
Returns: | |
str: Ідентифікатор сесії | |
""" | |
# Якщо user_id не вказано, генеруємо випадковий | |
if not user_id: | |
user_id = str(uuid.uuid4()) | |
# Генеруємо унікальний ідентифікатор сесії | |
session_id = f"{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}" | |
# Створюємо директорію для сесії | |
session_dir = self.base_dir / session_id | |
session_dir.mkdir(exist_ok=True) | |
# Створюємо підпапки для різних типів даних | |
(session_dir / "data").mkdir(exist_ok=True) # Для CSV та DataFrame | |
(session_dir / "indices").mkdir(exist_ok=True) # Для індексів FAISS та BM25 | |
(session_dir / "reports").mkdir(exist_ok=True) # Для звітів | |
(session_dir / "viz").mkdir(exist_ok=True) # Для візуалізацій | |
# Зберігаємо метадані сесії | |
metadata = { | |
"user_id": user_id, | |
"created_at": datetime.now().isoformat(), | |
"last_accessed": datetime.now().isoformat(), | |
"status": "active", | |
"data_files": [] | |
} | |
self._save_session_metadata(session_id, metadata) | |
logger.info(f"Створено нову сесію: {session_id}") | |
return session_id | |
def get_session_dir(self, session_id): | |
""" | |
Отримання шляху до директорії сесії. | |
Args: | |
session_id (str): Ідентифікатор сесії | |
Returns: | |
Path: Шлях до директорії сесії або None, якщо сесія не існує | |
""" | |
session_dir = self.base_dir / session_id | |
if not session_dir.exists(): | |
logger.warning(f"Сесія не знайдена: {session_id}") | |
return None | |
# Оновлюємо час останнього доступу | |
self._update_session_access_time(session_id) | |
return session_dir | |
def get_session_data_dir(self, session_id): | |
""" | |
Отримання шляху до директорії даних сесії. | |
Args: | |
session_id (str): Ідентифікатор сесії | |
Returns: | |
Path: Шлях до директорії даних або None, якщо сесія не існує | |
""" | |
session_dir = self.get_session_dir(session_id) | |
if not session_dir: | |
return None | |
return session_dir / "data" | |
def get_session_indices_dir(self, session_id): | |
""" | |
Отримання шляху до директорії індексів сесії. | |
Args: | |
session_id (str): Ідентифікатор сесії | |
Returns: | |
Path: Шлях до директорії індексів або None, якщо сесія не існує | |
""" | |
session_dir = self.get_session_dir(session_id) | |
if not session_dir: | |
return None | |
return session_dir / "indices" | |
def add_data_file(self, session_id, file_path, file_type="uploaded", description=None): | |
""" | |
Додавання інформації про файл даних до сесії. | |
Args: | |
session_id (str): Ідентифікатор сесії | |
file_path (str): Шлях до файлу | |
file_type (str): Тип файлу ("uploaded", "local", "merged") | |
description (str, optional): Опис файлу | |
Returns: | |
bool: True, якщо дані успішно додані, False у випадку помилки | |
""" | |
session_dir = self.get_session_dir(session_id) | |
if not session_dir: | |
return False | |
# Отримуємо поточні метадані сесії | |
metadata = self._get_session_metadata(session_id) | |
if not metadata: | |
return False | |
# Генеруємо хеш файлу для відстеження дублікатів | |
file_hash = self._generate_file_hash(file_path) | |
# Додаємо інформацію про файл | |
file_info = { | |
"path": str(file_path), | |
"filename": os.path.basename(file_path), | |
"type": file_type, | |
"hash": file_hash, | |
"size": os.path.getsize(file_path) if os.path.exists(file_path) else 0, | |
"added_at": datetime.now().isoformat(), | |
"description": description or "" | |
} | |
# Перевіряємо на дублікати | |
for existing_file in metadata.get("data_files", []): | |
if existing_file.get("hash") == file_hash: | |
logger.warning(f"Файл вже існує в сесії: {file_path}") | |
return True | |
# Додаємо файл до списку | |
metadata.setdefault("data_files", []).append(file_info) | |
# Оновлюємо метадані | |
self._save_session_metadata(session_id, metadata) | |
logger.info(f"Додано файл даних до сесії {session_id}: {file_path}") | |
return True | |
def remove_data_file(self, session_id, file_path_or_hash): | |
""" | |
Видалення інформації про файл даних із сесії. | |
Args: | |
session_id (str): Ідентифікатор сесії | |
file_path_or_hash (str): Шлях до файлу або його хеш | |
Returns: | |
bool: True, якщо дані успішно видалені, False у випадку помилки | |
""" | |
session_dir = self.get_session_dir(session_id) | |
if not session_dir: | |
return False | |
# Отримуємо поточні метадані сесії | |
metadata = self._get_session_metadata(session_id) | |
if not metadata: | |
return False | |
# Шукаємо файл за шляхом або хешем | |
file_found = False | |
updated_files = [] | |
for file_info in metadata.get("data_files", []): | |
if file_info.get("path") == file_path_or_hash or file_info.get("hash") == file_path_or_hash: | |
file_found = True | |
# Файл може бути фізично видалений, якщо він знаходиться в директорії сесії | |
if file_info.get("path").startswith(str(session_dir)): | |
try: | |
os.remove(file_info.get("path")) | |
logger.info(f"Фізично видалено файл: {file_info.get('path')}") | |
except Exception as e: | |
logger.warning(f"Не вдалося видалити файл {file_info.get('path')}: {e}") | |
else: | |
updated_files.append(file_info) | |
if not file_found: | |
logger.warning(f"Файл не знайдено в сесії: {file_path_or_hash}") | |
return False | |
# Оновлюємо список файлів | |
metadata["data_files"] = updated_files | |
# Оновлюємо метадані | |
self._save_session_metadata(session_id, metadata) | |
logger.info(f"Видалено файл з сесії {session_id}: {file_path_or_hash}") | |
return True | |
def get_session_files(self, session_id): | |
""" | |
Отримання списку файлів даних сесії. | |
Args: | |
session_id (str): Ідентифікатор сесії | |
Returns: | |
list: Список інформації про файли або порожній список у випадку помилки | |
""" | |
# Отримуємо поточні метадані сесії | |
metadata = self._get_session_metadata(session_id) | |
if not metadata: | |
return [] | |
return metadata.get("data_files", []) | |
def save_merged_data(self, session_id, merged_df, output_filename=None): | |
""" | |
Збереження об'єднаних даних у сесію. | |
Args: | |
session_id (str): Ідентифікатор сесії | |
merged_df (pandas.DataFrame): DataFrame з об'єднаними даними | |
output_filename (str, optional): Ім'я файлу для збереження. Якщо None, генерується автоматично. | |
Returns: | |
str: Шлях до збереженого файлу або None у випадку помилки | |
""" | |
session_data_dir = self.get_session_data_dir(session_id) | |
if not session_data_dir: | |
return None | |
try: | |
# Генеруємо ім'я файлу, якщо не вказано | |
if not output_filename: | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
output_filename = f"merged_data_{timestamp}.csv" | |
# Переконуємося, що файл має розширення .csv | |
if not output_filename.lower().endswith(".csv"): | |
output_filename += ".csv" | |
# Шлях для збереження | |
output_path = session_data_dir / output_filename | |
# Зберігаємо DataFrame у CSV | |
merged_df.to_csv(output_path, index=False) | |
# Додаємо інформацію про файл до сесії | |
self.add_data_file( | |
session_id, | |
str(output_path), | |
file_type="merged", | |
description="Об'єднані дані" | |
) | |
logger.info(f"Збережено об'єднані дані у сесії {session_id}: {output_path}") | |
return str(output_path) | |
except Exception as e: | |
logger.error(f"Помилка при збереженні об'єднаних даних: {e}") | |
return None | |
def cleanup_session(self, session_id): | |
""" | |
Очищення сесії (видалення всіх файлів і директорій). | |
Args: | |
session_id (str): Ідентифікатор сесії | |
Returns: | |
bool: True, якщо сесія успішно очищена, False у випадку помилки | |
""" | |
session_dir = self.base_dir / session_id | |
if not session_dir.exists(): | |
logger.warning(f"Сесія не знайдена: {session_id}") | |
return False | |
try: | |
# Видаляємо всю директорію сесії | |
shutil.rmtree(session_dir) | |
logger.info(f"Сесію {session_id} успішно очищено") | |
return True | |
except Exception as e: | |
logger.error(f"Помилка при очищенні сесії {session_id}: {e}") | |
return False | |
def cleanup_old_sessions(self, max_age_hours=24): | |
""" | |
Очищення застарілих сесій. | |
Args: | |
max_age_hours (int): Максимальний вік сесії в годинах для збереження | |
Returns: | |
int: Кількість видалених сесій | |
""" | |
cutoff_time = datetime.now() - timedelta(hours=max_age_hours) | |
deleted_count = 0 | |
# Перебираємо всі підпапки в базовій директорії | |
for session_dir in self.base_dir.iterdir(): | |
if not session_dir.is_dir(): | |
continue | |
# Перевіряємо час останнього доступу до сесії | |
metadata_file = session_dir / "metadata.json" | |
if not metadata_file.exists(): | |
# Якщо немає метаданих, видаляємо директорію | |
try: | |
shutil.rmtree(session_dir) | |
deleted_count += 1 | |
logger.info(f"Видалено сесію без метаданих: {session_dir.name}") | |
except Exception as e: | |
logger.error(f"Помилка при видаленні сесії {session_dir.name}: {e}") | |
continue | |
try: | |
with open(metadata_file, "r", encoding="utf-8") as f: | |
metadata = json.load(f) | |
last_accessed = datetime.fromisoformat(metadata.get("last_accessed", metadata.get("created_at"))) | |
if last_accessed < cutoff_time: | |
# Сесія застаріла, видаляємо її | |
shutil.rmtree(session_dir) | |
deleted_count += 1 | |
logger.info(f"Видалено застарілу сесію: {session_dir.name}, " | |
f"останній доступ: {last_accessed.isoformat()}") | |
except Exception as e: | |
logger.error(f"Помилка при перевірці сесії {session_dir.name}: {e}") | |
logger.info(f"Очищено {deleted_count} застарілих сесій") | |
return deleted_count | |
def _save_session_metadata(self, session_id, metadata): | |
""" | |
Збереження метаданих сесії. | |
Args: | |
session_id (str): Ідентифікатор сесії | |
metadata (dict): Метадані для збереження | |
Returns: | |
bool: True, якщо метадані успішно збережені, False у випадку помилки | |
""" | |
session_dir = self.base_dir / session_id | |
if not session_dir.exists(): | |
logger.warning(f"Сесія не знайдена: {session_id}") | |
return False | |
metadata_file = session_dir / "metadata.json" | |
try: | |
with open(metadata_file, "w", encoding="utf-8") as f: | |
json.dump(metadata, f, ensure_ascii=False, indent=2) | |
return True | |
except Exception as e: | |
logger.error(f"Помилка при збереженні метаданих сесії {session_id}: {e}") | |
return False | |
def _get_session_metadata(self, session_id): | |
""" | |
Отримання метаданих сесії. | |
Args: | |
session_id (str): Ідентифікатор сесії | |
Returns: | |
dict: Метадані сесії або None у випадку помилки | |
""" | |
session_dir = self.base_dir / session_id | |
metadata_file = session_dir / "metadata.json" | |
if not metadata_file.exists(): | |
logger.warning(f"Метадані сесії не знайдені: {session_id}") | |
return None | |
try: | |
with open(metadata_file, "r", encoding="utf-8") as f: | |
metadata = json.load(f) | |
return metadata | |
except Exception as e: | |
logger.error(f"Помилка при читанні метаданих сесії {session_id}: {e}") | |
return None | |
def _update_session_access_time(self, session_id): | |
""" | |
Оновлення часу останнього доступу до сесії. | |
Args: | |
session_id (str): Ідентифікатор сесії | |
Returns: | |
bool: True, якщо час доступу успішно оновлено, False у випадку помилки | |
""" | |
metadata = self._get_session_metadata(session_id) | |
if not metadata: | |
return False | |
metadata["last_accessed"] = datetime.now().isoformat() | |
return self._save_session_metadata(session_id, metadata) | |
def _generate_file_hash(file_path): | |
""" | |
Генерує хеш для файлу на основі його вмісту або шляху. | |
Args: | |
file_path (str): Шлях до файлу | |
Returns: | |
str: Хеш файлу | |
""" | |
try: | |
if os.path.exists(file_path): | |
# Для невеликих файлів використовуємо вміст файлу | |
if os.path.getsize(file_path) < 10 * 1024 * 1024: # < 10 MB | |
sha256 = hashlib.sha256() | |
with open(file_path, "rb") as f: | |
for byte_block in iter(lambda: f.read(4096), b""): | |
sha256.update(byte_block) | |
return sha256.hexdigest() | |
else: | |
# Для великих файлів використовуємо шлях, розмір і час модифікації | |
file_stat = os.stat(file_path) | |
hash_input = f"{file_path}_{file_stat.st_size}_{file_stat.st_mtime}" | |
return hashlib.md5(hash_input.encode()).hexdigest() | |
else: | |
# Якщо файл не існує, повертаємо хеш шляху | |
return hashlib.md5(file_path.encode()).hexdigest() | |
except Exception as e: | |
logger.warning(f"Помилка при генерації хешу файлу {file_path}: {e}") | |
# У випадку помилки, повертаємо хеш шляху | |
return hashlib.md5(str(file_path).encode()).hexdigest() |