File size: 13,271 Bytes
536cfbe
 
 
 
1a6c907
 
536cfbe
af103fc
1a6c907
2a1c9dd
f27898a
1a6c907
f27898a
536cfbe
f7bbdac
536cfbe
f27898a
f7bbdac
e7b49fa
 
1a6c907
 
e7b49fa
 
 
 
f7bbdac
 
e7b49fa
 
 
 
 
f27898a
 
e7b49fa
f27898a
 
 
f7bbdac
f27898a
 
e7b49fa
f7bbdac
e7b49fa
2a1c9dd
1a6c907
f27898a
536cfbe
 
f27898a
f7bbdac
1a6c907
 
 
e7b49fa
1a6c907
e7b49fa
 
f7bbdac
e7b49fa
 
 
 
f27898a
 
e7b49fa
 
 
 
 
 
 
 
 
 
 
 
f7bbdac
e7b49fa
 
 
 
 
 
 
f7bbdac
e7b49fa
1a6c907
536cfbe
 
 
1a6c907
f7bbdac
536cfbe
1a6c907
536cfbe
1a6c907
 
 
536cfbe
 
 
76aecb4
 
536cfbe
 
 
 
f7bbdac
536cfbe
 
 
76aecb4
 
f7bbdac
76aecb4
536cfbe
76aecb4
 
 
 
 
 
 
 
 
 
1a6c907
76aecb4
 
1a6c907
76aecb4
536cfbe
1a6c907
76aecb4
536cfbe
e7b49fa
 
f7bbdac
e7b49fa
 
 
 
 
 
 
 
 
 
 
 
 
f7bbdac
e7b49fa
 
 
 
 
 
 
 
 
 
 
 
 
 
dbd9b9e
 
 
f7bbdac
dbd9b9e
 
 
 
 
f7bbdac
dbd9b9e
 
 
 
 
f7bbdac
dbd9b9e
 
 
e7b49fa
 
f7bbdac
e7b49fa
 
 
dbd9b9e
 
f7bbdac
dbd9b9e
 
 
1a6c907
bea540e
e7b49fa
077b164
1a6c907
e7b49fa
077b164
 
 
 
e7b49fa
dbd9b9e
e7b49fa
dbd9b9e
e7b49fa
1a6c907
dbd9b9e
 
 
76aecb4
e7b49fa
76aecb4
f7bbdac
 
536cfbe
e7b49fa
76aecb4
e7b49fa
 
f7bbdac
76aecb4
 
 
 
 
536cfbe
1a6c907
9d0a3d7
 
 
 
f7bbdac
9d0a3d7
 
f7bbdac
9d0a3d7
740859e
 
f7bbdac
e7b49fa
 
 
 
 
 
 
 
 
 
f7bbdac
e7b49fa
1a6c907
536cfbe
 
 
 
 
e7b49fa
 
536cfbe
 
e7b49fa
 
536cfbe
 
e7b49fa
536cfbe
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
import gradio as gr
import cv2
import numpy as np
from PIL import Image
from ultralytics import YOLO
import requests
import os
import time
from autogen import AssistantAgent, GroupChat, GroupChatManager
import openai

# Initialize YOLOv8 for multi-label food detection
model = YOLO("yolov8n.pt")  # Nano model for speed, fine-tune on food data later

