skibi11 commited on
Commit
e0cf57f
·
verified ·
1 Parent(s): 2d599b6

Adapted to follow the logic from the provided Django api/views.py

Browse files
Files changed (1) hide show
  1. app.py +117 -113
app.py CHANGED
@@ -1,11 +1,10 @@
1
- # Final, Complete, and Working app.py for Hugging Face Space
 
2
  import os
3
  import cv2
4
  import tempfile
5
  import numpy as np
6
  import uvicorn
7
- import requests
8
- import io
9
  import base64
10
  from PIL import Image
11
  from inference_sdk import InferenceHTTPClient
@@ -16,8 +15,12 @@ from huggingface_hub import hf_hub_download
16
  import gradio as gr
17
 
18
  # --- 1. Configuration and Model Loading ---
19
- ROBOFLOW_API_KEY = os.environ.get("ROBOFLOW_API_KEY")
 
 
20
 
 
 
21
  CLIENT_FACE = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
22
  CLIENT_EYES = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
23
  CLIENT_IRIS = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
@@ -31,177 +34,178 @@ except Exception as e:
31
  print(f"--- FATAL ERROR: COULD NOT LOAD LEUKOCORIA MODEL: {e} ---")
32
  raise RuntimeError(f"Could not load leukocoria model: {e}")
33
 
34
- # --- 2. All Helper Functions ---
35
 
36
- # NOTE: The 'enhance_image_unsharp_mask' function has been removed.
 
 
 
37
 
38
  def detect_faces_roboflow(image_path):
 
39
  return CLIENT_FACE.infer(image_path, model_id="face-detector-v4liw/2").get("predictions", [])
40
 
