File size: 6,335 Bytes
4611e20
5eab736
4611e20
 
 
 
 
31e7c35
 
 
4611e20
 
 
1490129
ecfc393
5d7bc4a
ecfc393
 
4611e20
 
 
 
 
 
 
1490129
ecfc393
4611e20
d2a61cf
 
ecfc393
4611e20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ecfc393
4611e20
 
 
 
b9bd60d
4611e20
 
 
 
 
 
 
 
 
 
 
ecfc393
4611e20
ecfc393
4611e20
 
 
 
ac1b201
dc2c7e9
4611e20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
afbf135
 
 
 
 
 
 
31e7c35
 
 
4611e20
 
 
 
 
 
 
 
 
 
 
 
1490129
31e7c35
 
4611e20
1490129
31e7c35
4611e20
 
 
 
 
 
 
 
 
31e7c35
4611e20
 
 
 
 
1490129
4611e20
c676187
1471654
4611e20
31e7c35
 
1471654
31e7c35
1471654
c676187
06db179
 
31e7c35
06db179
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# The Complete and Final app.py for Hugging Face Space

import os
import cv2
import tempfile
import numpy as np
import uvicorn
import requests
import base64
import io
from PIL import Image
from inference_sdk import InferenceHTTPClient
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
import gradio as gr
import tensorflow as tf
from huggingface_hub import hf_hub_download

# --- 1. Configuration and Model Loading ---
# Note: Ensure ROBOFLOW_API_KEY is set as a secret in your Space settings
ROBOFLOW_API_KEY = os.environ.get("ROBOFLOW_API_KEY")
CLIENT_FACE = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
CLIENT_EYES = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
CLIENT_IRIS = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)

model = None
try:
    model_path = hf_hub_download("skibi11/leukolook-eye-detector", "MobileNetV1_best.keras")
    model = tf.keras.models.load_model(model_path)
    print("--- MODEL LOADED SUCCESSFULLY! ---")
except Exception as e:
    print(f"--- ERROR LOADING LEUKOCORIA MODEL: {e} ---")
    raise RuntimeError(f"Could not load leukocoria model: {e}")

# --- 2. All Helper Functions ---
def detect_faces_roboflow(image_path):
    """Calls Roboflow to find faces in the image."""
    resp = CLIENT_FACE.infer(image_path, model_id="face-detector-v4liw/2")
    return resp.get("predictions", [])

def detect_eyes_roboflow(image_path):
    """Calls Roboflow to find eyes and returns cropped images of them."""
    resp = CLIENT_EYES.infer(image_path, model_id="eye-detection-kso3d/3")
    raw_image = cv2.imread(image_path)
    if raw_image is None: return [], "Could not read image"
    eye_crops = []
    for p in resp.get("predictions", []):
        x1 = int(p['x'] - p['width'] / 2)
        y1 = int(p['y'] - p['height'] / 2)
        x2 = int(p['x'] + p['width'] / 2)
        y2 = int(p['y'] + p['height'] / 2)
        eye_crops.append(raw_image[y1:y2, x1:x2])
    return eye_crops, None

def detect_iris_roboflow(eye_crop):
    """Calls Roboflow to find the largest iris in an eye crop."""
    is_success, buffer = cv2.imencode(".jpg", eye_crop)
    if not is_success: return None
    resp = CLIENT_IRIS.infer(buffer, model_id="iris_120_set/7")
    preds = resp.get("predictions", [])
    if not preds: return None
    largest = max(preds, key=lambda p: p["width"] * p["height"])
    x1, y1 = int(largest['x'] - largest['width'] / 2), int(largest['y'] - largest['height'] / 2)
    x2, y2 = int(largest['x'] + largest['width'] / 2), int(largest['y'] + largest['height'] / 2)
    return eye_crop[y1:y2, x1:x2]

def run_leukocoria_prediction(iris_crop):
    """Runs the loaded TensorFlow model to predict leukocoria."""
    if model is None: return {"error": "Leukocoria model not loaded"}
    img_pil = Image.fromarray(cv2.cvtColor(iris_crop, cv2.COLOR_BGR2RGB))
    img = img_pil.resize((224, 224))
    img_array = np.array(img) / 255.0
    img_array = np.expand_dims(img_array, axis=0)
    prediction = model.predict(img_array)
    return {f"Class_{i}": float(score) for i, score in enumerate(prediction[0])}

# --- 3. Create the FastAPI App and Main Endpoint ---
app = FastAPI()

@app.post("/api/detect/")
async def full_detection_pipeline(image: UploadFile = File(...)):
    """The main API endpoint that runs the full detection pipeline."""
    with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
        tmp.write(await image.read())
        temp_image_path = tmp.name

    try:
        if not detect_faces_roboflow(temp_image_path):
            return JSONResponse(status_code=400, content={"error": "No face detected."})

        eye_crops, error_msg = detect_eyes_roboflow(temp_image_path)
        if error_msg or len(eye_crops) != 2:
            return JSONResponse(status_code=400, content={"error": "Exactly two eyes not detected."})
        
        results = {}
       # Convert each crop to grayscale before finding the bounding box for sorting
        def get_x_coordinate(crop):
            gray_crop = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY)
            x, _, _, _ = cv2.boundingRect(gray_crop)
            return x
        
        sorted_eye_crops = sorted(eye_crops, key=get_x_coordinate)
        
        for i, eye_crop in enumerate(sorted_eye_crops):
            side = "left_eye" if i == 0 else "right_eye"
            iris_crop = detect_iris_roboflow(eye_crop)
            if iris_crop is None:
                results[side] = {"status": "No iris detected", "prediction": None}
                continue
            
            prediction = run_leukocoria_prediction(iris_crop)
            results[side] = {"status": "Processed", "prediction": prediction}
        
        return JSONResponse(content=results)

    finally:
        os.remove(temp_image_path)

# --- 4. Create and Mount the Gradio UI for a professional homepage ---
def gradio_wrapper(image_array):
    """A wrapper function to call our own FastAPI endpoint from the Gradio UI."""
    try:
        pil_image = Image.fromarray(image_array.astype('uint8'), 'RGB')
        with tempfile.NamedTemporaryFile(mode="wb", suffix=".jpg", delete=False) as tmp:
            pil_image.save(tmp, format="JPEG")
            tmp_path = tmp.name

        with open(tmp_path, "rb") as f:
            files = {'image': ('image.jpg', f, 'image/jpeg')}
            # The API is running on the same server, so we call it locally
            response = requests.post("http://127.0.0.1:7860/api/detect/", files=files)
        
        os.remove(tmp_path)
        
        if response.status_code == 200:
            return response.json()
        else:
            return {"error": f"API Error {response.status_code}", "details": response.text}
    except Exception as e:
        return {"error": str(e)}

gradio_ui = gr.Interface(
    fn=gradio_wrapper,
    inputs=gr.Image(type="numpy", label="Upload an eye image to test the full pipeline"),
    outputs=gr.JSON(label="Analysis Results"),
    title="LeukoLook Eye Detector",
    description="A demonstration of the LeukoLook detection model pipeline."
)

app = gr.mount_gradio_app(app, gradio_ui, path="/")

# --- 5. Run the server ---
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=7860)