import gradio as gr import cv2 import numpy as np import requests from PIL import Image from autogen import AssistantAgent, GroupChat, GroupChatManager import os import time #import openai from ultralytics import YOLO model = YOLO("yolov8n.pt") # Nano model for speed, fine-tune on food data later # Agent Functions def recognize_foods(image): start = time.time() # Resize to 640x640 (YOLO default) pil_image = Image.fromarray(image).resize((640, 640)) results = model(pil_image) foods = [] for result in results: for cls in result.boxes.cls: label = model.names[int(cls)] if "food" in label.lower() or label in ["pasta", "rice", "tomato", "potato", "bread"]: # Expand this list conf = result.boxes.conf[result.boxes.cls == cls].item() foods.append((label, conf)) print(f"Recognition took {time.time() - start:.2f}s") return list(set(foods)) # Remove duplicates def estimate_sizes(image, foods): start = time.time() img_cv = cv2.cvtColor(image, cv2.COLOR_RGB2BGR).resize((640, 640)) # Match YOLO size sizes = {} total_area = img_cv.shape[0] * img_cv.shape[1] for food, _ in foods: # Dummy: assume area proportion (refine with food-specific weights later) area = total_area / len(foods) # Even split for now grams = min(500, int(area / (640 * 640) * 100)) # 100g per ~640k pixels sizes[food] = grams print(f"Size estimation took {time.time() - start:.2f}s") return sizes def fetch_nutrition(foods_with_sizes, nutritionix_key): if not nutritionix_key: return "Please provide a Nutritionix API key for nutrition data." start = time.time() url = "https://trackapi.nutritionix.com/v2/natural/nutrients" headers = { "x-app-id": os.getenv("NUTRITIONIX_APP_ID"), # From HF Secrets "x-app-key": nutritionix_key, # User's key "Content-Type": "application/json" } # Build query from foods and sizes query = "\n".join([f"{size}g {food}" for food, size in foods_with_sizes.items()]) body = {"query": query} try: response = requests.post(url, headers=headers, json=body, timeout=10) if response.status_code != 200: return f"Nutritionix API error: {response.text}" data = response.json().get("foods", []) nutrition_data = {} for item in data: food_name = item["food_name"] nutrition_data[food_name] = { "calories": item.get("nf_calories", 0), "protein": item.get("nf_protein", 0), "fat": item.get("nf_total_fat", 0), "carbs": item.get("nf_total_carbohydrate", 0) } print(f"Nutrition fetch took {time.time() - start:.2f}s") return nutrition_data except requests.Timeout: return "Nutritionix API timed out." except Exception as e: return f"Nutritionix error: {str(e)}" #def get_nutrition_advice(nutrition_data, llm_key): # if not llm_key: # return "No OpenAI/Grok key provided—skipping advice." # try: # openai.api_key = llm_key # prompt = "Given this nutritional data, suggest a dietary tip:\n" # for food, data in nutrition_data.items(): # prompt += f"- {food}: {data['calories']} cal, {data['protein']}g protein, {data['fat']}g fat, {data['carbs']}g carbs\n" # # response = openai.Completion.create( # model="text-davinci-003", # Swap for Grok if xAI API is available # prompt=prompt, # max_tokens=50 # ) # return response.choices[0].text.strip() # except Exception as e: # return f"Error with LLM key: {str(e)}" # AutoGen Agent Definitions food_recognizer = AssistantAgent( name="FoodRecognizer", system_message="Identify all food items in the image and return a list of (label, probability) pairs.", function_map={"recognize_foods": recognize_foods} ) size_estimator = AssistantAgent( name="SizeEstimator", system_message="Estimate portion sizes in grams for each recognized food based on the image.", function_map={"estimate_sizes": estimate_sizes} ) nutrition_fetcher = AssistantAgent( name="NutritionFetcher", system_message="Fetch nutritional data from the Nutritionix API using the user's key.", function_map={"fetch_nutrition": fetch_nutrition} ) ##advice_agent = AssistantAgent( ## name="NutritionAdvisor", ## system_message="Provide basic nutrition advice using the user's OpenAI/Grok key." ##) orchestrator = AssistantAgent( name="Orchestrator", system_message="Coordinate the workflow, format the output, and return the final result as text.", function_map={} ) group_chat = GroupChat( agents=[food_recognizer, size_estimator, nutrition_fetcher, orchestrator], messages=[], max_round=10 ) manager = GroupChatManager(groupchat=group_chat) # Orchestrator Logic (via AutoGen chat) def orchestrate_workflow(image, nutritionix_key): start = time.time() # Initiate chat with Orchestrator, passing image and key as message message = f"Process this image: {image} with Nutritionix key: {nutritionix_key}" response = manager.initiate_chat( orchestrator, message=message, max_turns=10 ) # Extract and format the final response from the chat result = response[-1].get("content", "No output from agents.") print(f"Total time: {time.time() - start:.2f}s") return result # Gradio Interface interface = gr.Interface( fn=orchestrate_workflow, inputs=[ gr.Image(type="numpy", label="Upload a Food Photo"), gr.Textbox(type="password", label="Your Nutritionix API Key (required)"), #gr.Textbox(type="password", label="Your OpenAI/Grok API Key (optional for advice)") ], outputs=[ gr.Textbox(label="Nutrition Breakdown"), #gr.Textbox(label="Nutrition Advice") ], title="Food Nutrition Analyzer", description="Upload a food photo and provide your Nutritionix API key. Add an OpenAI/Grok key for advice." ) if __name__ == "__main__": interface.launch()