# Agent Functions (registered with AutoGen, enhanced debugging)
def recognize_foods(image):
    start = time.time()
    print(f"Recognize_foods called with image shape: {image.shape if image is not None else 'None'}")
    # Check if image is valid (not None or empty)
    if image is None or image.size == 0:
        print("Warning: Invalid or empty image detected.")
        return []  # Return empty list for invalid images
    
    # Convert to RGB and resize to 640x640
    try:
        pil_image = Image.fromarray(image).convert('RGB').resize((640, 640))
        img_np = np.array(pil_image)
        print(f"Image converted to RGB, shape: {img_np.shape}, min RGB: {img_np.min()}, max RGB: {img_np.max()}")
    except Exception as e:
        print(f"Error processing image: {str(e)}")
        return []  # Return empty list on preprocessing failure
    
    # Run YOLOv8 detection
    results = model(pil_image)
    foods = []
    detected = False
    for result in results:
        for cls in result.boxes.cls:
            label = model.names[int(cls)]
            if "food" in label.lower() or label in ["waffle fry", "lettuce", "cucumber", "tomato", "broccoli", "carrot", "green bean", "chicken", "turkey", "pasta", "rice", "potato", "bread", "curry"]:  # Expanded list
                conf = result.boxes.conf[result.boxes.cls == cls].item()
                foods.append((label, conf))
                detected = True
                print(f"Detected: {label} with confidence {conf:.2f}, box: {result.boxes.xyxy[result.boxes.cls == cls]}")
    if not detected:
        print("Warning: No food items detected in the image. Check YOLOv8 model or image quality.")
    print(f"Recognition took {time.time() - start:.2f}s: Found foods {foods}")
    return list(set(foods))  # Remove duplicates

def estimate_sizes(image, foods):
    start = time.time()
    print(f"Estimate_sizes called with foods: {foods}")
    if not foods:
        print("Warning: No foods to estimate sizes for.")
        return {}
    
    # Resize to match YOLO output for consistency
    try:
        img_cv = cv2.cvtColor(image, cv2.COLOR_RGB2BGR).resize((640, 640))
        print(f"Image resized to shape: {img_cv.shape}")
    except Exception as e:
        print(f"Error resizing image for size estimation: {str(e)}")
        return {}
    
    sizes = {}
    total_area = img_cv.shape[0] * img_cv.shape[1]
    
    # Use YOLO bounding boxes for more accurate sizing (if available)
    pil_image = Image.fromarray(image).convert('RGB').resize((640, 640))
    results = model(pil_image)
    for result in results:
        for box, cls in zip(result.boxes.xyxy, result.boxes.cls):
            label = model.names[int(cls)]
            if label in [food for food, _ in foods]:
                box_area = (box[2] - box[0]) * (box[3] - box[1])  # Width * Height
                # Simple heuristic: scale box area to grams (tune this based on data)
                grams = min(500, int((box_area / (640 * 640)) * 500))  # Cap at 500g
                sizes[label] = grams
                print(f"Estimated size for {label}: {grams}g (via bounding box)")
    
    # Fallback: even split if no boxes found
    if not sizes:
        for food, _ in foods:
            area = total_area / len(foods)  # Even split for now
            grams = min(500, int(area / (640 * 640) * 100))  # 100g per ~640k pixels, capped at 500g
            sizes[food] = grams
            print(f"Estimated size for {food}: {grams}g (via fallback)")
    
    print(f"Size estimation took {time.time() - start:.2f}s: Estimated sizes {sizes}")
    return sizes

def fetch_nutrition(foods_with_sizes, nutritionix_key):
    start = time.time()
    print(f"Fetch_nutrition called with foods_with_sizes: {foods_with_sizes}, key: {nutritionix_key[:5]}... (partial)")
    if not nutritionix_key:
        print("Warning: No Nutritionix API key provided.")
        return "Please provide a Nutritionix API key for nutrition data."
    if not foods_with_sizes:
        print("Warning: No foods to fetch nutrition for.")
        return {}
    
    url = "https://trackapi.nutritionix.com/v2/natural/nutrients"
    headers = {
        "x-app-id": os.getenv("NUTRITIONIX_APP_ID"),  # From HF Secrets
        "x-app-key": nutritionix_key,                  # User's key
        "Content-Type": "application/json"
    }
    # Build query from foods and sizes
    query = "\n".join([f"{size}g {food}" for food, size in foods_with_sizes.items()])
    print(f"Nutritionix query: {query}")
    body = {"query": query}
    
    try:
        response = requests.post(url, headers=headers, json=body, timeout=10)
        if response.status_code != 200:
            print(f"Nutritionix API error (status {response.status_code}): {response.text}")
            return f"Nutritionix API error: {response.text}"
        
        data = response.json().get("foods", [])
        nutrition_data = {}
        for item in data:
            food_name = item["food_name"]
            nutrition_data[food_name] = {
                "calories": item.get("nf_calories", 0),
                "protein": item.get("nf_protein", 0),
                "fat": item.get("nf_total_fat", 0),
                "carbs": item.get("nf_total_carbohydrate", 0)
            }
        print(f"Nutrition fetch took {time.time() - start:.2f}s: Fetched nutrition {nutrition_data}")
        return nutrition_data
    except requests.Timeout:
        print("Nutritionix API timed out.")
        return "Nutritionix API timed out."
    except Exception as e:
        print(f"Nutritionix error: {str(e)}")
        return f"Nutritionix error: {str(e)}"

