HassanJalil's picture
Rename recipe_rag_app.py to app.py
c8beb63 verified
raw
history blame
18.8 kB
import streamlit as st
import google.generativeai as genai
import json
import pandas as pd
import numpy as np
from typing import List, Dict, Any
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import pickle
import os
# Configure page
st.set_page_config(
page_title="🍳 Enhanced AI Recipe Generator",
page_icon="🍳",
layout="wide",
initial_sidebar_state="collapsed"
)
class EnhancedRecipeRAG:
"""Enhanced Recipe RAG with Multiple Dataset Support"""
def __init__(self):
self.api_key = None
self.model = None
self.recipe_database = []
self.vectorizer = None
self.recipe_vectors = None
self.dataset_loaded = False
def load_sample_recipes(self) -> List[Dict]:
"""Fallback sample recipes if no dataset is loaded"""
return [
{
"name": "Classic Scrambled Eggs",
"ingredients": ["eggs", "butter", "salt", "pepper", "milk"],
"category": "breakfast",
"cuisine": "american",
"instructions": ["Beat eggs with milk", "Heat butter in pan", "Add eggs and scramble gently"],
"prep_time": 5,
"cook_time": 5
},
# ... more sample recipes
]
def load_dataset_from_csv(self, file_path: str, format_type: str = "auto") -> bool:
"""Load recipes from CSV dataset"""
try:
df = pd.read_csv(file_path)
# Auto-detect format or use specified format
if format_type == "recipenlg" or (format_type == "auto" and "title" in df.columns):
self.recipe_database = self.parse_recipenlg_format(df)
elif format_type == "foodcom" or (format_type == "auto" and "name" in df.columns):
self.recipe_database = self.parse_foodcom_format(df)
elif format_type == "epicurious" or (format_type == "auto" and "recipe_name" in df.columns):
self.recipe_database = self.parse_epicurious_format(df)
else:
self.recipe_database = self.parse_generic_format(df)
self.build_search_index()
self.dataset_loaded = True
return True
except Exception as e:
st.error(f"Error loading dataset: {str(e)}")
return False
def parse_recipenlg_format(self, df: pd.DataFrame) -> List[Dict]:
"""Parse RecipeNLG dataset format"""
recipes = []
for _, row in df.head(10000).iterrows(): # Limit for performance
try:
recipe = {
"name": row.get("title", "Unknown Recipe"),
"ingredients": self.parse_ingredients(row.get("ingredients", "")),
"instructions": self.parse_instructions(row.get("directions", "")),
"category": "unknown",
"cuisine": "unknown",
"source": "RecipeNLG"
}
if recipe["ingredients"]: # Only add if has ingredients
recipes.append(recipe)
except:
continue
return recipes
def parse_foodcom_format(self, df: pd.DataFrame) -> List[Dict]:
"""Parse Food.com dataset format"""
recipes = []
for _, row in df.head(10000).iterrows():
try:
recipe = {
"name": row.get("name", "Unknown Recipe"),
"ingredients": self.parse_ingredients(row.get("ingredients", "")),
"instructions": self.parse_instructions(row.get("steps", "")),
"category": row.get("tags", "unknown"),
"prep_time": row.get("minutes", 30),
"source": "Food.com"
}
if recipe["ingredients"]:
recipes.append(recipe)
except:
continue
return recipes
def parse_epicurious_format(self, df: pd.DataFrame) -> List[Dict]:
"""Parse Epicurious dataset format"""
recipes = []
for _, row in df.head(10000).iterrows():
try:
recipe = {
"name": row.get("recipe_name", "Unknown Recipe"),
"ingredients": self.parse_ingredients(row.get("ingredients", "")),
"instructions": [], # Usually not included in ingredient-focused datasets
"category": row.get("course", "unknown"),
"cuisine": row.get("cuisine", "unknown"),
"source": "Epicurious"
}
if recipe["ingredients"]:
recipes.append(recipe)
except:
continue
return recipes
def parse_generic_format(self, df: pd.DataFrame) -> List[Dict]:
"""Parse generic CSV format"""
recipes = []
name_col = self.find_column(df, ["name", "title", "recipe_name", "recipe"])
ingredients_col = self.find_column(df, ["ingredients", "ingredient_list"])
if not name_col or not ingredients_col:
st.error("Could not find required columns (name and ingredients) in CSV")
return []
for _, row in df.head(10000).iterrows():
try:
recipe = {
"name": row.get(name_col, "Unknown Recipe"),
"ingredients": self.parse_ingredients(row.get(ingredients_col, "")),
"instructions": [],
"category": "unknown",
"source": "Custom Dataset"
}
if recipe["ingredients"]:
recipes.append(recipe)
except:
continue
return recipes
def find_column(self, df: pd.DataFrame, possible_names: List[str]) -> str:
"""Find column by possible names"""
for col in df.columns:
if col.lower() in [name.lower() for name in possible_names]:
return col
return None
def parse_ingredients(self, ingredients_text: str) -> List[str]:
"""Parse ingredients from various text formats"""
if pd.isna(ingredients_text) or not ingredients_text:
return []
# Handle JSON format
if ingredients_text.startswith('['):
try:
return json.loads(ingredients_text.replace("'", '"'))
except:
pass
# Handle comma-separated
if ',' in ingredients_text:
return [ing.strip() for ing in ingredients_text.split(',') if ing.strip()]
# Handle newline-separated
if '\n' in ingredients_text:
return [ing.strip() for ing in ingredients_text.split('\n') if ing.strip()]
# Single ingredient or space-separated
return [ing.strip() for ing in ingredients_text.split() if ing.strip()]
def parse_instructions(self, instructions_text: str) -> List[str]:
"""Parse cooking instructions"""
if pd.isna(instructions_text) or not instructions_text:
return []
# Handle JSON format
if instructions_text.startswith('['):
try:
return json.loads(instructions_text.replace("'", '"'))
except:
pass
# Handle numbered steps or sentences
steps = re.split(r'\d+\.|\n', instructions_text)
return [step.strip() for step in steps if step.strip()]
def build_search_index(self):
"""Build TF-IDF search index for better retrieval"""
if not self.recipe_database:
return
# Create text representation for each recipe
recipe_texts = []
for recipe in self.recipe_database:
text = f"{recipe['name']} {' '.join(recipe['ingredients'])}"
if recipe.get('category'):
text += f" {recipe['category']}"
recipe_texts.append(text)
# Build TF-IDF vectors
self.vectorizer = TfidfVectorizer(
stop_words='english',
ngram_range=(1, 2),
max_features=5000
)
self.recipe_vectors = self.vectorizer.fit_transform(recipe_texts)
def setup_gemini(self, api_key: str) -> bool:
"""Initialize Gemini API"""
try:
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel('gemini-pro')
self.api_key = api_key
return True
except Exception as e:
st.error(f"Failed to initialize Gemini API: {str(e)}")
return False
def retrieve_relevant_recipes(self, user_ingredients: List[str], top_k: int = 5) -> List[Dict]:
"""Enhanced retrieval using TF-IDF similarity"""
if not self.dataset_loaded or not self.vectorizer:
return self.basic_ingredient_matching(user_ingredients)
# Create query vector
query = ' '.join(user_ingredients)
query_vector = self.vectorizer.transform([query])
# Calculate similarities
similarities = cosine_similarity(query_vector, self.recipe_vectors).flatten()
# Get top matches
top_indices = similarities.argsort()[-top_k:][::-1]
relevant_recipes = []
for idx in top_indices:
if similarities[idx] > 0.1: # Minimum similarity threshold
recipe = self.recipe_database[idx].copy()
recipe['similarity_score'] = similarities[idx]
relevant_recipes.append(recipe)
return relevant_recipes
def basic_ingredient_matching(self, user_ingredients: List[str]) -> List[Dict]:
"""Fallback method for simple ingredient matching"""
user_ingredients = [ing.lower().strip() for ing in user_ingredients]
relevant_recipes = []
for recipe in (self.recipe_database or self.load_sample_recipes()):
recipe_ingredients = [ing.lower() for ing in recipe["ingredients"]]
overlap = len(set(user_ingredients) & set(recipe_ingredients))
if overlap > 0:
recipe_score = overlap / len(recipe_ingredients)
relevant_recipes.append({
**recipe,
"relevance_score": recipe_score,
"matching_ingredients": overlap
})
relevant_recipes.sort(key=lambda x: x["relevance_score"], reverse=True)
return relevant_recipes[:5]
def generate_recipes_with_gemini(self, user_ingredients: List[str], relevant_recipes: List[Dict]) -> List[Dict]:
"""Generate recipes using retrieved context"""
ingredients_text = ", ".join(user_ingredients)
# Create rich context from retrieved recipes
context_text = "Similar recipes for context:\n"
for i, recipe in enumerate(relevant_recipes[:3], 1):
context_text += f"{i}. {recipe['name']}: {', '.join(recipe['ingredients'][:8])}\n"
if recipe.get('instructions'):
context_text += f" Style: {recipe['instructions'][0][:50]}...\n"
prompt = f"""
Available ingredients: {ingredients_text}
{context_text}
Based on the available ingredients and the style of similar recipes above, generate 4 complete, practical recipes. Each recipe should:
1. Use primarily the available ingredients
2. Be inspired by the context recipes' style
3. Include realistic quantities and cooking steps
Return as JSON:
{{
"recipes": [
{{
"name": "Recipe Name",
"ingredients_with_quantities": ["2 eggs", "1 tbsp butter"],
"instructions": ["Step 1", "Step 2"],
"prep_time": 10,
"cook_time": 15,
"tip": "Cooking tip",
"cuisine": "cuisine type"
}}
]
}}
"""
try:
response = self.model.generate_content(prompt)
response_text = response.text.strip()
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if json_match:
recipes_data = json.loads(json_match.group())
return recipes_data.get("recipes", [])
except Exception as e:
st.error(f"Error generating recipes: {str(e)}")
return []
def main():
st.markdown('<h1 style="text-align: center; color: #2E86AB;">🍳 Enhanced AI Recipe Generator</h1>', unsafe_allow_html=True)
st.markdown("### Powered by Large Recipe Datasets + Google Gemini Pro")
# Initialize enhanced RAG system
if 'enhanced_rag_system' not in st.session_state:
st.session_state.enhanced_rag_system = EnhancedRecipeRAG()
rag_system = st.session_state.enhanced_rag_system
# Sidebar configuration
with st.sidebar:
st.header("πŸ”§ Configuration")
# API Key
api_key = st.text_input("Google Gemini API Key", type="password")
if api_key and api_key != st.session_state.get('current_api_key'):
if rag_system.setup_gemini(api_key):
st.session_state.current_api_key = api_key
st.success("βœ… API configured!")
st.markdown("---")
# Dataset Management
st.header("πŸ“Š Dataset Options")
dataset_option = st.selectbox(
"Choose Knowledge Base:",
["Built-in Sample", "Upload CSV Dataset", "Use Kaggle Dataset"]
)
if dataset_option == "Upload CSV Dataset":
uploaded_file = st.file_uploader("Upload Recipe CSV", type=['csv'])
if uploaded_file:
dataset_format = st.selectbox(
"Dataset Format:",
["auto", "recipenlg", "foodcom", "epicurious", "generic"]
)
if st.button("Load Dataset"):
with st.spinner("Loading dataset..."):
# Save uploaded file temporarily
with open("temp_dataset.csv", "wb") as f:
f.write(uploaded_file.getbuffer())
if rag_system.load_dataset_from_csv("temp_dataset.csv", dataset_format):
st.success(f"βœ… Loaded {len(rag_system.recipe_database)} recipes!")
# Clean up
if os.path.exists("temp_dataset.csv"):
os.remove("temp_dataset.csv")
elif dataset_option == "Use Kaggle Dataset":
st.markdown("""
**Popular Datasets:**
- RecipeNLG: 2.2M recipes
- Food.com: 500K recipes
- Epicurious: 13K recipes
Download from Kaggle and upload above!
""")
# Dataset status
if rag_system.dataset_loaded:
st.success(f"πŸ“Š Dataset: {len(rag_system.recipe_database)} recipes loaded")
else:
st.info("πŸ“Š Using built-in sample recipes")
# Main interface
col1, col2 = st.columns([3, 1])
with col1:
ingredients_input = st.text_input(
"πŸ₯• Enter Your Ingredients:",
placeholder="onion, tomato, garlic, eggs, cheese",
help="Separate ingredients with commas"
)
with col2:
generate_button = st.button("πŸš€ Generate Recipes", type="primary", use_container_width=True)
# Generation logic
if generate_button:
if not api_key:
st.error("⚠️ Please add your Gemini API key!")
return
if not ingredients_input.strip():
st.error("⚠️ Please enter some ingredients!")
return
user_ingredients = [ing.strip() for ing in ingredients_input.split(',') if ing.strip()]
with st.spinner("πŸ€– Searching database and generating recipes..."):
# RAG process
relevant_recipes = rag_system.retrieve_relevant_recipes(user_ingredients)
generated_recipes = rag_system.generate_recipes_with_gemini(user_ingredients, relevant_recipes)
# Display results
if generated_recipes:
st.markdown("## 🍽️ Your Personalized Recipes")
# Show retrieval context
if relevant_recipes:
with st.expander("πŸ” Similar recipes found in database"):
for recipe in relevant_recipes[:3]:
score = recipe.get('similarity_score', recipe.get('relevance_score', 0))
st.write(f"**{recipe['name']}** (Match: {score:.2f})")
st.write(f"Ingredients: {', '.join(recipe['ingredients'][:5])}...")
# Display generated recipes
for i, recipe in enumerate(generated_recipes, 1):
with st.expander(f"πŸ“– Recipe {i}: {recipe.get('name', 'Delicious Recipe')}", expanded=i==1):
# Times and cuisine
col1, col2, col3 = st.columns(3)
with col1:
st.write(f"**⏱️ Prep:** {recipe.get('prep_time', 10)} mins")
with col2:
st.write(f"**πŸ”₯ Cook:** {recipe.get('cook_time', 15)} mins")
with col3:
cuisine = recipe.get('cuisine', 'International')
st.write(f"**🌍 Cuisine:** {cuisine}")
# Ingredients
st.markdown("#### πŸ›’ Ingredients:")
for ing in recipe.get('ingredients_with_quantities', []):
st.write(f"β€’ {ing}")
# Instructions
st.markdown("#### πŸ‘¨β€πŸ³ Instructions:")
for j, instruction in enumerate(recipe.get('instructions', []), 1):
st.write(f"**{j}.** {instruction}")
# Tip
if recipe.get('tip'):
st.info(f"πŸ’‘ **Tip:** {recipe['tip']}")
if __name__ == "__main__":
main()