Spaces:
Running
Running
File size: 6,155 Bytes
4611e20 5eab736 4611e20 1490129 ecfc393 5d7bc4a ecfc393 4611e20 1490129 ecfc393 4611e20 d2a61cf ecfc393 4611e20 ecfc393 4611e20 ecfc393 4611e20 ecfc393 4611e20 ac1b201 dc2c7e9 4611e20 1490129 4611e20 1490129 4611e20 1490129 4611e20 c676187 1471654 4611e20 1471654 4611e20 1471654 c676187 4611e20 06db179 4611e20 06db179 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# The Complete and Final app.py for Hugging Face Space
import os
import cv2
import tempfile
import numpy as np
import uvicorn
from PIL import Image
from inference_sdk import InferenceHTTPClient
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
import gradio as gr
import tensorflow as tf
from huggingface_hub import hf_hub_download
# --- 1. Configuration and Model Loading ---
# Note: Ensure ROBOFLOW_API_KEY is set as a secret in your Space settings
ROBOFLOW_API_KEY = os.environ.get("ROBOFLOW_API_KEY")
CLIENT_FACE = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
CLIENT_EYES = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
CLIENT_IRIS = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
model = None
try:
model_path = hf_hub_download("skibi11/leukolook-eye-detector", "MobileNetV1_best.keras")
model = tf.keras.models.load_model(model_path)
print("--- MODEL LOADED SUCCESSFULLY! ---")
except Exception as e:
print(f"--- ERROR LOADING LEUKOCORIA MODEL: {e} ---")
raise RuntimeError(f"Could not load leukocoria model: {e}")
# --- 2. All Helper Functions ---
def detect_faces_roboflow(image_path):
"""Calls Roboflow to find faces in the image."""
resp = CLIENT_FACE.infer(image_path, model_id="face-detector-v4liw/2")
return resp.get("predictions", [])
def detect_eyes_roboflow(image_path):
"""Calls Roboflow to find eyes and returns cropped images of them."""
resp = CLIENT_EYES.infer(image_path, model_id="eye-detection-kso3d/3")
raw_image = cv2.imread(image_path)
if raw_image is None: return [], "Could not read image"
eye_crops = []
for p in resp.get("predictions", []):
x1 = int(p['x'] - p['width'] / 2)
y1 = int(p['y'] - p['height'] / 2)
x2 = int(p['x'] + p['width'] / 2)
y2 = int(p['y'] + p['height'] / 2)
eye_crops.append(raw_image[y1:y2, x1:x2])
return eye_crops, None
def detect_iris_roboflow(eye_crop):
"""Calls Roboflow to find the largest iris in an eye crop."""
is_success, buffer = cv2.imencode(".jpg", eye_crop)
if not is_success: return None
resp = CLIENT_IRIS.infer(data=buffer, model_id="iris_120_set/7")
preds = resp.get("predictions", [])
if not preds: return None
largest = max(preds, key=lambda p: p["width"] * p["height"])
x1, y1 = int(largest['x'] - largest['width'] / 2), int(largest['y'] - largest['height'] / 2)
x2, y2 = int(largest['x'] + largest['width'] / 2), int(largest['y'] + largest['height'] / 2)
return eye_crop[y1:y2, x1:x2]
def run_leukocoria_prediction(iris_crop):
"""Runs the loaded TensorFlow model to predict leukocoria."""
if model is None: return {"error": "Leukocoria model not loaded"}
img_pil = Image.fromarray(cv2.cvtColor(iris_crop, cv2.COLOR_BGR2RGB))
img = img_pil.resize((224, 224))
img_array = np.array(img) / 255.0
img_array = np.expand_dims(img_array, axis=0)
prediction = model.predict(img_array)
return {f"Class_{i}": float(score) for i, score in enumerate(prediction[0])}
# --- 3. Create the FastAPI App and Main Endpoint ---
app = FastAPI()
@app.post("/api/detect/")
async def full_detection_pipeline(image: UploadFile = File(...)):
"""The main API endpoint that runs the full detection pipeline."""
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
tmp.write(await image.read())
temp_image_path = tmp.name
try:
if not detect_faces_roboflow(temp_image_path):
return JSONResponse(status_code=400, content={"error": "No face detected."})
eye_crops, error_msg = detect_eyes_roboflow(temp_image_path)
if error_msg or len(eye_crops) != 2:
return JSONResponse(status_code=400, content={"error": "Exactly two eyes not detected."})
results = {}
for i, eye_crop in enumerate(eye_crops):
side = f"eye_{i+1}"
iris_crop = detect_iris_roboflow(eye_crop)
if iris_crop is None:
results[side] = {"status": "No iris detected", "prediction": None}
continue
prediction = run_leukocoria_prediction(iris_crop)
results[side] = {"status": "Processed", "prediction": prediction}
return JSONResponse(content=results)
finally:
os.remove(temp_image_path)
# --- 4. Create the Gradio UI for the homepage ---
# This UI will call our own FastAPI endpoint, ensuring consistent logic.
def gradio_wrapper(image):
"""A wrapper function to call our own FastAPI endpoint from the Gradio UI."""
try:
# Save the numpy array from Gradio to a temporary file to send to our API
pil_image = Image.fromarray(image)
with tempfile.NamedTemporaryFile(mode="wb", suffix=".jpg", delete=False) as tmp:
pil_image.save(tmp, format="JPEG")
tmp_path = tmp.name
with open(tmp_path, "rb") as f:
files = {'image': ('image.jpg', f, 'image/jpeg')}
# The API is running on the same server, so we call it locally
response = requests.post("http://127.0.0.1:7860/api/detect/", files=files)
os.remove(tmp_path) # Clean up the temp file
if response.status_code == 200:
return response.json()
else:
return {"error": f"API Error {response.status_code}", "details": response.text}
except Exception as e:
return {"error": str(e)}
gradio_ui = gr.Interface(
fn=gradio_wrapper,
inputs=gr.Image(type="numpy", label="Upload an eye image to test"),
outputs=gr.JSON(label="Prediction Results"),
title="LeukoLook Eye Detector",
description="A demonstration of the LeukoLook detection model. This UI calls the same API endpoint that the main application uses."
)
# --- 5. Mount the Gradio UI onto the FastAPI app's root ---
app = gr.mount_gradio_app(app, gradio_ui, path="/")
# --- 6. Run the server ---
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860) |