Spaces:
Sleeping
Sleeping
File size: 8,132 Bytes
14e32e0 2f58804 14e32e0 2f58804 14e32e0 b8ca8ae 14e32e0 b8ca8ae 14e32e0 2f58804 14e32e0 5e003fe 14e32e0 5e003fe 14e32e0 5e003fe 14e32e0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
from datasets import Dataset, load_dataset
from huggingface_hub import HfApi, create_repo
import numpy as np
import json
import logging
from typing import Dict, List, Tuple, Optional
import os
from datetime import datetime
import tempfile
logger = logging.getLogger(__name__)
class HFVectorStorage:
def __init__(self):
self.hf_token = os.getenv('HF_TOKEN')
self.repo_name = os.getenv('HF_DATASET_REPO')
# Configure HF cache directory to a writable location
self._setup_hf_cache()
if self.hf_token and self.repo_name:
self.api = HfApi(token=self.hf_token)
# Créer le repo s'il n'existe pas
try:
create_repo(
repo_id=self.repo_name,
repo_type="dataset",
token=self.hf_token,
private=True,
exist_ok=True
)
except Exception as e:
logger.warning(f"Repo creation warning: {e}")
else:
self.api = None
logger.warning("HF_TOKEN or HF_DATASET_REPO not configured")
def _setup_hf_cache(self):
"""Setup HF cache directory to avoid permission issues"""
try:
# Try to use a writable cache directory
cache_dirs = [
os.getenv('HF_HOME'),
os.getenv('XDG_CACHE_HOME'),
os.path.expanduser('~/.cache/huggingface'),
'/tmp/hf_cache',
tempfile.gettempdir() + '/hf_cache'
]
for cache_dir in cache_dirs:
if cache_dir:
try:
os.makedirs(cache_dir, exist_ok=True)
# Test write permission
test_file = os.path.join(cache_dir, 'test_write')
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
# Set environment variables for HF
os.environ['HF_HOME'] = cache_dir
os.environ['HUGGINGFACE_HUB_CACHE'] = cache_dir
logger.info(f"Using HF cache directory: {cache_dir}")
return
except (OSError, PermissionError):
continue
logger.warning("Could not find writable cache directory, using default")
except Exception as e:
logger.warning(f"Error setting up HF cache: {e}")
def save_vectors(self, embeddings: np.ndarray, movies_data: List[Dict],
id_map: Dict, metadata: Dict) -> bool:
"""Sauvegarde les vecteurs sur HF Dataset Hub"""
try:
if not self.hf_token or not self.repo_name:
logger.error("HF_TOKEN or HF_DATASET_REPO not configured")
return False
# Préparer les données pour le dataset
dataset_dict = {
'movie_id': [movie['id'] for movie in movies_data],
'title': [movie['title'] for movie in movies_data],
'overview': [movie.get('overview', '') for movie in movies_data],
'genres': [movie.get('genres', []) for movie in movies_data],
'release_date': [movie.get('release_date', '') for movie in movies_data],
'embedding': embeddings.tolist(),
'tmdb_data': [json.dumps(movie) for movie in movies_data]
}
# Créer le dataset
dataset = Dataset.from_dict(dataset_dict)
# Upload vers HF Hub
dataset.push_to_hub(
self.repo_name,
token=self.hf_token,
commit_message=f"Update vectors - {datetime.now().isoformat()}"
)
# Sauvegarder les métadonnées avec un fichier temporaire dans un répertoire accessible
metadata_with_timestamp = {
**metadata,
'last_updated': datetime.now().isoformat(),
'total_movies': len(movies_data)
}
# Utiliser un répertoire temporaire avec permissions appropriées
temp_file = os.path.join(tempfile.gettempdir(), f'karl_metadata_{os.getpid()}.json')
try:
with open(temp_file, 'w') as f:
json.dump(metadata_with_timestamp, f, indent=2)
self.api.upload_file(
path_or_fileobj=temp_file,
path_in_repo='metadata.json',
repo_id=self.repo_name,
repo_type='dataset',
token=self.hf_token,
commit_message=f"Update metadata - {datetime.now().isoformat()}"
)
logger.info(f"Successfully saved {len(movies_data)} movie vectors to HF Hub")
return True
finally:
# Nettoyer le fichier temporaire
try:
if os.path.exists(temp_file):
os.remove(temp_file)
except Exception as cleanup_error:
logger.warning(f"Could not remove temp file {temp_file}: {cleanup_error}")
except Exception as e:
logger.error(f"Error saving vectors to HF Hub: {e}")
return False
def load_vectors(self) -> Optional[Tuple[np.ndarray, List[Dict], Dict, Dict]]:
"""Charge les vecteurs depuis HF Dataset Hub"""
try:
if not self.hf_token or not self.repo_name:
logger.error("HF_TOKEN or HF_DATASET_REPO not configured")
return None
# Charger le dataset
dataset = load_dataset(self.repo_name, token=self.hf_token)['train']
# Extraire les données
embeddings = np.array(dataset['embedding'])
movies_data = []
id_map = {}
for i, movie_id in enumerate(dataset['movie_id']):
movie_data = json.loads(dataset['tmdb_data'][i])
movies_data.append(movie_data)
id_map[movie_id] = i
# Charger les métadonnées
try:
metadata_file = self.api.hf_hub_download(
repo_id=self.repo_name,
filename='metadata.json',
repo_type='dataset',
token=self.hf_token
)
with open(metadata_file, 'r') as f:
metadata = json.load(f)
except:
metadata = {'last_updated': None}
logger.info(f"Successfully loaded {len(movies_data)} movie vectors from HF Hub")
return embeddings, movies_data, id_map, metadata
except Exception as e:
logger.error(f"Error loading vectors from HF Hub: {e}")
return None
def check_update_needed(self) -> bool:
"""Vérifie si une mise à jour est nécessaire"""
try:
update_interval = int(os.getenv('UPDATE_INTERVAL_HOURS', 24))
# Charger les métadonnées actuelles
result = self.load_vectors()
if not result:
return True
_, _, _, metadata = result
if not metadata.get('last_updated'):
return True
last_update = datetime.fromisoformat(metadata['last_updated'])
hours_since_update = (datetime.now() - last_update).total_seconds() / 3600
return hours_since_update >= update_interval
except Exception as e:
logger.error(f"Error checking update status: {e}")
return True |