File size: 7,429 Bytes
536cfbe
 
 
 
1a6c907
 
536cfbe
af103fc
1a6c907
f27898a
1a6c907
f27898a
536cfbe
1a6c907
536cfbe
f27898a
1a6c907
 
 
 
 
f27898a
 
 
 
 
 
1a6c907
f27898a
 
1a6c907
f27898a
536cfbe
 
f27898a
1a6c907
 
 
 
 
f27898a
 
 
1a6c907
f27898a
1a6c907
f27898a
1a6c907
536cfbe
 
 
1a6c907
536cfbe
1a6c907
536cfbe
1a6c907
 
 
536cfbe
 
 
76aecb4
 
536cfbe
 
 
 
 
 
 
76aecb4
 
1a6c907
76aecb4
536cfbe
76aecb4
 
 
 
 
 
 
 
 
 
1a6c907
76aecb4
 
1a6c907
76aecb4
536cfbe
1a6c907
76aecb4
536cfbe
dbd9b9e
 
 
1a6c907
dbd9b9e
 
 
 
 
1a6c907
dbd9b9e
 
 
 
 
1a6c907
dbd9b9e
 
 
 
 
1a6c907
dbd9b9e
 
 
1a6c907
bea540e
077b164
 
1a6c907
bea540e
077b164
 
 
 
1a6c907
dbd9b9e
734adb5
dbd9b9e
1a6c907
 
dbd9b9e
 
 
76aecb4
 
 
536cfbe
76aecb4
 
 
 
 
 
 
536cfbe
1a6c907
9d0a3d7
 
 
 
 
 
 
740859e
 
76aecb4
 
1a6c907
536cfbe
 
 
 
 
1a6c907
536cfbe
 
1a6c907
536cfbe
 
1a6c907
536cfbe
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import gradio as gr
import cv2
import numpy as np
from PIL import Image
from ultralytics import YOLO
import requests
import os
import time
from autogen import AssistantAgent, GroupChat, GroupChatManager

# Initialize YOLOv8 for multi-label food detection
model = YOLO("yolov8n.pt")  # Nano model for speed, fine-tune on food data later

# Agent Functions (registered with AutoGen)
def recognize_foods(image):
    start = time.time()
    # Check if image is valid (not all 255s or empty)
    if image is None or np.all(image == 255):
        print("Warning: Invalid or empty image detected.")
        return []  # Return empty list for invalid images
    # Resize to 640x640 (YOLO default) to reduce load and match model input
    pil_image = Image.fromarray(image).resize((640, 640))
    results = model(pil_image)
    foods = []
    for result in results:
        for cls in result.boxes.cls:
            label = model.names[int(cls)]
            if "food" in label.lower() or label in ["pasta", "rice", "tomato", "potato", "bread", "curry"]:  # Expand this list
                conf = result.boxes.conf[result.boxes.cls == cls].item()
                foods.append((label, conf))
    print(f"Recognition took {time.time() - start:.2f}s: Found foods {foods}")
    return list(set(foods))  # Remove duplicates

def estimate_sizes(image, foods):
    start = time.time()
    if not foods:
        print("Warning: No foods to estimate sizes for.")
        return {}
    # Resize to match YOLO output for consistency
    img_cv = cv2.cvtColor(image, cv2.COLOR_RGB2BGR).resize((640, 640))
    sizes = {}
    total_area = img_cv.shape[0] * img_cv.shape[1]
    for food, _ in foods:
        # Dummy: assume area proportion (refine with food-specific weights or bounding boxes later)
        area = total_area / len(foods)  # Even split for now
        grams = min(500, int(area / (640 * 640) * 100))  # 100g per ~640k pixels, capped at 500g
        sizes[food] = grams
    print(f"Size estimation took {time.time() - start:.2f}s: Estimated sizes {sizes}")
    return sizes

