File size: 9,401 Bytes
14e32e0
 
 
 
 
 
2f58804
14e32e0
b8ca8ae
 
 
 
 
 
 
 
 
 
 
 
 
 
14e32e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2f58804
14e32e0
 
 
 
 
 
 
 
 
 
2f58804
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14e32e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import asyncio
import logging
from datetime import datetime
from typing import Optional, List
import os
import numpy as np
import json

# Import avec gestion d'erreurs pour compatibilité
try:
    from .vector_storage import HFVectorStorage
    from .tmdb_service import TMDBService
    from .embedding_service import EmbeddingService
except ImportError:
    try:
        from app.services.vector_storage import HFVectorStorage
        from app.services.tmdb_service import TMDBService
        from app.services.embedding_service import EmbeddingService
    except ImportError:
        from services.vector_storage import HFVectorStorage
        from services.tmdb_service import TMDBService
        from services.embedding_service import EmbeddingService

logger = logging.getLogger(__name__)

class VectorUpdater:
    def __init__(self):
        self.storage = HFVectorStorage()
        self.tmdb_service = TMDBService()
        self.embedding_service = EmbeddingService()
        self.is_updating = False
        self.last_update_result = None
        self.update_logs = []
    
    def add_log(self, message: str, level: str = "INFO"):
        """Ajouter un log avec timestamp"""
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        log_entry = f"[{timestamp}] {level}: {message}"
        self.update_logs.append(log_entry)
        
        # Garder seulement les 100 derniers logs
        if len(self.update_logs) > 100:
            self.update_logs = self.update_logs[-100:]
        
        # Log également dans le système de logging
        log_level = getattr(logging, level.upper(), logging.INFO)
        logger.log(log_level, message)
    
    async def update_vectors_if_needed(self) -> bool:
        """Met à jour les vecteurs si nécessaire"""
        if self.is_updating:
            self.add_log("Update already in progress, skipping...", "WARNING")
            return False
        
        if not self.storage.check_update_needed():
            self.add_log("Vectors are up to date, no update needed", "INFO")
            return False
        
        if not os.getenv('AUTO_UPDATE_VECTORS', 'false').lower() == 'true':
            self.add_log("Auto update disabled", "INFO")
            return False
        
        self.add_log("Auto update conditions met, starting update...", "INFO")
        return await self.force_update_vectors()
    
    async def force_update_vectors(self) -> bool:
        """Force la mise à jour des vecteurs"""
        if self.is_updating:
            self.add_log("Update already in progress", "WARNING")
            return False
        
        self.is_updating = True
        self.add_log("Starting vector update process...", "INFO")
        
        try:
            # Paramètres de configuration
            batch_size = int(os.getenv('BATCH_SIZE', 100))
            max_movies = int(os.getenv('MAX_MOVIES_LIMIT', 10000))
            
            # Récupérer les films populaires depuis TMDB
            self.add_log("Fetching movies from TMDB...", "INFO")
            movies = await self.tmdb_service.get_popular_movies(limit=max_movies)
            
            if not movies:
                self.add_log("No movies fetched from TMDB", "ERROR")
                self.last_update_result = {"success": False, "error": "No movies fetched"}
                return False
            
            self.add_log(f"Processing {len(movies)} movies in batches of {batch_size}", "INFO")
            
            all_embeddings = []
            processed_movies = []
            id_map = {}
            
            # Traiter par batches pour éviter les timeouts
            for i in range(0, len(movies), batch_size):
                batch = movies[i:i + batch_size]
                batch_num = i//batch_size + 1
                total_batches = (len(movies)-1)//batch_size + 1
                
                self.add_log(f"Processing batch {batch_num}/{total_batches}", "INFO")
                
                # Générer les embeddings pour le batch
                batch_embeddings = await self.embedding_service.generate_batch_embeddings(batch)
                
                if batch_embeddings is not None:
                    all_embeddings.extend(batch_embeddings)
                    
                    for j, movie in enumerate(batch):
                        processed_movies.append(movie)
                        id_map[movie['id']] = len(processed_movies) - 1
                else:
                    self.add_log(f"Failed to generate embeddings for batch {batch_num}", "WARNING")
                
                # Pause entre les batches pour éviter le rate limiting
                await asyncio.sleep(1)
            
            if not all_embeddings:
                self.add_log("No embeddings generated", "ERROR")
                self.last_update_result = {"success": False, "error": "No embeddings generated"}
                return False
            
            # Convertir en numpy array
            embeddings_array = np.array(all_embeddings)
            self.add_log(f"Generated {len(all_embeddings)} embeddings", "INFO")
            
            # Sauvegarder sur HF Hub avec gestion d'erreur améliorée
            metadata = {
                'update_timestamp': datetime.now().isoformat(),
                'total_movies': len(processed_movies),
                'embedding_model': getattr(self.embedding_service, 'model_name', 'unknown'),
                'tmdb_api_version': '3',
                'batch_size': batch_size,
                'max_movies_limit': max_movies
            }
            
            self.add_log("Saving vectors to HF Hub...", "INFO")
            try:
                success = self.storage.save_vectors(
                    embeddings_array, 
                    processed_movies, 
                    id_map, 
                    metadata
                )
                
                if success:
                    self.add_log(f"Successfully updated {len(processed_movies)} movie vectors", "INFO")
                    self.last_update_result = {
                        "success": True, 
                        "count": len(processed_movies),
                        "timestamp": datetime.now().isoformat()
                    }
                    return True
                else:
                    # If HF Hub save fails, at least save locally
                    self.add_log("HF Hub save failed, attempting local backup...", "WARNING")
                    try:
                        # Save as backup files
                        backup_dir = "/tmp/karl_backup"
                        os.makedirs(backup_dir, exist_ok=True)
                        
                        np.save(f"{backup_dir}/embeddings_backup.npy", embeddings_array)
                        with open(f"{backup_dir}/movies_backup.json", "w") as f:
                            json.dump(processed_movies, f)
                        with open(f"{backup_dir}/metadata_backup.json", "w") as f:
                            json.dump(metadata, f)
                        
                        self.add_log(f"Backup saved locally to {backup_dir}", "INFO")
                        self.last_update_result = {
                            "success": False, 
                            "error": "HF Hub save failed, local backup created",
                            "backup_location": backup_dir,
                            "count": len(processed_movies),
                            "timestamp": datetime.now().isoformat()
                        }
                        return False
                        
                    except Exception as backup_error:
                        self.add_log(f"Local backup also failed: {backup_error}", "ERROR")
                        self.last_update_result = {"success": False, "error": f"Both HF Hub and local backup failed: {backup_error}"}
                        return False
                        
            except Exception as save_error:
                self.add_log(f"Error during save operation: {save_error}", "ERROR")
                self.last_update_result = {"success": False, "error": f"Save operation failed: {save_error}"}
                return False
                
        except Exception as e:
            self.add_log(f"Error during vector update: {e}", "ERROR")
            self.last_update_result = {"success": False, "error": str(e)}
            return False
        
        finally:
            self.is_updating = False
            self.add_log("Vector update process completed", "INFO")
    
    def get_update_status(self) -> dict:
        """Retourne le statut de la mise à jour"""
        return {
            'is_updating': self.is_updating,
            'auto_update_enabled': os.getenv('AUTO_UPDATE_VECTORS', 'false').lower() == 'true',
            'update_interval_hours': int(os.getenv('UPDATE_INTERVAL_HOURS', 24)),
            'batch_size': int(os.getenv('BATCH_SIZE', 100)),
            'max_movies_limit': int(os.getenv('MAX_MOVIES_LIMIT', 10000)),
            'last_update_result': self.last_update_result,
            'logs_count': len(self.update_logs),
            'hf_configured': bool(os.getenv('HF_TOKEN') and os.getenv('HF_DATASET_REPO'))
        }
    
    def get_logs(self) -> List[str]:
        """Retourne les logs de mise à jour"""
        return self.update_logs.copy()