import asyncio import logging from datetime import datetime from typing import Optional, List import os import numpy as np import json # Import avec gestion d'erreurs pour compatibilité try: from .vector_storage import HFVectorStorage from .tmdb_service import TMDBService from .embedding_service import EmbeddingService except ImportError: try: from app.services.vector_storage import HFVectorStorage from app.services.tmdb_service import TMDBService from app.services.embedding_service import EmbeddingService except ImportError: from services.vector_storage import HFVectorStorage from services.tmdb_service import TMDBService from services.embedding_service import EmbeddingService logger = logging.getLogger(__name__) class VectorUpdater: def __init__(self): self.storage = HFVectorStorage() self.tmdb_service = TMDBService() self.embedding_service = EmbeddingService() self.is_updating = False self.last_update_result = None self.update_logs = [] def add_log(self, message: str, level: str = "INFO"): """Ajouter un log avec timestamp""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] {level}: {message}" self.update_logs.append(log_entry) # Garder seulement les 100 derniers logs if len(self.update_logs) > 100: self.update_logs = self.update_logs[-100:] # Log également dans le système de logging log_level = getattr(logging, level.upper(), logging.INFO) logger.log(log_level, message) async def update_vectors_if_needed(self) -> bool: """Met à jour les vecteurs si nécessaire""" if self.is_updating: self.add_log("Update already in progress, skipping...", "WARNING") return False if not self.storage.check_update_needed(): self.add_log("Vectors are up to date, no update needed", "INFO") return False if not os.getenv('AUTO_UPDATE_VECTORS', 'false').lower() == 'true': self.add_log("Auto update disabled", "INFO") return False self.add_log("Auto update conditions met, starting update...", "INFO") return await self.force_update_vectors() async def force_update_vectors(self) -> bool: """Force la mise à jour des vecteurs""" if self.is_updating: self.add_log("Update already in progress", "WARNING") return False self.is_updating = True self.add_log("Starting vector update process...", "INFO") try: # Paramètres de configuration batch_size = int(os.getenv('BATCH_SIZE', 100)) max_movies = int(os.getenv('MAX_MOVIES_LIMIT', 10000)) # Récupérer les films populaires depuis TMDB self.add_log("Fetching movies from TMDB...", "INFO") movies = await self.tmdb_service.get_popular_movies(limit=max_movies) if not movies: self.add_log("No movies fetched from TMDB", "ERROR") self.last_update_result = {"success": False, "error": "No movies fetched"} return False self.add_log(f"Processing {len(movies)} movies in batches of {batch_size}", "INFO") all_embeddings = [] processed_movies = [] id_map = {} # Traiter par batches pour éviter les timeouts for i in range(0, len(movies), batch_size): batch = movies[i:i + batch_size] batch_num = i//batch_size + 1 total_batches = (len(movies)-1)//batch_size + 1 self.add_log(f"Processing batch {batch_num}/{total_batches}", "INFO") # Générer les embeddings pour le batch batch_embeddings = await self.embedding_service.generate_batch_embeddings(batch) if batch_embeddings is not None: all_embeddings.extend(batch_embeddings) for j, movie in enumerate(batch): processed_movies.append(movie) id_map[movie['id']] = len(processed_movies) - 1 else: self.add_log(f"Failed to generate embeddings for batch {batch_num}", "WARNING") # Pause entre les batches pour éviter le rate limiting await asyncio.sleep(1) if not all_embeddings: self.add_log("No embeddings generated", "ERROR") self.last_update_result = {"success": False, "error": "No embeddings generated"} return False # Convertir en numpy array embeddings_array = np.array(all_embeddings) self.add_log(f"Generated {len(all_embeddings)} embeddings", "INFO") # Sauvegarder sur HF Hub avec gestion d'erreur améliorée metadata = { 'update_timestamp': datetime.now().isoformat(), 'total_movies': len(processed_movies), 'embedding_model': getattr(self.embedding_service, 'model_name', 'unknown'), 'tmdb_api_version': '3', 'batch_size': batch_size, 'max_movies_limit': max_movies } self.add_log("Saving vectors to HF Hub...", "INFO") try: success = self.storage.save_vectors( embeddings_array, processed_movies, id_map, metadata ) if success: self.add_log(f"Successfully updated {len(processed_movies)} movie vectors", "INFO") self.last_update_result = { "success": True, "count": len(processed_movies), "timestamp": datetime.now().isoformat() } return True else: # If HF Hub save fails, at least save locally self.add_log("HF Hub save failed, attempting local backup...", "WARNING") try: # Save as backup files backup_dir = "/tmp/karl_backup" os.makedirs(backup_dir, exist_ok=True) np.save(f"{backup_dir}/embeddings_backup.npy", embeddings_array) with open(f"{backup_dir}/movies_backup.json", "w") as f: json.dump(processed_movies, f) with open(f"{backup_dir}/metadata_backup.json", "w") as f: json.dump(metadata, f) self.add_log(f"Backup saved locally to {backup_dir}", "INFO") self.last_update_result = { "success": False, "error": "HF Hub save failed, local backup created", "backup_location": backup_dir, "count": len(processed_movies), "timestamp": datetime.now().isoformat() } return False except Exception as backup_error: self.add_log(f"Local backup also failed: {backup_error}", "ERROR") self.last_update_result = {"success": False, "error": f"Both HF Hub and local backup failed: {backup_error}"} return False except Exception as save_error: self.add_log(f"Error during save operation: {save_error}", "ERROR") self.last_update_result = {"success": False, "error": f"Save operation failed: {save_error}"} return False except Exception as e: self.add_log(f"Error during vector update: {e}", "ERROR") self.last_update_result = {"success": False, "error": str(e)} return False finally: self.is_updating = False self.add_log("Vector update process completed", "INFO") def get_update_status(self) -> dict: """Retourne le statut de la mise à jour""" return { 'is_updating': self.is_updating, 'auto_update_enabled': os.getenv('AUTO_UPDATE_VECTORS', 'false').lower() == 'true', 'update_interval_hours': int(os.getenv('UPDATE_INTERVAL_HOURS', 24)), 'batch_size': int(os.getenv('BATCH_SIZE', 100)), 'max_movies_limit': int(os.getenv('MAX_MOVIES_LIMIT', 10000)), 'last_update_result': self.last_update_result, 'logs_count': len(self.update_logs), 'hf_configured': bool(os.getenv('HF_TOKEN') and os.getenv('HF_DATASET_REPO')) } def get_logs(self) -> List[str]: """Retourne les logs de mise à jour""" return self.update_logs.copy()