File size: 7,095 Bytes
ab24187
5eab736
4611e20
 
 
 
 
ab24187
 
ab43e06
4611e20
 
 
1490129
5d7bc4a
ecfc393
ab24187
ecfc393
4611e20
 
 
 
 
 
eadc82d
ecfc393
4611e20
eadc82d
 
ecfc393
eadc82d
4611e20
 
 
eadc82d
 
 
 
4611e20
eadc82d
4611e20
eadc82d
c7d17de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ecfc393
eadc82d
65ede81
 
 
 
 
 
 
 
 
 
 
 
 
4611e20
 
a4d01ce
4611e20
eadc82d
 
 
ecfc393
eadc82d
 
 
 
 
 
ac1b201
dc2c7e9
6c7560e
 
eadc82d
4611e20
 
eadc82d
 
4611e20
6c7560e
4611e20
6c7560e
 
 
 
4611e20
 
 
c7d17de
 
1a510c6
6c7560e
eadc82d
 
6c7560e
eadc82d
6c7560e
 
eadc82d
1d4fbb5
6c7560e
 
 
 
 
 
eadc82d
 
 
 
 
 
 
 
 
6c7560e
 
 
 
 
 
 
4611e20
 
 
1490129
ab24187
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
06db179
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# Final, Complete, and Working app.py for Hugging Face Space

import os
import cv2
import tempfile
import numpy as np
import uvicorn
import requests
import io
import base64
from PIL import Image
from inference_sdk import InferenceHTTPClient
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
import tensorflow as tf
from huggingface_hub import hf_hub_download
import gradio as gr

# --- 1. Configuration and Model Loading ---
ROBOFLOW_API_KEY = os.environ.get("ROBOFLOW_API_KEY")
CLIENT_FACE = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
CLIENT_EYES = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
CLIENT_IRIS = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)

leuko_model = None
try:
    model_path = hf_hub_download("skibi11/leukolook-eye-detector", "MobileNetV1_best.keras")
    leuko_model = tf.keras.models.load_model(model_path)
    print("--- LEUKOCORIA MODEL LOADED SUCCESSFULLY! ---")
except Exception as e:
    print(f"--- FATAL ERROR: COULD NOT LOAD LEUKOCORIA MODEL: {e} ---")
    raise RuntimeError(f"Could not load leukocoria model: {e}")

# --- 2. All Helper Functions ---
def enhance_image_unsharp_mask(image, strength=0.5, radius=5):
    blur = cv2.GaussianBlur(image, (radius, radius), 0)
    return cv2.addWeighted(image, 1.0 + strength, blur, -strength, 0)

def detect_faces_roboflow(image_path):
    return CLIENT_FACE.infer(image_path, model_id="face-detector-v4liw/2").get("predictions", [])

def detect_eyes_roboflow(image_path, raw_image):
    """Calls Roboflow to find eyes and returns cropped images of them."""
    try:
        resp = CLIENT_EYES.infer(image_path, model_id="eye-detection-kso3d/3")
        crops = []
        for p in resp.get("predictions", []):
            x1 = int(p['x'] - p['width'] / 2)
            y1 = int(p['y'] - p['height'] / 2)
            x2 = int(p['x'] + p['width'] / 2)
            y2 = int(p['y'] + p['height'] / 2)
            crop = raw_image[y1:y2, x1:x2]
            if crop.size > 0:
                crops.append(crop)
        # On success, return the crops and None for the error message
        return crops, None
    except Exception as e:
        # If Roboflow fails, return an empty list and the error message
        print(f"Error in Roboflow eye detection: {e}")
        return [], str(e)

def get_largest_iris_prediction(eye_crop):
    "Calls Roboflow to find the largest iris using a temporary file for reliability."
    with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
        cv2.imwrite(tmp.name, eye_crop)
        temp_iris_path = tmp.name

    try:
        # Use the file path for inference, which is more robust
        resp = CLIENT_IRIS.infer(temp_iris_path, model_id="iris_120_set/7")
        preds = resp.get("predictions", [])
        return max(preds, key=lambda p: p["width"] * p["height"]) if preds else None
    finally:
        # Ensure the temporary file is always deleted
        os.remove(temp_iris_path)

