DocUA's picture
Єдиний коміт - очищення історії
4ad5efa
import os
import hashlib
import uuid
import json
import logging
import shutil
from pathlib import Path
from datetime import datetime, timedelta
import pandas as pd
logger = logging.getLogger(__name__)
class SessionManager:
"""
Менеджер сесій користувачів для управління даними в багатокористувацькому середовищі.
Забезпечує ізоляцію даних між користувачами та уникнення конфліктів.
"""
def __init__(self, base_dir="temp/sessions"):
"""
Ініціалізація менеджера сесій.
Args:
base_dir (str): Базова директорія для зберігання сесій
"""
self.base_dir = Path(base_dir)
self.base_dir.mkdir(exist_ok=True, parents=True)
# Очищення застарілих сесій при ініціалізації
self.cleanup_old_sessions()
def create_session(self, user_id=None):
"""
Створення нової сесії користувача.
Args:
user_id (str, optional): Ідентифікатор користувача. Якщо None, генерується випадковий.
Returns:
str: Ідентифікатор сесії
"""
# Якщо user_id не вказано, генеруємо випадковий
if not user_id:
user_id = str(uuid.uuid4())
# Генеруємо унікальний ідентифікатор сесії
session_id = f"{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"
# Створюємо директорію для сесії
session_dir = self.base_dir / session_id
session_dir.mkdir(exist_ok=True)
# Створюємо підпапки для різних типів даних
(session_dir / "data").mkdir(exist_ok=True) # Для CSV та DataFrame
(session_dir / "indices").mkdir(exist_ok=True) # Для індексів FAISS та BM25
(session_dir / "reports").mkdir(exist_ok=True) # Для звітів
(session_dir / "viz").mkdir(exist_ok=True) # Для візуалізацій
# Зберігаємо метадані сесії
metadata = {
"user_id": user_id,
"created_at": datetime.now().isoformat(),
"last_accessed": datetime.now().isoformat(),
"status": "active",
"data_files": []
}
self._save_session_metadata(session_id, metadata)
logger.info(f"Створено нову сесію: {session_id}")
return session_id
def get_session_dir(self, session_id):
"""
Отримання шляху до директорії сесії.
Args:
session_id (str): Ідентифікатор сесії
Returns:
Path: Шлях до директорії сесії або None, якщо сесія не існує
"""
session_dir = self.base_dir / session_id
if not session_dir.exists():
logger.warning(f"Сесія не знайдена: {session_id}")
return None
# Оновлюємо час останнього доступу
self._update_session_access_time(session_id)
return session_dir
def get_session_data_dir(self, session_id):
"""
Отримання шляху до директорії даних сесії.
Args:
session_id (str): Ідентифікатор сесії
Returns:
Path: Шлях до директорії даних або None, якщо сесія не існує
"""
session_dir = self.get_session_dir(session_id)
if not session_dir:
return None
return session_dir / "data"
def get_session_indices_dir(self, session_id):
"""
Отримання шляху до директорії індексів сесії.
Args:
session_id (str): Ідентифікатор сесії
Returns:
Path: Шлях до директорії індексів або None, якщо сесія не існує
"""
session_dir = self.get_session_dir(session_id)
if not session_dir:
return None
return session_dir / "indices"
def add_data_file(self, session_id, file_path, file_type="uploaded", description=None):
"""
Додавання інформації про файл даних до сесії.
Args:
session_id (str): Ідентифікатор сесії
file_path (str): Шлях до файлу
file_type (str): Тип файлу ("uploaded", "local", "merged")
description (str, optional): Опис файлу
Returns:
bool: True, якщо дані успішно додані, False у випадку помилки
"""
session_dir = self.get_session_dir(session_id)
if not session_dir:
return False
# Отримуємо поточні метадані сесії
metadata = self._get_session_metadata(session_id)
if not metadata:
return False
# Генеруємо хеш файлу для відстеження дублікатів
file_hash = self._generate_file_hash(file_path)
# Додаємо інформацію про файл
file_info = {
"path": str(file_path),
"filename": os.path.basename(file_path),
"type": file_type,
"hash": file_hash,
"size": os.path.getsize(file_path) if os.path.exists(file_path) else 0,
"added_at": datetime.now().isoformat(),
"description": description or ""
}
# Перевіряємо на дублікати
for existing_file in metadata.get("data_files", []):
if existing_file.get("hash") == file_hash:
logger.warning(f"Файл вже існує в сесії: {file_path}")
return True
# Додаємо файл до списку
metadata.setdefault("data_files", []).append(file_info)
# Оновлюємо метадані
self._save_session_metadata(session_id, metadata)
logger.info(f"Додано файл даних до сесії {session_id}: {file_path}")
return True
def remove_data_file(self, session_id, file_path_or_hash):
"""
Видалення інформації про файл даних із сесії.
Args:
session_id (str): Ідентифікатор сесії
file_path_or_hash (str): Шлях до файлу або його хеш
Returns:
bool: True, якщо дані успішно видалені, False у випадку помилки
"""
session_dir = self.get_session_dir(session_id)
if not session_dir:
return False
# Отримуємо поточні метадані сесії
metadata = self._get_session_metadata(session_id)
if not metadata:
return False
# Шукаємо файл за шляхом або хешем
file_found = False
updated_files = []
for file_info in metadata.get("data_files", []):
if file_info.get("path") == file_path_or_hash or file_info.get("hash") == file_path_or_hash:
file_found = True
# Файл може бути фізично видалений, якщо він знаходиться в директорії сесії
if file_info.get("path").startswith(str(session_dir)):
try:
os.remove(file_info.get("path"))
logger.info(f"Фізично видалено файл: {file_info.get('path')}")
except Exception as e:
logger.warning(f"Не вдалося видалити файл {file_info.get('path')}: {e}")
else:
updated_files.append(file_info)
if not file_found:
logger.warning(f"Файл не знайдено в сесії: {file_path_or_hash}")
return False
# Оновлюємо список файлів
metadata["data_files"] = updated_files
# Оновлюємо метадані
self._save_session_metadata(session_id, metadata)
logger.info(f"Видалено файл з сесії {session_id}: {file_path_or_hash}")
return True
def get_session_files(self, session_id):
"""
Отримання списку файлів даних сесії.
Args:
session_id (str): Ідентифікатор сесії
Returns:
list: Список інформації про файли або порожній список у випадку помилки
"""
# Отримуємо поточні метадані сесії
metadata = self._get_session_metadata(session_id)
if not metadata:
return []
return metadata.get("data_files", [])
def save_merged_data(self, session_id, merged_df, output_filename=None):
"""
Збереження об'єднаних даних у сесію.
Args:
session_id (str): Ідентифікатор сесії
merged_df (pandas.DataFrame): DataFrame з об'єднаними даними
output_filename (str, optional): Ім'я файлу для збереження. Якщо None, генерується автоматично.
Returns:
str: Шлях до збереженого файлу або None у випадку помилки
"""
session_data_dir = self.get_session_data_dir(session_id)
if not session_data_dir:
return None
try:
# Генеруємо ім'я файлу, якщо не вказано
if not output_filename:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_filename = f"merged_data_{timestamp}.csv"
# Переконуємося, що файл має розширення .csv
if not output_filename.lower().endswith(".csv"):
output_filename += ".csv"
# Шлях для збереження
output_path = session_data_dir / output_filename
# Зберігаємо DataFrame у CSV
merged_df.to_csv(output_path, index=False)
# Додаємо інформацію про файл до сесії
self.add_data_file(
session_id,
str(output_path),
file_type="merged",
description="Об'єднані дані"
)
logger.info(f"Збережено об'єднані дані у сесії {session_id}: {output_path}")
return str(output_path)
except Exception as e:
logger.error(f"Помилка при збереженні об'єднаних даних: {e}")
return None
def cleanup_session(self, session_id):
"""
Очищення сесії (видалення всіх файлів і директорій).
Args:
session_id (str): Ідентифікатор сесії
Returns:
bool: True, якщо сесія успішно очищена, False у випадку помилки
"""
session_dir = self.base_dir / session_id
if not session_dir.exists():
logger.warning(f"Сесія не знайдена: {session_id}")
return False
try:
# Видаляємо всю директорію сесії
shutil.rmtree(session_dir)
logger.info(f"Сесію {session_id} успішно очищено")
return True
except Exception as e:
logger.error(f"Помилка при очищенні сесії {session_id}: {e}")
return False
def cleanup_old_sessions(self, max_age_hours=24):
"""
Очищення застарілих сесій.
Args:
max_age_hours (int): Максимальний вік сесії в годинах для збереження
Returns:
int: Кількість видалених сесій
"""
cutoff_time = datetime.now() - timedelta(hours=max_age_hours)
deleted_count = 0
# Перебираємо всі підпапки в базовій директорії
for session_dir in self.base_dir.iterdir():
if not session_dir.is_dir():
continue
# Перевіряємо час останнього доступу до сесії
metadata_file = session_dir / "metadata.json"
if not metadata_file.exists():
# Якщо немає метаданих, видаляємо директорію
try:
shutil.rmtree(session_dir)
deleted_count += 1
logger.info(f"Видалено сесію без метаданих: {session_dir.name}")
except Exception as e:
logger.error(f"Помилка при видаленні сесії {session_dir.name}: {e}")
continue
try:
with open(metadata_file, "r", encoding="utf-8") as f:
metadata = json.load(f)
last_accessed = datetime.fromisoformat(metadata.get("last_accessed", metadata.get("created_at")))
if last_accessed < cutoff_time:
# Сесія застаріла, видаляємо її
shutil.rmtree(session_dir)
deleted_count += 1
logger.info(f"Видалено застарілу сесію: {session_dir.name}, "
f"останній доступ: {last_accessed.isoformat()}")
except Exception as e:
logger.error(f"Помилка при перевірці сесії {session_dir.name}: {e}")
logger.info(f"Очищено {deleted_count} застарілих сесій")
return deleted_count
def _save_session_metadata(self, session_id, metadata):
"""
Збереження метаданих сесії.
Args:
session_id (str): Ідентифікатор сесії
metadata (dict): Метадані для збереження
Returns:
bool: True, якщо метадані успішно збережені, False у випадку помилки
"""
session_dir = self.base_dir / session_id
if not session_dir.exists():
logger.warning(f"Сесія не знайдена: {session_id}")
return False
metadata_file = session_dir / "metadata.json"
try:
with open(metadata_file, "w", encoding="utf-8") as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
logger.error(f"Помилка при збереженні метаданих сесії {session_id}: {e}")
return False
def _get_session_metadata(self, session_id):
"""
Отримання метаданих сесії.
Args:
session_id (str): Ідентифікатор сесії
Returns:
dict: Метадані сесії або None у випадку помилки
"""
session_dir = self.base_dir / session_id
metadata_file = session_dir / "metadata.json"
if not metadata_file.exists():
logger.warning(f"Метадані сесії не знайдені: {session_id}")
return None
try:
with open(metadata_file, "r", encoding="utf-8") as f:
metadata = json.load(f)
return metadata
except Exception as e:
logger.error(f"Помилка при читанні метаданих сесії {session_id}: {e}")
return None
def _update_session_access_time(self, session_id):
"""
Оновлення часу останнього доступу до сесії.
Args:
session_id (str): Ідентифікатор сесії
Returns:
bool: True, якщо час доступу успішно оновлено, False у випадку помилки
"""
metadata = self._get_session_metadata(session_id)
if not metadata:
return False
metadata["last_accessed"] = datetime.now().isoformat()
return self._save_session_metadata(session_id, metadata)
@staticmethod
def _generate_file_hash(file_path):
"""
Генерує хеш для файлу на основі його вмісту або шляху.
Args:
file_path (str): Шлях до файлу
Returns:
str: Хеш файлу
"""
try:
if os.path.exists(file_path):
# Для невеликих файлів використовуємо вміст файлу
if os.path.getsize(file_path) < 10 * 1024 * 1024: # < 10 MB
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256.update(byte_block)
return sha256.hexdigest()
else:
# Для великих файлів використовуємо шлях, розмір і час модифікації
file_stat = os.stat(file_path)
hash_input = f"{file_path}_{file_stat.st_size}_{file_stat.st_mtime}"
return hashlib.md5(hash_input.encode()).hexdigest()
else:
# Якщо файл не існує, повертаємо хеш шляху
return hashlib.md5(file_path.encode()).hexdigest()
except Exception as e:
logger.warning(f"Помилка при генерації хешу файлу {file_path}: {e}")
# У випадку помилки, повертаємо хеш шляху
return hashlib.md5(str(file_path).encode()).hexdigest()