|
import os |
|
import uvicorn |
|
from fastapi import FastAPI, File, UploadFile |
|
from fastapi.middleware.cors import CORSMiddleware |
|
import google.generativeai as genai |
|
from ultralytics import YOLO |
|
from PIL import Image |
|
import io |
|
import json |
|
import re |
|
import pickle |
|
import time |
|
from datetime import datetime |
|
|
|
|
|
app = FastAPI(title="Food Nutrition API", |
|
description="API for detecting food items and retrieving nutritional information") |
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
model = YOLO("yolov8n.pt") |
|
|
|
|
|
GOOGLE_API_KEY = "AIzaSyAlvHXHO7xFFjfEyaNOmyZvn9FFPPJdIx4" |
|
genai.configure(api_key=GOOGLE_API_KEY) |
|
gemini_model = genai.GenerativeModel("gemini-pro") |
|
|
|
|
|
|
|
|
|
|
|
|
|
CACHE_DIR = os.getenv("CACHE_DIR", "") |
|
CACHE_FILE = os.path.join(CACHE_DIR, "food_nutrition_cache.pkl") |
|
CACHE_EXPIRY_DAYS = int(os.getenv("CACHE_EXPIRY_DAYS", "30")) |
|
|
|
def load_cache(): |
|
"""Load the nutrition cache from disk.""" |
|
if os.path.exists(CACHE_FILE): |
|
try: |
|
with open(CACHE_FILE, 'rb') as f: |
|
return pickle.load(f) |
|
except (pickle.PickleError, EOFError): |
|
print("Cache file corrupted, creating new cache") |
|
return {} |
|
|
|
|
|
def save_cache(cache): |
|
"""Save the nutrition cache to disk.""" |
|
try: |
|
|
|
os.makedirs(os.path.dirname(CACHE_FILE), exist_ok=True) |
|
with open(CACHE_FILE, 'wb') as f: |
|
pickle.dump(cache, f) |
|
except Exception as e: |
|
print(f"Error saving cache: {e}") |
|
|
|
|
|
|
|
nutrition_cache = load_cache() |
|
|
|
|
|
def clean_expired_cache_entries(): |
|
"""Remove expired entries from the cache.""" |
|
current_time = time.time() |
|
expiry_seconds = CACHE_EXPIRY_DAYS * 24 * 60 * 60 |
|
expired_keys = [] |
|
|
|
for food_item, entry in nutrition_cache.items(): |
|
if current_time - entry['timestamp'] > expiry_seconds: |
|
expired_keys.append(food_item) |
|
|
|
for key in expired_keys: |
|
del nutrition_cache[key] |
|
|
|
if expired_keys: |
|
save_cache(nutrition_cache) |
|
print(f"Removed {len(expired_keys)} expired cache entries") |
|
|
|
|
|
def get_food_info(food_item): |
|
"""Fetch nutritional information with caching.""" |
|
|
|
if food_item in nutrition_cache: |
|
print(f"Cache hit for {food_item}") |
|
return nutrition_cache[food_item]['data'] |
|
|
|
print(f"Cache miss for {food_item}, fetching from API") |
|
|
|
prompt = f""" |
|
Provide the nutritional information for "{food_item}" in JSON format with the following structure: |
|
|
|
{{ |
|
"food_item": "{food_item}", |
|
"nutritional_info": {{ |
|
"calories_kcal": value, |
|
"protein_g": value, |
|
"fiber_g": value, |
|
"vitamins": {{ |
|
"Vitamin A_mcg": value, |
|
"Vitamin C_mg": value, |
|
"Vitamin D_mcg": value, |
|
"Vitamin E_mg": value, |
|
"Vitamin K_mcg": value, |
|
"Vitamin B1_mg": value, |
|
"Vitamin B2_mg": value, |
|
"Vitamin B3_mg": value, |
|
"Vitamin B6_mg": value, |
|
"Vitamin B12_mcg": value, |
|
"Folate_mcg": value |
|
}} |
|
}}, |
|
"health_benefits": [ |
|
"Brief description of health benefit 1", |
|
"Brief description of health benefit 2" |
|
] |
|
}} |
|
|
|
Ensure the response is **valid JSON** inside triple backticks (```json ... ```). |
|
""" |
|
|
|
try: |
|
response = gemini_model.generate_content(prompt) |
|
result = response.text if response else "No data found" |
|
|
|
|
|
nutrition_cache[food_item] = { |
|
'data': result, |
|
'timestamp': time.time() |
|
} |
|
|
|
|
|
save_cache(nutrition_cache) |
|
|
|
return result |
|
except Exception as e: |
|
print(f"Error fetching from Gemini API: {e}") |
|
return json.dumps({"error": f"Failed to retrieve data: {str(e)}"}) |
|
|
|
|
|
def parse_nutrition_response(response_dict): |
|
"""Parses the raw response containing JSON-like data embedded in a string format.""" |
|
parsed_data = {} |
|
|
|
for food, raw_json in response_dict.items(): |
|
|
|
json_match = re.search(r"```json\n(.*?)\n```", raw_json, re.DOTALL) |
|
|
|
if json_match: |
|
json_data = json_match.group(1) |
|
try: |
|
parsed_data[food] = json.loads(json_data) |
|
except json.JSONDecodeError: |
|
parsed_data[food] = {"error": "Invalid JSON format"} |
|
else: |
|
parsed_data[food] = {"error": "JSON not found in response"} |
|
|
|
return parsed_data |
|
|
|
|
|
def detect_food(image: Image.Image): |
|
"""Detect food items in an image using YOLOv8.""" |
|
results = model(image) |
|
detected_foods = {model.names[int(box.cls)] for result in results for box in result.boxes} |
|
return list(detected_foods) |
|
|
|
@app.get("/") |
|
def read_root(): |
|
return {"message": "FastAPI is running on Render!"} |
|
|
|
@app.head("/") |
|
async def root_head(): |
|
return {} |
|
|
|
@app.post("/analyze") |
|
async def analyze_image(file: UploadFile = File(...)): |
|
"""API endpoint to analyze food from an image and return nutritional info.""" |
|
|
|
clean_expired_cache_entries() |
|
|
|
try: |
|
|
|
contents = await file.read() |
|
image = Image.open(io.BytesIO(contents)) |
|
detected_foods = detect_food(image) |
|
|
|
if not detected_foods: |
|
return {"message": "No food detected"} |
|
|
|
|
|
raw_nutrition_info = {food: get_food_info(food) for food in detected_foods} |
|
parsed_nutrition_info = parse_nutrition_response(raw_nutrition_info) |
|
|
|
|
|
cache_stats = { |
|
"cache_size": len(nutrition_cache), |
|
"cache_hits": sum(1 for food in detected_foods if food in nutrition_cache), |
|
"cache_misses": sum(1 for food in detected_foods if food not in nutrition_cache), |
|
"last_cache_update": datetime.fromtimestamp( |
|
max([entry['timestamp'] for entry in nutrition_cache.values()]) |
|
if nutrition_cache else time.time() |
|
).isoformat() |
|
} |
|
|
|
return { |
|
"detected_foods": detected_foods, |
|
"nutrition_info": parsed_nutrition_info, |
|
"cache_stats": cache_stats |
|
} |
|
except Exception as e: |
|
return {"error": f"An error occurred: {str(e)}"} |
|
|
|
|
|
|
|
@app.get("/cache/stats") |
|
async def get_cache_stats(): |
|
"""Get statistics about the nutrition cache.""" |
|
if not nutrition_cache: |
|
return {"message": "Cache is empty"} |
|
|
|
return { |
|
"cache_size": len(nutrition_cache), |
|
"foods_cached": list(nutrition_cache.keys()), |
|
"oldest_entry": datetime.fromtimestamp( |
|
min([entry['timestamp'] for entry in nutrition_cache.values()]) |
|
).isoformat(), |
|
"newest_entry": datetime.fromtimestamp( |
|
max([entry['timestamp'] for entry in nutrition_cache.values()]) |
|
).isoformat() |
|
} |
|
|
|
|
|
@app.delete("/cache/clear") |
|
async def clear_cache(): |
|
"""Clear the nutrition cache.""" |
|
global nutrition_cache |
|
nutrition_cache = {} |
|
save_cache(nutrition_cache) |
|
return {"message": "Cache cleared successfully"} |
|
|
|
|
|
@app.delete("/cache/item/{food_item}") |
|
async def delete_cache_item(food_item: str): |
|
"""Delete a specific food item from the cache.""" |
|
if food_item in nutrition_cache: |
|
del nutrition_cache[food_item] |
|
save_cache(nutrition_cache) |
|
return {"message": f"Removed {food_item} from cache"} |
|
return {"message": f"{food_item} not found in cache"} |
|
|
|
|
|
|
|
@app.get("/health") |
|
async def health_check(): |
|
"""Health check endpoint for monitoring.""" |
|
return { |
|
"status": "healthy", |
|
"cache_size": len(nutrition_cache), |
|
"yolo_model_loaded": model is not None, |
|
"gemini_api_configured": GOOGLE_API_KEY != "" |
|
} |
|
|
|
import os |
|
|
|
if __name__ == "__main__": |
|
port = int(os.getenv("PORT", 8000)) |
|
uvicorn.run(app, host="0.0.0.0", port=port) |