Spaces:
Running
Running
File size: 9,447 Bytes
ab24187 5eab736 4611e20 ab24187 ab43e06 4611e20 1490129 5d7bc4a ecfc393 ab24187 ecfc393 4611e20 eadc82d ecfc393 4611e20 eadc82d ecfc393 eadc82d 4611e20 eadc82d 4611e20 eadc82d 4611e20 eadc82d c7d17de ecfc393 6531628 eadc82d 65ede81 6531628 65ede81 6531628 65ede81 4611e20 a4d01ce 4611e20 eadc82d ecfc393 eadc82d ac1b201 dc2c7e9 eadc82d 4611e20 eadc82d 4611e20 6c7560e 4611e20 6c7560e 4611e20 afea313 7b3ffbf 0bc9984 7b3ffbf 381811c 7b3ffbf 381811c 76a0766 18f7e14 381811c 76a0766 9ecfee1 0bc9984 9ecfee1 18f7e14 7b3ffbf 381811c 18f7e14 76a0766 c7d17de 5788deb 18f7e14 5788deb eadc82d 381811c 76a0766 381811c 9ecfee1 0550be8 9ecfee1 eadc82d 6c7560e eadc82d 1d4fbb5 381811c 9ecfee1 6c7560e afea313 eadc82d 0bc9984 7b3ffbf 0550be8 381811c 0550be8 7b3ffbf 76a0766 6c7560e 76a0766 7b3ffbf 6c7560e 4611e20 096e485 ab24187 06db179 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# Final, Complete, and Working app.py for Hugging Face Space
import os
import cv2
import tempfile
import numpy as np
import uvicorn
import requests
import io
import base64
from PIL import Image
from inference_sdk import InferenceHTTPClient
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
import tensorflow as tf
from huggingface_hub import hf_hub_download
import gradio as gr
# --- 1. Configuration and Model Loading ---
ROBOFLOW_API_KEY = os.environ.get("ROBOFLOW_API_KEY")
CLIENT_FACE = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
CLIENT_EYES = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
CLIENT_IRIS = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
leuko_model = None
try:
model_path = hf_hub_download("skibi11/leukolook-eye-detector", "MobileNetV1_best.keras")
leuko_model = tf.keras.models.load_model(model_path)
print("--- LEUKOCORIA MODEL LOADED SUCCESSFULLY! ---")
except Exception as e:
print(f"--- FATAL ERROR: COULD NOT LOAD LEUKOCORIA MODEL: {e} ---")
raise RuntimeError(f"Could not load leukocoria model: {e}")
# --- 2. All Helper Functions ---
def enhance_image_unsharp_mask(image, strength=0.5, radius=5):
blur = cv2.GaussianBlur(image, (radius, radius), 0)
return cv2.addWeighted(image, 1.0 + strength, blur, -strength, 0)
def detect_faces_roboflow(image_path):
return CLIENT_FACE.infer(image_path, model_id="face-detector-v4liw/2").get("predictions", [])
def detect_eyes_roboflow(image_path, raw_image):
"""Calls Roboflow to find eyes and returns cropped images of them."""
try:
resp = CLIENT_EYES.infer(image_path, model_id="eye-detection-kso3d/3")
crops = []
for p in resp.get("predictions", []):
x1 = int(p['x'] - p['width'] / 2)
y1 = int(p['y'] - p['height'] / 2)
x2 = int(p['x'] + p['width'] / 2)
y2 = int(p['y'] + p['height'] / 2)
crop = raw_image[y1:y2, x1:x2]
if crop.size > 0:
crops.append(crop)
# On success, return the crops and None for the error message
return crops, None
except Exception as e:
# If Roboflow fails, return an empty list and the error message
print(f"Error in Roboflow eye detection: {e}")
return [], str(e)
# In app.py, replace the existing function with this one
def get_largest_iris_prediction(eye_crop):
"Calls Roboflow to find the largest iris using a temporary file for reliability."
# --- NEW: Enhance the eye crop before saving it ---
enhanced_eye_crop = enhance_image_unsharp_mask(eye_crop)
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
# Save the ENHANCED version, not the original
cv2.imwrite(tmp.name, enhanced_eye_crop)
temp_iris_path = tmp.name
try:
# Use the file path for inference, which is more robust
resp = CLIENT_IRIS.infer(temp_iris_path, model_id="iris_120_set/7")
preds = resp.get("predictions", [])
return max(preds, key=lambda p: p["width"] * p["height"]) if preds else None
finally:
# Ensure the temporary file is always deleted
os.remove(temp_iris_path)
def run_leukocoria_prediction(iris_crop):
if leuko_model is None: return {"error": "Leukocoria model not loaded"}, 0.0
img_pil = Image.fromarray(cv2.cvtColor(iris_crop, cv2.COLOR_BGR2RGB))
enh = enhance_image_unsharp_mask(np.array(img_pil))
enh_rs = cv2.resize(enh, (224, 224))
img_array = np.array(enh_rs) / 255.0
img_array = np.expand_dims(img_array, axis=0)
prediction = leuko_model.predict(img_array)
confidence = float(prediction[0][0])
has_leuko = confidence > 0.5
return has_leuko, confidence
# --- 3. FastAPI Application ---
app = FastAPI()
@app.post("/detect/")
async def full_detection_pipeline(image: UploadFile = File(...)):
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
contents = await image.read()
tmp.write(contents)
temp_image_path = tmp.name
try:
raw_image = cv2.imread(temp_image_path)
if raw_image is None:
return JSONResponse(status_code=400, content={"error": "Could not read uploaded image."})
if not detect_faces_roboflow(temp_image_path):
return JSONResponse(status_code=400, content={"error": "No face detected."})
image_to_process = raw_image
was_mirrored = False
print("--- 1. Attempting detection on original image... ---")
eye_crops, error_msg = detect_eyes_roboflow(temp_image_path, image_to_process)
print(f"--- 2. Found {len(eye_crops)} eyes in original image. ---")
if len(eye_crops) != 2:
print("--- 3. Original failed. Attempting detection on mirrored image... ---")
mirrored_image = cv2.flip(raw_image, 1)
image_to_process = mirrored_image
was_mirrored = True
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp_mirrored:
cv2.imwrite(tmp_mirrored.name, mirrored_image)
temp_mirrored_image_path = tmp_mirrored.name
try:
eye_crops, error_msg = detect_eyes_roboflow(temp_mirrored_image_path, image_to_process)
print(f"--- 4. Found {len(eye_crops)} eyes in mirrored image. ---")
finally:
os.remove(temp_mirrored_image_path)
if error_msg or len(eye_crops) != 2:
return JSONResponse(
status_code=400,
content={"error": "Could not detect exactly two eyes. Please try another photo."}
)
initial_boxes = [cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY)) for c in eye_crops]
print(f"--- 5. Initial eye coordinates (x,y,w,h): {initial_boxes} ---")
eye_crops.sort(key=lambda c: cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY))[0])
sorted_boxes = [cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY)) for c in eye_crops]
print(f"--- 6. Sorted eye coordinates (x,y,w,h): {sorted_boxes} ---")
if was_mirrored:
print("--- 7. Image was mirrored, reversing eye order for correct labeling. ---")
eye_crops.reverse()
reversed_boxes = [cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY)) for c in eye_crops]
print(f"--- 8. Reversed eye coordinates (x,y,w,h): {reversed_boxes} ---")
flags = {}
eye_images_b64 = {}
for i, eye_crop in enumerate(eye_crops):
side = "right" if i == 0 else "left"
print(f"--- 9. Processing loop index {i}, assigning to: {side} eye. ---")
is_success, buffer = cv2.imencode(".jpg", eye_crop)
if is_success:
eye_images_b64[side] = "data:image/jpeg;base64," + base64.b64encode(buffer).decode("utf-8")
pred = get_largest_iris_prediction(eye_crop)
if pred:
x1, y1 = int(pred['x'] - pred['width'] / 2), int(pred['y'] - pred['height'] / 2)
x2, y2 = int(pred['x'] + pred['width'] / 2), int(pred['y'] + pred['height'] / 2)
iris_crop = eye_crop[y1:y2, x1:x2]
has_leuko, confidence = run_leukocoria_prediction(iris_crop)
flags[side] = has_leuko
else:
flags[side] = None
# --- THIS BLOCK IS NOW CORRECTLY UN-INDENTED ---
# It runs AFTER the 'for' loop is complete.
print("--- 10. Final generated flags:", flags, "---")
is_success_main, buffer_main = cv2.imencode(".jpg", image_to_process)
analyzed_image_b64 = ""
if is_success_main:
analyzed_image_b64 = "data:image/jpeg;base64," + base64.b64encode(buffer_main).decode("utf-8")
return JSONResponse(content={
"leukocoria": flags,
"warnings": [],
"two_eyes": eye_images_b64,
"analyzed_image": analyzed_image_b64
})
finally:
os.remove(temp_image_path)
# --- 4. Create and Mount the Gradio UI for a professional homepage ---
def gradio_wrapper(image_array):
"""A wrapper function to call our own FastAPI endpoint from the Gradio UI."""
try:
pil_image = Image.fromarray(image_array)
with io.BytesIO() as buffer:
pil_image.save(buffer, format="JPEG")
files = {'image': ('image.jpg', buffer.getvalue(), 'image/jpeg')}
response = requests.post("http://127.0.0.1:7860/detect/", files=files)
if response.status_code == 200:
return response.json()
else:
return {"error": f"API Error {response.status_code}", "details": response.text}
except Exception as e:
return {"error": str(e)}
gradio_ui = gr.Interface(
fn=gradio_wrapper,
inputs=gr.Image(type="numpy", label="Upload an eye image to test the full pipeline"),
outputs=gr.JSON(label="Analysis Results"),
title="LeukoLook Eye Detector",
description="A demonstration of the LeukoLook detection model pipeline."
)
app = gr.mount_gradio_app(app, gradio_ui, path="/")
# --- 5. Run the server ---
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860) |