def get_nutrition_advice(nutrition_data, openai_key):
    start = time.time()
    print(f"Get_nutrition_advice called with nutrition_data: {nutrition_data}, key: {openai_key[:5]}... (partial)")
    if not openai_key:
        print("Warning: No OpenAI API key provided—skipping advice.")
        return "No OpenAI key provided—skipping advice."
    if not nutrition_data:
        print("Warning: No nutrition data to advise on.")
        return "No nutrition data available for advice."
    
    try:
        openai.api_key = openai_key
        prompt = "Given this nutritional data, suggest a short dietary tip (max 50 words):\n" + "\n".join(
            [f"- {food}: {data['calories']} cal, {data['protein']}g protein, {data['fat']}g fat, {data['carbs']}g carbs" 
             for food, data in nutrition_data.items()]
        )
        print(f"OpenAI prompt: {prompt}")
        response = openai.Completion.create(
            model="text-davinci-003",
            prompt=prompt,
            max_tokens=50,
            temperature=0.7,
            timeout=5
        )
        advice = response.choices[0].text.strip()
        print(f"Advice took {time.time() - start:.2f}s: {advice}")
        return advice
    except Exception as e:
        print(f"LLM error: {str(e)}")
        return f"Error with OpenAI key: {str(e)}"

# AutoGen Agent Definitions
food_recognizer = AssistantAgent(
    name="FoodRecognizer",
    system_message="Identify all food items in the image and return a list of (label, probability) pairs. Parse the message for the image data (e.g., 'Process this image: <numpy_array>') and call recognize_foods with it.",
    function_map={"recognize_foods": recognize_foods}
)

size_estimator = AssistantAgent(
    name="SizeEstimator",
    system_message="Estimate portion sizes in grams for each recognized food based on the image. Parse the previous message for the list of foods (e.g., '[(\"food1\", 0.85), ...]') and call estimate_sizes with the image and foods from the message history.",
    function_map={"estimate_sizes": estimate_sizes}
)

nutrition_fetcher = AssistantAgent(
    name="NutritionFetcher",
    system_message="Fetch nutritional data from the Nutritionix API using the user's key. Parse the previous message for the foods and sizes dictionary (e.g., {'food1': 150, ...}) and the initial message for the Nutritionix key (e.g., 'with Nutritionix key: <key>'), then call fetch_nutrition.",
    function_map={"fetch_nutrition": fetch_nutrition}
)

advice_agent = AssistantAgent(
    name="NutritionAdvisor",
    system_message="Provide basic nutrition advice based on the food data using the user's OpenAI key. Parse the previous message for the nutrition data (e.g., {'food1': {'calories': 200, ...}}) and the initial message for the OpenAI key (e.g., 'with OpenAI key: <key>'), then call get_nutrition_advice.",
    function_map={"get_nutrition_advice": get_nutrition_advice}
)

