File size: 9,053 Bytes
42ecad8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
import os
import uvicorn
from fastapi import FastAPI, File, UploadFile
from fastapi.middleware.cors import CORSMiddleware
import google.generativeai as genai
from ultralytics import YOLO
from PIL import Image
import io
import json
import re
import pickle
import time
from datetime import datetime
# Initialize FastAPI app
app = FastAPI(title="Food Nutrition API",
description="API for detecting food items and retrieving nutritional information")
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins, modify for production
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
# Load YOLOv8 model
model = YOLO("yolov8n.pt")
# Set up Google Gemini API - Get from environment variable
GOOGLE_API_KEY = "AIzaSyAlvHXHO7xFFjfEyaNOmyZvn9FFPPJdIx4"
genai.configure(api_key=GOOGLE_API_KEY)
gemini_model = genai.GenerativeModel("gemini-pro")
# # Cache configuration
# CACHE_DIR = os.getenv("CACHE_DIR", ".") # Get cache directory from environment or use current directory
# CACHE_FILE = os.path.join(CACHE_DIR, "food_nutrition_cache.pkl")
# CACHE_EXPIRY_DAYS = int(os.getenv("CACHE_EXPIRY_DAYS", "30")) # Cache entries expire after 30 days by default
CACHE_DIR = os.getenv("CACHE_DIR", "") # Use /tmp for temporary storage
CACHE_FILE = os.path.join(CACHE_DIR, "food_nutrition_cache.pkl")
CACHE_EXPIRY_DAYS = int(os.getenv("CACHE_EXPIRY_DAYS", "30")) # Default to 30 days
def load_cache():
"""Load the nutrition cache from disk."""
if os.path.exists(CACHE_FILE):
try:
with open(CACHE_FILE, 'rb') as f:
return pickle.load(f)
except (pickle.PickleError, EOFError):
print("Cache file corrupted, creating new cache")
return {}
def save_cache(cache):
"""Save the nutrition cache to disk."""
try:
# Ensure cache directory exists
os.makedirs(os.path.dirname(CACHE_FILE), exist_ok=True)
with open(CACHE_FILE, 'wb') as f:
pickle.dump(cache, f)
except Exception as e:
print(f"Error saving cache: {e}")
# Initialize the cache
nutrition_cache = load_cache()
def clean_expired_cache_entries():
"""Remove expired entries from the cache."""
current_time = time.time()
expiry_seconds = CACHE_EXPIRY_DAYS * 24 * 60 * 60
expired_keys = []
for food_item, entry in nutrition_cache.items():
if current_time - entry['timestamp'] > expiry_seconds:
expired_keys.append(food_item)
for key in expired_keys:
del nutrition_cache[key]
if expired_keys:
save_cache(nutrition_cache)
print(f"Removed {len(expired_keys)} expired cache entries")
def get_food_info(food_item):
"""Fetch nutritional information with caching."""
# Check if the food item is in cache and not expired
if food_item in nutrition_cache:
print(f"Cache hit for {food_item}")
return nutrition_cache[food_item]['data']
print(f"Cache miss for {food_item}, fetching from API")
# If not in cache, fetch from Gemini API
prompt = f"""
Provide the nutritional information for "{food_item}" in JSON format with the following structure:
{{
"food_item": "{food_item}",
"nutritional_info": {{
"calories_kcal": value,
"protein_g": value,
"fiber_g": value,
"vitamins": {{
"Vitamin A_mcg": value,
"Vitamin C_mg": value,
"Vitamin D_mcg": value,
"Vitamin E_mg": value,
"Vitamin K_mcg": value,
"Vitamin B1_mg": value,
"Vitamin B2_mg": value,
"Vitamin B3_mg": value,
"Vitamin B6_mg": value,
"Vitamin B12_mcg": value,
"Folate_mcg": value
}}
}},
"health_benefits": [
"Brief description of health benefit 1",
"Brief description of health benefit 2"
]
}}
Ensure the response is **valid JSON** inside triple backticks (```json ... ```).
"""
try:
response = gemini_model.generate_content(prompt)
result = response.text if response else "No data found"
# Store in cache with timestamp
nutrition_cache[food_item] = {
'data': result,
'timestamp': time.time()
}
# Save updated cache
save_cache(nutrition_cache)
return result
except Exception as e:
print(f"Error fetching from Gemini API: {e}")
return json.dumps({"error": f"Failed to retrieve data: {str(e)}"})
def parse_nutrition_response(response_dict):
"""Parses the raw response containing JSON-like data embedded in a string format."""
parsed_data = {}
for food, raw_json in response_dict.items():
# Extract JSON part from the response using regex
json_match = re.search(r"```json\n(.*?)\n```", raw_json, re.DOTALL)
if json_match:
json_data = json_match.group(1) # Extract JSON content
try:
parsed_data[food] = json.loads(json_data) # Convert JSON string to dictionary
except json.JSONDecodeError:
parsed_data[food] = {"error": "Invalid JSON format"}
else:
parsed_data[food] = {"error": "JSON not found in response"}
return parsed_data
def detect_food(image: Image.Image):
"""Detect food items in an image using YOLOv8."""
results = model(image)
detected_foods = {model.names[int(box.cls)] for result in results for box in result.boxes}
return list(detected_foods)
@app.get("/")
def read_root():
return {"message": "FastAPI is running on Render!"}
@app.head("/")
async def root_head():
return {} # Empty response for HEAD requests
@app.post("/analyze")
async def analyze_image(file: UploadFile = File(...)):
"""API endpoint to analyze food from an image and return nutritional info."""
# Periodically clean expired cache entries
clean_expired_cache_entries()
try:
# Process the image
contents = await file.read()
image = Image.open(io.BytesIO(contents))
detected_foods = detect_food(image)
if not detected_foods:
return {"message": "No food detected"}
# Get nutrition info (from cache if available)
raw_nutrition_info = {food: get_food_info(food) for food in detected_foods}
parsed_nutrition_info = parse_nutrition_response(raw_nutrition_info)
# Add cache statistics to the response
cache_stats = {
"cache_size": len(nutrition_cache),
"cache_hits": sum(1 for food in detected_foods if food in nutrition_cache),
"cache_misses": sum(1 for food in detected_foods if food not in nutrition_cache),
"last_cache_update": datetime.fromtimestamp(
max([entry['timestamp'] for entry in nutrition_cache.values()])
if nutrition_cache else time.time()
).isoformat()
}
return {
"detected_foods": detected_foods,
"nutrition_info": parsed_nutrition_info,
"cache_stats": cache_stats
}
except Exception as e:
return {"error": f"An error occurred: {str(e)}"}
# Add endpoints to manage the cache
@app.get("/cache/stats")
async def get_cache_stats():
"""Get statistics about the nutrition cache."""
if not nutrition_cache:
return {"message": "Cache is empty"}
return {
"cache_size": len(nutrition_cache),
"foods_cached": list(nutrition_cache.keys()),
"oldest_entry": datetime.fromtimestamp(
min([entry['timestamp'] for entry in nutrition_cache.values()])
).isoformat(),
"newest_entry": datetime.fromtimestamp(
max([entry['timestamp'] for entry in nutrition_cache.values()])
).isoformat()
}
@app.delete("/cache/clear")
async def clear_cache():
"""Clear the nutrition cache."""
global nutrition_cache
nutrition_cache = {}
save_cache(nutrition_cache)
return {"message": "Cache cleared successfully"}
@app.delete("/cache/item/{food_item}")
async def delete_cache_item(food_item: str):
"""Delete a specific food item from the cache."""
if food_item in nutrition_cache:
del nutrition_cache[food_item]
save_cache(nutrition_cache)
return {"message": f"Removed {food_item} from cache"}
return {"message": f"{food_item} not found in cache"}
# Add health check endpoint for production deployments
@app.get("/health")
async def health_check():
"""Health check endpoint for monitoring."""
return {
"status": "healthy",
"cache_size": len(nutrition_cache),
"yolo_model_loaded": model is not None,
"gemini_api_configured": GOOGLE_API_KEY != ""
}
import os
if __name__ == "__main__":
port = int(os.getenv("PORT", 8000)) # Render assigns PORT dynamically
uvicorn.run(app, host="0.0.0.0", port=port) |