File size: 6,155 Bytes
4611e20
5eab736
4611e20
 
 
 
 
 
 
 
1490129
ecfc393
5d7bc4a
ecfc393
 
4611e20
 
 
 
 
 
 
1490129
ecfc393
4611e20
d2a61cf
 
ecfc393
4611e20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ecfc393
4611e20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ecfc393
4611e20
ecfc393
4611e20
 
 
 
ac1b201
dc2c7e9
4611e20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1490129
4611e20
 
 
 
1490129
4611e20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1490129
4611e20
c676187
1471654
4611e20
 
1471654
 
4611e20
1471654
c676187
4611e20
06db179
 
4611e20
06db179
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# The Complete and Final app.py for Hugging Face Space

import os
import cv2
import tempfile
import numpy as np
import uvicorn
from PIL import Image
from inference_sdk import InferenceHTTPClient
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
import gradio as gr
import tensorflow as tf
from huggingface_hub import hf_hub_download

# --- 1. Configuration and Model Loading ---
# Note: Ensure ROBOFLOW_API_KEY is set as a secret in your Space settings
ROBOFLOW_API_KEY = os.environ.get("ROBOFLOW_API_KEY")
CLIENT_FACE = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
CLIENT_EYES = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
CLIENT_IRIS = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)

model = None
try:
    model_path = hf_hub_download("skibi11/leukolook-eye-detector", "MobileNetV1_best.keras")
    model = tf.keras.models.load_model(model_path)
    print("--- MODEL LOADED SUCCESSFULLY! ---")
except Exception as e:
    print(f"--- ERROR LOADING LEUKOCORIA MODEL: {e} ---")
    raise RuntimeError(f"Could not load leukocoria model: {e}")

# --- 2. All Helper Functions ---
def detect_faces_roboflow(image_path):
    """Calls Roboflow to find faces in the image."""
    resp = CLIENT_FACE.infer(image_path, model_id="face-detector-v4liw/2")
    return resp.get("predictions", [])

def detect_eyes_roboflow(image_path):
    """Calls Roboflow to find eyes and returns cropped images of them."""
    resp = CLIENT_EYES.infer(image_path, model_id="eye-detection-kso3d/3")
    raw_image = cv2.imread(image_path)
    if raw_image is None: return [], "Could not read image"
    eye_crops = []
    for p in resp.get("predictions", []):
        x1 = int(p['x'] - p['width'] / 2)
        y1 = int(p['y'] - p['height'] / 2)
        x2 = int(p['x'] + p['width'] / 2)
        y2 = int(p['y'] + p['height'] / 2)
        eye_crops.append(raw_image[y1:y2, x1:x2])
    return eye_crops, None

def detect_iris_roboflow(eye_crop):
    """Calls Roboflow to find the largest iris in an eye crop."""
    is_success, buffer = cv2.imencode(".jpg", eye_crop)
    if not is_success: return None
    resp = CLIENT_IRIS.infer(data=buffer, model_id="iris_120_set/7")
    preds = resp.get("predictions", [])
    if not preds: return None
    largest = max(preds, key=lambda p: p["width"] * p["height"])
    x1, y1 = int(largest['x'] - largest['width'] / 2), int(largest['y'] - largest['height'] / 2)
    x2, y2 = int(largest['x'] + largest['width'] / 2), int(largest['y'] + largest['height'] / 2)
    return eye_crop[y1:y2, x1:x2]

def run_leukocoria_prediction(iris_crop):
    """Runs the loaded TensorFlow model to predict leukocoria."""
    if model is None: return {"error": "Leukocoria model not loaded"}
    img_pil = Image.fromarray(cv2.cvtColor(iris_crop, cv2.COLOR_BGR2RGB))
    img = img_pil.resize((224, 224))
    img_array = np.array(img) / 255.0
    img_array = np.expand_dims(img_array, axis=0)
    prediction = model.predict(img_array)
    return {f"Class_{i}": float(score) for i, score in enumerate(prediction[0])}

# --- 3. Create the FastAPI App and Main Endpoint ---
app = FastAPI()

@app.post("/api/detect/")
async def full_detection_pipeline(image: UploadFile = File(...)):
    """The main API endpoint that runs the full detection pipeline."""
    with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
        tmp.write(await image.read())
        temp_image_path = tmp.name

    try:
        if not detect_faces_roboflow(temp_image_path):
            return JSONResponse(status_code=400, content={"error": "No face detected."})

        eye_crops, error_msg = detect_eyes_roboflow(temp_image_path)
        if error_msg or len(eye_crops) != 2:
            return JSONResponse(status_code=400, content={"error": "Exactly two eyes not detected."})
        
        results = {}
        for i, eye_crop in enumerate(eye_crops):
            side = f"eye_{i+1}"
            iris_crop = detect_iris_roboflow(eye_crop)
            if iris_crop is None:
                results[side] = {"status": "No iris detected", "prediction": None}
                continue
            
            prediction = run_leukocoria_prediction(iris_crop)
            results[side] = {"status": "Processed", "prediction": prediction}
        
        return JSONResponse(content=results)

    finally:
        os.remove(temp_image_path)

# --- 4. Create the Gradio UI for the homepage ---
# This UI will call our own FastAPI endpoint, ensuring consistent logic.
def gradio_wrapper(image):
    """A wrapper function to call our own FastAPI endpoint from the Gradio UI."""
    try:
        # Save the numpy array from Gradio to a temporary file to send to our API
        pil_image = Image.fromarray(image)
        with tempfile.NamedTemporaryFile(mode="wb", suffix=".jpg", delete=False) as tmp:
            pil_image.save(tmp, format="JPEG")
            tmp_path = tmp.name

        with open(tmp_path, "rb") as f:
            files = {'image': ('image.jpg', f, 'image/jpeg')}
            # The API is running on the same server, so we call it locally
            response = requests.post("http://127.0.0.1:7860/api/detect/", files=files)
        
        os.remove(tmp_path) # Clean up the temp file
        
        if response.status_code == 200:
            return response.json()
        else:
            return {"error": f"API Error {response.status_code}", "details": response.text}

    except Exception as e:
        return {"error": str(e)}

gradio_ui = gr.Interface(
    fn=gradio_wrapper,
    inputs=gr.Image(type="numpy", label="Upload an eye image to test"),
    outputs=gr.JSON(label="Prediction Results"),
    title="LeukoLook Eye Detector",
    description="A demonstration of the LeukoLook detection model. This UI calls the same API endpoint that the main application uses."
)

# --- 5. Mount the Gradio UI onto the FastAPI app's root ---
app = gr.mount_gradio_app(app, gradio_ui, path="/")

# --- 6. Run the server ---
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=7860)