Spaces:
Sleeping
Sleeping
import streamlit as st | |
import json | |
import os | |
import io | |
import base64 | |
import boto3 | |
import time | |
from PIL import Image | |
import firebase_admin | |
from firebase_admin import credentials, auth, firestore | |
# π Load Secrets from Hugging Face Secrets | |
FIREBASE_CONFIG = json.loads(os.getenv("FIREBASE_CONFIG", "{}")) | |
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY") | |
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY") | |
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "food-image-crowdsourcing") | |
DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE", "image_metadata") # β Corrected Table Name | |
# β Initialize Firebase Admin SDK | |
if FIREBASE_CONFIG: | |
cred = credentials.Certificate(FIREBASE_CONFIG) | |
firebase_admin.initialize_app(cred) | |
else: | |
st.error("β οΈ Firebase configuration is missing! Please check HF Secrets.") | |
# β Initialize AWS Services (S3 & DynamoDB) | |
s3 = boto3.client( | |
"s3", | |
aws_access_key_id=AWS_ACCESS_KEY, | |
aws_secret_access_key=AWS_SECRET_KEY, | |
) | |
dynamodb = boto3.resource( | |
"dynamodb", | |
aws_access_key_id=AWS_ACCESS_KEY, | |
aws_secret_access_key=AWS_SECRET_KEY, | |
) | |
metadata_table = dynamodb.Table(DYNAMODB_TABLE) # β Corrected Table Name | |
# πΉ Initialize Session State for Tokens | |
if "tokens" not in st.session_state: | |
st.session_state.tokens = 0 | |
if "user_email" not in st.session_state: | |
st.session_state.user_email = None | |
# πΉ UI - Page Title | |
st.title("π· Food Crowdsourcing Research App") | |
st.write("Help our research by uploading food images! Earn tokens for future app use.") | |
# β **User Authentication Section** | |
st.sidebar.header("π User Authentication") | |
auth_choice = st.sidebar.radio("Select an option", ["Login", "Sign Up", "Logout"]) | |
if auth_choice == "Sign Up": | |
with st.sidebar.form("signup_form"): | |
email = st.text_input("Email") | |
password = st.text_input("Password", type="password") | |
submit_button = st.form_submit_button("Sign Up") | |
if submit_button: | |
try: | |
user = auth.create_user(email=email, password=password) | |
st.success(f"β Account created! Now login with {email}.") | |
except Exception as e: | |
st.error(f"β οΈ {str(e)}") | |
if auth_choice == "Login": | |
with st.sidebar.form("login_form"): | |
email = st.text_input("Email") | |
password = st.text_input("Password", type="password") | |
submit_button = st.form_submit_button("Login") | |
if submit_button: | |
try: | |
user = auth.get_user_by_email(email) | |
st.session_state.user_email = email | |
st.success(f"β Welcome back, {email}!") | |
except Exception as e: | |
st.error(f"β οΈ {str(e)}") | |
if auth_choice == "Logout": | |
if st.session_state.user_email: | |
st.session_state.user_email = None | |
st.success("β Logged out successfully!") | |
# πΉ **Ensure User is Logged In** | |
if not st.session_state.user_email: | |
st.warning("β οΈ Please log in to upload images.") | |
st.stop() | |
# β **Image Upload Section** | |
uploaded_file = st.file_uploader("πΈ Upload a food image", type=["jpg", "png", "jpeg"]) | |
# β **Terms & Conditions** | |
st.markdown("### **Terms & Conditions**") | |
st.write( | |
"By uploading an image, you agree to transfer full copyright to the research team for AI training purposes. " | |
"You are responsible for ensuring you own the image and it does not violate any copyright laws. " | |
"We do not guarantee when tokens will be redeemable. Keep track of your user ID." | |
) | |
st.checkbox("I agree to the terms and conditions", key="terms_accepted") | |
# β **Process and Upload Image** | |
if uploaded_file and st.session_state.terms_accepted: | |
image = Image.open(uploaded_file) | |
st.image(image, caption="Uploaded Image", use_column_width=True) | |
# Show Original Size | |
st.write(f"π Original size: {image.size}") | |
# Convert PNG to JPEG if needed | |
if image.format == "PNG": | |
image = image.convert("RGB") | |
# Resize image to max 1024x1024 px | |
MAX_SIZE = (1024, 1024) | |
image.thumbnail(MAX_SIZE) | |
# Convert Image to Bytes | |
img_bytes = io.BytesIO() | |
image.save(img_bytes, format="JPEG", quality=80) # Reduce quality to save space | |
img_bytes.seek(0) | |
# πΉ Generate Unique File Name | |
file_name = f"raw-uploads/{st.session_state.user_email}_{int(time.time())}.jpg" | |
# β **Upload to S3** | |
try: | |
s3.put_object( | |
Bucket=S3_BUCKET_NAME, | |
Key=file_name, | |
Body=img_bytes.getvalue(), | |
ContentType="image/jpeg", | |
) | |
st.success("β Image uploaded successfully to S3!") | |
# β **Store Metadata in DynamoDB** | |
metadata_table.put_item( | |
Item={ | |
"user_email": st.session_state.user_email, | |
"file_name": file_name, | |
"upload_time": int(time.time()), | |
} | |
) | |
# π **Reward Tokens** | |
st.session_state.tokens += 1 | |
st.write("π You earned **1 token** for uploading!") | |
st.success(f"π’ Total tokens: {st.session_state.tokens}") | |
except Exception as e: | |
st.error(f"β οΈ Failed to upload image: {str(e)}") | |
# β **Future: Annotation for Additional Token** | |
# If annotation is provided, reward another token | |