def run_leukocoria_prediction(iris_crop):
    if leuko_model is None: return {"error": "Leukocoria model not loaded"}, 0.0
    img_pil = Image.fromarray(cv2.cvtColor(iris_crop, cv2.COLOR_BGR2RGB))
    enh = enhance_image_unsharp_mask(np.array(img_pil))
    enh_rs = cv2.resize(enh, (224, 224))
    img_array = np.array(enh_rs) / 255.0
    img_array = np.expand_dims(img_array, axis=0)
    prediction = leuko_model.predict(img_array)
    confidence = float(prediction[0][0])
    has_leuko = confidence > 0.5
    return has_leuko, confidence

# --- 3. FastAPI Application ---
app = FastAPI()

# In app.py - an updated full_detection_pipeline function

@app.post("/detect/")
async def full_detection_pipeline(image: UploadFile = File(...)):
    with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
        contents = await image.read()
        tmp.write(contents)
        temp_image_path = tmp.name

    try:
        raw_image = cv2.imread(temp_image_path)
        if raw_image is None:
            return JSONResponse(status_code=400, content={"error": "Could not read uploaded image."})

        if not detect_faces_roboflow(temp_image_path):
            return JSONResponse(status_code=400, content={"error": "No face detected."})

        eye_crops, error_msg = detect_eyes_roboflow(temp_image_path, raw_image)
        if error_msg or len(eye_crops) != 2:
            return JSONResponse(status_code=200, content={"warnings": ["Exactly two eyes not detected."]})

        eye_crops.sort(key=lambda c: cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY))[0])

        # Prepare to store all our results
        flags = {}
        eye_images_b64 = {}

        for i, eye_crop in enumerate(eye_crops):
            side = "right" if i == 0 else "left"

            # --- NEW: Encode the cropped eye image to Base64 ---
            is_success, buffer = cv2.imencode(".jpg", eye_crop)
            if is_success:
                eye_images_b64[side] = "data:image/jpeg;base64," + base64.b64encode(buffer).decode("utf-8")

            pred = get_largest_iris_prediction(eye_crop)
            if pred:
                x1, y1 = int(pred['x'] - pred['width'] / 2), int(pred['y'] - pred['height'] / 2)
                x2, y2 = int(pred['x'] + pred['width'] / 2), int(pred['y'] + pred['height'] / 2)
                iris_crop = eye_crop[y1:y2, x1:x2]
                has_leuko, confidence = run_leukocoria_prediction(iris_crop)
                flags[side] = has_leuko
            else:
                flags[side] = None

        # --- NEW: Include the images in the final response ---
        return JSONResponse(content={
            "leukocoria": flags,
            "warnings": [],
            "two_eyes": eye_images_b64 # Add the eye images here
        })

    finally:
        os.remove(temp_image_path)

# --- 4. Create and Mount the Gradio UI for a professional homepage ---
def gradio_wrapper(image_array):
    """A wrapper function to call our own FastAPI endpoint from the Gradio UI."""
    try:
        pil_image = Image.fromarray(image_array)
        with io.BytesIO() as buffer:
            pil_image.save(buffer, format="JPEG")
            files = {'image': ('image.jpg', buffer.getvalue(), 'image/jpeg')}
            response = requests.post("http://127.0.0.1:7860/detect/", files=files)
        
        if response.status_code == 200:
            return response.json()
        else:
            return {"error": f"API Error {response.status_code}", "details": response.text}
    except Exception as e:
        return {"error": str(e)}

gradio_ui = gr.Interface(
    fn=gradio_wrapper,
    inputs=gr.Image(type="numpy", label="Upload an eye image to test the full pipeline"),
    outputs=gr.JSON(label="Analysis Results"),
    title="LeukoLook Eye Detector",
    description="A demonstration of the LeukoLook detection model pipeline."
)

app = gr.mount_gradio_app(app, gradio_ui, path="/")

# --- 5. Run the server ---
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=7860)