import os import uvicorn from fastapi import FastAPI, File, UploadFile from fastapi.middleware.cors import CORSMiddleware import google.generativeai as genai from ultralytics import YOLO from PIL import Image import io import json import re import pickle import time from datetime import datetime # Initialize FastAPI app app = FastAPI(title="Food Nutrition API", description="API for detecting food items and retrieving nutritional information") # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allows all origins, modify for production allow_credentials=True, allow_methods=["*"], # Allows all methods allow_headers=["*"], # Allows all headers ) # Load YOLOv8 model model = YOLO("yolov8n.pt") # Set up Google Gemini API - Get from environment variable GOOGLE_API_KEY = "AIzaSyAlvHXHO7xFFjfEyaNOmyZvn9FFPPJdIx4" genai.configure(api_key=GOOGLE_API_KEY) gemini_model = genai.GenerativeModel("gemini-pro") # # Cache configuration # CACHE_DIR = os.getenv("CACHE_DIR", ".") # Get cache directory from environment or use current directory # CACHE_FILE = os.path.join(CACHE_DIR, "food_nutrition_cache.pkl") # CACHE_EXPIRY_DAYS = int(os.getenv("CACHE_EXPIRY_DAYS", "30")) # Cache entries expire after 30 days by default CACHE_DIR = os.getenv("CACHE_DIR", "") # Use /tmp for temporary storage CACHE_FILE = os.path.join(CACHE_DIR, "food_nutrition_cache.pkl") CACHE_EXPIRY_DAYS = int(os.getenv("CACHE_EXPIRY_DAYS", "30")) # Default to 30 days def load_cache(): """Load the nutrition cache from disk.""" if os.path.exists(CACHE_FILE): try: with open(CACHE_FILE, 'rb') as f: return pickle.load(f) except (pickle.PickleError, EOFError): print("Cache file corrupted, creating new cache") return {} def save_cache(cache): """Save the nutrition cache to disk.""" try: # Ensure cache directory exists os.makedirs(os.path.dirname(CACHE_FILE), exist_ok=True) with open(CACHE_FILE, 'wb') as f: pickle.dump(cache, f) except Exception as e: print(f"Error saving cache: {e}") # Initialize the cache nutrition_cache = load_cache() def clean_expired_cache_entries(): """Remove expired entries from the cache.""" current_time = time.time() expiry_seconds = CACHE_EXPIRY_DAYS * 24 * 60 * 60 expired_keys = [] for food_item, entry in nutrition_cache.items(): if current_time - entry['timestamp'] > expiry_seconds: expired_keys.append(food_item) for key in expired_keys: del nutrition_cache[key] if expired_keys: save_cache(nutrition_cache) print(f"Removed {len(expired_keys)} expired cache entries") def get_food_info(food_item): """Fetch nutritional information with caching.""" # Check if the food item is in cache and not expired if food_item in nutrition_cache: print(f"Cache hit for {food_item}") return nutrition_cache[food_item]['data'] print(f"Cache miss for {food_item}, fetching from API") # If not in cache, fetch from Gemini API prompt = f""" Provide the nutritional information for "{food_item}" in JSON format with the following structure: {{ "food_item": "{food_item}", "nutritional_info": {{ "calories_kcal": value, "protein_g": value, "fiber_g": value, "vitamins": {{ "Vitamin A_mcg": value, "Vitamin C_mg": value, "Vitamin D_mcg": value, "Vitamin E_mg": value, "Vitamin K_mcg": value, "Vitamin B1_mg": value, "Vitamin B2_mg": value, "Vitamin B3_mg": value, "Vitamin B6_mg": value, "Vitamin B12_mcg": value, "Folate_mcg": value }} }}, "health_benefits": [ "Brief description of health benefit 1", "Brief description of health benefit 2" ] }} Ensure the response is **valid JSON** inside triple backticks (```json ... ```). """ try: response = gemini_model.generate_content(prompt) result = response.text if response else "No data found" # Store in cache with timestamp nutrition_cache[food_item] = { 'data': result, 'timestamp': time.time() } # Save updated cache save_cache(nutrition_cache) return result except Exception as e: print(f"Error fetching from Gemini API: {e}") return json.dumps({"error": f"Failed to retrieve data: {str(e)}"}) def parse_nutrition_response(response_dict): """Parses the raw response containing JSON-like data embedded in a string format.""" parsed_data = {} for food, raw_json in response_dict.items(): # Extract JSON part from the response using regex json_match = re.search(r"```json\n(.*?)\n```", raw_json, re.DOTALL) if json_match: json_data = json_match.group(1) # Extract JSON content try: parsed_data[food] = json.loads(json_data) # Convert JSON string to dictionary except json.JSONDecodeError: parsed_data[food] = {"error": "Invalid JSON format"} else: parsed_data[food] = {"error": "JSON not found in response"} return parsed_data def detect_food(image: Image.Image): """Detect food items in an image using YOLOv8.""" results = model(image) detected_foods = {model.names[int(box.cls)] for result in results for box in result.boxes} return list(detected_foods) @app.get("/") def read_root(): return {"message": "FastAPI is running on Render!"} @app.head("/") async def root_head(): return {} # Empty response for HEAD requests @app.post("/analyze") async def analyze_image(file: UploadFile = File(...)): """API endpoint to analyze food from an image and return nutritional info.""" # Periodically clean expired cache entries clean_expired_cache_entries() try: # Process the image contents = await file.read() image = Image.open(io.BytesIO(contents)) detected_foods = detect_food(image) if not detected_foods: return {"message": "No food detected"} # Get nutrition info (from cache if available) raw_nutrition_info = {food: get_food_info(food) for food in detected_foods} parsed_nutrition_info = parse_nutrition_response(raw_nutrition_info) # Add cache statistics to the response cache_stats = { "cache_size": len(nutrition_cache), "cache_hits": sum(1 for food in detected_foods if food in nutrition_cache), "cache_misses": sum(1 for food in detected_foods if food not in nutrition_cache), "last_cache_update": datetime.fromtimestamp( max([entry['timestamp'] for entry in nutrition_cache.values()]) if nutrition_cache else time.time() ).isoformat() } return { "detected_foods": detected_foods, "nutrition_info": parsed_nutrition_info, "cache_stats": cache_stats } except Exception as e: return {"error": f"An error occurred: {str(e)}"} # Add endpoints to manage the cache @app.get("/cache/stats") async def get_cache_stats(): """Get statistics about the nutrition cache.""" if not nutrition_cache: return {"message": "Cache is empty"} return { "cache_size": len(nutrition_cache), "foods_cached": list(nutrition_cache.keys()), "oldest_entry": datetime.fromtimestamp( min([entry['timestamp'] for entry in nutrition_cache.values()]) ).isoformat(), "newest_entry": datetime.fromtimestamp( max([entry['timestamp'] for entry in nutrition_cache.values()]) ).isoformat() } @app.delete("/cache/clear") async def clear_cache(): """Clear the nutrition cache.""" global nutrition_cache nutrition_cache = {} save_cache(nutrition_cache) return {"message": "Cache cleared successfully"} @app.delete("/cache/item/{food_item}") async def delete_cache_item(food_item: str): """Delete a specific food item from the cache.""" if food_item in nutrition_cache: del nutrition_cache[food_item] save_cache(nutrition_cache) return {"message": f"Removed {food_item} from cache"} return {"message": f"{food_item} not found in cache"} # Add health check endpoint for production deployments @app.get("/health") async def health_check(): """Health check endpoint for monitoring.""" return { "status": "healthy", "cache_size": len(nutrition_cache), "yolo_model_loaded": model is not None, "gemini_api_configured": GOOGLE_API_KEY != "" } import os if __name__ == "__main__": port = int(os.getenv("PORT", 8000)) # Render assigns PORT dynamically uvicorn.run(app, host="0.0.0.0", port=port)