def fetch_nutrition(foods_with_sizes, nutritionix_key):
    start = time.time()
    if not nutritionix_key:
        print("Warning: No Nutritionix API key provided.")
        return "Please provide a Nutritionix API key for nutrition data."
    if not foods_with_sizes:
        print("Warning: No foods to fetch nutrition for.")
        return {}
    
    url = "https://trackapi.nutritionix.com/v2/natural/nutrients"
    headers = {
        "x-app-id": os.getenv("NUTRITIONIX_APP_ID"),  # From HF Secrets
        "x-app-key": nutritionix_key,                  # User's key
        "Content-Type": "application/json"
    }
    # Build query from foods and sizes
    query = "\n".join([f"{size}g {food}" for food, size in foods_with_sizes.items()])
    body = {"query": query}
    
    try:
        response = requests.post(url, headers=headers, json=body, timeout=10)
        if response.status_code != 200:
            print(f"Nutritionix API error: {response.text}")
            return f"Nutritionix API error: {response.text}"
        
        data = response.json().get("foods", [])
        nutrition_data = {}
        for item in data:
            food_name = item["food_name"]
            nutrition_data[food_name] = {
                "calories": item.get("nf_calories", 0),
                "protein": item.get("nf_protein", 0),
                "fat": item.get("nf_total_fat", 0),
                "carbs": item.get("nf_total_carbohydrate", 0)
            }
        print(f"Nutrition fetch took {time.time() - start:.2f}s: Fetched nutrition {nutrition_data}")
        return nutrition_data
    except requests.Timeout:
        print("Nutritionix API timed out.")
        return "Nutritionix API timed out."
    except Exception as e:
        print(f"Nutritionix error: {str(e)}")
        return f"Nutritionix error: {str(e)}"

# AutoGen Agent Definitions
food_recognizer = AssistantAgent(
    name="FoodRecognizer",
    system_message="Identify all food items in the image and return a list of (label, probability) pairs. Call recognize_foods with the image.",
    function_map={"recognize_foods": recognize_foods}
)

size_estimator = AssistantAgent(
    name="SizeEstimator",
    system_message="Estimate portion sizes in grams for each recognized food based on the image. Call estimate_sizes with the image and list of foods.",
    function_map={"estimate_sizes": estimate_sizes}
)

nutrition_fetcher = AssistantAgent(
    name="NutritionFetcher",
    system_message="Fetch nutritional data from the Nutritionix API using the user's key. Call fetch_nutrition with the foods and sizes dictionary and Nutritionix key.",
    function_map={"fetch_nutrition": fetch_nutrition}
)

orchestrator = AssistantAgent(
    name="Orchestrator",
    system_message="Coordinate the workflow, format the output, and return the final result as text. Start by asking FoodRecognizer to process the image, then SizeEstimator, then NutritionFetcher, and finally format the results.",
    function_map={}
)

# Custom speaker selection function (no LLM needed, updated for AutoGen 0.7.6)
def custom_select_speaker(last_speaker, groupchat):
    """Select the next speaker in a fixed order: FoodRecognizer β†’ SizeEstimator β†’ NutritionFetcher β†’ Orchestrator."""
    if last_speaker is None:
        return food_recognizer  # Return the Agent object, not the name
    order = [food_recognizer, size_estimator, nutrition_fetcher, orchestrator]
    current_index = order.index(last_speaker)
    next_index = (current_index + 1) % len(order)
    return order[next_index]

# Group Chat for Agent Coordination (no LLM, custom speaker selection method)
group_chat = GroupChat(
    agents=[food_recognizer, size_estimator, nutrition_fetcher, orchestrator],
    messages=[],
    max_round=4,  # Limit rounds to match agent order
    speaker_selection_method=custom_select_speaker  # Use correct parameter for AutoGen 0.7.6
)
manager = GroupChatManager(groupchat=group_chat)

# Orchestrator Logic (via AutoGen chat)
def orchestrate_workflow(image, nutritionix_key):
    start = time.time()
    
    # Initiate chat with Orchestrator, passing image and key as message
    message = f"Process this image: {image} with Nutritionix key: {nutritionix_key}"
    response = manager.initiate_chat(
        orchestrator,
        message=message,
        max_turns=10
    )
    
    # Extract and format the final response from the ChatResult
    if hasattr(response, 'chat_history') and response.chat_history:
        # Get the last message from chat history
        last_message = response.chat_history[-1]
        result = last_message.get("content", "No output from agents.")
    else:
        result = "No output from agents."
    
    if isinstance(result, dict):
        result = result.get("text", "No text output from agents.")
    print(f"Total time: {time.time() - start:.2f}s")
    return result

# Gradio Interface
interface = gr.Interface(
    fn=orchestrate_workflow,
    inputs=[
        gr.Image(type="numpy", label="Upload a Food Photo"),
        gr.Textbox(type="password", label="Your Nutritionix API Key (required)")
    ],
    outputs=[
        gr.Textbox(label="Nutrition Breakdown")
    ],
    title="Food Nutrition Analyzer",
    description="Upload a food photo and provide your Nutritionix API key for nutrition data."
)

if __name__ == "__main__":
    interface.launch()