File size: 3,902 Bytes
0930d33
 
 
 
 
 
 
60ac74d
0930d33
01eeb3f
0930d33
60ac74d
 
0930d33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
03d6ad4
 
0930d33
 
 
 
 
 
 
 
 
 
 
 
 
 
03d6ad4
0930d33
01eeb3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0930d33
 
01eeb3f
0930d33
 
01eeb3f
 
0930d33
 
 
 
01eeb3f
 
 
0930d33
01eeb3f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import os
import sys
src_directory = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..", "src"))
sys.path.append(src_directory)
from pinecone import Pinecone, ServerlessSpec
import time
from model.clip_model import ClipModel
from dotenv import load_dotenv
from utils import logger
import pandas as pd

load_dotenv()
pincone_api_key = os.environ.get("PINECONE_API_KEY")
logger = logger.get_logger()

clip_model = ClipModel()

def create_index(pinecone, index_name):
    pinecone.create_index(
    name=index_name,
    dimension=512,
    metric="cosine",
    spec=ServerlessSpec(
    cloud="aws",
    region="us-east-1"
        ) 
    )

def wait_till_index_loaded(pinecone, index_name):
    while True:
        index = pinecone.describe_index(index_name)
        if index.status.get("ready", False):
            index = pinecone.Index(index_name)
            logger.info(f"Index '{index_name}' is ready and is now accessible.")
            return index
        else:
            logger.debug(f"Index '{index_name}' is not ready yet. Checking again in 1 second.")
            time.sleep(1)

def get_index():
    global index
    index = None
    try:
        pc = Pinecone(api_key=pincone_api_key)
        index_name = "imagesearch"
        logger.info(f"Checking if the index '{index_name}' exists...")
        if not pc.has_index(index_name):
            logger.info(f"Index '{index_name}' does not exist. Creating a new index...")
            create_index(pc,index_name)
            logger.info(f"Index '{index_name}' creation initiated. Waiting for it to be ready...")
            index = wait_till_index_loaded(index_name,pc)
        else:
            index = pc.Index(index_name)
            logger.info(f"Index '{index_name}' already exists. Returning the existing index.")
    except Exception as e:
        logger.info(f"Error occurred while getting or creating the Pinecone index: {str(e)}", exc_info=True)
    return index
    
def process_and_upsert_data(index, data: pd.Series, url_key: str, id_key: str):
    """
    Processes a single row of data (pandas Series) by extracting the URL and ID, generating image embeddings using 
    a clip model, and then upserting the generated embeddings into a pinecone database index.

    This function handles:
    - Extracting the URL and ID from the provided `data` (a pandas Series) using the specified keys (`url_key` and `id_key`).
    - Using the `clip_model` to generate embeddings for the image found at the extracted URL.
    - Upserting the generated embeddings, along with the photo ID and URL, into the pinecone database index using the `upsert` method.

    Args:
        data (pandas.Series): A single row of data from the DataFrame, containing the URL and ID.
        url_key (str): The column name in the Series that contains the URL of the image.
        id_key (str): The column name in the Series that contains the photo ID.

    """
    # Validate if the required columns exist in the row (Series)
    if url_key not in data or id_key not in data:
        raise ValueError(f"Missing required keys: '{url_key}' or '{id_key}' in the data")

    try:
        logger.info("Started to process and upsert the data")
        url = data[url_key]
        photo_id = data[id_key]
        embeddings = clip_model.get_image_embedding(url)
        index.upsert(
            vectors=[{
                "id": photo_id,
                "values": embeddings,
                "metadata": {
                    "url": url,
                    "photo_id": photo_id
                }
            }],
            namespace="image-search-dataset",
        )
        logger.info(f"Successfully upserted data for photo_id {photo_id} with URL {url}")
    except ValueError as ve:
        logger.error(f"ValueError: {ve}")
    except Exception as e:
        logger.error(f"Error processing row with photo_id {data.get(id_key, 'unknown')}: {e}")