Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| from typing import List, Dict, Any, Optional | |
| from pydantic import BaseModel | |
| import uvicorn | |
| from fastapi import FastAPI, HTTPException | |
| from pinecone import Pinecone , ServerlessSpec | |
| import numpy as np | |
| from openai import OpenAI | |
| # Load environment variables | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Get API keys from environment variables | |
| PINECONE_API_KEY = os.getenv('PINECONE_API_KEY') | |
| OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') | |
| if not PINECONE_API_KEY: | |
| raise ValueError("PINECONE_API_KEY environment variable not set") | |
| if not OPENAI_API_KEY: | |
| raise ValueError("OPENAI_API_KEY environment variable not set") | |
| # Create FastAPI app | |
| app = FastAPI(title="E-Bikes Semantic Search API", | |
| description="API for finding similar e-bikes based on semantic search", | |
| version="1.0.0") | |
| def build_filter(pt: Optional[str], cat: Optional[str]) -> dict | None: | |
| filt = {} | |
| if pt: | |
| filt["product_type"] = pt # shorthand $eq | |
| if cat: | |
| filt["category"] = cat | |
| return filt or None | |
| # Request and response models | |
| class SearchRequest(BaseModel): | |
| description: str | |
| top_k: int = 3 | |
| product_type: Optional[str] = None # "ebike" or "escooter" | |
| category: Optional[str] = None # e.g. "mountain" | |
| class BikeMatch(BaseModel): | |
| id: str | |
| name: str | |
| type: str | |
| description: str | |
| score: float | |
| class SearchResponse(BaseModel): | |
| matches: List[BikeMatch] | |
| # Initialize OpenAI client | |
| openai_client = OpenAI(api_key=OPENAI_API_KEY) | |
| # Define the embedding model using OpenAI | |
| class OpenAIEmbedder: | |
| def __init__(self, model_name="text-embedding-3-small"): | |
| self.model_name = model_name | |
| self.client = openai_client | |
| self.embedding_dimension = 1536 # Dimension of text-embedding-3-small | |
| def encode(self, texts): | |
| if isinstance(texts, str): | |
| texts = [texts] | |
| # Get embeddings from OpenAI | |
| response = self.client.embeddings.create( | |
| input=texts, | |
| model=self.model_name | |
| ) | |
| # Extract embeddings from response | |
| embeddings = [item.embedding for item in response.data] | |
| return np.array(embeddings) | |
| # Initialize Pinecone client | |
| def initialize_pinecone(): | |
| pc = Pinecone(api_key=PINECONE_API_KEY) | |
| # Define index name | |
| index_name = "ebikes-search" | |
| # Check if index already exists | |
| existing_indexes = pc.list_indexes().names() | |
| if index_name not in existing_indexes: | |
| # Create index with 1536 dimensions (matches text-embedding-3-small) | |
| pc.create_index( | |
| name=index_name, | |
| dimension=1536, | |
| metric="cosine", | |
| spec=ServerlessSpec(cloud="aws", region="us-east-1") | |
| ) | |
| print(f"Created new index: {index_name}") | |
| # Connect to the index | |
| try: | |
| index = pc.Index(index_name) | |
| return index | |
| except Exception as e: | |
| print(f"Error connecting to Pinecone index: {e}") | |
| raise | |
| # Load the e-bikes data | |
| def load_ebikes_data(file_path="data.json"): | |
| try: | |
| with open(file_path, 'r') as f: | |
| data = json.load(f) | |
| return data.get('pogo-cycles-data', []) | |
| except Exception as e: | |
| print(f"Error loading e-bikes data: {e}") | |
| return [] | |
| # Create embeddings and upload to Pinecone | |
| def create_and_upload_embeddings(ebikes_data, encoder, pinecone_index): | |
| # Prepare data for indexing | |
| ids = [] | |
| descriptions = [] | |
| metadata = [] | |
| for bike in ebikes_data: | |
| ids.append(bike['id']) | |
| descriptions.append(bike['description']) | |
| metadata.append({ | |
| 'id': bike['id'], | |
| 'name': bike['name'], | |
| 'type': bike['product_type'], | |
| 'description': bike['description'], | |
| 'category': bike['category'] | |
| }) | |
| # Create embeddings | |
| embeddings = encoder.encode(descriptions) | |
| # Prepare vectors for Pinecone | |
| vectors_to_upsert = [] | |
| for i in range(len(ids)): | |
| vector = { | |
| 'id': ids[i], | |
| 'values': embeddings[i].tolist(), | |
| 'metadata': metadata[i] | |
| } | |
| vectors_to_upsert.append(vector) | |
| # Upsert vectors to Pinecone | |
| pinecone_index.upsert(vectors=vectors_to_upsert) | |
| print(f"Uploaded {len(vectors_to_upsert)} embeddings to Pinecone") | |
| # Global variables for model and Pinecone index | |
| encoder = None | |
| pinecone_index = None | |
| # Initialize data at startup | |
| async def startup_event(): | |
| global encoder, pinecone_index | |
| print("Initializing OpenAI embedder...") | |
| encoder = OpenAIEmbedder() | |
| print("Connecting to Pinecone...") | |
| pinecone_index = initialize_pinecone() | |
| print("Loading e-bikes data...") | |
| ebikes_data = load_ebikes_data("data.json") | |
| if not ebikes_data: | |
| print("No e-bikes data found, skipping embedding creation") | |
| return | |
| print("Creating and uploading embeddings...") | |
| create_and_upload_embeddings(ebikes_data, encoder, pinecone_index) | |
| print("API startup completed successfully!") | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return {"status": "healthy"} | |
| async def search_ebikes(request:SearchRequest): | |
| """ | |
| Search for e-bikes similar to the provided description | |
| This endpoint uses semantic search to find e-bikes that match the user's description. | |
| """ | |
| try: | |
| # Create embedding for the query | |
| query_embedding = encoder.encode(request.description)[0] | |
| filter_payload = build_filter(request.filters.get("product_type"), request.filters.get("category")) | |
| # Query Pinecone | |
| results = pinecone_index.query( | |
| vector=query_embedding.tolist(), | |
| top_k=3, | |
| include_metadata=True, | |
| filter=filter_payload | |
| ) | |
| # Parse results | |
| matches = [] | |
| for match in results.matches: | |
| bike_match = BikeMatch( | |
| id=match.metadata.get('id'), | |
| name=match.metadata.get('name'), | |
| type=match.metadata.get('type'), | |
| description=match.metadata.get('description'), | |
| score=float(match.score) | |
| ) | |
| matches.append(bike_match) | |
| return SearchResponse(matches=matches) | |
| except Exception as e: | |
| print(f"Error during search: {e}") | |
| raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}") | |
| if __name__ == "__main__": | |
| uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) |