|
import os |
|
import sys |
|
src_directory = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..", "src")) |
|
sys.path.append(src_directory) |
|
from pinecone import Pinecone, ServerlessSpec |
|
import time |
|
from model.clip_model import ClipModel |
|
from dotenv import load_dotenv |
|
from utils import logger |
|
import pandas as pd |
|
|
|
load_dotenv() |
|
pincone_api_key = os.environ.get("PINECONE_API_KEY") |
|
logger = logger.get_logger() |
|
|
|
clip_model = ClipModel() |
|
|
|
def create_index(pinecone, index_name): |
|
pinecone.create_index( |
|
name=index_name, |
|
dimension=512, |
|
metric="cosine", |
|
spec=ServerlessSpec( |
|
cloud="aws", |
|
region="us-east-1" |
|
) |
|
) |
|
|
|
def wait_till_index_loaded(pinecone, index_name): |
|
while True: |
|
index = pinecone.describe_index(index_name) |
|
if index.status.get("ready", False): |
|
index = pinecone.Index(index_name) |
|
logger.info(f"Index '{index_name}' is ready and is now accessible.") |
|
return index |
|
else: |
|
logger.debug(f"Index '{index_name}' is not ready yet. Checking again in 1 second.") |
|
time.sleep(1) |
|
|
|
def get_index(): |
|
global index |
|
index = None |
|
try: |
|
pc = Pinecone(api_key=pincone_api_key) |
|
index_name = "imagesearch" |
|
logger.info(f"Checking if the index '{index_name}' exists...") |
|
if not pc.has_index(index_name): |
|
logger.info(f"Index '{index_name}' does not exist. Creating a new index...") |
|
create_index(pc,index_name) |
|
logger.info(f"Index '{index_name}' creation initiated. Waiting for it to be ready...") |
|
index = wait_till_index_loaded(index_name,pc) |
|
else: |
|
index = pc.Index(index_name) |
|
logger.info(f"Index '{index_name}' already exists. Returning the existing index.") |
|
except Exception as e: |
|
logger.info(f"Error occurred while getting or creating the Pinecone index: {str(e)}", exc_info=True) |
|
return index |
|
|
|
def process_and_upsert_data(index, data: pd.Series, url_key: str, id_key: str): |
|
""" |
|
Processes a single row of data (pandas Series) by extracting the URL and ID, generating image embeddings using |
|
a clip model, and then upserting the generated embeddings into a pinecone database index. |
|
|
|
This function handles: |
|
- Extracting the URL and ID from the provided `data` (a pandas Series) using the specified keys (`url_key` and `id_key`). |
|
- Using the `clip_model` to generate embeddings for the image found at the extracted URL. |
|
- Upserting the generated embeddings, along with the photo ID and URL, into the pinecone database index using the `upsert` method. |
|
|
|
Args: |
|
data (pandas.Series): A single row of data from the DataFrame, containing the URL and ID. |
|
url_key (str): The column name in the Series that contains the URL of the image. |
|
id_key (str): The column name in the Series that contains the photo ID. |
|
|
|
""" |
|
|
|
if url_key not in data or id_key not in data: |
|
raise ValueError(f"Missing required keys: '{url_key}' or '{id_key}' in the data") |
|
|
|
try: |
|
logger.info("Started to process and upsert the data") |
|
url = data[url_key] |
|
photo_id = data[id_key] |
|
embeddings = clip_model.get_image_embedding(url) |
|
index.upsert( |
|
vectors=[{ |
|
"id": photo_id, |
|
"values": embeddings, |
|
"metadata": { |
|
"url": url, |
|
"photo_id": photo_id |
|
} |
|
}], |
|
namespace="image-search-dataset", |
|
) |
|
logger.info(f"Successfully upserted data for photo_id {photo_id} with URL {url}") |
|
except ValueError as ve: |
|
logger.error(f"ValueError: {ve}") |
|
except Exception as e: |
|
logger.error(f"Error processing row with photo_id {data.get(id_key, 'unknown')}: {e}") |