yonnel
Enhance HF cache setup and error handling in vector storage; add local backup functionality for vector saving
2f58804
import asyncio
import logging
from datetime import datetime
from typing import Optional, List
import os
import numpy as np
import json
# Import avec gestion d'erreurs pour compatibilité
try:
from .vector_storage import HFVectorStorage
from .tmdb_service import TMDBService
from .embedding_service import EmbeddingService
except ImportError:
try:
from app.services.vector_storage import HFVectorStorage
from app.services.tmdb_service import TMDBService
from app.services.embedding_service import EmbeddingService
except ImportError:
from services.vector_storage import HFVectorStorage
from services.tmdb_service import TMDBService
from services.embedding_service import EmbeddingService
logger = logging.getLogger(__name__)
class VectorUpdater:
def __init__(self):
self.storage = HFVectorStorage()
self.tmdb_service = TMDBService()
self.embedding_service = EmbeddingService()
self.is_updating = False
self.last_update_result = None
self.update_logs = []
def add_log(self, message: str, level: str = "INFO"):
"""Ajouter un log avec timestamp"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {level}: {message}"
self.update_logs.append(log_entry)
# Garder seulement les 100 derniers logs
if len(self.update_logs) > 100:
self.update_logs = self.update_logs[-100:]
# Log également dans le système de logging
log_level = getattr(logging, level.upper(), logging.INFO)
logger.log(log_level, message)
async def update_vectors_if_needed(self) -> bool:
"""Met à jour les vecteurs si nécessaire"""
if self.is_updating:
self.add_log("Update already in progress, skipping...", "WARNING")
return False
if not self.storage.check_update_needed():
self.add_log("Vectors are up to date, no update needed", "INFO")
return False
if not os.getenv('AUTO_UPDATE_VECTORS', 'false').lower() == 'true':
self.add_log("Auto update disabled", "INFO")
return False
self.add_log("Auto update conditions met, starting update...", "INFO")
return await self.force_update_vectors()
async def force_update_vectors(self) -> bool:
"""Force la mise à jour des vecteurs"""
if self.is_updating:
self.add_log("Update already in progress", "WARNING")
return False
self.is_updating = True
self.add_log("Starting vector update process...", "INFO")
try:
# Paramètres de configuration
batch_size = int(os.getenv('BATCH_SIZE', 100))
max_movies = int(os.getenv('MAX_MOVIES_LIMIT', 10000))
# Récupérer les films populaires depuis TMDB
self.add_log("Fetching movies from TMDB...", "INFO")
movies = await self.tmdb_service.get_popular_movies(limit=max_movies)
if not movies:
self.add_log("No movies fetched from TMDB", "ERROR")
self.last_update_result = {"success": False, "error": "No movies fetched"}
return False
self.add_log(f"Processing {len(movies)} movies in batches of {batch_size}", "INFO")
all_embeddings = []
processed_movies = []
id_map = {}
# Traiter par batches pour éviter les timeouts
for i in range(0, len(movies), batch_size):
batch = movies[i:i + batch_size]
batch_num = i//batch_size + 1
total_batches = (len(movies)-1)//batch_size + 1
self.add_log(f"Processing batch {batch_num}/{total_batches}", "INFO")
# Générer les embeddings pour le batch
batch_embeddings = await self.embedding_service.generate_batch_embeddings(batch)
if batch_embeddings is not None:
all_embeddings.extend(batch_embeddings)
for j, movie in enumerate(batch):
processed_movies.append(movie)
id_map[movie['id']] = len(processed_movies) - 1
else:
self.add_log(f"Failed to generate embeddings for batch {batch_num}", "WARNING")
# Pause entre les batches pour éviter le rate limiting
await asyncio.sleep(1)
if not all_embeddings:
self.add_log("No embeddings generated", "ERROR")
self.last_update_result = {"success": False, "error": "No embeddings generated"}
return False
# Convertir en numpy array
embeddings_array = np.array(all_embeddings)
self.add_log(f"Generated {len(all_embeddings)} embeddings", "INFO")
# Sauvegarder sur HF Hub avec gestion d'erreur améliorée
metadata = {
'update_timestamp': datetime.now().isoformat(),
'total_movies': len(processed_movies),
'embedding_model': getattr(self.embedding_service, 'model_name', 'unknown'),
'tmdb_api_version': '3',
'batch_size': batch_size,
'max_movies_limit': max_movies
}
self.add_log("Saving vectors to HF Hub...", "INFO")
try:
success = self.storage.save_vectors(
embeddings_array,
processed_movies,
id_map,
metadata
)
if success:
self.add_log(f"Successfully updated {len(processed_movies)} movie vectors", "INFO")
self.last_update_result = {
"success": True,
"count": len(processed_movies),
"timestamp": datetime.now().isoformat()
}
return True
else:
# If HF Hub save fails, at least save locally
self.add_log("HF Hub save failed, attempting local backup...", "WARNING")
try:
# Save as backup files
backup_dir = "/tmp/karl_backup"
os.makedirs(backup_dir, exist_ok=True)
np.save(f"{backup_dir}/embeddings_backup.npy", embeddings_array)
with open(f"{backup_dir}/movies_backup.json", "w") as f:
json.dump(processed_movies, f)
with open(f"{backup_dir}/metadata_backup.json", "w") as f:
json.dump(metadata, f)
self.add_log(f"Backup saved locally to {backup_dir}", "INFO")
self.last_update_result = {
"success": False,
"error": "HF Hub save failed, local backup created",
"backup_location": backup_dir,
"count": len(processed_movies),
"timestamp": datetime.now().isoformat()
}
return False
except Exception as backup_error:
self.add_log(f"Local backup also failed: {backup_error}", "ERROR")
self.last_update_result = {"success": False, "error": f"Both HF Hub and local backup failed: {backup_error}"}
return False
except Exception as save_error:
self.add_log(f"Error during save operation: {save_error}", "ERROR")
self.last_update_result = {"success": False, "error": f"Save operation failed: {save_error}"}
return False
except Exception as e:
self.add_log(f"Error during vector update: {e}", "ERROR")
self.last_update_result = {"success": False, "error": str(e)}
return False
finally:
self.is_updating = False
self.add_log("Vector update process completed", "INFO")
def get_update_status(self) -> dict:
"""Retourne le statut de la mise à jour"""
return {
'is_updating': self.is_updating,
'auto_update_enabled': os.getenv('AUTO_UPDATE_VECTORS', 'false').lower() == 'true',
'update_interval_hours': int(os.getenv('UPDATE_INTERVAL_HOURS', 24)),
'batch_size': int(os.getenv('BATCH_SIZE', 100)),
'max_movies_limit': int(os.getenv('MAX_MOVIES_LIMIT', 10000)),
'last_update_result': self.last_update_result,
'logs_count': len(self.update_logs),
'hf_configured': bool(os.getenv('HF_TOKEN') and os.getenv('HF_DATASET_REPO'))
}
def get_logs(self) -> List[str]:
"""Retourne les logs de mise à jour"""
return self.update_logs.copy()