41
- def detect_eyes_roboflow(image_path, raw_image):
42
- """Calls Roboflow to find eyes and returns cropped images of them."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
  try:
44
- resp = CLIENT_EYES.infer(image_path, model_id="eye-detection-kso3d/3")
45
- crops = []
46
- for p in resp.get("predictions", []):
47
- x1 = int(p['x'] - p['width'] / 2)
48
- y1 = int(p['y'] - p['height'] / 2)
49
- x2 = int(p['x'] + p['width'] / 2)
50
- y2 = int(p['y'] + p['height'] / 2)
51
- crop = raw_image[y1:y2, x1:x2]
52
- if crop.size > 0:
53
- crops.append(crop)
54
- return crops, None
55
- except Exception as e:
56
- print(f"Error in Roboflow eye detection: {e}")
57
- return [], str(e)
 
 
 
 
 
 
 
 
 
58
 
59
  def get_largest_iris_prediction(eye_crop):
60
- "Calls Roboflow to find the largest iris using a temporary file for reliability."
61
  with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
62
- # Save the original eye crop, not an enhanced version
63
  cv2.imwrite(tmp.name, eye_crop)
64
- temp_iris_path = tmp.name
65
  try:
66
- resp = CLIENT_IRIS.infer(temp_iris_path, model_id="iris_120_set/7")
67
  preds = resp.get("predictions", [])
68
  return max(preds, key=lambda p: p["width"] * p["height"]) if preds else None
69
  finally:
70
- os.remove(temp_iris_path)
71
 
72
  def run_leukocoria_prediction(iris_crop):
73
- if leuko_model is None: return {"error": "Leukocoria model not loaded"}, 0.0
74
- # Convert crop to PIL Image
75
- img_pil = Image.fromarray(cv2.cvtColor(iris_crop, cv2.COLOR_BGR2RGB))
76
- # Resize the original image array
77
- img_resized = cv2.resize(np.array(img_pil), (224, 224))
78
- # Normalize and expand dimensions for the model
79
- img_array = np.array(img_resized) / 255.0
80
  img_array = np.expand_dims(img_array, axis=0)
 
81
  prediction = leuko_model.predict(img_array)
82
  confidence = float(prediction[0][0])
83
  has_leuko = confidence > 0.5
84
  return has_leuko, confidence
85
 
 
 
 
 
 
86
  # --- 3. FastAPI Application ---
87
  app = FastAPI()
88
 
89
  @app.post("/detect/")
90
  async def full_detection_pipeline(image: UploadFile = File(...)):
91
  with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
92
- contents = await image.read()
93
- tmp.write(contents)
94
  temp_image_path = tmp.name
 
95
  try:
96
- raw_image = cv2.imread(temp_image_path)
 
 
 
 
 
97
  if raw_image is None:
98
  return JSONResponse(status_code=400, content={"error": "Could not read uploaded image."})
99
- if not detect_faces_roboflow(temp_image_path):
100
- return JSONResponse(status_code=400, content={"error": "No face detected."})
101
- image_to_process = raw_image
102
- was_mirrored = False
103
-
104
- print("--- 1. Attempting detection on original image... ---")
105
- eye_crops, error_msg = detect_eyes_roboflow(temp_image_path, image_to_process)
106
- print(f"--- 2. Found {len(eye_crops)} eyes in original image. ---")
107
  if len(eye_crops) != 2:
108
- print("--- 3. Original failed. Attempting detection on mirrored image... ---")
109
- mirrored_image = cv2.flip(raw_image, 1)
110
- image_to_process = mirrored_image
111
- was_mirrored = True
112
-
113
- with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp_mirrored:
114
- cv2.imwrite(tmp_mirrored.name, mirrored_image)
115
- temp_mirrored_image_path = tmp_mirrored.name
116
- try:
117
- eye_crops, error_msg = detect_eyes_roboflow(temp_mirrored_image_path, image_to_process)
118
- print(f"--- 4. Found {len(eye_crops)} eyes in mirrored image. ---")
119
- finally:
120
- os.remove(temp_mirrored_image_path)
121
-
122
- if error_msg or len(eye_crops) != 2:
123
- return JSONResponse(
124
- status_code=400,
125
- content={"error": "Could not detect exactly two eyes. Please try another photo."}
126
- )
127
-
128
- initial_boxes = [cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY)) for c in eye_crops]
129
- print(f"--- 5. Initial eye coordinates (x,y,w,h): {initial_boxes} ---")
130
-
131
- eye_crops.sort(key=lambda c: cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY))[0])
132
-
133
- sorted_boxes = [cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY)) for c in eye_crops]
134
- print(f"--- 6. Sorted eye coordinates (x,y,w,h): {sorted_boxes} ---")
135
-
136
- if was_mirrored:
137
- print("--- 7. Image was mirrored, reversing eye order for correct labeling. ---")
138
- eye_crops.reverse()
139
- reversed_boxes = [cv2.boundingRect(cv2.cvtColor(c, cv2.COLOR_BGR2GRAY)) for c in eye_crops]
140
- print(f"--- 8. Reversed eye coordinates (x,y,w,h): {reversed_boxes} ---")
141
-
142
  flags = {}
143
- eye_images_b64 = {}
144
- for i, eye_crop in enumerate(eye_crops):
145
- side = "right" if i == 0 else "left"
146
- print(f"--- 9. Processing loop index {i}, assigning to: {side} eye. ---")
147
-
148
- is_success, buffer = cv2.imencode(".jpg", eye_crop)
149
- if is_success:
150
- eye_images_b64[side] = "data:image/jpeg;base64," + base64.b64encode(buffer).decode("utf-8")
151
 
152
- pred = get_largest_iris_prediction(eye_crop)
 
153
  if pred:
154
- x1, y1 = int(pred['x'] - pred['width'] / 2), int(pred['y'] - pred['height'] / 2)
155
- x2, y2 = int(pred['x'] + pred['width'] / 2), int(pred['y'] + pred['height'] / 2)
156
- iris_crop = eye_crop[y1:y2, x1:x2]
 
 
 
157
  has_leuko, confidence = run_leukocoria_prediction(iris_crop)
158
  flags[side] = has_leuko
159
  else:
160
  flags[side] = None
161
-
162
- print("--- 10. Final generated flags:", flags, "---")
163
-
164
- is_success_main, buffer_main = cv2.imencode(".jpg", image_to_process)
165
- analyzed_image_b64 = ""
166
- if is_success_main:
167
- analyzed_image_b64 = "data:image/jpeg;base64," + base64.b64encode(buffer_main).decode("utf-8")
168
-
169
- return JSONResponse(content={
170
  "leukocoria": flags,
171
- "warnings": [],
172
- "two_eyes": eye_images_b64,
173
- "analyzed_image": analyzed_image_b64
174
  })
 
175
  finally:
176
  os.remove(temp_image_path)
177
 
178
- # --- 4. Create and Mount the Gradio UI ---
179
  def gradio_wrapper(image_array):
180
- """A wrapper function to call our own FastAPI endpoint from the Gradio UI."""
181
  try:
182
  pil_image = Image.fromarray(image_array)
183
  with io.BytesIO() as buffer:
184
  pil_image.save(buffer, format="JPEG")
185
  files = {'image': ('image.jpg', buffer.getvalue(), 'image/jpeg')}
186
- # The URL points to the local FastAPI server running within the Hugging Face Space
187
  response = requests.post("http://127.0.0.1:7860/detect/", files=files)
188
 
189
- if response.status_code == 200:
190
- return response.json()
191
- else:
192
- return {"error": f"API Error {response.status_code}", "details": response.text}
193
  except Exception as e:
194
  return {"error": str(e)}
195
 
196
  gradio_ui = gr.Interface(
197
  fn=gradio_wrapper,
198
- inputs=gr.Image(type="numpy", label="Upload an eye image to test the full pipeline"),
199
  outputs=gr.JSON(label="Analysis Results"),
200
  title="LeukoLook Eye Detector",
201
- description="A demonstration of the LeukoLook detection model pipeline.")
 
202
 
203
  app = gr.mount_gradio_app(app, gradio_ui, path="/")
204
 
205
- # --- 5. Run the server ---
206
  if __name__ == "__main__":
207
  uvicorn.run(app, host="0.0.0.0", port=7860)
 
1
+ # app.py
2
+ # Adapted to follow the logic from the provided Django api/views.py
3
  import os
4
  import cv2
5
  import tempfile
6
  import numpy as np
7
  import uvicorn
 
 
8
  import base64
9
  from PIL import Image
10
  from inference_sdk import InferenceHTTPClient
 
15
  import gradio as gr
16
 
17
  # --- 1. Configuration and Model Loading ---
18
+ # Constants from the new Django logic
19
+ MAX_INFER_DIM = 1024
20
+ ENHANCED_SIZE = (224, 224)
21
 
22
+ # Roboflow and TF Model setup
23
+ ROBOFLOW_API_KEY = os.environ.get("ROBOFLOW_API_KEY")
24
  CLIENT_FACE = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
25
  CLIENT_EYES = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
26
  CLIENT_IRIS = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key=ROBOFLOW_API_KEY)
 
34
  print(f"--- FATAL ERROR: COULD NOT LOAD LEUKOCORIA MODEL: {e} ---")
35
  raise RuntimeError(f"Could not load leukocoria model: {e}")
36
 
37
+ # --- 2. Helper Functions (Adapted from Django views.py) ---
38
 
39
+ def enhance_image_unsharp_mask(image, strength=0.5, radius=5):
40
+ """Enhances image using unsharp masking."""
41
+ blur = cv2.GaussianBlur(image, (radius, radius), 0)
42
+ return cv2.addWeighted(image, 1.0 + strength, blur, -strength, 0)
43
 
44
  def detect_faces_roboflow(image_path):
45
+ """Detects faces using Roboflow."""
46
  return CLIENT_FACE.infer(image_path, model_id="face-detector-v4liw/2").get("predictions", [])
47
 
48
+ def detect_eyes_roboflow(image_path):
49
+ """
50
+ Detects eyes, resizing the image if necessary for inference,
51
+ then scales coordinates back to the original image size.
52
+ """
53
+ raw_image = cv2.imread(image_path)
54
+ if raw_image is None:
55
+ return None, []
56
+
57
+ h, w = raw_image.shape[:2]
58
+ scale = min(1.0, MAX_INFER_DIM / max(h, w))
59
+
60
+ # Use a temporary file for inference if resizing is needed
61
+ if scale < 1.0:
62
+ small_image = cv2.resize(raw_image, (int(w*scale), int(h*scale)))
63
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
64
+ cv2.imwrite(tmp.name, small_image)
65
+ infer_path = tmp.name
66
+ else:
67
+ infer_path = image_path
68
+
69
  try:
70
+ resp = CLIENT_EYES.infer(infer_path, model_id="eye-detection-kso3d/3")
71
+ finally:
72
+ # Clean up temp file if one was created
73
+ if scale < 1.0 and os.path.exists(infer_path):
74
+ os.remove(infer_path)
75
+
76
+ crops = []
77
+ for p in resp.get("predictions", []):
78
+ # Scale coordinates back to original image dimensions
79
+ cx, cy = p["x"] / scale, p["y"] / scale
80
+ bw, bh = p["width"] / scale, p["height"] / scale
81
+
82
+ # Crop from the original raw image
83
+ x1 = int(cx - bw / 2)
84
+ y1 = int(cy - bh / 2)
85
+ x2 = int(cx + bw / 2)
86
+ y2 = int(cy + bh / 2)
87
+
88
+ crop = raw_image[y1:y2, x1:x2]
89
+ if crop.size > 0:
90
+ crops.append({"coords": (x1, y1, x2, y2), "image": crop})
91
+
92
+ return raw_image, crops
93
 
94
  def get_largest_iris_prediction(eye_crop):
95
+ """Finds the largest iris in an eye crop."""
96
  with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
 
97
  cv2.imwrite(tmp.name, eye_crop)
98
+ temp_path = tmp.name
99
  try:
100
+ resp = CLIENT_IRIS.infer(temp_path, model_id="iris_120_set/7")
101
  preds = resp.get("predictions", [])
102
  return max(preds, key=lambda p: p["width"] * p["height"]) if preds else None
103
  finally:
104
+ os.remove(temp_path)
105
 
106
  def run_leukocoria_prediction(iris_crop):
107
+ """Runs the loaded TensorFlow model on an iris crop."""
108
+ # The logic from views.py is now directly in the TF model call
109
+ enh = enhance_image_unsharp_mask(iris_crop)
110
+ enh_rs = cv2.resize(enh, ENHANCED_SIZE)
111
+
112
+ img_array = np.array(enh_rs) / 255.0
 
113
  img_array = np.expand_dims(img_array, axis=0)
114
+
115
  prediction = leuko_model.predict(img_array)
116
  confidence = float(prediction[0][0])
117
  has_leuko = confidence > 0.5
118
  return has_leuko, confidence
119
 
120
+ def to_base64(image):
121
+ """Converts a CV2 image to a base64 string."""
122
+ _, buffer = cv2.imencode(".jpg", image)
123
+ return "data:image/jpeg;base64," + base64.b64encode(buffer).decode()
124
+
125
  # --- 3. FastAPI Application ---
126
  app = FastAPI()
127
 
128
  @app.post("/detect/")
129
  async def full_detection_pipeline(image: UploadFile = File(...)):
130
  with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
131
+ tmp.write(await image.read())
 
132
  temp_image_path = tmp.name
133
+
134
  try:
135
+ # Step 1: Face Check
136
+ if not detect_faces_roboflow(temp_image_path):
137
+ return JSONResponse(status_code=200, content={"warnings": ["No face detected."]})
138
+
139
+ # Step 2: Eye Detection
140
+ raw_image, eye_crops = detect_eyes_roboflow(temp_image_path)
141
  if raw_image is None:
142
  return JSONResponse(status_code=400, content={"error": "Could not read uploaded image."})
143
+
 
 
 
 
 
 
 
144
  if len(eye_crops) != 2:
145
+ return JSONResponse(status_code=200, content={
146
+ "analyzed_image": to_base64(raw_image),
147
+ "warnings": ["Exactly two eyes not detected."]
148
+ })
149
+
150
+ # Step 3: Process Eyes with NEW Labeling Logic
151
+ sorted_eyes = sorted(eye_crops, key=lambda e: e["coords"][0])
152
+ images_b64 = {}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
153
  flags = {}
154
+
155
+ # This new loop labels the left-most eye as "left" and right-most as "right"
156
+ for side, eye_info in zip(("left", "right"), sorted_eyes):
157
+ eye_img = eye_info["image"]
 
 
 
 
158
 
159
+ # Iris detection and Leukocoria prediction
160
+ pred = get_largest_iris_prediction(eye_img)
161
  if pred:
162
+ cx, cy, w, h = pred["x"], pred["y"], pred["width"], pred["height"]
163
+ x1, y1 = int(cx - w / 2), int(cy - h / 2)
164
+ x2, y2 = int(cx + w / 2), int(cy + h / 2)
165
+
166
+ iris_crop = eye_img[y1:y2, x1:x2]
167
+
168
  has_leuko, confidence = run_leukocoria_prediction(iris_crop)
169
  flags[side] = has_leuko
170
  else:
171
  flags[side] = None
172
+
173
+ images_b64[side] = to_base64(eye_img)
174
+
175
+ # Step 4: Prepare and return the final response
176
+ return JSONResponse(status_code=200, content={
177
+ "analyzed_image": to_base64(raw_image),
178
+ "two_eyes": images_b64,
 
 
179
  "leukocoria": flags,
180
+ "warnings": []
 
 
181
  })
182
+
183
  finally:
184
  os.remove(temp_image_path)
185
 
186
+ # --- 4. Gradio UI (for simple testing) ---
187
  def gradio_wrapper(image_array):
 
188
  try:
189
  pil_image = Image.fromarray(image_array)
190
  with io.BytesIO() as buffer:
191
  pil_image.save(buffer, format="JPEG")
192
  files = {'image': ('image.jpg', buffer.getvalue(), 'image/jpeg')}
 
193
  response = requests.post("http://127.0.0.1:7860/detect/", files=files)
194
 
195
+ return response.json()
 
 
 
196
  except Exception as e:
197
  return {"error": str(e)}
198
 
199
  gradio_ui = gr.Interface(
200
  fn=gradio_wrapper,
201
+ inputs=gr.Image(type="numpy", label="Upload an eye image"),
202
  outputs=gr.JSON(label="Analysis Results"),
203
  title="LeukoLook Eye Detector",
204
+ description="Demonstration of the full detection pipeline."
205
+ )
206
 
207
  app = gr.mount_gradio_app(app, gradio_ui, path="/")
208
 
209
+ # --- 5. Run Server ---
210
  if __name__ == "__main__":
211
  uvicorn.run(app, host="0.0.0.0", port=7860)