meal-tracker / app.py
tdurzynski's picture
Update app.py
1a6c907 verified
raw
history blame
7.43 kB
import gradio as gr
import cv2
import numpy as np
from PIL import Image
from ultralytics import YOLO
import requests
import os
import time
from autogen import AssistantAgent, GroupChat, GroupChatManager
# Initialize YOLOv8 for multi-label food detection
model = YOLO("yolov8n.pt") # Nano model for speed, fine-tune on food data later
# Agent Functions (registered with AutoGen)
def recognize_foods(image):
start = time.time()
# Check if image is valid (not all 255s or empty)
if image is None or np.all(image == 255):
print("Warning: Invalid or empty image detected.")
return [] # Return empty list for invalid images
# Resize to 640x640 (YOLO default) to reduce load and match model input
pil_image = Image.fromarray(image).resize((640, 640))
results = model(pil_image)
foods = []
for result in results:
for cls in result.boxes.cls:
label = model.names[int(cls)]
if "food" in label.lower() or label in ["pasta", "rice", "tomato", "potato", "bread", "curry"]: # Expand this list
conf = result.boxes.conf[result.boxes.cls == cls].item()
foods.append((label, conf))
print(f"Recognition took {time.time() - start:.2f}s: Found foods {foods}")
return list(set(foods)) # Remove duplicates
def estimate_sizes(image, foods):
start = time.time()
if not foods:
print("Warning: No foods to estimate sizes for.")
return {}
# Resize to match YOLO output for consistency
img_cv = cv2.cvtColor(image, cv2.COLOR_RGB2BGR).resize((640, 640))
sizes = {}
total_area = img_cv.shape[0] * img_cv.shape[1]
for food, _ in foods:
# Dummy: assume area proportion (refine with food-specific weights or bounding boxes later)
area = total_area / len(foods) # Even split for now
grams = min(500, int(area / (640 * 640) * 100)) # 100g per ~640k pixels, capped at 500g
sizes[food] = grams
print(f"Size estimation took {time.time() - start:.2f}s: Estimated sizes {sizes}")
return sizes
def fetch_nutrition(foods_with_sizes, nutritionix_key):
start = time.time()
if not nutritionix_key:
print("Warning: No Nutritionix API key provided.")
return "Please provide a Nutritionix API key for nutrition data."
if not foods_with_sizes:
print("Warning: No foods to fetch nutrition for.")
return {}
url = "https://trackapi.nutritionix.com/v2/natural/nutrients"
headers = {
"x-app-id": os.getenv("NUTRITIONIX_APP_ID"), # From HF Secrets
"x-app-key": nutritionix_key, # User's key
"Content-Type": "application/json"
}
# Build query from foods and sizes
query = "\n".join([f"{size}g {food}" for food, size in foods_with_sizes.items()])
body = {"query": query}
try:
response = requests.post(url, headers=headers, json=body, timeout=10)
if response.status_code != 200:
print(f"Nutritionix API error: {response.text}")
return f"Nutritionix API error: {response.text}"
data = response.json().get("foods", [])
nutrition_data = {}
for item in data:
food_name = item["food_name"]
nutrition_data[food_name] = {
"calories": item.get("nf_calories", 0),
"protein": item.get("nf_protein", 0),
"fat": item.get("nf_total_fat", 0),
"carbs": item.get("nf_total_carbohydrate", 0)
}
print(f"Nutrition fetch took {time.time() - start:.2f}s: Fetched nutrition {nutrition_data}")
return nutrition_data
except requests.Timeout:
print("Nutritionix API timed out.")
return "Nutritionix API timed out."
except Exception as e:
print(f"Nutritionix error: {str(e)}")
return f"Nutritionix error: {str(e)}"
# AutoGen Agent Definitions
food_recognizer = AssistantAgent(
name="FoodRecognizer",
system_message="Identify all food items in the image and return a list of (label, probability) pairs. Call recognize_foods with the image.",
function_map={"recognize_foods": recognize_foods}
)
size_estimator = AssistantAgent(
name="SizeEstimator",
system_message="Estimate portion sizes in grams for each recognized food based on the image. Call estimate_sizes with the image and list of foods.",
function_map={"estimate_sizes": estimate_sizes}
)
nutrition_fetcher = AssistantAgent(
name="NutritionFetcher",
system_message="Fetch nutritional data from the Nutritionix API using the user's key. Call fetch_nutrition with the foods and sizes dictionary and Nutritionix key.",
function_map={"fetch_nutrition": fetch_nutrition}
)
orchestrator = AssistantAgent(
name="Orchestrator",
system_message="Coordinate the workflow, format the output, and return the final result as text. Start by asking FoodRecognizer to process the image, then SizeEstimator, then NutritionFetcher, and finally format the results.",
function_map={}
)
# Custom speaker selection function (no LLM needed, updated for AutoGen 0.7.6)
def custom_select_speaker(last_speaker, groupchat):
"""Select the next speaker in a fixed order: FoodRecognizer β†’ SizeEstimator β†’ NutritionFetcher β†’ Orchestrator."""
if last_speaker is None:
return food_recognizer # Return the Agent object, not the name
order = [food_recognizer, size_estimator, nutrition_fetcher, orchestrator]
current_index = order.index(last_speaker)
next_index = (current_index + 1) % len(order)
return order[next_index]
# Group Chat for Agent Coordination (no LLM, custom speaker selection method)
group_chat = GroupChat(
agents=[food_recognizer, size_estimator, nutrition_fetcher, orchestrator],
messages=[],
max_round=4, # Limit rounds to match agent order
speaker_selection_method=custom_select_speaker # Use correct parameter for AutoGen 0.7.6
)
manager = GroupChatManager(groupchat=group_chat)
# Orchestrator Logic (via AutoGen chat)
def orchestrate_workflow(image, nutritionix_key):
start = time.time()
# Initiate chat with Orchestrator, passing image and key as message
message = f"Process this image: {image} with Nutritionix key: {nutritionix_key}"
response = manager.initiate_chat(
orchestrator,
message=message,
max_turns=10
)
# Extract and format the final response from the ChatResult
if hasattr(response, 'chat_history') and response.chat_history:
# Get the last message from chat history
last_message = response.chat_history[-1]
result = last_message.get("content", "No output from agents.")
else:
result = "No output from agents."
if isinstance(result, dict):
result = result.get("text", "No text output from agents.")
print(f"Total time: {time.time() - start:.2f}s")
return result
# Gradio Interface
interface = gr.Interface(
fn=orchestrate_workflow,
inputs=[
gr.Image(type="numpy", label="Upload a Food Photo"),
gr.Textbox(type="password", label="Your Nutritionix API Key (required)")
],
outputs=[
gr.Textbox(label="Nutrition Breakdown")
],
title="Food Nutrition Analyzer",
description="Upload a food photo and provide your Nutritionix API key for nutrition data."
)
if __name__ == "__main__":
interface.launch()