File size: 9,447 Bytes
ab24187
5eab736
4611e20
 
 
 
 
ab24187
 
ab43e06
4611e20
 
 
1490129
5d7bc4a
ecfc393
ab24187
ecfc393
4611e20
 
 
 
 
 
eadc82d
ecfc393
4611e20
eadc82d
 
ecfc393
eadc82d
4611e20
 
 
eadc82d
 
 
 
4611e20
eadc82d
4611e20
eadc82d
c7d17de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ecfc393
6531628
 
eadc82d
65ede81
6531628
 
 
 
65ede81
6531628
 
65ede81
 
 
 
 
 
 
 
 
 
4611e20
 
a4d01ce
4611e20
eadc82d
 
 
ecfc393
eadc82d
 
 
 
 
 
ac1b201
dc2c7e9
eadc82d
4611e20
 
eadc82d
 
4611e20
6c7560e
4611e20
6c7560e
 
 
 
4611e20
 
afea313
7b3ffbf
0bc9984
7b3ffbf
381811c
7b3ffbf
381811c
76a0766
18f7e14
381811c
76a0766
9ecfee1
0bc9984
9ecfee1
18f7e14
 
 
 
7b3ffbf
381811c
18f7e14
 
76a0766
c7d17de
5788deb
18f7e14
5788deb
 
eadc82d
381811c
 
 
76a0766
 
381811c
 
 
9ecfee1
0550be8
 
 
 
9ecfee1
eadc82d
6c7560e
eadc82d
1d4fbb5
381811c
9ecfee1
6c7560e
 
 
afea313
eadc82d
 
 
 
 
 
 
 
0bc9984
7b3ffbf
0550be8
 
381811c
0550be8
7b3ffbf
76a0766
 
 
 
6c7560e
 
 
76a0766
7b3ffbf
6c7560e
4611e20
 
 
096e485
ab24187
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
06db179
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# Final, Complete, and Working app.py for Hugging Face Space

import os
import cv2
import tempfile
import numpy as np
import uvicorn
import requests
import io
import base64
from PIL import Image
from inference_sdk import InferenceHTTPClient
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
import tensorflow as tf
from huggingface_hub import hf_hub_download
import gradio as gr

# --- 1. Configuration and Model Loading ---
ROBOFLOW_API_KEY = os.environ.get("ROBOFLOW_API_KEY")
CLIENT_FACE = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
CLIENT_EYES = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
CLIENT_IRIS = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)

leuko_model = None
try:
    model_path = hf_hub_download("skibi11/leukolook-eye-detector", "MobileNetV1_best.keras")
    leuko_model = tf.keras.models.load_model(model_path)
    print("--- LEUKOCORIA MODEL LOADED SUCCESSFULLY! ---")
except Exception as e:
    print(f"--- FATAL ERROR: COULD NOT LOAD LEUKOCORIA MODEL: {e} ---")
    raise RuntimeError(f"Could not load leukocoria model: {e}")

# --- 2. All Helper Functions ---
def enhance_image_unsharp_mask(image, strength=0.5, radius=5):
    blur = cv2.GaussianBlur(image, (radius, radius), 0)
    return cv2.addWeighted(image, 1.0 + strength, blur, -strength, 0)

def detect_faces_roboflow(image_path):
    return CLIENT_FACE.infer(image_path, model_id="face-detector-v4liw/2").get("predictions", [])

def detect_eyes_roboflow(image_path, raw_image):
    """Calls Roboflow to find eyes and returns cropped images of them."""
    try:
        resp = CLIENT_EYES.infer(image_path, model_id="eye-detection-kso3d/3")
        crops = []
        for p in resp.get("predictions", []):
            x1 = int(p['x'] - p['width'] / 2)
            y1 = int(p['y'] - p['height'] / 2)
            x2 = int(p['x'] + p['width'] / 2)
            y2 = int(p['y'] + p['height'] / 2)
            crop = raw_image[y1:y2, x1:x2]
            if crop.size > 0:
                crops.append(crop)
        # On success, return the crops and None for the error message
        return crops, None
    except Exception as e:
        # If Roboflow fails, return an empty list and the error message
        print(f"Error in Roboflow eye detection: {e}")
        return [], str(e)

# In app.py, replace the existing function with this one

def get_largest_iris_prediction(eye_crop):
    "Calls Roboflow to find the largest iris using a temporary file for reliability."
    
    # --- NEW: Enhance the eye crop before saving it ---
    enhanced_eye_crop = enhance_image_unsharp_mask(eye_crop)

    with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
        # Save the ENHANCED version, not the original
        cv2.imwrite(tmp.name, enhanced_eye_crop)
        temp_iris_path = tmp.name

    try:
        # Use the file path for inference, which is more robust
        resp = CLIENT_IRIS.infer(temp_iris_path, model_id="iris_120_set/7")
        preds = resp.get("predictions", [])
        return max(preds, key=lambda p: p["width"] * p["height"]) if preds else None
    finally:
        # Ensure the temporary file is always deleted
        os.remove(temp_iris_path)

def run_leukocoria_prediction(iris_crop):
    if leuko_model is None: return {"error": "Leukocoria model not loaded"}, 0.0
    img_pil = Image.fromarray(cv2.cvtColor(iris_crop, cv2.COLOR_BGR2RGB))
    enh = enhance_image_unsharp_mask(np.array(img_pil))
    enh_rs = cv2.resize(enh, (224, 224))
    img_array = np.array(enh_rs) / 255.0
    img_array = np.expand_dims(img_array, axis=0)
    prediction = leuko_model.predict(img_array)
    confidence = float(prediction[0][0])
    has_leuko = confidence > 0.5
    return has_leuko, confidence

