tdurzynski's picture
Update app.py
75e97f4 verified
raw
history blame
37.9 kB
import streamlit as st
import json
import os
import uuid
from datetime import datetime
from io import BytesIO
from decimal import Decimal # Add this import for DynamoDB float handling
# Third-party library imports
import boto3
from PIL import Image
import firebase_admin
from firebase_admin import credentials, auth
import pandas as pd
import streamlit_tags as st_tags
from dotenv import load_dotenv
# Add debug mode detection here
debug_mode = "debug" in st.query_params
# Load environment variables from .env file if it exists
load_dotenv()
# Load AWS credentials using correct HF Secrets
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY")
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "food-image-crowdsourcing")
DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE", "image_metadata")
# Load Firebase credentials
FIREBASE_CONFIG = json.loads(os.getenv("FIREBASE_CONFIG", "{}"))
# Initialize Firebase Admin SDK (Prevent multiple initialization)
if not firebase_admin._apps:
try:
cred = credentials.Certificate(FIREBASE_CONFIG)
firebase_admin.initialize_app(cred)
except Exception as e:
st.error(f"Firebase initialization error: {e}")
if st.button("Continue in Demo Mode"):
st.session_state["demo_mode"] = True
else:
st.stop()
# Initialize AWS Services (S3 & DynamoDB)
try:
s3 = boto3.client(
"s3",
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
region_name=AWS_REGION
)
dynamodb = boto3.resource(
"dynamodb",
region_name=AWS_REGION,
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
)
metadata_table = dynamodb.Table(DYNAMODB_TABLE)
except Exception as e:
st.error(f"AWS initialization error: {e}")
if st.button("Continue in Demo Mode"):
st.session_state["demo_mode"] = True
else:
st.stop()
FOOD_SUGGESTIONS = [
"Ajvar", "Angel Wings", "Apple", "Apple Pie", "Apfelstrudel", "Arancini", "Asparagus", "Babka", "Bagel","Baguette", "Baklava",
"Banana", "Banana Bread", "Banh Mi", "Banitsa", "Barbecue Ribs", "BBQ Chicken", "BBQ Chicken Pizza", "BBQ Ribs", "Bean Buritto",
"Bear Claw", "Beef Empanadas", "Beef Pho", "Beef Sirloin", "Beef Stroganoff", "Beer", "Beets", "Bell Pepper", "Biryani", "Bistecca alla Fiorentina",
"Black Beans", "Black Forest Cake", "Black Olives", "Blini", "Borscht", "Bossam", "Brioche", "Broccoli", "Brown Rice",
"Bruschetta", "Brussels Sprouts", "Buckwheat", "Buffalo Wings", "Burger", "Burrito", "Butter Chicken", "Cabbage",
"Cabbage Rolls", "Calzone", "Cannoli", "Carrot", "Carrot Cake", "Cauliflower", "Cauliflower Soup", "Cevapi", "Ceviche", "Ceviche de Camaron",
"Challah", "Char Siu", "Cheese Empanadas", "Cheesecake", "Chicken", "Chicken Broth", "Chicken Empanadas",
"Chicken Wings", "Chickpeas", "Chiles en Nogada", "Chili Sauce", "Chimichirri Steak", "Chow Mein",
"Clams", "Cold Beet Soup", "Corn", "Corn on the Cob", "Coxinha", "Crab Cakes", "Cream Cheese", "Creamy Mushroom Risotto",
"Creme Brulee", "Creole Gumbo", "Croissant", "Croque Monsieur", "Cucumber", "Cucumber Soup", "Deep-fried",
"Dim Sum", "Dolmades", "Doughnuts", "Duck", "Eggplant", "Eggplant Spread", "Eggs", "Enchiladas",
"Encebollado", "Falafel", "Fanesca", "Fasolada", "Faworki", "Filet Mignon", "Fish", "Fish and Chips",
"Fish Tacos", "Flatbread", "Flan", "Focaccia", "Four Cheese Pizza", "French Fries", "French Onion Soup",
"Fresh Fruit", "Fruit Soup", "Garbanzo", "Garlic", "Gazpacho", "Gefilte Fish", "Gibanica", "Ginger Bread",
"Goat Cheese", "Goulash", "Green Beans", "Green Fried Tomatoes", "Green Onion", "Gyoza", "Gyros", "Hawaiian Pizza",
"Herbs", "Hoddeok", "Hot and Sour Soup", "Hot Pot", "Hummus", "Hunter's stew", "Ice Cream", "Japchae",
"Jasmine Rice", "Jollof Rice", "Kabsa", "Kale", "Katsu Curry", "Kavarma", "Kebabs", "Kimchi Fried Rice", "Kisiel",
"Kremowka", "Kreplach", "Kung Pao Chicken", "Kutia", "Lamb", "Lamb Chops", "Lasagna", "Layered Potato Casserole",
"Lemon", "Lemon Pie", "Lentil Soup", "Lettuce", "Llapingachos", "Lobster", "Mac and Cheese", "Macarons", "Mahi Mahi",
"Mansaf", "Mapo Tofu", "Margherita Pizza", "Marinated", "Marzipan", "Matzo Ball Soup", "Mazurek", "Meat Lover's Pizza",
"Meat Patties", "Meatloaf", "Miso Soup", "Mixed Salad", "Mixed Vegetables", "Mooncake", "Moussaka", "Mozarella", "Mushroom Pizza", "Mushroom Soup",
"Mushrooms", "Napoleon Cake", "Neapolitan Pizza", "New York Strip Steak", "Nougat Candies", "Onion Rings", "Onion",
"Osso Buco", "Oysters", "Pad Thai", "Paella", "Panna Cotta", "Pasta", "Pasta Carbonara", "Pavlova",
"Peas", "Pecan Pie", "Peking Duck", "Pelmeni", "Pepperoni Pizza", "Pierogi", "Pineapple", "Pita Bread",
"Pizza", "Pljeskavica", "Pork Chops", "Pork Knuckle", "Portobello Mushrooms", "Potato pancakes", "Potato Salad",
"Poutine", "Poppy Seed Roll", "Pudding", "Pulled Pork", "Pumpkin", "Pumpkin Pie", "Radish", "Quesadillas", "Quiche", "Ramen", "Ratatouille",
"Ravioli", "Red Pepper", "Ribeye Steak", "Ribolita", "Rich Stew", "Risotto alla Milanese", "Rugelach", "Rye Bread",
"Sachertorte", "Saffron Rice", "Salad", "Salmon", "Sarma", "Sausage", "Sauerkraut", "Seafood Pasta",
"Seco de Chivo", "Shashlik", "Shashuka", "Shawarma", "Shepherd's Pie", "Shopska Salad", "Shrimp", "Shrimp Skewers",
"Soft Egg Noodles", "Sopes", "Soup Dumplings", "Sour Rye Soup", "Souvlaki", "Spaghetti Carbonara", "Spinach", "Sponge Cake",
"Spring Salad", "Spring Rolls", "Stuffed Cabbage", "Stuffed Grape Leaves", "Stuffed Mushrooms", "Stuffed Pepper", "Supreme Pizza", "Sushi",
"Swwet and Sour Pork", "Sweet Potato", "Swordfish Steak", "Szarlotka", "T-bone Steak", "Tacos", "Tamales", "Tandoori Chicken", "Teriyaki", "Tarator",
"Texas Style Brisket", "Tilapia", "Tiramisu", "Toast", "Tomato", "Tomato Soup", "Tostada", "Tteokbokki", "Tuna Steak",
"Tzatziki", "Uszka", "Vareniki", "Veal", "Veggie Fries", "Veggie Pizza", "Wheat Bread", "White Bean Soup", "White Pizza",
"Wiener Schnitzel", "Wild Mushroom Pasta", "Wine (Red)", "Wine (White)", "Wonton Soup", "Xiaolongbao", "Zeppelins", "Zucchini"
] # Alphabetically sorted list of diverse cuisines
# Unit options for food weight/volume
UNIT_OPTIONS = ["grams", "ounce(s)", "teaspoon(s)", "tablespoon(s)", "cup(s)", "slice(s)", "piece(s)"]
# Cooking methods
COOKING_METHODS = [
"Baked", "Boiled", "Braised", "Breaded and fried", "Broiled", "Creamy", "Deep-fried", "Dried",
"Fried", "Grilled", "Grilled minced", "Marinated", "Microwaved", "Pan-seared", "Poached", "Raw",
"Roasted", "SautΓ©ed", "Slow-cooked", "Smoked", "Steamed", "Stewed", "Stir-fried", "Takeout/Restaurant", "Unknown"
]
# Helper functions
def resize_image(image, max_size=512, quality=85):
"""
Resize image while preserving aspect ratio and reducing file size
Args:
image: PIL Image object
max_size: Maximum dimension (width or height)
quality: JPEG quality (0-100)
Returns:
Resized PIL Image
"""
# Calculate new dimensions
width, height = image.width, image.height
# Only resize if the image is larger than max_size
if width > max_size or height > max_size:
if width > height:
new_width = max_size
new_height = int(height * (max_size / width))
else:
new_height = max_size
new_width = int(width * (max_size / height))
# Resize the image
resized_img = image.resize((new_width, new_height), Image.LANCZOS)
else:
# If image is already smaller than max_size, don't resize
return image
# Convert to RGB if image has alpha channel (for JPEG conversion)
if resized_img.mode == 'RGBA':
resized_img = resized_img.convert('RGB')
# Compress the image
buffer = BytesIO()
resized_img.save(buffer, format="JPEG", quality=quality, optimize=True)
buffer.seek(0)
# Return the compressed image
return Image.open(buffer)
def get_image_size_kb(image):
"""Get image file size in KB"""
buffer = BytesIO()
image.save(buffer, format="JPEG")
size_bytes = buffer.tell()
return size_bytes / 1024 # Convert to KB
def upload_to_s3(image, user_id, folder=""):
"""
Upload image to S3 bucket and return the S3 path
Args:
image: PIL Image object
user_id: User ID for folder structure
folder: Subfolder to store the image in (e.g., "raw-uploads" or "processed-512x512")
"""
if st.session_state.get("demo_mode", False):
return f"demo/{user_id}/demo_image.jpg"
try:
# Generate a unique ID for the image
image_id = str(uuid.uuid4())
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
# Create the S3 path with the appropriate folder structure
if folder:
s3_path = f"{folder}/{user_id}/{timestamp}_{image_id}.jpg"
else:
s3_path = f"{user_id}/{timestamp}_{image_id}.jpg"
# Convert PIL image to bytes
buffer = BytesIO()
# Higher quality for raw uploads, compressed for processed
quality = 95 if folder == "raw-uploads" else 85
image.save(buffer, format="JPEG", quality=quality, optimize=True)
buffer.seek(0)
# Add debug logging
if debug_mode:
st.sidebar.info(f"S3 Upload Path: {s3_path}")
# Upload to S3
s3.upload_fileobj(buffer, S3_BUCKET_NAME, s3_path)
return s3_path
except Exception as e:
st.error(f"Failed to upload image: {e}")
return None
def save_metadata(user_id, s3_path, food_name, portion_size, portion_unit, cooking_method, ingredients, tokens_awarded):
"""Save metadata to DynamoDB"""
if st.session_state.get("demo_mode", False):
st.success("Demo mode: Metadata would be saved to DynamoDB")
return True
try:
# Generate a unique ID for the database entry
image_id = str(uuid.uuid4())
timestamp = datetime.now().isoformat()
# Ensure portion_size is a Decimal (DynamoDB doesn't support float)
if not isinstance(portion_size, Decimal):
portion_size = Decimal(str(portion_size))
# Create item for DynamoDB
item = {
'image_id': image_id,
'user_id': user_id,
'upload_timestamp': timestamp,
'food_name': food_name,
'portion_size': portion_size, # Decimal type
'portion_unit': portion_unit,
'cooking_method': cooking_method,
'ingredients': ingredients,
's3_path': s3_path,
'tokens_awarded': tokens_awarded
}
# Add debug logging with more details
if debug_mode:
st.sidebar.info(f"DynamoDB Save: {image_id} - {food_name}")
if "test_debug_details" not in st.session_state:
st.session_state["test_debug_details"] = []
# Store details for debugging
st.session_state["test_debug_details"].append({
"time": datetime.now().isoformat(),
"image_id": image_id,
"food": food_name,
"s3_path": s3_path
})
# Save to DynamoDB
metadata_table.put_item(Item=item)
return True
except Exception as e:
st.error(f"Failed to save metadata: {e}")
return False
def calculate_tokens(image_quality, has_metadata, is_unique_category):
"""Calculate tokens based on various factors"""
tokens = 1 # Base token for upload
if image_quality == "high":
tokens += 1
if has_metadata:
tokens += 1
if is_unique_category:
tokens += 1
return tokens
# Initialize session state for first-time users
if "tokens" not in st.session_state:
st.session_state["tokens"] = 0
if "uploads_count" not in st.session_state:
st.session_state["uploads_count"] = 0
# Initialize food items list for storing multiple annotations
if "food_items" not in st.session_state:
st.session_state["food_items"] = []
# Initialize form input state variables
if "custom_food_name" not in st.session_state:
st.session_state["custom_food_name"] = ""
def reset_form_fields():
"""Reset all form fields after adding an item"""
# Only reset custom food name, keep the dropdown at its current value
st.session_state["custom_food_name"] = ""
# We don't reset the dropdown selection as users might want to add multiple similar items
def add_food_item(food_name, portion_size, portion_unit, cooking_method, ingredients):
"""Add a food item to the session state"""
if food_name and portion_size and portion_unit and cooking_method:
# Add the food item to the session state
st.session_state["food_items"].append({
"food_name": food_name,
"portion_size": portion_size,
"portion_unit": portion_unit,
"cooking_method": cooking_method,
"ingredients": ingredients
})
st.success(f"βœ… Added {food_name} to your submission")
reset_form_fields()
return True
else:
st.error("❌ Please fill in all required fields")
return False
# Streamlit Layout - Authentication Section
st.sidebar.title("πŸ”‘ User Authentication")
auth_option = st.sidebar.radio("Select an option", ["Login", "Sign Up", "Logout"])
if auth_option == "Sign Up":
email = st.sidebar.text_input("Email")
password = st.sidebar.text_input("Password", type="password")
if st.sidebar.button("Sign Up"):
try:
if st.session_state.get("demo_mode", False):
st.sidebar.success("βœ… Demo mode: User created successfully! Please log in.")
else:
user = auth.create_user(email=email, password=password)
st.sidebar.success("βœ… User created successfully! Please log in.")
except Exception as e:
st.sidebar.error(f"Error: {e}")
if auth_option == "Login":
email = st.sidebar.text_input("Email")
password = st.sidebar.text_input("Password", type="password")
if st.sidebar.button("Login"):
try:
if st.session_state.get("demo_mode", False):
st.session_state["user_id"] = "demo_user_123"
st.session_state["tokens"] = 0 # Initialize token count
st.sidebar.success("βœ… Demo mode: Logged in successfully!")
else:
user = auth.get_user_by_email(email)
st.session_state["user_id"] = user.uid
st.session_state["tokens"] = 0 # Initialize token count
st.sidebar.success("βœ… Logged in successfully!")
except Exception as e:
st.sidebar.error(f"Login failed: {e}")
if auth_option == "Logout" and "user_id" in st.session_state:
del st.session_state["user_id"]
st.sidebar.success("βœ… Logged out successfully!")
# Add Debug Mode Indicator to sidebar
if debug_mode:
st.sidebar.warning("⚠️ DEBUG MODE ACTIVE")
test_mode = st.sidebar.checkbox("Mark submissions as tests")
if test_mode:
st.sidebar.info("Test mode enabled - all submissions will be prefixed with 'TEST_'")
# Ensure user is logged in before uploading
if "user_id" not in st.session_state and not st.session_state.get("demo_mode", False):
st.warning("⚠️ Please log in to upload images.")
# Add links to guidelines and terms
st.markdown("### πŸ“š While You're Here")
st.markdown("Take a moment to read our guidelines and token system:")
# Use expanders instead of columns for better document display
with st.expander("πŸ“‹ Participation Guidelines"):
try:
with open("PARTICIPATION_GUIDELINES.md", "r") as f:
guidelines = f.read()
st.markdown(guidelines, unsafe_allow_html=True)
except Exception as e:
st.error(f"Could not load guidelines: {e}")
with st.expander("πŸͺ™ Token Rewards System"):
try:
with open("TOKEN_REWARDS.md", "r") as f:
rewards = f.read()
st.markdown(rewards, unsafe_allow_html=True)
except Exception as e:
st.error(f"Could not load rewards information: {e}")
with st.expander("πŸ“œ Terms of Service"):
try:
with open("TERMS_OF_SERVICE.md", "r") as f:
terms = f.read()
st.markdown(terms, unsafe_allow_html=True)
except Exception as e:
st.error(f"Could not load terms: {e}")
st.stop()
# Streamlit Layout - Main App
st.title("🍽️ Food Image Review & Annotation")
# Compliance & Disclaimer Section
with st.expander("πŸ“œ Terms & Conditions", expanded=False):
st.markdown("### **Terms & Conditions**")
st.write(
"By uploading an image, you agree to transfer full copyright to the research team for AI training purposes."
" You are responsible for ensuring you own the image and it does not violate any copyright laws."
" We do not guarantee when tokens will be redeemable. Keep track of your user ID.")
terms_accepted = st.checkbox("I agree to the terms and conditions", key="terms_accepted")
if not terms_accepted:
st.warning("⚠️ You must agree to the terms before proceeding.")
st.stop()
# Upload Image
uploaded_file = st.file_uploader("Upload an image of your food", type=["jpg", "png", "jpeg"])
if uploaded_file:
original_img = Image.open(uploaded_file)
st.session_state["original_image"] = original_img
# If an image has been uploaded, process and display it
if "original_image" in st.session_state:
original_img = st.session_state["original_image"]
# Process the image - resize and compress with more visible difference
processed_img = resize_image(original_img, max_size=512, quality=85)
st.session_state["processed_image"] = processed_img
# Calculate file sizes
original_size = get_image_size_kb(original_img)
processed_size = get_image_size_kb(processed_img)
size_reduction = ((original_size - processed_size) / original_size) * 100 if original_size > 0 else 0
# Display images side by side with border to highlight differences
col1, col2 = st.columns(2)
with col1:
st.subheader("πŸ“· Original Image")
st.markdown(f"<div style='border:2px solid red;padding:5px;'>", unsafe_allow_html=True)
st.image(original_img, caption=f"Original ({original_img.width}x{original_img.height} px, {original_size:.1f} KB)", use_container_width=True)
st.markdown("</div>", unsafe_allow_html=True)
with col2:
st.subheader("πŸ–ΌοΈ Processed Image")
st.markdown(f"<div style='border:2px solid green;padding:5px;'>", unsafe_allow_html=True)
st.image(processed_img, caption=f"Processed ({processed_img.width}x{processed_img.height} px, {processed_size:.1f} KB)", use_container_width=True)
st.markdown("</div>", unsafe_allow_html=True)
# Show size reduction
if size_reduction > 5: # Only show if there's a meaningful reduction
st.success(f"βœ… Image size reduced by {size_reduction:.1f}% for faster uploads and processing")
# Add S3 Upload Test Panel for Debug Mode with proper subfolder structure
if debug_mode and "processed_image" in st.session_state:
with st.expander("πŸ” DEBUG: S3 Upload Tests", expanded=True):
st.warning("⚠️ Debug Mode: Test S3 functionality with correct folder structure")
s3_test_col1, s3_test_col2 = st.columns(2)
with s3_test_col1:
st.subheader("Raw Image Upload")
test_raw_button = st.button("Test Raw Upload")
if test_raw_button and "original_image" in st.session_state:
# Test upload to raw-uploads folder
raw_s3_path = upload_to_s3(st.session_state["original_image"],
st.session_state["user_id"],
folder="raw-uploads")
if raw_s3_path:
st.success(f"βœ… Raw upload successful!")
st.code(f"s3://{S3_BUCKET_NAME}/{raw_s3_path}", language="text")
st.session_state["test_raw_s3_path"] = raw_s3_path
else:
st.error("❌ Raw upload failed!")
with s3_test_col2:
st.subheader("Processed Image Upload")
test_processed_button = st.button("Test Processed Upload")
if test_processed_button and "processed_image" in st.session_state:
# Test upload to processed-512x512 folder
processed_s3_path = upload_to_s3(st.session_state["processed_image"],
st.session_state["user_id"],
folder="processed-512x512")
if processed_s3_path:
st.success(f"βœ… Processed upload successful!")
st.code(f"s3://{S3_BUCKET_NAME}/{processed_s3_path}", language="text")
st.session_state["test_processed_s3_path"] = processed_s3_path
else:
st.error("❌ Processed upload failed!")
# Add DynamoDB Test Panel for Multiple Food Items
if debug_mode and "processed_image" in st.session_state:
with st.expander("πŸ” DEBUG: Complete Food Annotation Test", expanded=True):
st.warning("⚠️ Debug Mode: Test Complete Flow with Multiple Food Items")
# Create a test session state if it doesn't exist
if "test_food_items" not in st.session_state:
st.session_state["test_food_items"] = []
# Form for adding test food items
with st.form(key="test_food_form"):
test_food = st.text_input("Food Name", "Test Pizza")
test_portion = st.number_input("Portion Size", value=1.0)
test_unit = st.selectbox("Unit", UNIT_OPTIONS)
test_cooking = st.selectbox("Cooking Method", COOKING_METHODS)
test_ingredients = st_tags.st_tags(
label="Ingredients",
text="Press enter to add",
value=["Test Ingredient"],
maxtags=5
)
test_add_button = st.form_submit_button("Add Test Food Item")
if test_add_button:
st.session_state["test_food_items"].append({
"food_name": f"TEST_{test_food}",
"portion_size": test_portion,
"portion_unit": test_unit,
"cooking_method": test_cooking,
"ingredients": test_ingredients
})
st.success(f"Added test food item: {test_food}")
st.rerun()
# Display test food items
if st.session_state["test_food_items"]:
st.subheader("Test Food Items")
for i, item in enumerate(st.session_state["test_food_items"]):
with st.expander(f"Item #{i+1}: {item['food_name']}"):
st.write(f"**Portion:** {item['portion_size']} {item['portion_unit']}")
st.write(f"**Cooking Method:** {item['cooking_method']}")
st.write(f"**Ingredients:** {', '.join(item['ingredients'])}")
if st.button(f"Remove Test Item #{i+1}", key=f"remove_test_{i}"):
st.session_state["test_food_items"].pop(i)
st.rerun()
# Button to run the full test
test_full_button = st.button("Run Complete Annotation Test")
if test_full_button and st.session_state["test_food_items"]:
with st.spinner("Testing complete annotation flow..."):
# Upload both original and processed images
raw_s3_path = upload_to_s3(original_img, st.session_state["user_id"], folder="raw-uploads")
processed_s3_path = upload_to_s3(processed_img, st.session_state["user_id"], folder="processed-512x512")
all_saved = True
saved_items = 0
if raw_s3_path and processed_s3_path:
# Set up a results area
results_area = st.empty()
# Save each food item
for item in st.session_state["test_food_items"]:
try:
# Convert to Decimal
portion_size_decimal = Decimal(str(item["portion_size"]))
# Save to DynamoDB
success = save_metadata(
st.session_state["user_id"],
processed_s3_path,
item["food_name"],
portion_size_decimal,
item["portion_unit"],
item["cooking_method"],
item["ingredients"],
1 # Test token value
)
if success:
saved_items += 1
else:
all_saved = False
break
except Exception as e:
results_area.error(f"Error saving {item['food_name']}: {e}")
all_saved = False
break
if all_saved:
st.success(f"βœ… Successfully saved {saved_items} food items!")
st.json({
"user_id": st.session_state["user_id"],
"s3_paths": {
"raw": raw_s3_path,
"processed": processed_s3_path
},
"food_items": st.session_state["test_food_items"]
})
else:
st.error("❌ Failed to save all food items")
else:
st.error("❌ Failed to upload images to S3")
else:
st.info("Add test food items above to run a complete annotation test")
# Display existing food annotations if any
if st.session_state["food_items"]:
st.subheader("πŸ“‹ Added Food Items")
for i, item in enumerate(st.session_state["food_items"]):
with st.expander(f"🍽️ {item['food_name']} ({item['portion_size']} {item['portion_unit']})"):
st.write(f"**Cooking Method:** {item['cooking_method']}")
st.write(f"**Ingredients:** {', '.join(item['ingredients'])}")
if st.button(f"Remove Item #{i+1}", key=f"remove_{i}"):
st.session_state["food_items"].pop(i)
st.rerun()
# Food metadata form
st.subheader("🍲 Add Food Details")
# Use Streamlit form to capture Enter key and provide a better UX
with st.form(key="food_item_form"):
food_selection = st.selectbox("Food Name", options=[""] + FOOD_SUGGESTIONS, index=0)
# Only show custom food name if the dropdown is empty
custom_food_name = ""
if food_selection == "":
custom_food_name = st.text_input("Or enter a custom food name",
value=st.session_state["custom_food_name"],
key="food_name_input")
# Determine the actual food name to use
food_name = food_selection if food_selection else custom_food_name
col1, col2 = st.columns(2)
with col1:
portion_size = st.number_input("Portion Size", min_value=0.1, step=0.1, format="%.2f")
with col2:
portion_unit = st.selectbox("Unit", options=UNIT_OPTIONS)
cooking_method = st.selectbox("Cooking Method", options=[""] + COOKING_METHODS)
ingredients = st_tags.st_tags(
label="Main Ingredients (Add up to 5)",
text="Press enter to add",
value=[],
suggestions=["Salt", "Pepper", "Olive Oil", "Butter", "Garlic", "Onion", "Tomato"],
maxtags=5
)
# Submit button inside the form
submitted = st.form_submit_button(label="βž• Add This Food Item")
if submitted:
if add_food_item(food_name, portion_size, portion_unit, cooking_method, ingredients):
# Store custom food name for next reset
if custom_food_name:
st.session_state["custom_food_name"] = custom_food_name
st.rerun()
# Separate section for quick-add common foods
if "original_image" in st.session_state:
with st.expander("πŸš€ Quick Add Common Foods"):
st.info("Click to quickly add common food items with default values")
quick_add_cols = st.columns(3)
common_foods = [
{"name": "French Fries", "portion": 100, "unit": "grams", "cooking": "Fried", "ingredients": ["Potatoes", "Salt", "Oil"]},
{"name": "Hamburger", "portion": 1, "unit": "pieces", "cooking": "Grilled", "ingredients": ["Beef", "Bun", "Lettuce", "Tomato"]},
{"name": "Salad", "portion": 150, "unit": "grams", "cooking": "Raw", "ingredients": ["Lettuce", "Tomato", "Cucumber"]}
]
for i, food in enumerate(common_foods):
with quick_add_cols[i % 3]:
if st.button(f"+ {food['name']}", key=f"quick_{i}"):
add_food_item(
food['name'],
food['portion'],
food['unit'],
food['cooking'],
food['ingredients']
)
st.rerun()
# Divider before submit button
st.markdown("---")
# Submit all foods button - outside the form
if st.button("πŸ“€ Submit All Food Items", disabled=len(st.session_state["food_items"]) == 0):
# Modify submission to handle test data
if debug_mode and test_mode:
for food_item in st.session_state["food_items"]:
# Prefix the food name to identify test data
food_item["food_name"] = f"TEST_{food_item['food_name']}"
st.info("ℹ️ Test mode: Data will be marked as test data")
if not st.session_state["food_items"]:
st.error("❌ Please add at least one food item before submitting")
else:
with st.spinner("Processing your submission..."):
all_saved = True
total_tokens = 0
# Determine image quality (simplified version)
image_quality = "high" if original_img.width >= 1000 and original_img.height >= 1000 else "standard"
# Upload both original and processed images to correct S3 folders
raw_s3_path = upload_to_s3(original_img, st.session_state["user_id"], folder="raw-uploads")
processed_s3_path = upload_to_s3(processed_img, st.session_state["user_id"], folder="processed-512x512")
if raw_s3_path and processed_s3_path:
# Save each food item with the processed image path
for food_item in st.session_state["food_items"]:
# Check if metadata is complete
has_metadata = True # Already validated
# Check if the food is in a unique category (simplified)
is_unique_category = food_item["food_name"] not in ["Pizza", "Burger", "Pasta", "Salad"]
# Calculate tokens for this item
tokens_awarded = calculate_tokens(image_quality, has_metadata, is_unique_category)
total_tokens += tokens_awarded
# Convert float to Decimal for DynamoDB
portion_size_decimal = Decimal(str(food_item["portion_size"]))
# Save metadata to DynamoDB with processed image path
success = save_metadata(
st.session_state["user_id"],
processed_s3_path, # Use the processed image path
food_item["food_name"],
portion_size_decimal, # Use Decimal type
food_item["portion_unit"],
food_item["cooking_method"],
food_item["ingredients"],
tokens_awarded
)
if not success:
all_saved = False
break
if all_saved:
st.session_state["tokens"] += total_tokens
st.session_state["uploads_count"] += 1
st.success(f"βœ… All food items uploaded successfully! You earned {total_tokens} tokens.")
# Clear the form and image for a new submission
st.session_state.pop("original_image", None)
st.session_state.pop("processed_image", None)
st.session_state["food_items"] = []
st.rerun()
else:
st.error("Failed to save some items. Please try again.")
else:
st.error("Failed to upload images. Please try again.")
# Display earned tokens
st.sidebar.markdown("---")
st.sidebar.subheader("πŸ† Your Statistics")
st.sidebar.info(f"πŸͺ™ Total Tokens: {st.session_state['tokens']}")
st.sidebar.info(f"πŸ“Έ Total Uploads: {st.session_state.get('uploads_count', 0)}")
# Help and Documentation Links
st.sidebar.markdown("---")
st.sidebar.subheader("πŸ“š Resources")
if st.sidebar.button("Participation Guidelines"):
with open("PARTICIPATION_GUIDELINES.md", "r") as f:
guidelines = f.read()
st.sidebar.markdown(guidelines)
if st.sidebar.button("Token Rewards System"):
with open("TOKEN_REWARDS.md", "r") as f:
rewards = f.read()
st.sidebar.markdown(rewards)
if st.sidebar.button("Terms of Service"):
with open("TERMS_OF_SERVICE.md", "r") as f:
terms = f.read()
st.sidebar.markdown(terms)
# Add Cleanup Tools for Debug Mode
if debug_mode and st.sidebar.checkbox("Show Cleanup Tools"):
st.sidebar.markdown("---")
st.sidebar.subheader("🧹 Debug Cleanup Tools")
with st.sidebar.expander("⚠️ Cleanup Test Data"):
st.sidebar.warning("This will delete all test records from DynamoDB and S3")
if st.sidebar.button("Delete All Test Records"):
try:
# Scan for test records in DynamoDB
from boto3.dynamodb.conditions import Attr
response = metadata_table.scan(
FilterExpression=Attr('food_name').begins_with('TEST_')
)
total_records = 0
deleted_records = 0
s3_paths = []
# Delete each record and collect S3 paths
for item in response.get('Items', []):
total_records += 1
try:
metadata_table.delete_item(Key={'image_id': item['image_id']})
deleted_records += 1
s3_paths.append(item['s3_path'])
except Exception as e:
st.sidebar.error(f"Failed to delete record {item['image_id']}: {e}")
# Delete S3 objects
for s3_path in set(s3_paths): # Use set to remove duplicates
try:
s3.delete_object(Bucket=S3_BUCKET_NAME, Key=s3_path)
except Exception as e:
st.sidebar.error(f"Failed to delete S3 object {s3_path}: {e}")
st.sidebar.success(f"Deleted {deleted_records}/{total_records} test records and their S3 objects")
except Exception as e:
st.sidebar.error(f"Cleanup error: {e}")
# Add Debug Info Panel at the end of your app
if debug_mode and "test_debug_details" in st.session_state and st.session_state["test_debug_details"]:
with st.expander("πŸ” DEBUG: Last Test Results"):
st.json(st.session_state["test_debug_details"])
if st.button("Clear Debug History"):
st.session_state["test_debug_details"] = []
st.rerun()