Spaces:
Sleeping
Sleeping
File size: 9,401 Bytes
14e32e0 2f58804 14e32e0 b8ca8ae 14e32e0 2f58804 14e32e0 2f58804 14e32e0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
import asyncio
import logging
from datetime import datetime
from typing import Optional, List
import os
import numpy as np
import json
# Import avec gestion d'erreurs pour compatibilité
try:
from .vector_storage import HFVectorStorage
from .tmdb_service import TMDBService
from .embedding_service import EmbeddingService
except ImportError:
try:
from app.services.vector_storage import HFVectorStorage
from app.services.tmdb_service import TMDBService
from app.services.embedding_service import EmbeddingService
except ImportError:
from services.vector_storage import HFVectorStorage
from services.tmdb_service import TMDBService
from services.embedding_service import EmbeddingService
logger = logging.getLogger(__name__)
class VectorUpdater:
def __init__(self):
self.storage = HFVectorStorage()
self.tmdb_service = TMDBService()
self.embedding_service = EmbeddingService()
self.is_updating = False
self.last_update_result = None
self.update_logs = []
def add_log(self, message: str, level: str = "INFO"):
"""Ajouter un log avec timestamp"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {level}: {message}"
self.update_logs.append(log_entry)
# Garder seulement les 100 derniers logs
if len(self.update_logs) > 100:
self.update_logs = self.update_logs[-100:]
# Log également dans le système de logging
log_level = getattr(logging, level.upper(), logging.INFO)
logger.log(log_level, message)
async def update_vectors_if_needed(self) -> bool:
"""Met à jour les vecteurs si nécessaire"""
if self.is_updating:
self.add_log("Update already in progress, skipping...", "WARNING")
return False
if not self.storage.check_update_needed():
self.add_log("Vectors are up to date, no update needed", "INFO")
return False
if not os.getenv('AUTO_UPDATE_VECTORS', 'false').lower() == 'true':
self.add_log("Auto update disabled", "INFO")
return False
self.add_log("Auto update conditions met, starting update...", "INFO")
return await self.force_update_vectors()
async def force_update_vectors(self) -> bool:
"""Force la mise à jour des vecteurs"""
if self.is_updating:
self.add_log("Update already in progress", "WARNING")
return False
self.is_updating = True
self.add_log("Starting vector update process...", "INFO")
try:
# Paramètres de configuration
batch_size = int(os.getenv('BATCH_SIZE', 100))
max_movies = int(os.getenv('MAX_MOVIES_LIMIT', 10000))
# Récupérer les films populaires depuis TMDB
self.add_log("Fetching movies from TMDB...", "INFO")
movies = await self.tmdb_service.get_popular_movies(limit=max_movies)
if not movies:
self.add_log("No movies fetched from TMDB", "ERROR")
self.last_update_result = {"success": False, "error": "No movies fetched"}
return False
self.add_log(f"Processing {len(movies)} movies in batches of {batch_size}", "INFO")
all_embeddings = []
processed_movies = []
id_map = {}
# Traiter par batches pour éviter les timeouts
for i in range(0, len(movies), batch_size):
batch = movies[i:i + batch_size]
batch_num = i//batch_size + 1
total_batches = (len(movies)-1)//batch_size + 1
self.add_log(f"Processing batch {batch_num}/{total_batches}", "INFO")
# Générer les embeddings pour le batch
batch_embeddings = await self.embedding_service.generate_batch_embeddings(batch)
if batch_embeddings is not None:
all_embeddings.extend(batch_embeddings)
for j, movie in enumerate(batch):
processed_movies.append(movie)
id_map[movie['id']] = len(processed_movies) - 1
else:
self.add_log(f"Failed to generate embeddings for batch {batch_num}", "WARNING")
# Pause entre les batches pour éviter le rate limiting
await asyncio.sleep(1)
if not all_embeddings:
self.add_log("No embeddings generated", "ERROR")
self.last_update_result = {"success": False, "error": "No embeddings generated"}
return False
# Convertir en numpy array
embeddings_array = np.array(all_embeddings)
self.add_log(f"Generated {len(all_embeddings)} embeddings", "INFO")
# Sauvegarder sur HF Hub avec gestion d'erreur améliorée
metadata = {
'update_timestamp': datetime.now().isoformat(),
'total_movies': len(processed_movies),
'embedding_model': getattr(self.embedding_service, 'model_name', 'unknown'),
'tmdb_api_version': '3',
'batch_size': batch_size,
'max_movies_limit': max_movies
}
self.add_log("Saving vectors to HF Hub...", "INFO")
try:
success = self.storage.save_vectors(
embeddings_array,
processed_movies,
id_map,
metadata
)
if success:
self.add_log(f"Successfully updated {len(processed_movies)} movie vectors", "INFO")
self.last_update_result = {
"success": True,
"count": len(processed_movies),
"timestamp": datetime.now().isoformat()
}
return True
else:
# If HF Hub save fails, at least save locally
self.add_log("HF Hub save failed, attempting local backup...", "WARNING")
try:
# Save as backup files
backup_dir = "/tmp/karl_backup"
os.makedirs(backup_dir, exist_ok=True)
np.save(f"{backup_dir}/embeddings_backup.npy", embeddings_array)
with open(f"{backup_dir}/movies_backup.json", "w") as f:
json.dump(processed_movies, f)
with open(f"{backup_dir}/metadata_backup.json", "w") as f:
json.dump(metadata, f)
self.add_log(f"Backup saved locally to {backup_dir}", "INFO")
self.last_update_result = {
"success": False,
"error": "HF Hub save failed, local backup created",
"backup_location": backup_dir,
"count": len(processed_movies),
"timestamp": datetime.now().isoformat()
}
return False
except Exception as backup_error:
self.add_log(f"Local backup also failed: {backup_error}", "ERROR")
self.last_update_result = {"success": False, "error": f"Both HF Hub and local backup failed: {backup_error}"}
return False
except Exception as save_error:
self.add_log(f"Error during save operation: {save_error}", "ERROR")
self.last_update_result = {"success": False, "error": f"Save operation failed: {save_error}"}
return False
except Exception as e:
self.add_log(f"Error during vector update: {e}", "ERROR")
self.last_update_result = {"success": False, "error": str(e)}
return False
finally:
self.is_updating = False
self.add_log("Vector update process completed", "INFO")
def get_update_status(self) -> dict:
"""Retourne le statut de la mise à jour"""
return {
'is_updating': self.is_updating,
'auto_update_enabled': os.getenv('AUTO_UPDATE_VECTORS', 'false').lower() == 'true',
'update_interval_hours': int(os.getenv('UPDATE_INTERVAL_HOURS', 24)),
'batch_size': int(os.getenv('BATCH_SIZE', 100)),
'max_movies_limit': int(os.getenv('MAX_MOVIES_LIMIT', 10000)),
'last_update_result': self.last_update_result,
'logs_count': len(self.update_logs),
'hf_configured': bool(os.getenv('HF_TOKEN') and os.getenv('HF_DATASET_REPO'))
}
def get_logs(self) -> List[str]:
"""Retourne les logs de mise à jour"""
return self.update_logs.copy() |