File size: 9,053 Bytes
42ecad8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
import os
import uvicorn
from fastapi import FastAPI, File, UploadFile
from fastapi.middleware.cors import CORSMiddleware
import google.generativeai as genai
from ultralytics import YOLO
from PIL import Image
import io
import json
import re
import pickle
import time
from datetime import datetime

# Initialize FastAPI app
app = FastAPI(title="Food Nutrition API", 
              description="API for detecting food items and retrieving nutritional information")

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Allows all origins, modify for production
    allow_credentials=True,
    allow_methods=["*"],  # Allows all methods
    allow_headers=["*"],  # Allows all headers
)

# Load YOLOv8 model
model = YOLO("yolov8n.pt")

# Set up Google Gemini API - Get from environment variable
GOOGLE_API_KEY = "AIzaSyAlvHXHO7xFFjfEyaNOmyZvn9FFPPJdIx4"
genai.configure(api_key=GOOGLE_API_KEY)
gemini_model = genai.GenerativeModel("gemini-pro")

# # Cache configuration
# CACHE_DIR = os.getenv("CACHE_DIR", ".")  # Get cache directory from environment or use current directory
# CACHE_FILE = os.path.join(CACHE_DIR, "food_nutrition_cache.pkl")
# CACHE_EXPIRY_DAYS = int(os.getenv("CACHE_EXPIRY_DAYS", "30"))  # Cache entries expire after 30 days by default

CACHE_DIR = os.getenv("CACHE_DIR", "")  # Use /tmp for temporary storage
CACHE_FILE = os.path.join(CACHE_DIR, "food_nutrition_cache.pkl")
CACHE_EXPIRY_DAYS = int(os.getenv("CACHE_EXPIRY_DAYS", "30"))  # Default to 30 days

def load_cache():
    """Load the nutrition cache from disk."""
    if os.path.exists(CACHE_FILE):
        try:
            with open(CACHE_FILE, 'rb') as f:
                return pickle.load(f)
        except (pickle.PickleError, EOFError):
            print("Cache file corrupted, creating new cache")
    return {}


def save_cache(cache):
    """Save the nutrition cache to disk."""
    try:
        # Ensure cache directory exists
        os.makedirs(os.path.dirname(CACHE_FILE), exist_ok=True)
        with open(CACHE_FILE, 'wb') as f:
            pickle.dump(cache, f)
    except Exception as e:
        print(f"Error saving cache: {e}")


# Initialize the cache
nutrition_cache = load_cache()


def clean_expired_cache_entries():
    """Remove expired entries from the cache."""
    current_time = time.time()
    expiry_seconds = CACHE_EXPIRY_DAYS * 24 * 60 * 60
    expired_keys = []
    
    for food_item, entry in nutrition_cache.items():
        if current_time - entry['timestamp'] > expiry_seconds:
            expired_keys.append(food_item)
    
    for key in expired_keys:
        del nutrition_cache[key]
    
    if expired_keys:
        save_cache(nutrition_cache)
        print(f"Removed {len(expired_keys)} expired cache entries")


def get_food_info(food_item):
    """Fetch nutritional information with caching."""
    # Check if the food item is in cache and not expired
    if food_item in nutrition_cache:
        print(f"Cache hit for {food_item}")
        return nutrition_cache[food_item]['data']
    
    print(f"Cache miss for {food_item}, fetching from API")
    # If not in cache, fetch from Gemini API
    prompt = f"""
    Provide the nutritional information for "{food_item}" in JSON format with the following structure:

    {{
      "food_item": "{food_item}",
      "nutritional_info": {{
        "calories_kcal": value,
        "protein_g": value,
        "fiber_g": value,
        "vitamins": {{
          "Vitamin A_mcg": value,
          "Vitamin C_mg": value,
          "Vitamin D_mcg": value,
          "Vitamin E_mg": value,
          "Vitamin K_mcg": value,
          "Vitamin B1_mg": value,
          "Vitamin B2_mg": value,
          "Vitamin B3_mg": value,
          "Vitamin B6_mg": value,
          "Vitamin B12_mcg": value,
          "Folate_mcg": value
        }}
      }},
      "health_benefits": [
        "Brief description of health benefit 1",
        "Brief description of health benefit 2"
      ]
    }}

    Ensure the response is **valid JSON** inside triple backticks (```json ... ```).
    """

    try:
        response = gemini_model.generate_content(prompt)
        result = response.text if response else "No data found"
        
        # Store in cache with timestamp
        nutrition_cache[food_item] = {
            'data': result,
            'timestamp': time.time()
        }
        
        # Save updated cache
        save_cache(nutrition_cache)
        
        return result
    except Exception as e:
        print(f"Error fetching from Gemini API: {e}")
        return json.dumps({"error": f"Failed to retrieve data: {str(e)}"})