orchestrator = AssistantAgent(
    name="Orchestrator",
    system_message="Coordinate the workflow, format the output, and return the final result as text. Parse the initial message for the image, Nutritionix key, and OpenAI key. Start by asking FoodRecognizer to process the image, then SizeEstimator, then NutritionFetcher, then NutritionAdvisor (if OpenAI key provided), and finally format the results into 'Food Analysis:\\n- food1 (size1g, prob1% confidence): calories1 cal, protein1g protein, fat1g fat, carbs1g carbs\\n...' for each food, followed by '\\nNutrition Advice:\\n' and the advice if available.",
    function_map={}
)

# Custom speaker selection function (no LLM needed, updated for AutoGen 0.7.6)
def custom_select_speaker(last_speaker, groupchat):
    """Select the next speaker in a fixed order: FoodRecognizer → SizeEstimator → NutritionFetcher → NutritionAdvisor → Orchestrator."""
    if last_speaker is None:
        return food_recognizer  # Return the Agent object, not the name
    order = [food_recognizer, size_estimator, nutrition_fetcher, advice_agent, orchestrator]
    current_index = order.index(last_speaker)
    next_index = (current_index + 1) % len(order)
    return order[next_index]

# Group Chat for Agent Coordination (no LLM for selection, custom speaker selection method)
group_chat = GroupChat(
    agents=[food_recognizer, size_estimator, nutrition_fetcher, advice_agent, orchestrator],
    messages=[],
    max_round=5,  # Increase for advice agent
    speaker_selection_method=custom_select_speaker  # Use correct parameter for AutoGen 0.7.6
)
manager = GroupChatManager(groupchat=group_chat)

# Orchestrator Logic (via AutoGen chat)
def orchestrate_workflow(image, nutritionix_key, openai_key=None):
    start = time.time()
    print(f"Orchestrate_workflow called with image shape: {image.shape if image is not None else 'None'}, "
          f"Nutritionix key: {nutritionix_key[:5]}..., OpenAI key: {openai_key[:5]}... (partial)")
    
    # Initiate chat with Orchestrator, passing image and keys as message
    message = f"Process this image: {image} with Nutritionix key: {nutritionix_key}"
    if openai_key:
        message += f" and OpenAI key: {openai_key}"
    print(f"Starting chat with message: {message[:100]}...")  # Truncate for readability
    response = manager.initiate_chat(
        orchestrator,
        message=message,
        max_turns=10
    )
    
    # Extract and format the final response from the ChatResult
    if hasattr(response, 'chat_history') and response.chat_history:
        # Get the last message from chat history
        last_message = response.chat_history[-1]
        result = last_message.get("content", "No output from agents.")
        print(f"Chat history last message: {result}")
    else:
        result = "No output from agents."
        print("Warning: No chat history in response.")
    
    if isinstance(result, dict):
        result = result.get("text", "No text output from agents.")
        print(f"Result is dict, extracted text: {result}")
    
    # Split result into nutrition and advice if OpenAI key was provided
    if openai_key and isinstance(result, str) and "\nNutrition Advice:\n" in result:
        parts = result.split("\nNutrition Advice:\n", 1)
        nutrition = parts[0] if parts[0] else "No nutrition data."
        advice = parts[1] if len(parts) > 1 else "No advice available."
    else:
        nutrition = result if result != "No output from agents." else "No nutrition data."
        advice = "No advice available (OpenAI key required)."
    
    print(f"Total time: {time.time() - start:.2f}s, Nutrition: {nutrition[:50]}..., Advice: {advice[:50]}...")
    return nutrition, advice

# Gradio Interface
interface = gr.Interface(
    fn=orchestrate_workflow,
    inputs=[
        gr.Image(type="numpy", label="Upload a Food Photo"),
        gr.Textbox(type="password", label="Your Nutritionix API Key (required)"),
        gr.Textbox(type="password", label="Your OpenAI API Key (optional for advice)")
    ],
    outputs=[
        gr.Textbox(label="Nutrition Breakdown"),
        gr.Textbox(label="Nutrition Advice")
    ],
    title="Food Nutrition Analyzer",
    description="Upload a food photo and provide your Nutritionix API key. Add an OpenAI key for nutrition advice."
)

if __name__ == "__main__":
    interface.launch()