Spaces:
Running
Running
used the old views.py logic as guide
Browse files
app.py
CHANGED
@@ -1,4 +1,4 @@
|
|
1 |
-
#
|
2 |
|
3 |
import os
|
4 |
import cv2
|
@@ -17,101 +17,100 @@ import tensorflow as tf
|
|
17 |
from huggingface_hub import hf_hub_download
|
18 |
|
19 |
# --- 1. Configuration and Model Loading ---
|
20 |
-
# Note: Ensure ROBOFLOW_API_KEY is set as a secret in your Space settings
|
21 |
ROBOFLOW_API_KEY = os.environ.get("ROBOFLOW_API_KEY")
|
22 |
CLIENT_FACE = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
|
23 |
CLIENT_EYES = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
|
24 |
CLIENT_IRIS = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
|
25 |
|
26 |
-
|
27 |
try:
|
28 |
model_path = hf_hub_download("skibi11/leukolook-eye-detector", "MobileNetV1_best.keras")
|
29 |
-
|
30 |
-
print("--- MODEL LOADED SUCCESSFULLY! ---")
|
31 |
except Exception as e:
|
32 |
-
print(f"--- ERROR
|
33 |
raise RuntimeError(f"Could not load leukocoria model: {e}")
|
34 |
|
35 |
# --- 2. All Helper Functions ---
|
|
|
|
|
|
|
|
|
36 |
def detect_faces_roboflow(image_path):
|
37 |
-
""
|
38 |
-
resp = CLIENT_FACE.infer(image_path, model_id="face-detector-v4liw/2")
|
39 |
-
return resp.get("predictions", [])
|
40 |
|
41 |
-
def detect_eyes_roboflow(image_path):
|
42 |
-
"""Calls Roboflow to find eyes and returns cropped images of them."""
|
43 |
resp = CLIENT_EYES.infer(image_path, model_id="eye-detection-kso3d/3")
|
44 |
-
|
45 |
-
if raw_image is None: return [], "Could not read image"
|
46 |
-
eye_crops = []
|
47 |
for p in resp.get("predictions", []):
|
48 |
x1 = int(p['x'] - p['width'] / 2)
|
49 |
y1 = int(p['y'] - p['height'] / 2)
|
50 |
x2 = int(p['x'] + p['width'] / 2)
|
51 |
y2 = int(p['y'] + p['height'] / 2)
|
52 |
-
|
53 |
-
|
|
|
|
|
54 |
|
55 |
-
|
56 |
-
|
57 |
is_success, buffer = cv2.imencode(".jpg", eye_crop)
|
58 |
if not is_success: return None
|
59 |
-
resp = CLIENT_IRIS.infer(buffer, model_id="iris_120_set/7")
|
60 |
preds = resp.get("predictions", [])
|
61 |
-
|
62 |
-
largest = max(preds, key=lambda p: p["width"] * p["height"])
|
63 |
-
x1, y1 = int(largest['x'] - largest['width'] / 2), int(largest['y'] - largest['height'] / 2)
|
64 |
-
x2, y2 = int(largest['x'] + largest['width'] / 2), int(largest['y'] + largest['height'] / 2)
|
65 |
-
return eye_crop[y1:y2, x1:x2]
|
66 |
|
67 |
def run_leukocoria_prediction(iris_crop):
|
68 |
-
"""
|
69 |
-
|
70 |
img_pil = Image.fromarray(cv2.cvtColor(iris_crop, cv2.COLOR_BGR2RGB))
|
71 |
-
|
72 |
-
|
|
|
73 |
img_array = np.expand_dims(img_array, axis=0)
|
74 |
-
|
75 |
-
|
76 |
-
|
77 |
-
|
|
|
|
|
|
|
|
|
78 |
app = FastAPI()
|
79 |
|
80 |
-
@app.post("/
|
81 |
async def full_detection_pipeline(image: UploadFile = File(...)):
|
82 |
-
"""The main API endpoint that runs the full detection pipeline."""
|
83 |
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
|
84 |
-
|
|
|
85 |
temp_image_path = tmp.name
|
86 |
-
|
87 |
try:
|
88 |
if not detect_faces_roboflow(temp_image_path):
|
89 |
return JSONResponse(status_code=400, content={"error": "No face detected."})
|
90 |
|
91 |
-
|
92 |
-
|
|
|
|
|
93 |
return JSONResponse(status_code=400, content={"error": "Exactly two eyes not detected."})
|
94 |
|
95 |
-
|
96 |
-
|
97 |
-
|
98 |
-
|
99 |
-
|
100 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
101 |
|
102 |
-
|
103 |
-
|
104 |
-
for i, eye_crop in enumerate(sorted_eye_crops):
|
105 |
-
side = "left_eye" if i == 0 else "right_eye"
|
106 |
-
iris_crop = detect_iris_roboflow(eye_crop)
|
107 |
-
if iris_crop is None:
|
108 |
-
results[side] = {"status": "No iris detected", "prediction": None}
|
109 |
-
continue
|
110 |
-
|
111 |
-
prediction = run_leukocoria_prediction(iris_crop)
|
112 |
-
results[side] = {"status": "Processed", "prediction": prediction}
|
113 |
-
|
114 |
-
return JSONResponse(content=results)
|
115 |
|
116 |
finally:
|
117 |
os.remove(temp_image_path)
|
@@ -120,17 +119,11 @@ async def full_detection_pipeline(image: UploadFile = File(...)):
|
|
120 |
def gradio_wrapper(image_array):
|
121 |
"""A wrapper function to call our own FastAPI endpoint from the Gradio UI."""
|
122 |
try:
|
123 |
-
pil_image = Image.fromarray(image_array
|
124 |
-
with
|
125 |
-
pil_image.save(
|
126 |
-
|
127 |
-
|
128 |
-
with open(tmp_path, "rb") as f:
|
129 |
-
files = {'image': ('image.jpg', f, 'image/jpeg')}
|
130 |
-
# The API is running on the same server, so we call it locally
|
131 |
-
response = requests.post("http://127.0.0.1:7860/api/detect/", files=files)
|
132 |
-
|
133 |
-
os.remove(tmp_path)
|
134 |
|
135 |
if response.status_code == 200:
|
136 |
return response.json()
|
|
|
1 |
+
# Final, Complete, and Corrected app.py for Hugging Face Space
|
2 |
|
3 |
import os
|
4 |
import cv2
|
|
|
17 |
from huggingface_hub import hf_hub_download
|
18 |
|
19 |
# --- 1. Configuration and Model Loading ---
|
|
|
20 |
ROBOFLOW_API_KEY = os.environ.get("ROBOFLOW_API_KEY")
|
21 |
CLIENT_FACE = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
|
22 |
CLIENT_EYES = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
|
23 |
CLIENT_IRIS = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
|
24 |
|
25 |
+
leuko_model = None
|
26 |
try:
|
27 |
model_path = hf_hub_download("skibi11/leukolook-eye-detector", "MobileNetV1_best.keras")
|
28 |
+
leuko_model = tf.keras.models.load_model(model_path)
|
29 |
+
print("--- LEUKOCORIA MODEL LOADED SUCCESSFULLY! ---")
|
30 |
except Exception as e:
|
31 |
+
print(f"--- FATAL ERROR: COULD NOT LOAD LEUKOCORIA MODEL: {e} ---")
|
32 |
raise RuntimeError(f"Could not load leukocoria model: {e}")
|
33 |
|
34 |
# --- 2. All Helper Functions ---
|
35 |
+
def enhance_image_unsharp_mask(image, strength=0.5, radius=5):
|
36 |
+
blur = cv2.GaussianBlur(image, (radius, radius), 0)
|
37 |
+
return cv2.addWeighted(image, 1.0 + strength, blur, -strength, 0)
|
38 |
+
|
39 |
def detect_faces_roboflow(image_path):
|
40 |
+
return CLIENT_FACE.infer(image_path, model_id="face-detector-v4liw/2").get("predictions", [])
|
|
|
|
|
41 |
|
42 |
+
def detect_eyes_roboflow(image_path, raw_image):
|
|
|
43 |
resp = CLIENT_EYES.infer(image_path, model_id="eye-detection-kso3d/3")
|
44 |
+
crops = []
|
|
|
|
|
45 |
for p in resp.get("predictions", []):
|
46 |
x1 = int(p['x'] - p['width'] / 2)
|
47 |
y1 = int(p['y'] - p['height'] / 2)
|
48 |
x2 = int(p['x'] + p['width'] / 2)
|
49 |
y2 = int(p['y'] + p['height'] / 2)
|
50 |
+
crop = raw_image[y1:y2, x1:x2]
|
51 |
+
if crop.size > 0:
|
52 |
+
crops.append(crop)
|
53 |
+
return crops
|
54 |
|
55 |
+
# --- ADDED MISSING FUNCTION ---
|
56 |
+
def get_largest_iris_prediction(eye_crop):
|
57 |
is_success, buffer = cv2.imencode(".jpg", eye_crop)
|
58 |
if not is_success: return None
|
59 |
+
resp = CLIENT_IRIS.infer(data=buffer, model_id="iris_120_set/7")
|
60 |
preds = resp.get("predictions", [])
|
61 |
+
return max(preds, key=lambda p: p["width"] * p["height"]) if preds else None
|
|
|
|
|
|
|
|
|
62 |
|
63 |
def run_leukocoria_prediction(iris_crop):
|
64 |
+
if leuko_model is None: return {"error": "Leukocoria model not loaded"}
|
65 |
+
|
66 |
img_pil = Image.fromarray(cv2.cvtColor(iris_crop, cv2.COLOR_BGR2RGB))
|
67 |
+
enh = enhance_image_unsharp_mask(np.array(img_pil))
|
68 |
+
enh_rs = cv2.resize(enh, (224, 224))
|
69 |
+
img_array = np.array(enh_rs) / 255.0
|
70 |
img_array = np.expand_dims(img_array, axis=0)
|
71 |
+
|
72 |
+
prediction = leuko_model.predict(img_array)
|
73 |
+
confidence = float(prediction[0][0])
|
74 |
+
has_leuko = confidence > 0.5
|
75 |
+
|
76 |
+
return has_leuko, confidence
|
77 |
+
|
78 |
+
# --- 3. FastAPI Application ---
|
79 |
app = FastAPI()
|
80 |
|
81 |
+
@app.post("/detect/")
|
82 |
async def full_detection_pipeline(image: UploadFile = File(...)):
|
|
|
83 |
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
|
84 |
+
contents = await image.read()
|
85 |
+
tmp.write(contents)
|
86 |
temp_image_path = tmp.name
|
87 |
+
|
88 |
try:
|
89 |
if not detect_faces_roboflow(temp_image_path):
|
90 |
return JSONResponse(status_code=400, content={"error": "No face detected."})
|
91 |
|
92 |
+
raw_image = cv2.imread(temp_image_path)
|
93 |
+
eye_crops = detect_eyes_roboflow(temp_image_path, raw_image)
|
94 |
+
|
95 |
+
if len(eye_crops) != 2:
|
96 |
return JSONResponse(status_code=400, content={"error": "Exactly two eyes not detected."})
|
97 |
|
98 |
+
eye_crops.sort(key=lambda c: cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY))[0])
|
99 |
+
|
100 |
+
flags = {}
|
101 |
+
for i, eye_crop in enumerate(eye_crops):
|
102 |
+
side = "left" if i == 0 else "right"
|
103 |
+
pred = get_largest_iris_prediction(eye_crop)
|
104 |
+
if pred:
|
105 |
+
x1, y1 = int(pred['x'] - pred['width'] / 2), int(pred['y'] - pred['height'] / 2)
|
106 |
+
x2, y2 = int(pred['x'] + pred['width'] / 2), int(pred['y'] + pred['height'] / 2)
|
107 |
+
iris_crop = eye_crop[y1:y2, x1:x2]
|
108 |
+
has_leuko, confidence = run_leukocoria_prediction(iris_crop)
|
109 |
+
flags[side] = has_leuko
|
110 |
+
else:
|
111 |
+
flags[side] = None
|
112 |
|
113 |
+
return JSONResponse(content={"leukocoria": flags, "warnings": []})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
114 |
|
115 |
finally:
|
116 |
os.remove(temp_image_path)
|
|
|
119 |
def gradio_wrapper(image_array):
|
120 |
"""A wrapper function to call our own FastAPI endpoint from the Gradio UI."""
|
121 |
try:
|
122 |
+
pil_image = Image.fromarray(image_array)
|
123 |
+
with io.BytesIO() as buffer:
|
124 |
+
pil_image.save(buffer, format="JPEG")
|
125 |
+
files = {'image': ('image.jpg', buffer.getvalue(), 'image/jpeg')}
|
126 |
+
response = requests.post("http://127.0.0.1:7860/detect/", files=files)
|
|
|
|
|
|
|
|
|
|
|
|
|
127 |
|
128 |
if response.status_code == 200:
|
129 |
return response.json()
|