Spaces:
Runtime error
Runtime error
import gradio as gr | |
import cv2 | |
import numpy as np | |
import requests | |
from PIL import Image | |
from autogen import AssistantAgent, GroupChat, GroupChatManager | |
import os | |
import time | |
#import openai | |
from ultralytics import YOLO | |
model = YOLO("yolov8n.pt") # Nano model for speed, fine-tune on food data later | |
# Agent Functions | |
def recognize_foods(image): | |
start = time.time() | |
# Resize to 640x640 (YOLO default) | |
pil_image = Image.fromarray(image).resize((640, 640)) | |
results = model(pil_image) | |
foods = [] | |
for result in results: | |
for cls in result.boxes.cls: | |
label = model.names[int(cls)] | |
if "food" in label.lower() or label in ["pasta", "rice", "tomato", "potato", "bread"]: # Expand this list | |
conf = result.boxes.conf[result.boxes.cls == cls].item() | |
foods.append((label, conf)) | |
print(f"Recognition took {time.time() - start:.2f}s") | |
return list(set(foods)) # Remove duplicates | |
def estimate_sizes(image, foods): | |
start = time.time() | |
img_cv = cv2.cvtColor(image, cv2.COLOR_RGB2BGR).resize((640, 640)) # Match YOLO size | |
sizes = {} | |
total_area = img_cv.shape[0] * img_cv.shape[1] | |
for food, _ in foods: | |
# Dummy: assume area proportion (refine with food-specific weights later) | |
area = total_area / len(foods) # Even split for now | |
grams = min(500, int(area / (640 * 640) * 100)) # 100g per ~640k pixels | |
sizes[food] = grams | |
print(f"Size estimation took {time.time() - start:.2f}s") | |
return sizes | |
def fetch_nutrition(foods_with_sizes, nutritionix_key): | |
if not nutritionix_key: | |
return "Please provide a Nutritionix API key for nutrition data." | |
start = time.time() | |
url = "https://trackapi.nutritionix.com/v2/natural/nutrients" | |
headers = { | |
"x-app-id": os.getenv("NUTRITIONIX_APP_ID"), # From HF Secrets | |
"x-app-key": nutritionix_key, # User's key | |
"Content-Type": "application/json" | |
} | |
# Build query from foods and sizes | |
query = "\n".join([f"{size}g {food}" for food, size in foods_with_sizes.items()]) | |
body = {"query": query} | |
try: | |
response = requests.post(url, headers=headers, json=body, timeout=10) | |
if response.status_code != 200: | |
return f"Nutritionix API error: {response.text}" | |
data = response.json().get("foods", []) | |
nutrition_data = {} | |
for item in data: | |
food_name = item["food_name"] | |
nutrition_data[food_name] = { | |
"calories": item.get("nf_calories", 0), | |
"protein": item.get("nf_protein", 0), | |
"fat": item.get("nf_total_fat", 0), | |
"carbs": item.get("nf_total_carbohydrate", 0) | |
} | |
print(f"Nutrition fetch took {time.time() - start:.2f}s") | |
return nutrition_data | |
except requests.Timeout: | |
return "Nutritionix API timed out." | |
except Exception as e: | |
return f"Nutritionix error: {str(e)}" | |
#def get_nutrition_advice(nutrition_data, llm_key): | |
# if not llm_key: | |
# return "No OpenAI/Grok key provided—skipping advice." | |
# try: | |
# openai.api_key = llm_key | |
# prompt = "Given this nutritional data, suggest a dietary tip:\n" | |
# for food, data in nutrition_data.items(): | |
# prompt += f"- {food}: {data['calories']} cal, {data['protein']}g protein, {data['fat']}g fat, {data['carbs']}g carbs\n" | |
# | |
# response = openai.Completion.create( | |
# model="text-davinci-003", # Swap for Grok if xAI API is available | |
# prompt=prompt, | |
# max_tokens=50 | |
# ) | |
# return response.choices[0].text.strip() | |
# except Exception as e: | |
# return f"Error with LLM key: {str(e)}" | |
# AutoGen Agent Definitions | |
food_recognizer = AssistantAgent( | |
name="FoodRecognizer", | |
system_message="Identify all food items in the image and return a list of (label, probability) pairs.", | |
function_map={"recognize_foods": recognize_foods} | |
) | |
size_estimator = AssistantAgent( | |
name="SizeEstimator", | |
system_message="Estimate portion sizes in grams for each recognized food based on the image.", | |
function_map={"estimate_sizes": estimate_sizes} | |
) | |
nutrition_fetcher = AssistantAgent( | |
name="NutritionFetcher", | |
system_message="Fetch nutritional data from the Nutritionix API using the user's key.", | |
function_map={"fetch_nutrition": fetch_nutrition} | |
) | |
##advice_agent = AssistantAgent( | |
## name="NutritionAdvisor", | |
## system_message="Provide basic nutrition advice using the user's OpenAI/Grok key." | |
##) | |
orchestrator = AssistantAgent( | |
name="Orchestrator", | |
system_message="Coordinate the workflow, format the output, and return the final result as text.", | |
function_map={} | |
) | |
group_chat = GroupChat( | |
agents=[food_recognizer, size_estimator, nutrition_fetcher, orchestrator], | |
messages=[], | |
max_round=10 | |
) | |
manager = GroupChatManager(groupchat=group_chat) | |
# Orchestrator Logic (via AutoGen chat) | |
def orchestrate_workflow(image, nutritionix_key): | |
start = time.time() | |
# Initiate chat with Orchestrator, passing image and key as message | |
message = f"Process this image: {image} with Nutritionix key: {nutritionix_key}" | |
response = manager.initiate_chat( | |
orchestrator, | |
message=message, | |
max_turns=10 | |
) | |
# Extract and format the final response from the chat | |
result = response[-1].get("content", "No output from agents.") | |
print(f"Total time: {time.time() - start:.2f}s") | |
return result | |
# Gradio Interface | |
interface = gr.Interface( | |
fn=orchestrate_workflow, | |
inputs=[ | |
gr.Image(type="numpy", label="Upload a Food Photo"), | |
gr.Textbox(type="password", label="Your Nutritionix API Key (required)"), | |
#gr.Textbox(type="password", label="Your OpenAI/Grok API Key (optional for advice)") | |
], | |
outputs=[ | |
gr.Textbox(label="Nutrition Breakdown"), | |
#gr.Textbox(label="Nutrition Advice") | |
], | |
title="Food Nutrition Analyzer", | |
description="Upload a food photo and provide your Nutritionix API key. Add an OpenAI/Grok key for advice." | |
) | |
if __name__ == "__main__": | |
interface.launch() |