def parse_nutrition_response(response_dict):
    """Parses the raw response containing JSON-like data embedded in a string format."""
    parsed_data = {}

    for food, raw_json in response_dict.items():
        # Extract JSON part from the response using regex
        json_match = re.search(r"```json\n(.*?)\n```", raw_json, re.DOTALL)

        if json_match:
            json_data = json_match.group(1)  # Extract JSON content
            try:
                parsed_data[food] = json.loads(json_data)  # Convert JSON string to dictionary
            except json.JSONDecodeError:
                parsed_data[food] = {"error": "Invalid JSON format"}
        else:
            parsed_data[food] = {"error": "JSON not found in response"}

    return parsed_data


def detect_food(image: Image.Image):
    """Detect food items in an image using YOLOv8."""
    results = model(image)
    detected_foods = {model.names[int(box.cls)] for result in results for box in result.boxes}
    return list(detected_foods)

@app.get("/")
def read_root():
    return {"message": "FastAPI is running on Render!"}

@app.head("/")
async def root_head():
    return {}  # Empty response for HEAD requests

@app.post("/analyze")
async def analyze_image(file: UploadFile = File(...)):
    """API endpoint to analyze food from an image and return nutritional info."""
    # Periodically clean expired cache entries
    clean_expired_cache_entries()
    
    try:
        # Process the image
        contents = await file.read()
        image = Image.open(io.BytesIO(contents))
        detected_foods = detect_food(image)

        if not detected_foods:
            return {"message": "No food detected"}

        # Get nutrition info (from cache if available)
        raw_nutrition_info = {food: get_food_info(food) for food in detected_foods}
        parsed_nutrition_info = parse_nutrition_response(raw_nutrition_info)

        # Add cache statistics to the response
        cache_stats = {
            "cache_size": len(nutrition_cache),
            "cache_hits": sum(1 for food in detected_foods if food in nutrition_cache),
            "cache_misses": sum(1 for food in detected_foods if food not in nutrition_cache),
            "last_cache_update": datetime.fromtimestamp(
                max([entry['timestamp'] for entry in nutrition_cache.values()]) 
                if nutrition_cache else time.time()
            ).isoformat()
        }

        return {
            "detected_foods": detected_foods,
            "nutrition_info": parsed_nutrition_info,
            "cache_stats": cache_stats
        }
    except Exception as e:
        return {"error": f"An error occurred: {str(e)}"}


# Add endpoints to manage the cache
@app.get("/cache/stats")
async def get_cache_stats():
    """Get statistics about the nutrition cache."""
    if not nutrition_cache:
        return {"message": "Cache is empty"}
    
    return {
        "cache_size": len(nutrition_cache),
        "foods_cached": list(nutrition_cache.keys()),
        "oldest_entry": datetime.fromtimestamp(
            min([entry['timestamp'] for entry in nutrition_cache.values()])
        ).isoformat(),
        "newest_entry": datetime.fromtimestamp(
            max([entry['timestamp'] for entry in nutrition_cache.values()])
        ).isoformat()
    }


@app.delete("/cache/clear")
async def clear_cache():
    """Clear the nutrition cache."""
    global nutrition_cache
    nutrition_cache = {}
    save_cache(nutrition_cache)
    return {"message": "Cache cleared successfully"}


@app.delete("/cache/item/{food_item}")
async def delete_cache_item(food_item: str):
    """Delete a specific food item from the cache."""
    if food_item in nutrition_cache:
        del nutrition_cache[food_item]
        save_cache(nutrition_cache)
        return {"message": f"Removed {food_item} from cache"}
    return {"message": f"{food_item} not found in cache"}


# Add health check endpoint for production deployments
@app.get("/health")
async def health_check():
    """Health check endpoint for monitoring."""
    return {
        "status": "healthy",
        "cache_size": len(nutrition_cache),
        "yolo_model_loaded": model is not None,
        "gemini_api_configured": GOOGLE_API_KEY != ""
    }

import os

if __name__ == "__main__":
    port = int(os.getenv("PORT", 8000))  # Render assigns PORT dynamically
    uvicorn.run(app, host="0.0.0.0", port=port)