Spaces:
Sleeping
Sleeping
yonnel
Enhance HF cache setup and error handling in vector storage; add local backup functionality for vector saving
2f58804
import asyncio | |
import logging | |
from datetime import datetime | |
from typing import Optional, List | |
import os | |
import numpy as np | |
import json | |
# Import avec gestion d'erreurs pour compatibilité | |
try: | |
from .vector_storage import HFVectorStorage | |
from .tmdb_service import TMDBService | |
from .embedding_service import EmbeddingService | |
except ImportError: | |
try: | |
from app.services.vector_storage import HFVectorStorage | |
from app.services.tmdb_service import TMDBService | |
from app.services.embedding_service import EmbeddingService | |
except ImportError: | |
from services.vector_storage import HFVectorStorage | |
from services.tmdb_service import TMDBService | |
from services.embedding_service import EmbeddingService | |
logger = logging.getLogger(__name__) | |
class VectorUpdater: | |
def __init__(self): | |
self.storage = HFVectorStorage() | |
self.tmdb_service = TMDBService() | |
self.embedding_service = EmbeddingService() | |
self.is_updating = False | |
self.last_update_result = None | |
self.update_logs = [] | |
def add_log(self, message: str, level: str = "INFO"): | |
"""Ajouter un log avec timestamp""" | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
log_entry = f"[{timestamp}] {level}: {message}" | |
self.update_logs.append(log_entry) | |
# Garder seulement les 100 derniers logs | |
if len(self.update_logs) > 100: | |
self.update_logs = self.update_logs[-100:] | |
# Log également dans le système de logging | |
log_level = getattr(logging, level.upper(), logging.INFO) | |
logger.log(log_level, message) | |
async def update_vectors_if_needed(self) -> bool: | |
"""Met à jour les vecteurs si nécessaire""" | |
if self.is_updating: | |
self.add_log("Update already in progress, skipping...", "WARNING") | |
return False | |
if not self.storage.check_update_needed(): | |
self.add_log("Vectors are up to date, no update needed", "INFO") | |
return False | |
if not os.getenv('AUTO_UPDATE_VECTORS', 'false').lower() == 'true': | |
self.add_log("Auto update disabled", "INFO") | |
return False | |
self.add_log("Auto update conditions met, starting update...", "INFO") | |
return await self.force_update_vectors() | |
async def force_update_vectors(self) -> bool: | |
"""Force la mise à jour des vecteurs""" | |
if self.is_updating: | |
self.add_log("Update already in progress", "WARNING") | |
return False | |
self.is_updating = True | |
self.add_log("Starting vector update process...", "INFO") | |
try: | |
# Paramètres de configuration | |
batch_size = int(os.getenv('BATCH_SIZE', 100)) | |
max_movies = int(os.getenv('MAX_MOVIES_LIMIT', 10000)) | |
# Récupérer les films populaires depuis TMDB | |
self.add_log("Fetching movies from TMDB...", "INFO") | |
movies = await self.tmdb_service.get_popular_movies(limit=max_movies) | |
if not movies: | |
self.add_log("No movies fetched from TMDB", "ERROR") | |
self.last_update_result = {"success": False, "error": "No movies fetched"} | |
return False | |
self.add_log(f"Processing {len(movies)} movies in batches of {batch_size}", "INFO") | |
all_embeddings = [] | |
processed_movies = [] | |
id_map = {} | |
# Traiter par batches pour éviter les timeouts | |
for i in range(0, len(movies), batch_size): | |
batch = movies[i:i + batch_size] | |
batch_num = i//batch_size + 1 | |
total_batches = (len(movies)-1)//batch_size + 1 | |
self.add_log(f"Processing batch {batch_num}/{total_batches}", "INFO") | |
# Générer les embeddings pour le batch | |
batch_embeddings = await self.embedding_service.generate_batch_embeddings(batch) | |
if batch_embeddings is not None: | |
all_embeddings.extend(batch_embeddings) | |
for j, movie in enumerate(batch): | |
processed_movies.append(movie) | |
id_map[movie['id']] = len(processed_movies) - 1 | |
else: | |
self.add_log(f"Failed to generate embeddings for batch {batch_num}", "WARNING") | |
# Pause entre les batches pour éviter le rate limiting | |
await asyncio.sleep(1) | |
if not all_embeddings: | |
self.add_log("No embeddings generated", "ERROR") | |
self.last_update_result = {"success": False, "error": "No embeddings generated"} | |
return False | |
# Convertir en numpy array | |
embeddings_array = np.array(all_embeddings) | |
self.add_log(f"Generated {len(all_embeddings)} embeddings", "INFO") | |
# Sauvegarder sur HF Hub avec gestion d'erreur améliorée | |
metadata = { | |
'update_timestamp': datetime.now().isoformat(), | |
'total_movies': len(processed_movies), | |
'embedding_model': getattr(self.embedding_service, 'model_name', 'unknown'), | |
'tmdb_api_version': '3', | |
'batch_size': batch_size, | |
'max_movies_limit': max_movies | |
} | |
self.add_log("Saving vectors to HF Hub...", "INFO") | |
try: | |
success = self.storage.save_vectors( | |
embeddings_array, | |
processed_movies, | |
id_map, | |
metadata | |
) | |
if success: | |
self.add_log(f"Successfully updated {len(processed_movies)} movie vectors", "INFO") | |
self.last_update_result = { | |
"success": True, | |
"count": len(processed_movies), | |
"timestamp": datetime.now().isoformat() | |
} | |
return True | |
else: | |
# If HF Hub save fails, at least save locally | |
self.add_log("HF Hub save failed, attempting local backup...", "WARNING") | |
try: | |
# Save as backup files | |
backup_dir = "/tmp/karl_backup" | |
os.makedirs(backup_dir, exist_ok=True) | |
np.save(f"{backup_dir}/embeddings_backup.npy", embeddings_array) | |
with open(f"{backup_dir}/movies_backup.json", "w") as f: | |
json.dump(processed_movies, f) | |
with open(f"{backup_dir}/metadata_backup.json", "w") as f: | |
json.dump(metadata, f) | |
self.add_log(f"Backup saved locally to {backup_dir}", "INFO") | |
self.last_update_result = { | |
"success": False, | |
"error": "HF Hub save failed, local backup created", | |
"backup_location": backup_dir, | |
"count": len(processed_movies), | |
"timestamp": datetime.now().isoformat() | |
} | |
return False | |
except Exception as backup_error: | |
self.add_log(f"Local backup also failed: {backup_error}", "ERROR") | |
self.last_update_result = {"success": False, "error": f"Both HF Hub and local backup failed: {backup_error}"} | |
return False | |
except Exception as save_error: | |
self.add_log(f"Error during save operation: {save_error}", "ERROR") | |
self.last_update_result = {"success": False, "error": f"Save operation failed: {save_error}"} | |
return False | |
except Exception as e: | |
self.add_log(f"Error during vector update: {e}", "ERROR") | |
self.last_update_result = {"success": False, "error": str(e)} | |
return False | |
finally: | |
self.is_updating = False | |
self.add_log("Vector update process completed", "INFO") | |
def get_update_status(self) -> dict: | |
"""Retourne le statut de la mise à jour""" | |
return { | |
'is_updating': self.is_updating, | |
'auto_update_enabled': os.getenv('AUTO_UPDATE_VECTORS', 'false').lower() == 'true', | |
'update_interval_hours': int(os.getenv('UPDATE_INTERVAL_HOURS', 24)), | |
'batch_size': int(os.getenv('BATCH_SIZE', 100)), | |
'max_movies_limit': int(os.getenv('MAX_MOVIES_LIMIT', 10000)), | |
'last_update_result': self.last_update_result, | |
'logs_count': len(self.update_logs), | |
'hf_configured': bool(os.getenv('HF_TOKEN') and os.getenv('HF_DATASET_REPO')) | |
} | |
def get_logs(self) -> List[str]: | |
"""Retourne les logs de mise à jour""" | |
return self.update_logs.copy() |