# Final, Complete, and Working app.py for Hugging Face Space import os import cv2 import tempfile import numpy as np import uvicorn import requests import io import base64 from PIL import Image from inference_sdk import InferenceHTTPClient from fastapi import FastAPI, File, UploadFile from fastapi.responses import JSONResponse import tensorflow as tf from huggingface_hub import hf_hub_download import gradio as gr # --- 1. Configuration and Model Loading --- ROBOFLOW_API_KEY = os.environ.get("ROBOFLOW_API_KEY") CLIENT_FACE = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY) CLIENT_EYES = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY) CLIENT_IRIS = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY) leuko_model = None try: model_path = hf_hub_download("skibi11/leukolook-eye-detector", "MobileNetV1_best.keras") leuko_model = tf.keras.models.load_model(model_path) print("--- LEUKOCORIA MODEL LOADED SUCCESSFULLY! ---") except Exception as e: print(f"--- FATAL ERROR: COULD NOT LOAD LEUKOCORIA MODEL: {e} ---") raise RuntimeError(f"Could not load leukocoria model: {e}") # --- 2. All Helper Functions --- def enhance_image_unsharp_mask(image, strength=0.5, radius=5): blur = cv2.GaussianBlur(image, (radius, radius), 0) return cv2.addWeighted(image, 1.0 + strength, blur, -strength, 0) def detect_faces_roboflow(image_path): return CLIENT_FACE.infer(image_path, model_id="face-detector-v4liw/2").get("predictions", []) def detect_eyes_roboflow(image_path, raw_image): """Calls Roboflow to find eyes and returns cropped images of them.""" try: resp = CLIENT_EYES.infer(image_path, model_id="eye-detection-kso3d/3") crops = [] for p in resp.get("predictions", []): x1 = int(p['x'] - p['width'] / 2) y1 = int(p['y'] - p['height'] / 2) x2 = int(p['x'] + p['width'] / 2) y2 = int(p['y'] + p['height'] / 2) crop = raw_image[y1:y2, x1:x2] if crop.size > 0: crops.append(crop) # On success, return the crops and None for the error message return crops, None except Exception as e: # If Roboflow fails, return an empty list and the error message print(f"Error in Roboflow eye detection: {e}") return [], str(e) # In app.py, replace the existing function with this one def get_largest_iris_prediction(eye_crop): "Calls Roboflow to find the largest iris using a temporary file for reliability." # --- NEW: Enhance the eye crop before saving it --- enhanced_eye_crop = enhance_image_unsharp_mask(eye_crop) with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp: # Save the ENHANCED version, not the original cv2.imwrite(tmp.name, enhanced_eye_crop) temp_iris_path = tmp.name try: # Use the file path for inference, which is more robust resp = CLIENT_IRIS.infer(temp_iris_path, model_id="iris_120_set/7") preds = resp.get("predictions", []) return max(preds, key=lambda p: p["width"] * p["height"]) if preds else None finally: # Ensure the temporary file is always deleted os.remove(temp_iris_path) def run_leukocoria_prediction(iris_crop): if leuko_model is None: return {"error": "Leukocoria model not loaded"}, 0.0 img_pil = Image.fromarray(cv2.cvtColor(iris_crop, cv2.COLOR_BGR2RGB)) enh = enhance_image_unsharp_mask(np.array(img_pil)) enh_rs = cv2.resize(enh, (224, 224)) img_array = np.array(enh_rs) / 255.0 img_array = np.expand_dims(img_array, axis=0) prediction = leuko_model.predict(img_array) confidence = float(prediction[0][0]) has_leuko = confidence > 0.5 return has_leuko, confidence # --- 3. FastAPI Application --- app = FastAPI() @app.post("/detect/") async def full_detection_pipeline(image: UploadFile = File(...)): with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp: contents = await image.read() tmp.write(contents) temp_image_path = tmp.name try: raw_image = cv2.imread(temp_image_path) if raw_image is None: return JSONResponse(status_code=400, content={"error": "Could not read uploaded image."}) if not detect_faces_roboflow(temp_image_path): return JSONResponse(status_code=400, content={"error": "No face detected."}) image_to_process = raw_image was_mirrored = False print("--- 1. Attempting detection on original image... ---") eye_crops, error_msg = detect_eyes_roboflow(temp_image_path, image_to_process) print(f"--- 2. Found {len(eye_crops)} eyes in original image. ---") if len(eye_crops) != 2: print("--- 3. Original failed. Attempting detection on mirrored image... ---") mirrored_image = cv2.flip(raw_image, 1) image_to_process = mirrored_image was_mirrored = True with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp_mirrored: cv2.imwrite(tmp_mirrored.name, mirrored_image) temp_mirrored_image_path = tmp_mirrored.name try: eye_crops, error_msg = detect_eyes_roboflow(temp_mirrored_image_path, image_to_process) print(f"--- 4. Found {len(eye_crops)} eyes in mirrored image. ---") finally: os.remove(temp_mirrored_image_path) if error_msg or len(eye_crops) != 2: return JSONResponse( status_code=400, content={"error": "Could not detect exactly two eyes. Please try another photo."} ) initial_boxes = [cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY)) for c in eye_crops] print(f"--- 5. Initial eye coordinates (x,y,w,h): {initial_boxes} ---") eye_crops.sort(key=lambda c: cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY))[0]) sorted_boxes = [cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY)) for c in eye_crops] print(f"--- 6. Sorted eye coordinates (x,y,w,h): {sorted_boxes} ---") if was_mirrored: print("--- 7. Image was mirrored, reversing eye order for correct labeling. ---") eye_crops.reverse() reversed_boxes = [cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY)) for c in eye_crops] print(f"--- 8. Reversed eye coordinates (x,y,w,h): {reversed_boxes} ---") flags = {} eye_images_b64 = {} for i, eye_crop in enumerate(eye_crops): side = "right" if i == 0 else "left" print(f"--- 9. Processing loop index {i}, assigning to: {side} eye. ---") is_success, buffer = cv2.imencode(".jpg", eye_crop) if is_success: eye_images_b64[side] = "data:image/jpeg;base64," + base64.b64encode(buffer).decode("utf-8") pred = get_largest_iris_prediction(eye_crop) if pred: x1, y1 = int(pred['x'] - pred['width'] / 2), int(pred['y'] - pred['height'] / 2) x2, y2 = int(pred['x'] + pred['width'] / 2), int(pred['y'] + pred['height'] / 2) iris_crop = eye_crop[y1:y2, x1:x2] has_leuko, confidence = run_leukocoria_prediction(iris_crop) flags[side] = has_leuko else: flags[side] = None # --- THIS BLOCK IS NOW CORRECTLY UN-INDENTED --- # It runs AFTER the 'for' loop is complete. print("--- 10. Final generated flags:", flags, "---") is_success_main, buffer_main = cv2.imencode(".jpg", image_to_process) analyzed_image_b64 = "" if is_success_main: analyzed_image_b64 = "data:image/jpeg;base64," + base64.b64encode(buffer_main).decode("utf-8") return JSONResponse(content={ "leukocoria": flags, "warnings": [], "two_eyes": eye_images_b64, "analyzed_image": analyzed_image_b64 }) finally: os.remove(temp_image_path) # --- 4. Create and Mount the Gradio UI for a professional homepage --- def gradio_wrapper(image_array): """A wrapper function to call our own FastAPI endpoint from the Gradio UI.""" try: pil_image = Image.fromarray(image_array) with io.BytesIO() as buffer: pil_image.save(buffer, format="JPEG") files = {'image': ('image.jpg', buffer.getvalue(), 'image/jpeg')} response = requests.post("http://127.0.0.1:7860/detect/", files=files) if response.status_code == 200: return response.json() else: return {"error": f"API Error {response.status_code}", "details": response.text} except Exception as e: return {"error": str(e)} gradio_ui = gr.Interface( fn=gradio_wrapper, inputs=gr.Image(type="numpy", label="Upload an eye image to test the full pipeline"), outputs=gr.JSON(label="Analysis Results"), title="LeukoLook Eye Detector", description="A demonstration of the LeukoLook detection model pipeline." ) app = gr.mount_gradio_app(app, gradio_ui, path="/") # --- 5. Run the server --- if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)