Spaces:
Sleeping
Sleeping
import streamlit as st | |
import google.generativeai as genai | |
import json | |
import pandas as pd | |
import numpy as np | |
from typing import List, Dict, Any | |
import re | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
import pickle | |
import os | |
# Configure page | |
st.set_page_config( | |
page_title="π³ Enhanced AI Recipe Generator", | |
page_icon="π³", | |
layout="wide", | |
initial_sidebar_state="collapsed" | |
) | |
class EnhancedRecipeRAG: | |
"""Enhanced Recipe RAG with Multiple Dataset Support""" | |
def __init__(self): | |
self.api_key = None | |
self.model = None | |
self.recipe_database = [] | |
self.vectorizer = None | |
self.recipe_vectors = None | |
self.dataset_loaded = False | |
def load_sample_recipes(self) -> List[Dict]: | |
"""Fallback sample recipes if no dataset is loaded""" | |
return [ | |
{ | |
"name": "Classic Scrambled Eggs", | |
"ingredients": ["eggs", "butter", "salt", "pepper", "milk"], | |
"category": "breakfast", | |
"cuisine": "american", | |
"instructions": ["Beat eggs with milk", "Heat butter in pan", "Add eggs and scramble gently"], | |
"prep_time": 5, | |
"cook_time": 5 | |
}, | |
# ... more sample recipes | |
] | |
def load_dataset_from_csv(self, file_path: str, format_type: str = "auto") -> bool: | |
"""Load recipes from CSV dataset""" | |
try: | |
df = pd.read_csv(file_path) | |
# Auto-detect format or use specified format | |
if format_type == "recipenlg" or (format_type == "auto" and "title" in df.columns): | |
self.recipe_database = self.parse_recipenlg_format(df) | |
elif format_type == "foodcom" or (format_type == "auto" and "name" in df.columns): | |
self.recipe_database = self.parse_foodcom_format(df) | |
elif format_type == "epicurious" or (format_type == "auto" and "recipe_name" in df.columns): | |
self.recipe_database = self.parse_epicurious_format(df) | |
else: | |
self.recipe_database = self.parse_generic_format(df) | |
self.build_search_index() | |
self.dataset_loaded = True | |
return True | |
except Exception as e: | |
st.error(f"Error loading dataset: {str(e)}") | |
return False | |
def parse_recipenlg_format(self, df: pd.DataFrame) -> List[Dict]: | |
"""Parse RecipeNLG dataset format""" | |
recipes = [] | |
for _, row in df.head(10000).iterrows(): # Limit for performance | |
try: | |
recipe = { | |
"name": row.get("title", "Unknown Recipe"), | |
"ingredients": self.parse_ingredients(row.get("ingredients", "")), | |
"instructions": self.parse_instructions(row.get("directions", "")), | |
"category": "unknown", | |
"cuisine": "unknown", | |
"source": "RecipeNLG" | |
} | |
if recipe["ingredients"]: # Only add if has ingredients | |
recipes.append(recipe) | |
except: | |
continue | |
return recipes | |
def parse_foodcom_format(self, df: pd.DataFrame) -> List[Dict]: | |
"""Parse Food.com dataset format""" | |
recipes = [] | |
for _, row in df.head(10000).iterrows(): | |
try: | |
recipe = { | |
"name": row.get("name", "Unknown Recipe"), | |
"ingredients": self.parse_ingredients(row.get("ingredients", "")), | |
"instructions": self.parse_instructions(row.get("steps", "")), | |
"category": row.get("tags", "unknown"), | |
"prep_time": row.get("minutes", 30), | |
"source": "Food.com" | |
} | |
if recipe["ingredients"]: | |
recipes.append(recipe) | |
except: | |
continue | |
return recipes | |
def parse_epicurious_format(self, df: pd.DataFrame) -> List[Dict]: | |
"""Parse Epicurious dataset format""" | |
recipes = [] | |
for _, row in df.head(10000).iterrows(): | |
try: | |
recipe = { | |
"name": row.get("recipe_name", "Unknown Recipe"), | |
"ingredients": self.parse_ingredients(row.get("ingredients", "")), | |
"instructions": [], # Usually not included in ingredient-focused datasets | |
"category": row.get("course", "unknown"), | |
"cuisine": row.get("cuisine", "unknown"), | |
"source": "Epicurious" | |
} | |
if recipe["ingredients"]: | |
recipes.append(recipe) | |
except: | |
continue | |
return recipes | |
def parse_generic_format(self, df: pd.DataFrame) -> List[Dict]: | |
"""Parse generic CSV format""" | |
recipes = [] | |
name_col = self.find_column(df, ["name", "title", "recipe_name", "recipe"]) | |
ingredients_col = self.find_column(df, ["ingredients", "ingredient_list"]) | |
if not name_col or not ingredients_col: | |
st.error("Could not find required columns (name and ingredients) in CSV") | |
return [] | |
for _, row in df.head(10000).iterrows(): | |
try: | |
recipe = { | |
"name": row.get(name_col, "Unknown Recipe"), | |
"ingredients": self.parse_ingredients(row.get(ingredients_col, "")), | |
"instructions": [], | |
"category": "unknown", | |
"source": "Custom Dataset" | |
} | |
if recipe["ingredients"]: | |
recipes.append(recipe) | |
except: | |
continue | |
return recipes | |
def find_column(self, df: pd.DataFrame, possible_names: List[str]) -> str: | |
"""Find column by possible names""" | |
for col in df.columns: | |
if col.lower() in [name.lower() for name in possible_names]: | |
return col | |
return None | |
def parse_ingredients(self, ingredients_text: str) -> List[str]: | |
"""Parse ingredients from various text formats""" | |
if pd.isna(ingredients_text) or not ingredients_text: | |
return [] | |
# Handle JSON format | |
if ingredients_text.startswith('['): | |
try: | |
return json.loads(ingredients_text.replace("'", '"')) | |
except: | |
pass | |
# Handle comma-separated | |
if ',' in ingredients_text: | |
return [ing.strip() for ing in ingredients_text.split(',') if ing.strip()] | |
# Handle newline-separated | |
if '\n' in ingredients_text: | |
return [ing.strip() for ing in ingredients_text.split('\n') if ing.strip()] | |
# Single ingredient or space-separated | |
return [ing.strip() for ing in ingredients_text.split() if ing.strip()] | |
def parse_instructions(self, instructions_text: str) -> List[str]: | |
"""Parse cooking instructions""" | |
if pd.isna(instructions_text) or not instructions_text: | |
return [] | |
# Handle JSON format | |
if instructions_text.startswith('['): | |
try: | |
return json.loads(instructions_text.replace("'", '"')) | |
except: | |
pass | |
# Handle numbered steps or sentences | |
steps = re.split(r'\d+\.|\n', instructions_text) | |
return [step.strip() for step in steps if step.strip()] | |
def build_search_index(self): | |
"""Build TF-IDF search index for better retrieval""" | |
if not self.recipe_database: | |
return | |
# Create text representation for each recipe | |
recipe_texts = [] | |
for recipe in self.recipe_database: | |
text = f"{recipe['name']} {' '.join(recipe['ingredients'])}" | |
if recipe.get('category'): | |
text += f" {recipe['category']}" | |
recipe_texts.append(text) | |
# Build TF-IDF vectors | |
self.vectorizer = TfidfVectorizer( | |
stop_words='english', | |
ngram_range=(1, 2), | |
max_features=5000 | |
) | |
self.recipe_vectors = self.vectorizer.fit_transform(recipe_texts) | |
def setup_gemini(self, api_key: str) -> bool: | |
"""Initialize Gemini API""" | |
try: | |
genai.configure(api_key=api_key) | |
self.model = genai.GenerativeModel('gemini-pro') | |
self.api_key = api_key | |
return True | |
except Exception as e: | |
st.error(f"Failed to initialize Gemini API: {str(e)}") | |
return False | |
def retrieve_relevant_recipes(self, user_ingredients: List[str], top_k: int = 5) -> List[Dict]: | |
"""Enhanced retrieval using TF-IDF similarity""" | |
if not self.dataset_loaded or not self.vectorizer: | |
return self.basic_ingredient_matching(user_ingredients) | |
# Create query vector | |
query = ' '.join(user_ingredients) | |
query_vector = self.vectorizer.transform([query]) | |
# Calculate similarities | |
similarities = cosine_similarity(query_vector, self.recipe_vectors).flatten() | |
# Get top matches | |
top_indices = similarities.argsort()[-top_k:][::-1] | |
relevant_recipes = [] | |
for idx in top_indices: | |
if similarities[idx] > 0.1: # Minimum similarity threshold | |
recipe = self.recipe_database[idx].copy() | |
recipe['similarity_score'] = similarities[idx] | |
relevant_recipes.append(recipe) | |
return relevant_recipes | |
def basic_ingredient_matching(self, user_ingredients: List[str]) -> List[Dict]: | |
"""Fallback method for simple ingredient matching""" | |
user_ingredients = [ing.lower().strip() for ing in user_ingredients] | |
relevant_recipes = [] | |
for recipe in (self.recipe_database or self.load_sample_recipes()): | |
recipe_ingredients = [ing.lower() for ing in recipe["ingredients"]] | |
overlap = len(set(user_ingredients) & set(recipe_ingredients)) | |
if overlap > 0: | |
recipe_score = overlap / len(recipe_ingredients) | |
relevant_recipes.append({ | |
**recipe, | |
"relevance_score": recipe_score, | |
"matching_ingredients": overlap | |
}) | |
relevant_recipes.sort(key=lambda x: x["relevance_score"], reverse=True) | |
return relevant_recipes[:5] | |
def generate_recipes_with_gemini(self, user_ingredients: List[str], relevant_recipes: List[Dict]) -> List[Dict]: | |
"""Generate recipes using retrieved context""" | |
ingredients_text = ", ".join(user_ingredients) | |
# Create rich context from retrieved recipes | |
context_text = "Similar recipes for context:\n" | |
for i, recipe in enumerate(relevant_recipes[:3], 1): | |
context_text += f"{i}. {recipe['name']}: {', '.join(recipe['ingredients'][:8])}\n" | |
if recipe.get('instructions'): | |
context_text += f" Style: {recipe['instructions'][0][:50]}...\n" | |
prompt = f""" | |
Available ingredients: {ingredients_text} | |
{context_text} | |
Based on the available ingredients and the style of similar recipes above, generate 4 complete, practical recipes. Each recipe should: | |
1. Use primarily the available ingredients | |
2. Be inspired by the context recipes' style | |
3. Include realistic quantities and cooking steps | |
Return as JSON: | |
{{ | |
"recipes": [ | |
{{ | |
"name": "Recipe Name", | |
"ingredients_with_quantities": ["2 eggs", "1 tbsp butter"], | |
"instructions": ["Step 1", "Step 2"], | |
"prep_time": 10, | |
"cook_time": 15, | |
"tip": "Cooking tip", | |
"cuisine": "cuisine type" | |
}} | |
] | |
}} | |
""" | |
try: | |
response = self.model.generate_content(prompt) | |
response_text = response.text.strip() | |
json_match = re.search(r'\{.*\}', response_text, re.DOTALL) | |
if json_match: | |
recipes_data = json.loads(json_match.group()) | |
return recipes_data.get("recipes", []) | |
except Exception as e: | |
st.error(f"Error generating recipes: {str(e)}") | |
return [] | |
def main(): | |
st.markdown('<h1 style="text-align: center; color: #2E86AB;">π³ Enhanced AI Recipe Generator</h1>', unsafe_allow_html=True) | |
st.markdown("### Powered by Large Recipe Datasets + Google Gemini Pro") | |
# Initialize enhanced RAG system | |
if 'enhanced_rag_system' not in st.session_state: | |
st.session_state.enhanced_rag_system = EnhancedRecipeRAG() | |
rag_system = st.session_state.enhanced_rag_system | |
# Sidebar configuration | |
with st.sidebar: | |
st.header("π§ Configuration") | |
# API Key | |
api_key = st.text_input("Google Gemini API Key", type="password") | |
if api_key and api_key != st.session_state.get('current_api_key'): | |
if rag_system.setup_gemini(api_key): | |
st.session_state.current_api_key = api_key | |
st.success("β API configured!") | |
st.markdown("---") | |
# Dataset Management | |
st.header("π Dataset Options") | |
dataset_option = st.selectbox( | |
"Choose Knowledge Base:", | |
["Built-in Sample", "Upload CSV Dataset", "Use Kaggle Dataset"] | |
) | |
if dataset_option == "Upload CSV Dataset": | |
uploaded_file = st.file_uploader("Upload Recipe CSV", type=['csv']) | |
if uploaded_file: | |
dataset_format = st.selectbox( | |
"Dataset Format:", | |
["auto", "recipenlg", "foodcom", "epicurious", "generic"] | |
) | |
if st.button("Load Dataset"): | |
with st.spinner("Loading dataset..."): | |
# Save uploaded file temporarily | |
with open("temp_dataset.csv", "wb") as f: | |
f.write(uploaded_file.getbuffer()) | |
if rag_system.load_dataset_from_csv("temp_dataset.csv", dataset_format): | |
st.success(f"β Loaded {len(rag_system.recipe_database)} recipes!") | |
# Clean up | |
if os.path.exists("temp_dataset.csv"): | |
os.remove("temp_dataset.csv") | |
elif dataset_option == "Use Kaggle Dataset": | |
st.markdown(""" | |
**Popular Datasets:** | |
- RecipeNLG: 2.2M recipes | |
- Food.com: 500K recipes | |
- Epicurious: 13K recipes | |
Download from Kaggle and upload above! | |
""") | |
# Dataset status | |
if rag_system.dataset_loaded: | |
st.success(f"π Dataset: {len(rag_system.recipe_database)} recipes loaded") | |
else: | |
st.info("π Using built-in sample recipes") | |
# Main interface | |
col1, col2 = st.columns([3, 1]) | |
with col1: | |
ingredients_input = st.text_input( | |
"π₯ Enter Your Ingredients:", | |
placeholder="onion, tomato, garlic, eggs, cheese", | |
help="Separate ingredients with commas" | |
) | |
with col2: | |
generate_button = st.button("π Generate Recipes", type="primary", use_container_width=True) | |
# Generation logic | |
if generate_button: | |
if not api_key: | |
st.error("β οΈ Please add your Gemini API key!") | |
return | |
if not ingredients_input.strip(): | |
st.error("β οΈ Please enter some ingredients!") | |
return | |
user_ingredients = [ing.strip() for ing in ingredients_input.split(',') if ing.strip()] | |
with st.spinner("π€ Searching database and generating recipes..."): | |
# RAG process | |
relevant_recipes = rag_system.retrieve_relevant_recipes(user_ingredients) | |
generated_recipes = rag_system.generate_recipes_with_gemini(user_ingredients, relevant_recipes) | |
# Display results | |
if generated_recipes: | |
st.markdown("## π½οΈ Your Personalized Recipes") | |
# Show retrieval context | |
if relevant_recipes: | |
with st.expander("π Similar recipes found in database"): | |
for recipe in relevant_recipes[:3]: | |
score = recipe.get('similarity_score', recipe.get('relevance_score', 0)) | |
st.write(f"**{recipe['name']}** (Match: {score:.2f})") | |
st.write(f"Ingredients: {', '.join(recipe['ingredients'][:5])}...") | |
# Display generated recipes | |
for i, recipe in enumerate(generated_recipes, 1): | |
with st.expander(f"π Recipe {i}: {recipe.get('name', 'Delicious Recipe')}", expanded=i==1): | |
# Times and cuisine | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
st.write(f"**β±οΈ Prep:** {recipe.get('prep_time', 10)} mins") | |
with col2: | |
st.write(f"**π₯ Cook:** {recipe.get('cook_time', 15)} mins") | |
with col3: | |
cuisine = recipe.get('cuisine', 'International') | |
st.write(f"**π Cuisine:** {cuisine}") | |
# Ingredients | |
st.markdown("#### π Ingredients:") | |
for ing in recipe.get('ingredients_with_quantities', []): | |
st.write(f"β’ {ing}") | |
# Instructions | |
st.markdown("#### π¨βπ³ Instructions:") | |
for j, instruction in enumerate(recipe.get('instructions', []), 1): | |
st.write(f"**{j}.** {instruction}") | |
# Tip | |
if recipe.get('tip'): | |
st.info(f"π‘ **Tip:** {recipe['tip']}") | |
if __name__ == "__main__": | |
main() |