import json import os from typing import List, Dict, Any, Optional from pydantic import BaseModel import uvicorn from fastapi import FastAPI, HTTPException from pinecone import Pinecone , ServerlessSpec import numpy as np from openai import OpenAI # Load environment variables from dotenv import load_dotenv load_dotenv() # Get API keys from environment variables PINECONE_API_KEY = os.getenv('PINECONE_API_KEY') OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') if not PINECONE_API_KEY: raise ValueError("PINECONE_API_KEY environment variable not set") if not OPENAI_API_KEY: raise ValueError("OPENAI_API_KEY environment variable not set") # Create FastAPI app app = FastAPI(title="E-Bikes Semantic Search API", description="API for finding similar e-bikes based on semantic search", version="1.0.0") def build_filter(pt: Optional[str], cat: Optional[str]) -> dict | None: filt = {} if pt: filt["type"] = pt # shorthand $eq if cat: filt["category"] = cat return filt or None # Request and response models class SearchRequest(BaseModel): description: str top_k: int = 3 product_type: str category : str class BikeMatch(BaseModel): id: str name: str type: str description: str score: float class SearchResponse(BaseModel): matches: List[BikeMatch] # Initialize OpenAI client openai_client = OpenAI(api_key=OPENAI_API_KEY) # Define the embedding model using OpenAI class OpenAIEmbedder: def __init__(self, model_name="text-embedding-3-small"): self.model_name = model_name self.client = openai_client self.embedding_dimension = 1536 # Dimension of text-embedding-3-small def encode(self, texts): if isinstance(texts, str): texts = [texts] # Get embeddings from OpenAI response = self.client.embeddings.create( input=texts, model=self.model_name ) # Extract embeddings from response embeddings = [item.embedding for item in response.data] return np.array(embeddings) # Initialize Pinecone client def initialize_pinecone(): pc = Pinecone(api_key=PINECONE_API_KEY) # Define index name index_name = "ebikes-search" # Check if index already exists existing_indexes = pc.list_indexes().names() if index_name not in existing_indexes: # Create index with 1536 dimensions (matches text-embedding-3-small) pc.create_index( name=index_name, dimension=1536, metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1") ) print(f"Created new index: {index_name}") # Connect to the index try: index = pc.Index(index_name) return index except Exception as e: print(f"Error connecting to Pinecone index: {e}") raise # Load the e-bikes data def load_ebikes_data(file_path="data.json"): try: with open(file_path, 'r') as f: data = json.load(f) return data.get('pogo-cycles-data', []) except Exception as e: print(f"Error loading e-bikes data: {e}") return [] # Create embeddings and upload to Pinecone def create_and_upload_embeddings(ebikes_data, encoder, pinecone_index): # Prepare data for indexing ids = [] descriptions = [] metadata = [] for bike in ebikes_data: ids.append(bike['id']) descriptions.append(bike['description']) metadata.append({ 'id': bike['id'], 'name': bike['name'], 'type': bike['product_type'], 'description': bike['description'], 'category': bike['category'] }) # Create embeddings embeddings = encoder.encode(descriptions) # Prepare vectors for Pinecone vectors_to_upsert = [] for i in range(len(ids)): vector = { 'id': ids[i], 'values': embeddings[i].tolist(), 'metadata': metadata[i] } vectors_to_upsert.append(vector) # Upsert vectors to Pinecone pinecone_index.upsert(vectors=vectors_to_upsert) print(f"Uploaded {len(vectors_to_upsert)} embeddings to Pinecone") # Global variables for model and Pinecone index encoder = None pinecone_index = None # Initialize data at startup @app.on_event("startup") async def startup_event(): global encoder, pinecone_index print("Initializing OpenAI embedder...") encoder = OpenAIEmbedder() print("Connecting to Pinecone...") pinecone_index = initialize_pinecone() print("Loading e-bikes data...") ebikes_data = load_ebikes_data("data.json") if not ebikes_data: print("No e-bikes data found, skipping embedding creation") return print("Creating and uploading embeddings...") create_and_upload_embeddings(ebikes_data, encoder, pinecone_index) print("API startup completed successfully!") @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy"} @app.post("/search", response_model=SearchResponse) async def search_ebikes(request:SearchRequest): """ Search for e-bikes similar to the provided description This endpoint uses semantic search to find e-bikes that match the user's description. """ try: # Create embedding for the query query_embedding = encoder.encode(request.description)[0] filter_payload = build_filter(request.product_type, request.category) # Query Pinecone results = pinecone_index.query( vector=query_embedding.tolist(), top_k=3, include_metadata=True, filter=filter_payload ) print("results",results) # Parse results matches = [] for match in results.matches: bike_match = BikeMatch( id=match.metadata.get('id'), name=match.metadata.get('name'), type=match.metadata.get('type'), description=match.metadata.get('description'), score=float(match.score) ) matches.append(bike_match) return SearchResponse(matches=matches) except Exception as e: print(f"Error during search: {e}") raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}") if __name__ == "__main__": uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)