File size: 6,236 Bytes
536cfbe
 
 
 
 
 
 
af103fc
a19902d
f27898a
 
 
536cfbe
 
 
f27898a
 
 
 
 
 
 
 
 
 
 
 
 
536cfbe
 
f27898a
 
 
 
 
 
 
 
 
 
536cfbe
 
 
 
 
 
76aecb4
536cfbe
 
76aecb4
 
536cfbe
 
 
 
 
 
 
76aecb4
 
 
536cfbe
76aecb4
 
 
 
 
 
 
 
 
 
 
 
 
 
536cfbe
76aecb4
536cfbe
76aecb4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dbd9b9e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
734adb5
dbd9b9e
 
 
 
 
 
76aecb4
 
 
536cfbe
76aecb4
 
 
 
 
 
 
536cfbe
76aecb4
 
 
 
536cfbe
76aecb4
536cfbe
 
 
 
 
 
76aecb4
536cfbe
 
 
76aecb4
536cfbe
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import gradio as gr
import cv2
import numpy as np
import requests
from PIL import Image
from autogen import AssistantAgent, GroupChat, GroupChatManager
import os
import time
#import openai
from ultralytics import YOLO

model = YOLO("yolov8n.pt")  # Nano model for speed, fine-tune on food data later

# Agent Functions
def recognize_foods(image):
    start = time.time()
    # Resize to 640x640 (YOLO default)
    pil_image = Image.fromarray(image).resize((640, 640))
    results = model(pil_image)
    foods = []
    for result in results:
        for cls in result.boxes.cls:
            label = model.names[int(cls)]
            if "food" in label.lower() or label in ["pasta", "rice", "tomato", "potato", "bread"]:  # Expand this list
                conf = result.boxes.conf[result.boxes.cls == cls].item()
                foods.append((label, conf))
    print(f"Recognition took {time.time() - start:.2f}s")
    return list(set(foods))  # Remove duplicates

def estimate_sizes(image, foods):
    start = time.time()
    img_cv = cv2.cvtColor(image, cv2.COLOR_RGB2BGR).resize((640, 640))  # Match YOLO size
    sizes = {}
    total_area = img_cv.shape[0] * img_cv.shape[1]
    for food, _ in foods:
        # Dummy: assume area proportion (refine with food-specific weights later)
        area = total_area / len(foods)  # Even split for now
        grams = min(500, int(area / (640 * 640) * 100))  # 100g per ~640k pixels
        sizes[food] = grams
    print(f"Size estimation took {time.time() - start:.2f}s")
    return sizes

def fetch_nutrition(foods_with_sizes, nutritionix_key):
    if not nutritionix_key:
        return "Please provide a Nutritionix API key for nutrition data."
    
    start = time.time()
    url = "https://trackapi.nutritionix.com/v2/natural/nutrients"
    headers = {
        "x-app-id": os.getenv("NUTRITIONIX_APP_ID"),  # From HF Secrets
        "x-app-key": nutritionix_key,                  # User's key
        "Content-Type": "application/json"
    }
    # Build query from foods and sizes
    query = "\n".join([f"{size}g {food}" for food, size in foods_with_sizes.items()])
    body = {"query": query}
    
    try:
        response = requests.post(url, headers=headers, json=body, timeout=10)
        if response.status_code != 200:
            return f"Nutritionix API error: {response.text}"
        
        data = response.json().get("foods", [])
        nutrition_data = {}
        for item in data:
            food_name = item["food_name"]
            nutrition_data[food_name] = {
                "calories": item.get("nf_calories", 0),
                "protein": item.get("nf_protein", 0),
                "fat": item.get("nf_total_fat", 0),
                "carbs": item.get("nf_total_carbohydrate", 0)
            }
        print(f"Nutrition fetch took {time.time() - start:.2f}s")
        return nutrition_data
    except requests.Timeout:
        return "Nutritionix API timed out."
    except Exception as e:
        return f"Nutritionix error: {str(e)}"

#def get_nutrition_advice(nutrition_data, llm_key):
#    if not llm_key:
#        return "No OpenAI/Grok key provided—skipping advice."
#    try:
#        openai.api_key = llm_key
#        prompt = "Given this nutritional data, suggest a dietary tip:\n"
#        for food, data in nutrition_data.items():
#            prompt += f"- {food}: {data['calories']} cal, {data['protein']}g protein, {data['fat']}g fat, {data['carbs']}g carbs\n"
#        
#        response = openai.Completion.create(
#            model="text-davinci-003",  # Swap for Grok if xAI API is available
#            prompt=prompt,
#            max_tokens=50
#        )
#        return response.choices[0].text.strip()
#    except Exception as e:
#        return f"Error with LLM key: {str(e)}"


# AutoGen Agent Definitions
food_recognizer = AssistantAgent(
    name="FoodRecognizer",
    system_message="Identify all food items in the image and return a list of (label, probability) pairs.",
    function_map={"recognize_foods": recognize_foods}
)

size_estimator = AssistantAgent(
    name="SizeEstimator",
    system_message="Estimate portion sizes in grams for each recognized food based on the image.",
    function_map={"estimate_sizes": estimate_sizes}
)

nutrition_fetcher = AssistantAgent(
    name="NutritionFetcher",
    system_message="Fetch nutritional data from the Nutritionix API using the user's key.",
    function_map={"fetch_nutrition": fetch_nutrition}
)

##advice_agent = AssistantAgent(
##    name="NutritionAdvisor",
##    system_message="Provide basic nutrition advice using the user's OpenAI/Grok key."
##)

orchestrator = AssistantAgent(
    name="Orchestrator",
    system_message="Coordinate the workflow, format the output, and return the final result as text.",
    function_map={}
)

group_chat = GroupChat(
    agents=[food_recognizer, size_estimator, nutrition_fetcher, orchestrator],
    messages=[],
    max_round=10
)
manager = GroupChatManager(groupchat=group_chat)


# Orchestrator Logic (via AutoGen chat)
def orchestrate_workflow(image, nutritionix_key):
    start = time.time()
    
    # Initiate chat with Orchestrator, passing image and key as message
    message = f"Process this image: {image} with Nutritionix key: {nutritionix_key}"
    response = manager.initiate_chat(
        orchestrator,
        message=message,
        max_turns=10
    )
    
    # Extract and format the final response from the chat
    result = response[-1].get("content", "No output from agents.")
    print(f"Total time: {time.time() - start:.2f}s")
    return result

    
# Gradio Interface
interface = gr.Interface(
    fn=orchestrate_workflow,
    inputs=[
        gr.Image(type="numpy", label="Upload a Food Photo"),
        gr.Textbox(type="password", label="Your Nutritionix API Key (required)"),
        #gr.Textbox(type="password", label="Your OpenAI/Grok API Key (optional for advice)")
    ],
    outputs=[
        gr.Textbox(label="Nutrition Breakdown"),
        #gr.Textbox(label="Nutrition Advice")
    ],
    title="Food Nutrition Analyzer",
    description="Upload a food photo and provide your Nutritionix API key. Add an OpenAI/Grok key for advice."
)

if __name__ == "__main__":
    interface.launch()