leukolook-api / app.py
skibi11's picture
Update app.py
b9ab630 verified
raw
history blame
8.72 kB
# Final, Complete, and Working app.py for Hugging Face Space
import os
import cv2
import tempfile
import numpy as np
import uvicorn
import requests
import io
import base64
from PIL import Image
from inference_sdk import InferenceHTTPClient
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
import tensorflow as tf
from huggingface_hub import hf_hub_download
import gradio as gr
# --- 1. Configuration and Model Loading ---
ROBOFLOW_API_KEY = os.environ.get("ROBOFLOW_API_KEY")
CLIENT_FACE = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
CLIENT_EYES = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
CLIENT_IRIS = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
leuko_model = None
try:
model_path = hf_hub_download("skibi11/leukolook-eye-detector", "MobileNetV1_best.keras")
leuko_model = tf.keras.models.load_model(model_path)
print("--- LEUKOCORIA MODEL LOADED SUCCESSFULLY! ---")
except Exception as e:
print(f"--- FATAL ERROR: COULD NOT LOAD LEUKOCORIA MODEL: {e} ---")
raise RuntimeError(f"Could not load leukocoria model: {e}")
# --- 2. All Helper Functions ---
def enhance_image_unsharp_mask(image, strength=0.5, radius=5):
blur = cv2.GaussianBlur(image, (radius, radius), 0)
return cv2.addWeighted(image, 1.0 + strength, blur, -strength, 0)
def detect_faces_roboflow(image_path):
return CLIENT_FACE.infer(image_path, model_id="face-detector-v4liw/2").get("predictions", [])
def detect_eyes_roboflow(image_path, raw_image):
"""Calls Roboflow to find eyes and returns cropped images of them."""
try:
resp = CLIENT_EYES.infer(image_path, model_id="eye-detection-kso3d/3")
crops = []
for p in resp.get("predictions", []):
x1 = int(p['x'] - p['width'] / 2)
y1 = int(p['y'] - p['height'] / 2)
x2 = int(p['x'] + p['width'] / 2)
y2 = int(p['y'] + p['height'] / 2)
crop = raw_image[y1:y2, x1:x2]
if crop.size > 0:
crops.append(crop)
# On success, return the crops and None for the error message
return crops, None
except Exception as e:
# If Roboflow fails, return an empty list and the error message
print(f"Error in Roboflow eye detection: {e}")
return [], str(e)
def get_largest_iris_prediction(eye_crop):
"Calls Roboflow to find the largest iris using a temporary file for reliability."
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
cv2.imwrite(tmp.name, eye_crop)
temp_iris_path = tmp.name
try:
# Use the file path for inference, which is more robust
resp = CLIENT_IRIS.infer(temp_iris_path, model_id="iris_120_set/7")
preds = resp.get("predictions", [])
return max(preds, key=lambda p: p["width"] * p["height"]) if preds else None
finally:
# Ensure the temporary file is always deleted
os.remove(temp_iris_path)
def run_leukocoria_prediction(iris_crop):
if leuko_model is None: return {"error": "Leukocoria model not loaded"}, 0.0
img_pil = Image.fromarray(cv2.cvtColor(iris_crop, cv2.COLOR_BGR2RGB))
enh = enhance_image_unsharp_mask(np.array(img_pil))
enh_rs = cv2.resize(enh, (224, 224))
img_array = np.array(enh_rs) / 255.0
img_array = np.expand_dims(img_array, axis=0)
prediction = leuko_model.predict(img_array)
confidence = float(prediction[0][0])
has_leuko = confidence > 0.5
return has_leuko, confidence
# --- 3. FastAPI Application ---
app = FastAPI()
@app.post("/detect/")
async def full_detection_pipeline(image: UploadFile = File(...)):
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
contents = await image.read()
tmp.write(contents)
temp_image_path = tmp.name
try:
raw_image = cv2.imread(temp_image_path)
if raw_image is None:
return JSONResponse(status_code=400, content={"error": "Could not read uploaded image."})
if not detect_faces_roboflow(temp_image_path):
return JSONResponse(status_code=400, content={"error": "No face detected."})
# --- This is the final corrected logic ---
image_to_process = raw_image
was_mirrored = False # --- NEW: Add a flag to track if we flipped the image
print("--- Attempting detection on original image... ---")
eye_crops, error_msg = detect_eyes_roboflow(temp_image_path, image_to_process)
if len(eye_crops) != 2:
print("--- Original failed. Attempting detection on mirrored image... ---")
mirrored_image = cv2.flip(raw_image, 1)
image_to_process = mirrored_image
was_mirrored = True # --- NEW: Set the flag to true
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp_mirrored:
cv2.imwrite(tmp_mirrored.name, mirrored_image)
temp_mirrored_image_path = tmp_mirrored.name
try:
eye_crops, error_msg = detect_eyes_roboflow(temp_mirrored_image_path, image_to_process)
finally:
os.remove(temp_mirrored_image_path)
# Final check after both attempts
if error_msg or len(eye_crops) != 2:
return JSONResponse(
status_code=400,
content={"error": "Could not detect exactly two eyes. Please try another photo."}
)
# Sort the eyes from left to right based on their position in the image
eye_crops.sort(key=lambda c: cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY))[0])
# --- NEW: If the image was mirrored, reverse the sorted list ---
# This ensures the person's right eye is always first in the list.
if was_mirrored:
print("--- Image was mirrored, reversing eye order for correct labeling. ---")
eye_crops.reverse()
flags = {}
eye_images_b64 = {}
for i, eye_crop in enumerate(eye_crops):
# Now, because of the sort (and potential reverse), i=0 is ALWAYS the person's right eye
side = "right" if i == 0 else "left"
is_success, buffer = cv2.imencode(".jpg", eye_crop)
if is_success:
eye_images_b64[side] = "data:image/jpeg;base64," + base64.b64encode(buffer).decode("utf-8")
pred = get_largest_iris_prediction(eye_crop)
if pred:
x1, y1 = int(pred['x'] - pred['width'] / 2), int(pred['y'] - pred['height'] / 2)
x2, y2 = int(pred['x'] + pred['width'] / 2), int(pred['y'] + pred['height'] / 2)
iris_crop = eye_crop[y1:y2, x1:x2]
has_leuko, confidence = run_leukocoria_prediction(iris_crop)
flags[side] = has_leuko
else:
flags[side] = None
is_success_main, buffer_main = cv2.imencode(".jpg", image_to_process)
analyzed_image_b64 = ""
if is_success_main:
analyzed_image_b64 = "data:image/jpeg;base64," + base64.b64encode(buffer_main).decode("utf-8")
return JSONResponse(content={
"leukocoria": flags,
"warnings": [],
"two_eyes": eye_images_b64,
"analyzed_image": analyzed_image_b64
})
finally:
os.remove(temp_image_path)
# --- 4. Create and Mount the Gradio UI for a professional homepage ---
def gradio_wrapper(image_array):
"""A wrapper function to call our own FastAPI endpoint from the Gradio UI."""
try:
pil_image = Image.fromarray(image_array)
with io.BytesIO() as buffer:
pil_image.save(buffer, format="JPEG")
files = {'image': ('image.jpg', buffer.getvalue(), 'image/jpeg')}
response = requests.post("http://127.0.0.1:7860/detect/", files=files)
if response.status_code == 200:
return response.json()
else:
return {"error": f"API Error {response.status_code}", "details": response.text}
except Exception as e:
return {"error": str(e)}
gradio_ui = gr.Interface(
fn=gradio_wrapper,
inputs=gr.Image(type="numpy", label="Upload an eye image to test the full pipeline"),
outputs=gr.JSON(label="Analysis Results"),
title="LeukoLook Eye Detector",
description="A demonstration of the LeukoLook detection model pipeline."
)
app = gr.mount_gradio_app(app, gradio_ui, path="/")
# --- 5. Run the server ---
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)