karl-movie-vector-backend / app /services /embedding_service.py
yonnel
Enhance OpenAI client initialization with version compatibility handling and update openai dependency to 1.12.0
945f885
import asyncio
import time
import logging
from typing import List, Optional, Dict
import os
logger = logging.getLogger(__name__)
class EmbeddingService:
"""Service pour générer des embeddings avec OpenAI"""
def __init__(self):
self.model_name = "text-embedding-3-small"
self.client = self._create_openai_client()
def _create_openai_client(self):
"""Create OpenAI client with version compatibility handling"""
try:
from openai import OpenAI
# Try standard initialization first
try:
return OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
except TypeError as e:
if "proxies" in str(e):
# Fallback for version compatibility issues
logger.warning(f"OpenAI client compatibility issue: {e}")
logger.info("Using alternative OpenAI client initialization...")
import httpx
# Create a basic httpx client without proxies
http_client = httpx.Client(timeout=60.0)
return OpenAI(api_key=os.getenv('OPENAI_API_KEY'), http_client=http_client)
else:
raise
except Exception as e:
logger.error(f"Failed to initialize OpenAI client: {e}")
raise
def create_composite_text(self, movie_data: Dict) -> str:
"""Create composite text for embedding from movie data"""
parts = []
# Title
if movie_data.get('title'):
parts.append(f"Title: {movie_data['title']}")
# Tagline
if movie_data.get('tagline'):
parts.append(f"Tagline: {movie_data['tagline']}")
# Overview
if movie_data.get('overview'):
parts.append(f"Overview: {movie_data['overview']}")
# Release date
if movie_data.get('release_date'):
parts.append(f"Release Date: {movie_data['release_date']}")
# Original language
if movie_data.get('original_language'):
parts.append(f"Language: {movie_data['original_language']}")
# Spoken languages
if movie_data.get('spoken_languages'):
languages = [lang.get('iso_639_1', '') for lang in movie_data['spoken_languages'] if lang.get('iso_639_1')]
if languages:
parts.append(f"Spoken Languages: {', '.join(languages)}")
# Genres
if movie_data.get('genres'):
genres = [genre['name'] for genre in movie_data['genres']]
parts.append(f"Genres: {', '.join(genres)}")
# Production companies
if movie_data.get('production_companies'):
companies = [company['name'] for company in movie_data['production_companies']]
if companies:
parts.append(f"Production Companies: {', '.join(companies)}")
# Production countries
if movie_data.get('production_countries'):
countries = [country['name'] for country in movie_data['production_countries']]
if countries:
parts.append(f"Production Countries: {', '.join(countries)}")
# Budget (only if > 0)
if movie_data.get('budget') and movie_data['budget'] > 0:
parts.append(f"Budget: ${movie_data['budget']:,}")
# Popularity
if movie_data.get('popularity'):
parts.append(f"Popularity: {movie_data['popularity']}")
# Vote average
if movie_data.get('vote_average'):
parts.append(f"Vote Average: {movie_data['vote_average']}")
# Vote count
if movie_data.get('vote_count'):
parts.append(f"Vote Count: {movie_data['vote_count']}")
# Director(s)
if movie_data.get('credits', {}).get('crew'):
directors = [person['name'] for person in movie_data['credits']['crew'] if person['job'] == 'Director']
if directors:
parts.append(f"Director: {', '.join(directors)}")
# Top 5 cast
if movie_data.get('credits', {}).get('cast'):
top_cast = [person['name'] for person in movie_data['credits']['cast'][:5]]
if top_cast:
parts.append(f"Cast: {', '.join(top_cast)}")
return " / ".join(parts)
def get_embeddings_batch(self, texts: List[str], max_retries: int = 3) -> Optional[List[List[float]]]:
"""Get embeddings for a batch of texts with retry"""
for attempt in range(max_retries):
try:
response = self.client.embeddings.create(
model=self.model_name,
input=texts
)
return [embedding.embedding for embedding in response.data]
except Exception as e:
logger.error(f"OpenAI API error (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
return None
async def generate_batch_embeddings(self, movies: List[Dict], batch_size: int = 100) -> Optional[List[List[float]]]:
"""Generate embeddings for a batch of movies"""
try:
# Create composite texts
texts = []
for movie in movies:
composite_text = self.create_composite_text(movie)
texts.append(composite_text)
# Generate embeddings in smaller batches to avoid API limits
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i + batch_size]
logger.debug(f"Generating embeddings for batch {i//batch_size + 1}")
batch_embeddings = self.get_embeddings_batch(batch_texts)
if batch_embeddings is None:
logger.error(f"Failed to generate embeddings for batch starting at {i}")
return None
all_embeddings.extend(batch_embeddings)
# Small delay between batches to respect rate limits
await asyncio.sleep(0.1)
return all_embeddings
except Exception as e:
logger.error(f"Error generating batch embeddings: {e}")
return None