# --- 3. FastAPI Application ---
app = FastAPI()

@app.post("/detect/")
async def full_detection_pipeline(image: UploadFile = File(...)):
    with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
        contents = await image.read()
        tmp.write(contents)
        temp_image_path = tmp.name

    try:
        raw_image = cv2.imread(temp_image_path)
        if raw_image is None:
            return JSONResponse(status_code=400, content={"error": "Could not read uploaded image."})

        if not detect_faces_roboflow(temp_image_path):
            return JSONResponse(status_code=400, content={"error": "No face detected."})

        image_to_process = raw_image
        was_mirrored = False
        
        print("--- 1. Attempting detection on original image... ---")
        eye_crops, error_msg = detect_eyes_roboflow(temp_image_path, image_to_process)
        print(f"--- 2. Found {len(eye_crops)} eyes in original image. ---")

        if len(eye_crops) != 2:
            print("--- 3. Original failed. Attempting detection on mirrored image... ---")
            mirrored_image = cv2.flip(raw_image, 1)
            image_to_process = mirrored_image
            was_mirrored = True
            
            with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp_mirrored:
                cv2.imwrite(tmp_mirrored.name, mirrored_image)
                temp_mirrored_image_path = tmp_mirrored.name
            try:
                eye_crops, error_msg = detect_eyes_roboflow(temp_mirrored_image_path, image_to_process)
                print(f"--- 4. Found {len(eye_crops)} eyes in mirrored image. ---")
            finally:
                os.remove(temp_mirrored_image_path)

        if error_msg or len(eye_crops) != 2:
            return JSONResponse(
                status_code=400,
                content={"error": "Could not detect exactly two eyes. Please try another photo."}
            )

        initial_boxes = [cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY)) for c in eye_crops]
        print(f"--- 5. Initial eye coordinates (x,y,w,h): {initial_boxes} ---")

        eye_crops.sort(key=lambda c: cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY))[0])
        
        sorted_boxes = [cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY)) for c in eye_crops]
        print(f"--- 6. Sorted eye coordinates (x,y,w,h): {sorted_boxes} ---")
        
        if was_mirrored:
            print("--- 7. Image was mirrored, reversing eye order for correct labeling. ---")
            eye_crops.reverse()
            reversed_boxes = [cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY)) for c in eye_crops]
            print(f"--- 8. Reversed eye coordinates (x,y,w,h): {reversed_boxes} ---")

        flags = {}
        eye_images_b64 = {}
        for i, eye_crop in enumerate(eye_crops):
            side = "right" if i == 0 else "left"
            print(f"--- 9. Processing loop index {i}, assigning to: {side} eye. ---")
            
            is_success, buffer = cv2.imencode(".jpg", eye_crop)
            if is_success:
                eye_images_b64[side] = "data:image/jpeg;base64," + base64.b64encode(buffer).decode("utf-8")

            pred = get_largest_iris_prediction(eye_crop)
            if pred:
                x1, y1 = int(pred['x'] - pred['width'] / 2), int(pred['y'] - pred['height'] / 2)
                x2, y2 = int(pred['x'] + pred['width'] / 2), int(pred['y'] + pred['height'] / 2)
                iris_crop = eye_crop[y1:y2, x1:x2]
                has_leuko, confidence = run_leukocoria_prediction(iris_crop)
                flags[side] = has_leuko
            else:
                flags[side] = None
        
        # --- THIS BLOCK IS NOW CORRECTLY UN-INDENTED ---
        # It runs AFTER the 'for' loop is complete.
        print("--- 10. Final generated flags:", flags, "---")
        
        is_success_main, buffer_main = cv2.imencode(".jpg", image_to_process)
        analyzed_image_b64 = ""
        if is_success_main:
            analyzed_image_b64 = "data:image/jpeg;base64," + base64.b64encode(buffer_main).decode("utf-8")

        return JSONResponse(content={
            "leukocoria": flags,
            "warnings": [],
            "two_eyes": eye_images_b64,
            "analyzed_image": analyzed_image_b64
        })

    finally:
        os.remove(temp_image_path)
        
# --- 4. Create and Mount the Gradio UI for a professional homepage ---
def gradio_wrapper(image_array):
    """A wrapper function to call our own FastAPI endpoint from the Gradio UI."""
    try:
        pil_image = Image.fromarray(image_array)
        with io.BytesIO() as buffer:
            pil_image.save(buffer, format="JPEG")
            files = {'image': ('image.jpg', buffer.getvalue(), 'image/jpeg')}
            response = requests.post("http://127.0.0.1:7860/detect/", files=files)
        
        if response.status_code == 200:
            return response.json()
        else:
            return {"error": f"API Error {response.status_code}", "details": response.text}
    except Exception as e:
        return {"error": str(e)}

gradio_ui = gr.Interface(
    fn=gradio_wrapper,
    inputs=gr.Image(type="numpy", label="Upload an eye image to test the full pipeline"),
    outputs=gr.JSON(label="Analysis Results"),
    title="LeukoLook Eye Detector",
    description="A demonstration of the LeukoLook detection model pipeline."
)

app = gr.mount_gradio_app(app, gradio_ui, path="/")

# --- 5. Run the server ---
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=7860)