Spaces:
Sleeping
Sleeping
import streamlit as st | |
import json | |
import os | |
import boto3 | |
from PIL import Image | |
import firebase_admin | |
from firebase_admin import credentials, auth | |
import pandas as pd | |
import uuid | |
from datetime import datetime | |
from io import BytesIO | |
import streamlit_tags as st_tags | |
from dotenv import load_dotenv | |
# Load environment variables from .env file if it exists | |
load_dotenv() | |
# Load AWS credentials using correct HF Secrets | |
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY") | |
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY") | |
AWS_REGION = os.getenv("AWS_REGION", "us-east-1") | |
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "food-image-crowdsourcing") | |
DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE", "image_metadata") | |
# Load Firebase credentials | |
FIREBASE_CONFIG = json.loads(os.getenv("FIREBASE_CONFIG", "{}")) | |
# Initialize Firebase Admin SDK (Prevent multiple initialization) | |
if not firebase_admin._apps: | |
try: | |
cred = credentials.Certificate(FIREBASE_CONFIG) | |
firebase_admin.initialize_app(cred) | |
except Exception as e: | |
st.error(f"Firebase initialization error: {e}") | |
if st.button("Continue in Demo Mode"): | |
st.session_state["demo_mode"] = True | |
else: | |
st.stop() | |
# Initialize AWS Services (S3 & DynamoDB) | |
try: | |
s3 = boto3.client( | |
"s3", | |
aws_access_key_id=AWS_ACCESS_KEY, | |
aws_secret_access_key=AWS_SECRET_KEY, | |
region_name=AWS_REGION | |
) | |
dynamodb = boto3.resource( | |
"dynamodb", | |
region_name=AWS_REGION, | |
aws_access_key_id=AWS_ACCESS_KEY, | |
aws_secret_access_key=AWS_SECRET_KEY, | |
) | |
metadata_table = dynamodb.Table(DYNAMODB_TABLE) | |
except Exception as e: | |
st.error(f"AWS initialization error: {e}") | |
if st.button("Continue in Demo Mode"): | |
st.session_state["demo_mode"] = True | |
else: | |
st.stop() | |
# Food Intellisense List | |
FOOD_SUGGESTIONS = [ | |
"Apple", "Banana", "Pizza", "Burger", "Pasta", "Sushi", "Tacos", "Salad", | |
"Chicken (Baked)", "Chicken (Roasted)", "Chicken (Grilled)", "Chicken (Boiled)", | |
"Fish (Grilled)", "Fish (Fried)", "Fish (Steamed)", "Beef Steak", "Pork Chops", | |
"Spaghetti Carbonara", "Lasagna", "Pad Thai", "Dim Sum", "Kimchi Fried Rice", | |
"Biryani", "Croissant", "Baguette", "Miso Soup", "Ramen", "Pierogi", "Gyoza", | |
"Schnitzel", "Creole Gumbo", "Jambalaya", "Tandoori Chicken", "Falafel", "Shawarma" | |
] # Extended with diverse cuisines | |
# Unit options for food weight/volume | |
UNIT_OPTIONS = ["grams", "ounces", "teaspoons", "tablespoons", "cups", "slices", "pieces"] | |
# Cooking methods | |
COOKING_METHODS = [ | |
"Baked", "Boiled", "Broiled", "Fried", "Grilled", "Microwaved", | |
"Pan-seared", "Poached", "Raw", "Roasted", "Sautéed", "Steamed", | |
"Stewed", "Stir-fried", "Takeout/Restaurant", "Unknown" | |
] | |
# Helper functions | |
def resize_image(image, max_size=512, quality=85): | |
""" | |
Resize image while preserving aspect ratio and reducing file size | |
Args: | |
image: PIL Image object | |
max_size: Maximum dimension (width or height) | |
quality: JPEG quality (0-100) | |
Returns: | |
Resized PIL Image | |
""" | |
# Calculate new dimensions | |
width, height = image.width, image.height | |
# Only resize if the image is larger than max_size | |
if width > max_size or height > max_size: | |
if width > height: | |
new_width = max_size | |
new_height = int(height * (max_size / width)) | |
else: | |
new_height = max_size | |
new_width = int(width * (max_size / height)) | |
# Resize the image | |
resized_img = image.resize((new_width, new_height), Image.LANCZOS) | |
else: | |
# If image is already smaller than max_size, don't resize | |
return image | |
# Convert to RGB if image has alpha channel (for JPEG conversion) | |
if resized_img.mode == 'RGBA': | |
resized_img = resized_img.convert('RGB') | |
# Compress the image | |
buffer = BytesIO() | |
resized_img.save(buffer, format="JPEG", quality=quality, optimize=True) | |
buffer.seek(0) | |
# Return the compressed image | |
return Image.open(buffer) | |
def get_image_size_kb(image): | |
"""Get image file size in KB""" | |
buffer = BytesIO() | |
image.save(buffer, format="JPEG") | |
size_bytes = buffer.tell() | |
return size_bytes / 1024 # Convert to KB | |
def upload_to_s3(image, user_id): | |
"""Upload image to S3 bucket and return the S3 path""" | |
if st.session_state.get("demo_mode", False): | |
return f"demo/{user_id}/demo_image.jpg" | |
try: | |
# Generate a unique ID for the image | |
image_id = str(uuid.uuid4()) | |
timestamp = datetime.now().strftime("%Y%m%d%H%M%S") | |
s3_path = f"{user_id}/{timestamp}_{image_id}.jpg" | |
# Convert PIL image to bytes | |
buffer = BytesIO() | |
image.save(buffer, format="JPEG", quality=85, optimize=True) | |
buffer.seek(0) | |
# Upload to S3 | |
s3.upload_fileobj(buffer, S3_BUCKET_NAME, s3_path) | |
return s3_path | |
except Exception as e: | |
st.error(f"Failed to upload image: {e}") | |
return None | |
def save_metadata(user_id, s3_path, food_name, portion_size, portion_unit, cooking_method, ingredients, tokens_awarded): | |
"""Save metadata to DynamoDB""" | |
if st.session_state.get("demo_mode", False): | |
st.success("Demo mode: Metadata would be saved to DynamoDB") | |
return True | |
try: | |
# Generate a unique ID for the database entry | |
image_id = str(uuid.uuid4()) | |
timestamp = datetime.now().isoformat() | |
# Create item for DynamoDB | |
item = { | |
'image_id': image_id, | |
'user_id': user_id, | |
'upload_timestamp': timestamp, | |
'food_name': food_name, | |
'portion_size': portion_size, | |
'portion_unit': portion_unit, | |
'cooking_method': cooking_method, | |
'ingredients': ingredients, | |
's3_path': s3_path, | |
'tokens_awarded': tokens_awarded | |
} | |
# Save to DynamoDB | |
metadata_table.put_item(Item=item) | |
return True | |
except Exception as e: | |
st.error(f"Failed to save metadata: {e}") | |
return False | |
def calculate_tokens(image_quality, has_metadata, is_unique_category): | |
"""Calculate tokens based on various factors""" | |
tokens = 1 # Base token for upload | |
if image_quality == "high": | |
tokens += 1 | |
if has_metadata: | |
tokens += 1 | |
if is_unique_category: | |
tokens += 1 | |
return tokens | |
# Initialize session state for first-time users | |
if "tokens" not in st.session_state: | |
st.session_state["tokens"] = 0 | |
if "uploads_count" not in st.session_state: | |
st.session_state["uploads_count"] = 0 | |
# Streamlit Layout - Authentication Section | |
st.sidebar.title("🔑 User Authentication") | |
auth_option = st.sidebar.radio("Select an option", ["Login", "Sign Up", "Logout"]) | |
if auth_option == "Sign Up": | |
email = st.sidebar.text_input("Email") | |
password = st.sidebar.text_input("Password", type="password") | |
if st.sidebar.button("Sign Up"): | |
try: | |
if st.session_state.get("demo_mode", False): | |
st.sidebar.success("✅ Demo mode: User created successfully! Please log in.") | |
else: | |
user = auth.create_user(email=email, password=password) | |
st.sidebar.success("✅ User created successfully! Please log in.") | |
except Exception as e: | |
st.sidebar.error(f"Error: {e}") | |
if auth_option == "Login": | |
email = st.sidebar.text_input("Email") | |
password = st.sidebar.text_input("Password", type="password") | |
if st.sidebar.button("Login"): | |
try: | |
if st.session_state.get("demo_mode", False): | |
st.session_state["user_id"] = "demo_user_123" | |
st.session_state["tokens"] = 0 # Initialize token count | |
st.sidebar.success("✅ Demo mode: Logged in successfully!") | |
else: | |
user = auth.get_user_by_email(email) | |
st.session_state["user_id"] = user.uid | |
st.session_state["tokens"] = 0 # Initialize token count | |
st.sidebar.success("✅ Logged in successfully!") | |
except Exception as e: | |
st.sidebar.error(f"Login failed: {e}") | |
if auth_option == "Logout" and "user_id" in st.session_state: | |
del st.session_state["user_id"] | |
st.sidebar.success("✅ Logged out successfully!") | |
# Ensure user is logged in before uploading | |
if "user_id" not in st.session_state and not st.session_state.get("demo_mode", False): | |
st.warning("⚠️ Please log in to upload images.") | |
# Add links to guidelines and terms | |
st.markdown("### 📚 While You're Here") | |
st.markdown("Take a moment to read our guidelines and token system:") | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
if st.button("Participation Guidelines"): | |
with open("PARTICIPATION_GUIDELINES.md", "r") as f: | |
guidelines = f.read() | |
st.markdown(guidelines) | |
with col2: | |
if st.button("Token Rewards"): | |
with open("TOKEN_REWARDS.md", "r") as f: | |
rewards = f.read() | |
st.markdown(rewards) | |
with col3: | |
if st.button("Terms of Service"): | |
with open("TERMS_OF_SERVICE.md", "r") as f: | |
terms = f.read() | |
st.markdown(terms) | |
st.stop() | |
# Streamlit Layout - Main App | |
st.title("🍽️ Food Image Review & Annotation") | |
# Compliance & Disclaimer Section | |
with st.expander("📜 Terms & Conditions", expanded=False): | |
st.markdown("### **Terms & Conditions**") | |
st.write( | |
"By uploading an image, you agree to transfer full copyright to the research team for AI training purposes." | |
" You are responsible for ensuring you own the image and it does not violate any copyright laws." | |
" We do not guarantee when tokens will be redeemable. Keep track of your user ID.") | |
terms_accepted = st.checkbox("I agree to the terms and conditions", key="terms_accepted") | |
if not terms_accepted: | |
st.warning("⚠️ You must agree to the terms before proceeding.") | |
st.stop() | |
# Upload Image | |
uploaded_file = st.file_uploader("Upload an image of your food", type=["jpg", "png", "jpeg"]) | |
if uploaded_file: | |
original_img = Image.open(uploaded_file) | |
st.session_state["original_image"] = original_img | |
# If an image has been uploaded, process and display it | |
if "original_image" in st.session_state: | |
original_img = st.session_state["original_image"] | |
# Process the image - resize and compress | |
processed_img = resize_image(original_img, max_size=512, quality=85) | |
st.session_state["processed_image"] = processed_img | |
# Calculate file sizes | |
original_size = get_image_size_kb(original_img) | |
processed_size = get_image_size_kb(processed_img) | |
size_reduction = ((original_size - processed_size) / original_size) * 100 if original_size > 0 else 0 | |
# Display images side by side | |
col1, col2 = st.columns(2) | |
with col1: | |
st.subheader("📷 Original Image") | |
st.image(original_img, caption=f"Original ({original_img.width}x{original_img.height} px, {original_size:.1f} KB)", use_container_width=True) | |
with col2: | |
st.subheader("🖼️ Processed Image") | |
st.image(processed_img, caption=f"Processed ({processed_img.width}x{processed_img.height} px, {processed_size:.1f} KB)", use_container_width=True) | |
# Show size reduction | |
if size_reduction > 5: # Only show if there's a meaningful reduction | |
st.success(f"✅ Image size reduced by {size_reduction:.1f}% for faster uploads and processing") | |
# Food metadata form | |
st.subheader("🍲 Food Details") | |
food_name = st.selectbox("Food Name", options=[""] + FOOD_SUGGESTIONS, index=0) | |
if food_name == "": | |
food_name = st.text_input("Or enter a custom food name") | |
col1, col2 = st.columns(2) | |
with col1: | |
portion_size = st.number_input("Portion Size", min_value=0.1, step=0.1) | |
with col2: | |
portion_unit = st.selectbox("Unit", options=UNIT_OPTIONS) | |
cooking_method = st.selectbox("Cooking Method", options=[""] + COOKING_METHODS) | |
ingredients = st_tags.st_tags( | |
label="Main Ingredients (Add up to 5)", | |
text="Press enter to add", | |
value=[], | |
suggestions=["Salt", "Pepper", "Olive Oil", "Butter", "Garlic", "Onion", "Tomato"], | |
maxtags=5 | |
) | |
# Submit button | |
if st.button("📤 Submit Food Image"): | |
with st.spinner("Processing your submission..."): | |
# Determine image quality (simplified version) | |
image_quality = "high" if original_img.width >= 1000 and original_img.height >= 1000 else "standard" | |
# Check if metadata is complete | |
has_metadata = bool(food_name and portion_size and portion_unit and cooking_method) | |
# Check if the food is in a unique category (simplified) | |
is_unique_category = food_name not in ["Pizza", "Burger", "Pasta", "Salad"] | |
# Calculate tokens | |
tokens_awarded = calculate_tokens(image_quality, has_metadata, is_unique_category) | |
# Upload image to S3 | |
s3_path = upload_to_s3(processed_img, st.session_state["user_id"]) | |
if s3_path: | |
# Save metadata to DynamoDB | |
success = save_metadata( | |
st.session_state["user_id"], | |
s3_path, | |
food_name, | |
float(portion_size), | |
portion_unit, | |
cooking_method, | |
ingredients, | |
tokens_awarded | |
) | |
if success: | |
st.session_state["tokens"] += tokens_awarded | |
st.session_state["uploads_count"] += 1 | |
st.success(f"✅ Food image uploaded successfully! You earned {tokens_awarded} tokens.") | |
# Clear the form and image for a new submission | |
st.session_state.pop("original_image", None) | |
st.session_state.pop("processed_image", None) | |
st.experimental_rerun() | |
else: | |
st.error("Failed to save metadata. Please try again.") | |
else: | |
st.error("Failed to upload image. Please try again.") | |
# Display earned tokens | |
st.sidebar.markdown("---") | |
st.sidebar.subheader("🏆 Your Statistics") | |
st.sidebar.info(f"🪙 Total Tokens: {st.session_state['tokens']}") | |
st.sidebar.info(f"📸 Total Uploads: {st.session_state.get('uploads_count', 0)}") | |
# Help and Documentation Links | |
st.sidebar.markdown("---") | |
st.sidebar.subheader("📚 Resources") | |
if st.sidebar.button("Participation Guidelines"): | |
with open("PARTICIPATION_GUIDELINES.md", "r") as f: | |
guidelines = f.read() | |
st.sidebar.markdown(guidelines) | |
if st.sidebar.button("Token Rewards System"): | |
with open("TOKEN_REWARDS.md", "r") as f: | |
rewards = f.read() | |
st.sidebar.markdown(rewards) | |
if st.sidebar.button("Terms of Service"): | |
with open("TERMS_OF_SERVICE.md", "r") as f: | |
terms = f.read() | |
st.sidebar.markdown(terms) |