Spaces:
Sleeping
Sleeping
import streamlit as st | |
import json | |
import os | |
import uuid | |
from datetime import datetime | |
from io import BytesIO | |
from decimal import Decimal # Add this import for DynamoDB float handling | |
# Third-party library imports | |
import boto3 | |
from PIL import Image | |
import firebase_admin | |
from firebase_admin import credentials, auth | |
import pandas as pd | |
import streamlit_tags as st_tags | |
from dotenv import load_dotenv | |
# Load environment variables from .env file if it exists | |
load_dotenv() | |
# Load AWS credentials using correct HF Secrets | |
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY") | |
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY") | |
AWS_REGION = os.getenv("AWS_REGION", "us-east-1") | |
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "food-image-crowdsourcing") | |
DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE", "image_metadata") | |
# Load Firebase credentials | |
FIREBASE_CONFIG = json.loads(os.getenv("FIREBASE_CONFIG", "{}")) | |
# Initialize Firebase Admin SDK (Prevent multiple initialization) | |
if not firebase_admin._apps: | |
try: | |
cred = credentials.Certificate(FIREBASE_CONFIG) | |
firebase_admin.initialize_app(cred) | |
except Exception as e: | |
st.error(f"Firebase initialization error: {e}") | |
if st.button("Continue in Demo Mode"): | |
st.session_state["demo_mode"] = True | |
else: | |
st.stop() | |
# Initialize AWS Services (S3 & DynamoDB) | |
try: | |
s3 = boto3.client( | |
"s3", | |
aws_access_key_id=AWS_ACCESS_KEY, | |
aws_secret_access_key=AWS_SECRET_KEY, | |
region_name=AWS_REGION | |
) | |
dynamodb = boto3.resource( | |
"dynamodb", | |
region_name=AWS_REGION, | |
aws_access_key_id=AWS_ACCESS_KEY, | |
aws_secret_access_key=AWS_SECRET_KEY, | |
) | |
metadata_table = dynamodb.Table(DYNAMODB_TABLE) | |
except Exception as e: | |
st.error(f"AWS initialization error: {e}") | |
if st.button("Continue in Demo Mode"): | |
st.session_state["demo_mode"] = True | |
else: | |
st.stop() | |
# Food Intellisense List | |
FOOD_SUGGESTIONS = [ | |
"Apple", "Banana", "Pizza", "Burger", "Pasta", "Sushi", "Tacos", "Salad", | |
"Chicken (Baked)", "Chicken (Roasted)", "Chicken (Grilled)", "Chicken (Boiled)", | |
"Fish (Grilled)", "Fish (Fried)", "Fish (Steamed)", "Beef Steak", "Pork Chops", | |
"Spaghetti Carbonara", "Lasagna", "Pad Thai", "Dim Sum", "Kimchi Fried Rice", | |
"Biryani", "Croissant", "Baguette", "Miso Soup", "Ramen", "Pierogi", "Gyoza", | |
"Schnitzel", "Creole Gumbo", "Jambalaya", "Tandoori Chicken", "Falafel", "Shawarma" | |
] # Extended with diverse cuisines | |
# Unit options for food weight/volume | |
UNIT_OPTIONS = ["grams", "ounces", "teaspoons", "tablespoons", "cups", "slices", "pieces"] | |
# Cooking methods | |
COOKING_METHODS = [ | |
"Baked", "Boiled", "Broiled", "Fried", "Grilled", "Microwaved", | |
"Pan-seared", "Poached", "Raw", "Roasted", "Sautéed", "Steamed", | |
"Stewed", "Stir-fried", "Takeout/Restaurant", "Unknown" | |
] | |
# Helper functions | |
def resize_image(image, max_size=512, quality=85): | |
""" | |
Resize image while preserving aspect ratio and reducing file size | |
Args: | |
image: PIL Image object | |
max_size: Maximum dimension (width or height) | |
quality: JPEG quality (0-100) | |
Returns: | |
Resized PIL Image | |
""" | |
# Calculate new dimensions | |
width, height = image.width, image.height | |
# Only resize if the image is larger than max_size | |
if width > max_size or height > max_size: | |
if width > height: | |
new_width = max_size | |
new_height = int(height * (max_size / width)) | |
else: | |
new_height = max_size | |
new_width = int(width * (max_size / height)) | |
# Resize the image | |
resized_img = image.resize((new_width, new_height), Image.LANCZOS) | |
else: | |
# If image is already smaller than max_size, don't resize | |
return image | |
# Convert to RGB if image has alpha channel (for JPEG conversion) | |
if resized_img.mode == 'RGBA': | |
resized_img = resized_img.convert('RGB') | |
# Compress the image | |
buffer = BytesIO() | |
resized_img.save(buffer, format="JPEG", quality=quality, optimize=True) | |
buffer.seek(0) | |
# Return the compressed image | |
return Image.open(buffer) | |
def get_image_size_kb(image): | |
"""Get image file size in KB""" | |
buffer = BytesIO() | |
image.save(buffer, format="JPEG") | |
size_bytes = buffer.tell() | |
return size_bytes / 1024 # Convert to KB | |
def upload_to_s3(image, user_id): | |
"""Upload image to S3 bucket and return the S3 path""" | |
if st.session_state.get("demo_mode", False): | |
return f"demo/{user_id}/demo_image.jpg" | |
try: | |
# Generate a unique ID for the image | |
image_id = str(uuid.uuid4()) | |
timestamp = datetime.now().strftime("%Y%m%d%H%M%S") | |
s3_path = f"{user_id}/{timestamp}_{image_id}.jpg" | |
# Convert PIL image to bytes | |
buffer = BytesIO() | |
image.save(buffer, format="JPEG", quality=85, optimize=True) | |
buffer.seek(0) | |
# Upload to S3 | |
s3.upload_fileobj(buffer, S3_BUCKET_NAME, s3_path) | |
return s3_path | |
except Exception as e: | |
st.error(f"Failed to upload image: {e}") | |
return None | |
def save_metadata(user_id, s3_path, food_name, portion_size, portion_unit, cooking_method, ingredients, tokens_awarded): | |
"""Save metadata to DynamoDB""" | |
if st.session_state.get("demo_mode", False): | |
st.success("Demo mode: Metadata would be saved to DynamoDB") | |
return True | |
try: | |
# Generate a unique ID for the database entry | |
image_id = str(uuid.uuid4()) | |
timestamp = datetime.now().isoformat() | |
# Ensure portion_size is a Decimal (DynamoDB doesn't support float) | |
if not isinstance(portion_size, Decimal): | |
portion_size = Decimal(str(portion_size)) | |
# Create item for DynamoDB | |
item = { | |
'image_id': image_id, | |
'user_id': user_id, | |
'upload_timestamp': timestamp, | |
'food_name': food_name, | |
'portion_size': portion_size, # Decimal type | |
'portion_unit': portion_unit, | |
'cooking_method': cooking_method, | |
'ingredients': ingredients, | |
's3_path': s3_path, | |
'tokens_awarded': tokens_awarded | |
} | |
# Save to DynamoDB | |
metadata_table.put_item(Item=item) | |
return True | |
except Exception as e: | |
st.error(f"Failed to save metadata: {e}") | |
return False | |
def calculate_tokens(image_quality, has_metadata, is_unique_category): | |
"""Calculate tokens based on various factors""" | |
tokens = 1 # Base token for upload | |
if image_quality == "high": | |
tokens += 1 | |
if has_metadata: | |
tokens += 1 | |
if is_unique_category: | |
tokens += 1 | |
return tokens | |
# Initialize session state for first-time users | |
if "tokens" not in st.session_state: | |
st.session_state["tokens"] = 0 | |
if "uploads_count" not in st.session_state: | |
st.session_state["uploads_count"] = 0 | |
# Initialize food items list for storing multiple annotations | |
if "food_items" not in st.session_state: | |
st.session_state["food_items"] = [] | |
# Initialize form input state variables | |
if "custom_food_name" not in st.session_state: | |
st.session_state["custom_food_name"] = "" | |
def reset_form_fields(): | |
"""Reset all form fields after adding an item""" | |
# Only reset custom food name, keep the dropdown at its current value | |
st.session_state["custom_food_name"] = "" | |
# We don't reset the dropdown selection as users might want to add multiple similar items | |
def add_food_item(food_name, portion_size, portion_unit, cooking_method, ingredients): | |
"""Add a food item to the session state""" | |
if food_name and portion_size and portion_unit and cooking_method: | |
# Add the food item to the session state | |
st.session_state["food_items"].append({ | |
"food_name": food_name, | |
"portion_size": portion_size, | |
"portion_unit": portion_unit, | |
"cooking_method": cooking_method, | |
"ingredients": ingredients | |
}) | |
st.success(f"✅ Added {food_name} to your submission") | |
reset_form_fields() | |
return True | |
else: | |
st.error("❌ Please fill in all required fields") | |
return False | |
# Streamlit Layout - Authentication Section | |
st.sidebar.title("🔑 User Authentication") | |
auth_option = st.sidebar.radio("Select an option", ["Login", "Sign Up", "Logout"]) | |
if auth_option == "Sign Up": | |
email = st.sidebar.text_input("Email") | |
password = st.sidebar.text_input("Password", type="password") | |
if st.sidebar.button("Sign Up"): | |
try: | |
if st.session_state.get("demo_mode", False): | |
st.sidebar.success("✅ Demo mode: User created successfully! Please log in.") | |
else: | |
user = auth.create_user(email=email, password=password) | |
st.sidebar.success("✅ User created successfully! Please log in.") | |
except Exception as e: | |
st.sidebar.error(f"Error: {e}") | |
if auth_option == "Login": | |
email = st.sidebar.text_input("Email") | |
password = st.sidebar.text_input("Password", type="password") | |
if st.sidebar.button("Login"): | |
try: | |
if st.session_state.get("demo_mode", False): | |
st.session_state["user_id"] = "demo_user_123" | |
st.session_state["tokens"] = 0 # Initialize token count | |
st.sidebar.success("✅ Demo mode: Logged in successfully!") | |
else: | |
user = auth.get_user_by_email(email) | |
st.session_state["user_id"] = user.uid | |
st.session_state["tokens"] = 0 # Initialize token count | |
st.sidebar.success("✅ Logged in successfully!") | |
except Exception as e: | |
st.sidebar.error(f"Login failed: {e}") | |
if auth_option == "Logout" and "user_id" in st.session_state: | |
del st.session_state["user_id"] | |
st.sidebar.success("✅ Logged out successfully!") | |
# Ensure user is logged in before uploading | |
if "user_id" not in st.session_state and not st.session_state.get("demo_mode", False): | |
st.warning("⚠️ Please log in to upload images.") | |
# Add links to guidelines and terms | |
st.markdown("### 📚 While You're Here") | |
st.markdown("Take a moment to read our guidelines and token system:") | |
# Use expanders instead of columns for better document display | |
with st.expander("📋 Participation Guidelines"): | |
try: | |
with open("PARTICIPATION_GUIDELINES.md", "r") as f: | |
guidelines = f.read() | |
st.markdown(guidelines, unsafe_allow_html=True) | |
except Exception as e: | |
st.error(f"Could not load guidelines: {e}") | |
with st.expander("🪙 Token Rewards System"): | |
try: | |
with open("TOKEN_REWARDS.md", "r") as f: | |
rewards = f.read() | |
st.markdown(rewards, unsafe_allow_html=True) | |
except Exception as e: | |
st.error(f"Could not load rewards information: {e}") | |
with st.expander("📜 Terms of Service"): | |
try: | |
with open("TERMS_OF_SERVICE.md", "r") as f: | |
terms = f.read() | |
st.markdown(terms, unsafe_allow_html=True) | |
except Exception as e: | |
st.error(f"Could not load terms: {e}") | |
st.stop() | |
# Streamlit Layout - Main App | |
st.title("🍽️ Food Image Review & Annotation") | |
# Compliance & Disclaimer Section | |
with st.expander("📜 Terms & Conditions", expanded=False): | |
st.markdown("### **Terms & Conditions**") | |
st.write( | |
"By uploading an image, you agree to transfer full copyright to the research team for AI training purposes." | |
" You are responsible for ensuring you own the image and it does not violate any copyright laws." | |
" We do not guarantee when tokens will be redeemable. Keep track of your user ID.") | |
terms_accepted = st.checkbox("I agree to the terms and conditions", key="terms_accepted") | |
if not terms_accepted: | |
st.warning("⚠️ You must agree to the terms before proceeding.") | |
st.stop() | |
# Upload Image | |
uploaded_file = st.file_uploader("Upload an image of your food", type=["jpg", "png", "jpeg"]) | |
if uploaded_file: | |
original_img = Image.open(uploaded_file) | |
st.session_state["original_image"] = original_img | |
# If an image has been uploaded, process and display it | |
if "original_image" in st.session_state: | |
original_img = st.session_state["original_image"] | |
# Process the image - resize and compress with more visible difference | |
processed_img = resize_image(original_img, max_size=512, quality=85) | |
st.session_state["processed_image"] = processed_img | |
# Calculate file sizes | |
original_size = get_image_size_kb(original_img) | |
processed_size = get_image_size_kb(processed_img) | |
size_reduction = ((original_size - processed_size) / original_size) * 100 if original_size > 0 else 0 | |
# Display images side by side with border to highlight differences | |
col1, col2 = st.columns(2) | |
with col1: | |
st.subheader("📷 Original Image") | |
st.markdown(f"<div style='border:2px solid red;padding:5px;'>", unsafe_allow_html=True) | |
st.image(original_img, caption=f"Original ({original_img.width}x{original_img.height} px, {original_size:.1f} KB)", use_container_width=True) | |
st.markdown("</div>", unsafe_allow_html=True) | |
with col2: | |
st.subheader("🖼️ Processed Image") | |
st.markdown(f"<div style='border:2px solid green;padding:5px;'>", unsafe_allow_html=True) | |
st.image(processed_img, caption=f"Processed ({processed_img.width}x{processed_img.height} px, {processed_size:.1f} KB)", use_container_width=True) | |
st.markdown("</div>", unsafe_allow_html=True) | |
# Show size reduction | |
if size_reduction > 5: # Only show if there's a meaningful reduction | |
st.success(f"✅ Image size reduced by {size_reduction:.1f}% for faster uploads and processing") | |
# Display existing food annotations if any | |
if st.session_state["food_items"]: | |
st.subheader("📋 Added Food Items") | |
for i, item in enumerate(st.session_state["food_items"]): | |
with st.expander(f"🍽️ {item['food_name']} ({item['portion_size']} {item['portion_unit']})"): | |
st.write(f"**Cooking Method:** {item['cooking_method']}") | |
st.write(f"**Ingredients:** {', '.join(item['ingredients'])}") | |
if st.button(f"Remove Item #{i+1}", key=f"remove_{i}"): | |
st.session_state["food_items"].pop(i) | |
st.rerun() | |
# Food metadata form | |
st.subheader("🍲 Add Food Details") | |
# Use Streamlit form to capture Enter key and provide a better UX | |
with st.form(key="food_item_form"): | |
food_selection = st.selectbox("Food Name", options=[""] + FOOD_SUGGESTIONS, index=0) | |
# Only show custom food name if the dropdown is empty | |
custom_food_name = "" | |
if food_selection == "": | |
custom_food_name = st.text_input("Or enter a custom food name", | |
value=st.session_state["custom_food_name"], | |
key="food_name_input") | |
# Determine the actual food name to use | |
food_name = food_selection if food_selection else custom_food_name | |
col1, col2 = st.columns(2) | |
with col1: | |
portion_size = st.number_input("Portion Size", min_value=0.1, step=0.1, format="%.2f") | |
with col2: | |
portion_unit = st.selectbox("Unit", options=UNIT_OPTIONS) | |
cooking_method = st.selectbox("Cooking Method", options=[""] + COOKING_METHODS) | |
ingredients = st_tags.st_tags( | |
label="Main Ingredients (Add up to 5)", | |
text="Press enter to add", | |
value=[], | |
suggestions=["Salt", "Pepper", "Olive Oil", "Butter", "Garlic", "Onion", "Tomato"], | |
maxtags=5 | |
) | |
# Submit button inside the form | |
submitted = st.form_submit_button(label="➕ Add This Food Item") | |
if submitted: | |
if add_food_item(food_name, portion_size, portion_unit, cooking_method, ingredients): | |
# Store custom food name for next reset | |
if custom_food_name: | |
st.session_state["custom_food_name"] = custom_food_name | |
st.rerun() | |
# Separate section for quick-add common foods | |
if "original_image" in st.session_state: | |
with st.expander("🚀 Quick Add Common Foods"): | |
st.info("Click to quickly add common food items with default values") | |
quick_add_cols = st.columns(3) | |
common_foods = [ | |
{"name": "French Fries", "portion": 100, "unit": "grams", "cooking": "Fried", "ingredients": ["Potatoes", "Salt", "Oil"]}, | |
{"name": "Hamburger", "portion": 1, "unit": "pieces", "cooking": "Grilled", "ingredients": ["Beef", "Bun", "Lettuce", "Tomato"]}, | |
{"name": "Salad", "portion": 150, "unit": "grams", "cooking": "Raw", "ingredients": ["Lettuce", "Tomato", "Cucumber"]} | |
] | |
for i, food in enumerate(common_foods): | |
with quick_add_cols[i % 3]: | |
if st.button(f"+ {food['name']}", key=f"quick_{i}"): | |
add_food_item( | |
food['name'], | |
food['portion'], | |
food['unit'], | |
food['cooking'], | |
food['ingredients'] | |
) | |
st.rerun() | |
# Divider before submit button | |
st.markdown("---") | |
# Submit all foods button - outside the form | |
if st.button("📤 Submit All Food Items", disabled=len(st.session_state["food_items"]) == 0): | |
if not st.session_state["food_items"]: | |
st.error("❌ Please add at least one food item before submitting") | |
else: | |
with st.spinner("Processing your submission..."): | |
all_saved = True | |
total_tokens = 0 | |
# Determine image quality (simplified version) | |
image_quality = "high" if original_img.width >= 1000 and original_img.height >= 1000 else "standard" | |
# Upload image to S3 once | |
s3_path = upload_to_s3(processed_img, st.session_state["user_id"]) | |
if s3_path: | |
# Save each food item with the same image | |
for food_item in st.session_state["food_items"]: | |
# Check if metadata is complete | |
has_metadata = True # Already validated | |
# Check if the food is in a unique category (simplified) | |
is_unique_category = food_item["food_name"] not in ["Pizza", "Burger", "Pasta", "Salad"] | |
# Calculate tokens for this item | |
tokens_awarded = calculate_tokens(image_quality, has_metadata, is_unique_category) | |
total_tokens += tokens_awarded | |
# Convert float to Decimal for DynamoDB | |
portion_size_decimal = Decimal(str(food_item["portion_size"])) | |
# Save metadata to DynamoDB | |
success = save_metadata( | |
st.session_state["user_id"], | |
s3_path, | |
food_item["food_name"], | |
portion_size_decimal, # Use Decimal type | |
food_item["portion_unit"], | |
food_item["cooking_method"], | |
food_item["ingredients"], | |
tokens_awarded | |
) | |
if not success: | |
all_saved = False | |
break | |
if all_saved: | |
st.session_state["tokens"] += total_tokens | |
st.session_state["uploads_count"] += 1 | |
st.success(f"✅ All food items uploaded successfully! You earned {total_tokens} tokens.") | |
# Clear the form and image for a new submission | |
st.session_state.pop("original_image", None) | |
st.session_state.pop("processed_image", None) | |
st.session_state["food_items"] = [] | |
st.rerun() | |
else: | |
st.error("Failed to save some items. Please try again.") | |
else: | |
st.error("Failed to upload image. Please try again.") | |
# Display earned tokens | |
st.sidebar.markdown("---") | |
st.sidebar.subheader("🏆 Your Statistics") | |
st.sidebar.info(f"🪙 Total Tokens: {st.session_state['tokens']}") | |
st.sidebar.info(f"📸 Total Uploads: {st.session_state.get('uploads_count', 0)}") | |
# Help and Documentation Links | |
st.sidebar.markdown("---") | |
st.sidebar.subheader("📚 Resources") | |
if st.sidebar.button("Participation Guidelines"): | |
with open("PARTICIPATION_GUIDELINES.md", "r") as f: | |
guidelines = f.read() | |
st.sidebar.markdown(guidelines) | |
if st.sidebar.button("Token Rewards System"): | |
with open("TOKEN_REWARDS.md", "r") as f: | |
rewards = f.read() | |
st.sidebar.markdown(rewards) | |
if st.sidebar.button("Terms of Service"): | |
with open("TERMS_OF_SERVICE.md", "r") as f: | |
terms = f.read() | |
st.sidebar.markdown(terms) | |