Spaces:
Runtime error
Runtime error
File size: 13,271 Bytes
536cfbe 1a6c907 536cfbe af103fc 1a6c907 2a1c9dd f27898a 1a6c907 f27898a 536cfbe f7bbdac 536cfbe f27898a f7bbdac e7b49fa 1a6c907 e7b49fa f7bbdac e7b49fa f27898a e7b49fa f27898a f7bbdac f27898a e7b49fa f7bbdac e7b49fa 2a1c9dd 1a6c907 f27898a 536cfbe f27898a f7bbdac 1a6c907 e7b49fa 1a6c907 e7b49fa f7bbdac e7b49fa f27898a e7b49fa f7bbdac e7b49fa f7bbdac e7b49fa 1a6c907 536cfbe 1a6c907 f7bbdac 536cfbe 1a6c907 536cfbe 1a6c907 536cfbe 76aecb4 536cfbe f7bbdac 536cfbe 76aecb4 f7bbdac 76aecb4 536cfbe 76aecb4 1a6c907 76aecb4 1a6c907 76aecb4 536cfbe 1a6c907 76aecb4 536cfbe e7b49fa f7bbdac e7b49fa f7bbdac e7b49fa dbd9b9e f7bbdac dbd9b9e f7bbdac dbd9b9e f7bbdac dbd9b9e e7b49fa f7bbdac e7b49fa dbd9b9e f7bbdac dbd9b9e 1a6c907 bea540e e7b49fa 077b164 1a6c907 e7b49fa 077b164 e7b49fa dbd9b9e e7b49fa dbd9b9e e7b49fa 1a6c907 dbd9b9e 76aecb4 e7b49fa 76aecb4 f7bbdac 536cfbe e7b49fa 76aecb4 e7b49fa f7bbdac 76aecb4 536cfbe 1a6c907 9d0a3d7 f7bbdac 9d0a3d7 f7bbdac 9d0a3d7 740859e f7bbdac e7b49fa f7bbdac e7b49fa 1a6c907 536cfbe e7b49fa 536cfbe e7b49fa 536cfbe e7b49fa 536cfbe |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
import gradio as gr
import cv2
import numpy as np
from PIL import Image
from ultralytics import YOLO
import requests
import os
import time
from autogen import AssistantAgent, GroupChat, GroupChatManager
import openai
# Initialize YOLOv8 for multi-label food detection
model = YOLO("yolov8n.pt") # Nano model for speed, fine-tune on food data later
# Agent Functions (registered with AutoGen, enhanced debugging)
def recognize_foods(image):
start = time.time()
print(f"Recognize_foods called with image shape: {image.shape if image is not None else 'None'}")
# Check if image is valid (not None or empty)
if image is None or image.size == 0:
print("Warning: Invalid or empty image detected.")
return [] # Return empty list for invalid images
# Convert to RGB and resize to 640x640
try:
pil_image = Image.fromarray(image).convert('RGB').resize((640, 640))
img_np = np.array(pil_image)
print(f"Image converted to RGB, shape: {img_np.shape}, min RGB: {img_np.min()}, max RGB: {img_np.max()}")
except Exception as e:
print(f"Error processing image: {str(e)}")
return [] # Return empty list on preprocessing failure
# Run YOLOv8 detection
results = model(pil_image)
foods = []
detected = False
for result in results:
for cls in result.boxes.cls:
label = model.names[int(cls)]
if "food" in label.lower() or label in ["waffle fry", "lettuce", "cucumber", "tomato", "broccoli", "carrot", "green bean", "chicken", "turkey", "pasta", "rice", "potato", "bread", "curry"]: # Expanded list
conf = result.boxes.conf[result.boxes.cls == cls].item()
foods.append((label, conf))
detected = True
print(f"Detected: {label} with confidence {conf:.2f}, box: {result.boxes.xyxy[result.boxes.cls == cls]}")
if not detected:
print("Warning: No food items detected in the image. Check YOLOv8 model or image quality.")
print(f"Recognition took {time.time() - start:.2f}s: Found foods {foods}")
return list(set(foods)) # Remove duplicates
def estimate_sizes(image, foods):
start = time.time()
print(f"Estimate_sizes called with foods: {foods}")
if not foods:
print("Warning: No foods to estimate sizes for.")
return {}
# Resize to match YOLO output for consistency
try:
img_cv = cv2.cvtColor(image, cv2.COLOR_RGB2BGR).resize((640, 640))
print(f"Image resized to shape: {img_cv.shape}")
except Exception as e:
print(f"Error resizing image for size estimation: {str(e)}")
return {}
sizes = {}
total_area = img_cv.shape[0] * img_cv.shape[1]
# Use YOLO bounding boxes for more accurate sizing (if available)
pil_image = Image.fromarray(image).convert('RGB').resize((640, 640))
results = model(pil_image)
for result in results:
for box, cls in zip(result.boxes.xyxy, result.boxes.cls):
label = model.names[int(cls)]
if label in [food for food, _ in foods]:
box_area = (box[2] - box[0]) * (box[3] - box[1]) # Width * Height
# Simple heuristic: scale box area to grams (tune this based on data)
grams = min(500, int((box_area / (640 * 640)) * 500)) # Cap at 500g
sizes[label] = grams
print(f"Estimated size for {label}: {grams}g (via bounding box)")
# Fallback: even split if no boxes found
if not sizes:
for food, _ in foods:
area = total_area / len(foods) # Even split for now
grams = min(500, int(area / (640 * 640) * 100)) # 100g per ~640k pixels, capped at 500g
sizes[food] = grams
print(f"Estimated size for {food}: {grams}g (via fallback)")
print(f"Size estimation took {time.time() - start:.2f}s: Estimated sizes {sizes}")
return sizes
def fetch_nutrition(foods_with_sizes, nutritionix_key):
start = time.time()
print(f"Fetch_nutrition called with foods_with_sizes: {foods_with_sizes}, key: {nutritionix_key[:5]}... (partial)")
if not nutritionix_key:
print("Warning: No Nutritionix API key provided.")
return "Please provide a Nutritionix API key for nutrition data."
if not foods_with_sizes:
print("Warning: No foods to fetch nutrition for.")
return {}
url = "https://trackapi.nutritionix.com/v2/natural/nutrients"
headers = {
"x-app-id": os.getenv("NUTRITIONIX_APP_ID"), # From HF Secrets
"x-app-key": nutritionix_key, # User's key
"Content-Type": "application/json"
}
# Build query from foods and sizes
query = "\n".join([f"{size}g {food}" for food, size in foods_with_sizes.items()])
print(f"Nutritionix query: {query}")
body = {"query": query}
try:
response = requests.post(url, headers=headers, json=body, timeout=10)
if response.status_code != 200:
print(f"Nutritionix API error (status {response.status_code}): {response.text}")
return f"Nutritionix API error: {response.text}"
data = response.json().get("foods", [])
nutrition_data = {}
for item in data:
food_name = item["food_name"]
nutrition_data[food_name] = {
"calories": item.get("nf_calories", 0),
"protein": item.get("nf_protein", 0),
"fat": item.get("nf_total_fat", 0),
"carbs": item.get("nf_total_carbohydrate", 0)
}
print(f"Nutrition fetch took {time.time() - start:.2f}s: Fetched nutrition {nutrition_data}")
return nutrition_data
except requests.Timeout:
print("Nutritionix API timed out.")
return "Nutritionix API timed out."
except Exception as e:
print(f"Nutritionix error: {str(e)}")
return f"Nutritionix error: {str(e)}"
def get_nutrition_advice(nutrition_data, openai_key):
start = time.time()
print(f"Get_nutrition_advice called with nutrition_data: {nutrition_data}, key: {openai_key[:5]}... (partial)")
if not openai_key:
print("Warning: No OpenAI API key provided—skipping advice.")
return "No OpenAI key provided—skipping advice."
if not nutrition_data:
print("Warning: No nutrition data to advise on.")
return "No nutrition data available for advice."
try:
openai.api_key = openai_key
prompt = "Given this nutritional data, suggest a short dietary tip (max 50 words):\n" + "\n".join(
[f"- {food}: {data['calories']} cal, {data['protein']}g protein, {data['fat']}g fat, {data['carbs']}g carbs"
for food, data in nutrition_data.items()]
)
print(f"OpenAI prompt: {prompt}")
response = openai.Completion.create(
model="text-davinci-003",
prompt=prompt,
max_tokens=50,
temperature=0.7,
timeout=5
)
advice = response.choices[0].text.strip()
print(f"Advice took {time.time() - start:.2f}s: {advice}")
return advice
except Exception as e:
print(f"LLM error: {str(e)}")
return f"Error with OpenAI key: {str(e)}"
# AutoGen Agent Definitions
food_recognizer = AssistantAgent(
name="FoodRecognizer",
system_message="Identify all food items in the image and return a list of (label, probability) pairs. Parse the message for the image data (e.g., 'Process this image: <numpy_array>') and call recognize_foods with it.",
function_map={"recognize_foods": recognize_foods}
)
size_estimator = AssistantAgent(
name="SizeEstimator",
system_message="Estimate portion sizes in grams for each recognized food based on the image. Parse the previous message for the list of foods (e.g., '[(\"food1\", 0.85), ...]') and call estimate_sizes with the image and foods from the message history.",
function_map={"estimate_sizes": estimate_sizes}
)
nutrition_fetcher = AssistantAgent(
name="NutritionFetcher",
system_message="Fetch nutritional data from the Nutritionix API using the user's key. Parse the previous message for the foods and sizes dictionary (e.g., {'food1': 150, ...}) and the initial message for the Nutritionix key (e.g., 'with Nutritionix key: <key>'), then call fetch_nutrition.",
function_map={"fetch_nutrition": fetch_nutrition}
)
advice_agent = AssistantAgent(
name="NutritionAdvisor",
system_message="Provide basic nutrition advice based on the food data using the user's OpenAI key. Parse the previous message for the nutrition data (e.g., {'food1': {'calories': 200, ...}}) and the initial message for the OpenAI key (e.g., 'with OpenAI key: <key>'), then call get_nutrition_advice.",
function_map={"get_nutrition_advice": get_nutrition_advice}
)
orchestrator = AssistantAgent(
name="Orchestrator",
system_message="Coordinate the workflow, format the output, and return the final result as text. Parse the initial message for the image, Nutritionix key, and OpenAI key. Start by asking FoodRecognizer to process the image, then SizeEstimator, then NutritionFetcher, then NutritionAdvisor (if OpenAI key provided), and finally format the results into 'Food Analysis:\\n- food1 (size1g, prob1% confidence): calories1 cal, protein1g protein, fat1g fat, carbs1g carbs\\n...' for each food, followed by '\\nNutrition Advice:\\n' and the advice if available.",
function_map={}
)
# Custom speaker selection function (no LLM needed, updated for AutoGen 0.7.6)
def custom_select_speaker(last_speaker, groupchat):
"""Select the next speaker in a fixed order: FoodRecognizer → SizeEstimator → NutritionFetcher → NutritionAdvisor → Orchestrator."""
if last_speaker is None:
return food_recognizer # Return the Agent object, not the name
order = [food_recognizer, size_estimator, nutrition_fetcher, advice_agent, orchestrator]
current_index = order.index(last_speaker)
next_index = (current_index + 1) % len(order)
return order[next_index]
# Group Chat for Agent Coordination (no LLM for selection, custom speaker selection method)
group_chat = GroupChat(
agents=[food_recognizer, size_estimator, nutrition_fetcher, advice_agent, orchestrator],
messages=[],
max_round=5, # Increase for advice agent
speaker_selection_method=custom_select_speaker # Use correct parameter for AutoGen 0.7.6
)
manager = GroupChatManager(groupchat=group_chat)
# Orchestrator Logic (via AutoGen chat)
def orchestrate_workflow(image, nutritionix_key, openai_key=None):
start = time.time()
print(f"Orchestrate_workflow called with image shape: {image.shape if image is not None else 'None'}, "
f"Nutritionix key: {nutritionix_key[:5]}..., OpenAI key: {openai_key[:5]}... (partial)")
# Initiate chat with Orchestrator, passing image and keys as message
message = f"Process this image: {image} with Nutritionix key: {nutritionix_key}"
if openai_key:
message += f" and OpenAI key: {openai_key}"
print(f"Starting chat with message: {message[:100]}...") # Truncate for readability
response = manager.initiate_chat(
orchestrator,
message=message,
max_turns=10
)
# Extract and format the final response from the ChatResult
if hasattr(response, 'chat_history') and response.chat_history:
# Get the last message from chat history
last_message = response.chat_history[-1]
result = last_message.get("content", "No output from agents.")
print(f"Chat history last message: {result}")
else:
result = "No output from agents."
print("Warning: No chat history in response.")
if isinstance(result, dict):
result = result.get("text", "No text output from agents.")
print(f"Result is dict, extracted text: {result}")
# Split result into nutrition and advice if OpenAI key was provided
if openai_key and isinstance(result, str) and "\nNutrition Advice:\n" in result:
parts = result.split("\nNutrition Advice:\n", 1)
nutrition = parts[0] if parts[0] else "No nutrition data."
advice = parts[1] if len(parts) > 1 else "No advice available."
else:
nutrition = result if result != "No output from agents." else "No nutrition data."
advice = "No advice available (OpenAI key required)."
print(f"Total time: {time.time() - start:.2f}s, Nutrition: {nutrition[:50]}..., Advice: {advice[:50]}...")
return nutrition, advice
# Gradio Interface
interface = gr.Interface(
fn=orchestrate_workflow,
inputs=[
gr.Image(type="numpy", label="Upload a Food Photo"),
gr.Textbox(type="password", label="Your Nutritionix API Key (required)"),
gr.Textbox(type="password", label="Your OpenAI API Key (optional for advice)")
],
outputs=[
gr.Textbox(label="Nutrition Breakdown"),
gr.Textbox(label="Nutrition Advice")
],
title="Food Nutrition Analyzer",
description="Upload a food photo and provide your Nutritionix API key. Add an OpenAI key for nutrition advice."
)
if __name__ == "__main__":
interface.launch() |