pattern / model /search_script.py
azaher1215
add dark mode, fix issues
cb0d4a7
raw
history blame
9.55 kB
import torch
import numpy as np
from transformers import BertTokenizer, BertModel
import pickle
import json
import gdown
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
from config import GOOGLE_DRIVE_FILES
class RecipeSearchSystem:
def __init__(self, max_recipes=231630):
try:
# Load all the preprocessed files
self.max_recipes = max_recipes
file_paths = {
'recipe_embeddings': GOOGLE_DRIVE_FILES['assets/nlp/advanced_recipe_embeddings_231630.npy'],
'recipes_df': GOOGLE_DRIVE_FILES['assets/nlp/advanced_filtered_recipes_231630.pkl'],
'recipe_stats': GOOGLE_DRIVE_FILES['assets/nlp/recipe_statistics_231630.pkl'],
'model': GOOGLE_DRIVE_FILES['assets/nlp/tag_based_bert_model.pth']
}
output_path = "assets/nlp/"
#download files from google drive
self.ensure_files_exist(file_paths, output_path)
# Set up device
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Load tokenizer
self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
# Load the trained model
self.model = BertModel.from_pretrained('bert-base-uncased')
self.model.load_state_dict(torch.load(f'{output_path}tag_based_bert_model.pth', map_location=self.device))
self.model.to(self.device)
self.model.eval()
#load recipe embeddings
self.recipe_embeddings = np.load(f'{output_path}advanced_recipe_embeddings_{self.max_recipes}.npy')
#load recipes dataframe
with open(f'{output_path}advanced_filtered_recipes_{self.max_recipes}.pkl', 'rb') as f:
self.recipes_df = pickle.load(f)
#load recipe statistics
with open(f'{output_path}recipe_statistics_{self.max_recipes}.pkl', 'rb') as f:
self.recipe_stats = pickle.load(f)
self.is_ready = True
except Exception as e:
print(f"Error initializing search system: {e}")
self.is_ready = False
def ensure_files_exist(self, file_paths, output_path):
# Create output directory if it doesn't exist
os.makedirs(output_path, exist_ok=True)
file_mapping = {
'recipe_embeddings': f'advanced_recipe_embeddings_{self.max_recipes}.npy',
'recipes_df': f'advanced_filtered_recipes_{self.max_recipes}.pkl',
'recipe_stats': f'recipe_statistics_{self.max_recipes}.pkl',
'model': f'tag_based_bert_model.pth'
}
for key, local_filename in file_mapping.items():
local_path = os.path.join(output_path, local_filename)
if not os.path.exists(local_path):
print(f"Downloading {local_filename}...")
gdown.download(file_paths[key], local_path, quiet=False, fuzzy=True)
print(f"Downloaded {local_filename}")
else:
print(f"{local_filename} already exists, skipping download")
def create_query_embedding(self, user_query):
structured_query = f"anchor: {user_query.lower()}"
# Tokenize the query
tokenized_query = self.tokenizer(
structured_query,
return_tensors='pt',
truncation=True,
max_length=128,
padding='max_length'
)
# Move to device
tokenized_query = tokenized_query.to(self.device)
# Get embedding from model
with torch.no_grad():
anchor_input_ids = tokenized_query['input_ids'].to(self.device)
anchor_attention_mask = tokenized_query['attention_mask'].to(self.device)
anchor_outputs = self.model(anchor_input_ids, anchor_attention_mask)
# Get CLS token embedding
anchor_embedding = anchor_outputs.last_hidden_state[:, 0, :]
# Move to CPU and convert to numpy
query_embedding_numpy = anchor_embedding.cpu().numpy().flatten()
return query_embedding_numpy
def calculate_similarities(self, query_embedding):
similarities = []
# Calculate cosine similarity for each recipe
for i in range(len(self.recipe_embeddings)):
recipe_embedding = self.recipe_embeddings[i]
# Calculate cosine similarity
#Cosine Similarity = (a · b) / (||a|| * ||b||)
dot_product = np.dot(recipe_embedding, query_embedding)
recipe_norm = np.linalg.norm(recipe_embedding)
query_norm = np.linalg.norm(query_embedding)
# Avoid division by zero
if recipe_norm > 0 and query_norm > 0:
similarity = dot_product / (recipe_norm * query_norm)
else:
similarity = 0.0
similarities.append(similarity)
return similarities
def filter_recipes_by_quality(self, min_rating=3.0, min_num_ratings=5):
#Get all indexes for recipes that meet the quality criteria the user chose
filtered_recipe_indices = []
for i in range(len(self.recipes_df)):
recipe = self.recipes_df.iloc[i]
recipe_id = recipe['id']
if recipe_id in self.recipe_stats:
avg_rating, num_ratings, _ = self.recipe_stats[recipe_id]
if avg_rating >= min_rating and num_ratings >= min_num_ratings:
filtered_recipe_indices.append(i)
return filtered_recipe_indices
def rank_recipes_by_similarity_and_rating(self, similarities, recipe_indices):
recipe_scores = []
for recipe_index in recipe_indices:
recipe = self.recipes_df.iloc[recipe_index]
recipe_id = recipe['id']
similarity_score = similarities[recipe_index]
#if the recipe has no ratings we will assume it is a bad recipe to choose and set the ratio to 1.0
if recipe_id in self.recipe_stats:
avg_rating, _, _ = self.recipe_stats[recipe_id]
else:
avg_rating = 1.0
recipe_scores.append({
'recipe_index': recipe_index,
'recipe_id': recipe_id,
'similarity_score': similarity_score,
'avg_rating': avg_rating
})
return recipe_scores
def create_recipe_result(self, recipe_index, scores_info):
recipe = self.recipes_df.iloc[recipe_index]
recipe_id = recipe['id']
avg_rating, num_ratings, unique_users = self.recipe_stats[recipe_id]
# Create result structure mapping
result = {
'recipe_id': int(recipe_id),
'name': recipe['name'],
'ingredients': recipe['ingredients'],
'tags': recipe['tags'],
'minutes': int(recipe['minutes']),
'n_steps': int(recipe['n_steps']),
'description': recipe.get('description', ''),
'similarity_score': float(scores_info['similarity_score']),
'avg_rating': float(avg_rating),
'num_ratings': int(num_ratings),
'unique_users': int(unique_users)
}
return result
def search_recipes(self, user_query, top_k=5, min_rating=3.0, min_num_ratings=5):
# Create embedding for user query
query_embedding = self.create_query_embedding(user_query)
# Calculate similarities between query and all recipes
similarities = self.calculate_similarities(query_embedding)
# Filter recipes by quality
filtered_recipe_indices = self.filter_recipes_by_quality(min_rating, min_num_ratings)
# Rank by semantic similarity and rating
recipe_scores = self.rank_recipes_by_similarity_and_rating(similarities, filtered_recipe_indices)
# Sort by semantic similarity, then by average rating
recipe_scores.sort(key=lambda x: (x['similarity_score'], x['avg_rating']), reverse=True)
# Get top results
top_results = recipe_scores[:top_k]
# Create result dictionaries
final_results = []
for score_info in top_results:
recipe_result = self.create_recipe_result(score_info['recipe_index'], score_info)
final_results.append(recipe_result)
return final_results
def search_for_recipes():
return RecipeSearchSystem()
if __name__ == "__main__":
search_system = RecipeSearchSystem()
test_queries = [
# "chicken pasta italian quick dinner",
# "chocolate cake dessert brownie baked healthy",
# "healthy vegetarian salad tomato basil",
# "quick easy dinner",
# "beef steak",
"beef pasta",
"beef"
]
for query in test_queries:
print(f"Testing query: '{query}'")
results = search_system.search_recipes(
user_query=query,
top_k=3,
min_rating=3.5,
min_num_ratings=10
)
print (results)
print("Recipe search system testing complete!")