|
from ultralytics import YOLO |
|
import cv2 |
|
import gradio as gr |
|
import numpy as np |
|
import spaces |
|
import os |
|
import torch |
|
import utils |
|
import plotly.graph_objects as go |
|
|
|
from image_segmenter import ImageSegmenter |
|
from monocular_depth_estimator import MonocularDepthEstimator |
|
from point_cloud_generator import display_pcd |
|
|
|
|
|
|
|
|
|
|
|
|
|
device = torch.device("cpu") |
|
|
|
def initialize_gpu(): |
|
"""Ensure ZeroGPU assigns a GPU before initializing CUDA""" |
|
global device |
|
try: |
|
with spaces.GPU(): |
|
torch.cuda.empty_cache() |
|
if torch.cuda.is_available(): |
|
device = torch.device("cuda") |
|
print(f"✅ GPU initialized: {torch.cuda.get_device_name(0)}") |
|
else: |
|
print("❌ No GPU detected after ZeroGPU allocation.") |
|
device = torch.device("cpu") |
|
except Exception as e: |
|
print(f"🚨 GPU initialization failed: {e}") |
|
device = torch.device("cpu") |
|
|
|
|
|
|
|
initialize_gpu() |
|
|
|
|
|
|
|
|
|
|
|
CANCEL_PROCESSING = False |
|
|
|
img_seg = ImageSegmenter(model_type="yolov8s-seg") |
|
depth_estimator = MonocularDepthEstimator(model_type="midas_v21_small_256") |
|
|
|
@spaces.GPU |
|
def process_image(image): |
|
image = utils.resize(image) |
|
image_segmentation, objects_data = img_seg.predict(image) |
|
depthmap, depth_colormap = depth_estimator.make_prediction(image) |
|
dist_image = utils.draw_depth_info(image, depthmap, objects_data) |
|
objs_pcd = utils.generate_obj_pcd(depthmap, objects_data) |
|
plot_fig = display_pcd(objs_pcd) |
|
return image_segmentation, depth_colormap, dist_image, plot_fig |
|
|
|
|
|
|
|
@spaces.GPU |
|
def test_process_img(image): |
|
image = utils.resize(image) |
|
image_segmentation, objects_data = img_seg.predict(image) |
|
depthmap, depth_colormap = depth_estimator.make_prediction(image) |
|
return image_segmentation, objects_data, depthmap, depth_colormap |
|
|
|
@spaces.GPU |
|
def process_video(vid_path=None): |
|
vid_cap = cv2.VideoCapture(vid_path) |
|
while vid_cap.isOpened(): |
|
ret, frame = vid_cap.read() |
|
if ret: |
|
print("making predictions ....") |
|
frame = utils.resize(frame) |
|
image_segmentation, objects_data = img_seg.predict(frame) |
|
depthmap, depth_colormap = depth_estimator.make_prediction(frame) |
|
dist_image = utils.draw_depth_info(frame, depthmap, objects_data) |
|
yield cv2.cvtColor(image_segmentation, cv2.COLOR_BGR2RGB), depth_colormap, cv2.cvtColor(dist_image, cv2.COLOR_BGR2RGB) |
|
return None |
|
|
|
|
|
|
|
def update_segmentation_options(options): |
|
img_seg.is_show_bounding_boxes = True if 'Show Boundary Box' in options else False |
|
img_seg.is_show_segmentation = True if 'Show Segmentation Region' in options else False |
|
img_seg.is_show_segmentation_boundary = True if 'Show Segmentation Boundary' in options else False |
|
|
|
def update_confidence_threshold(thres_val): |
|
img_seg.confidence_threshold = thres_val/100 |
|
|
|
@spaces.GPU |
|
def model_selector(model_type): |
|
global img_seg, depth_estimator |
|
|
|
if "Small - Better performance and less accuracy" == model_type: |
|
midas_model, yolo_model = "midas_v21_small_256", "yolov8s-seg" |
|
elif "Medium - Balanced performance and accuracy" == model_type: |
|
midas_model, yolo_model = "dpt_hybrid_384", "yolov8m-seg" |
|
elif "Large - Slow performance and high accuracy" == model_type: |
|
midas_model, yolo_model = "dpt_large_384", "yolov8l-seg" |
|
else: |
|
midas_model, yolo_model = "midas_v21_small_256", "yolov8s-seg" |
|
|
|
img_seg = ImageSegmenter(model_type=yolo_model) |
|
depth_estimator = MonocularDepthEstimator(model_type=midas_model) |
|
|
|
|
|
|
|
|
|
|
|
def get_box_vertices(bbox): |
|
"""Convert bbox to corner vertices""" |
|
x1, y1, x2, y2 = bbox |
|
return [ |
|
[x1, y1], |
|
[x2, y1], |
|
[x2, y2], |
|
[x1, y2] |
|
] |
|
|
|
def depth_at_center(depth_map, bbox): |
|
"""Get depth at center of bounding box""" |
|
x1, y1, x2, y2 = bbox |
|
center_x = int((x1 + x2) / 2) |
|
center_y = int((y1 + y2) / 2) |
|
|
|
|
|
region = depth_map[ |
|
max(0, center_y-2):min(depth_map.shape[0], center_y+3), |
|
max(0, center_x-2):min(depth_map.shape[1], center_x+3) |
|
] |
|
return np.median(region) |
|
|
|
def get_camera_matrix(depth_estimator): |
|
"""Get camera calibration matrix""" |
|
return { |
|
"fx": depth_estimator.fx_depth, |
|
"fy": depth_estimator.fy_depth, |
|
"cx": depth_estimator.cx_depth, |
|
"cy": depth_estimator.cy_depth |
|
} |
|
|
|
@spaces.GPU |
|
def get_detection_data(image): |
|
"""Get structured detection data with depth information""" |
|
try: |
|
|
|
image = utils.resize(image) |
|
|
|
|
|
image_segmentation, objects_data = img_seg.predict(image) |
|
depthmap, depth_colormap = depth_estimator.make_prediction(image) |
|
|
|
|
|
detections = [] |
|
for data in objects_data: |
|
cls_id, cls_name, cls_center, cls_mask, cls_clr = data |
|
|
|
|
|
masked_depth, mean_depth = utils.get_masked_depth(depthmap, cls_mask) |
|
|
|
|
|
y_indices, x_indices = np.where(cls_mask > 0) |
|
if len(x_indices) > 0 and len(y_indices) > 0: |
|
x1, x2 = np.min(x_indices), np.max(x_indices) |
|
y1, y2 = np.min(y_indices), np.max(y_indices) |
|
else: |
|
continue |
|
|
|
|
|
height, width = image.shape[:2] |
|
bbox_normalized = [ |
|
float(x1/width), |
|
float(y1/height), |
|
float(x2/width), |
|
float(y2/height) |
|
] |
|
|
|
detection = { |
|
"id": int(cls_id), |
|
"category": cls_name, |
|
"center": [ |
|
float(cls_center[0]/width), |
|
float(cls_center[1]/height) |
|
], |
|
"bbox": bbox_normalized, |
|
"depth": float(mean_depth * 10), |
|
"color": [float(c/255) for c in cls_clr], |
|
"mask": cls_mask.tolist(), |
|
"confidence": 1.0 |
|
} |
|
detections.append(detection) |
|
|
|
|
|
camera_params = { |
|
"fx": depth_estimator.fx_depth, |
|
"fy": depth_estimator.fy_depth, |
|
"cx": depth_estimator.cx_depth, |
|
"cy": depth_estimator.cy_depth |
|
} |
|
|
|
|
|
point_clouds = utils.generate_obj_pcd(depthmap, objects_data) |
|
pcd_data = [ |
|
{ |
|
"points": np.asarray(pcd.points).tolist(), |
|
"color": [float(c/255) for c in color] |
|
} |
|
for pcd, color in point_clouds |
|
] |
|
|
|
return { |
|
"detections": detections, |
|
"depth_map": depthmap.tolist(), |
|
"camera_params": camera_params, |
|
"image_size": { |
|
"width": width, |
|
"height": height |
|
}, |
|
"point_clouds": pcd_data |
|
} |
|
|
|
except Exception as e: |
|
print(f"Error in get_detection_data: {str(e)}") |
|
raise |
|
|
|
|
|
def cancel(): |
|
CANCEL_PROCESSING = True |
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks() as my_app: |
|
|
|
|
|
gr.Markdown("<h1><center>Simultaneous Segmentation and Depth Estimation</center></h1>") |
|
gr.Markdown("<h3><center>Created by Vaishanth</center></h3>") |
|
gr.Markdown("<h3><center>This model estimates the depth of segmented objects.</center></h3>") |
|
|
|
|
|
with gr.Tab("Image"): |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
img_input = gr.Image() |
|
model_type_img = gr.Dropdown( |
|
["Small - Better performance and less accuracy", |
|
"Medium - Balanced performance and accuracy", |
|
"Large - Slow performance and high accuracy"], |
|
label="Model Type", value="Small - Better performance and less accuracy", |
|
info="Select the inference model before running predictions!") |
|
options_checkbox_img = gr.CheckboxGroup(["Show Boundary Box", "Show Segmentation Region", "Show Segmentation Boundary"], label="Options") |
|
conf_thres_img = gr.Slider(1, 100, value=60, label="Confidence Threshold", info="Choose the threshold above which objects should be detected") |
|
submit_btn_img = gr.Button(value="Predict") |
|
|
|
with gr.Column(scale=2): |
|
with gr.Row(): |
|
segmentation_img_output = gr.Image(height=300, label="Segmentation") |
|
depth_img_output = gr.Image(height=300, label="Depth Estimation") |
|
|
|
with gr.Row(): |
|
dist_img_output = gr.Image(height=300, label="Distance") |
|
pcd_img_output = gr.Plot(label="Point Cloud") |
|
|
|
gr.Markdown("## Sample Images") |
|
gr.Examples( |
|
examples=[os.path.join(os.path.dirname(__file__), "assets/images/baggage_claim.jpg"), |
|
os.path.join(os.path.dirname(__file__), "assets/images/kitchen_2.png"), |
|
os.path.join(os.path.dirname(__file__), "assets/images/soccer.jpg"), |
|
os.path.join(os.path.dirname(__file__), "assets/images/room_2.png"), |
|
os.path.join(os.path.dirname(__file__), "assets/images/living_room.jpg")], |
|
inputs=img_input, |
|
outputs=[segmentation_img_output, depth_img_output, dist_img_output, pcd_img_output], |
|
fn=process_image, |
|
cache_examples=True, |
|
) |
|
|
|
with gr.Tab("Video"): |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
vid_input = gr.Video() |
|
model_type_vid = gr.Dropdown( |
|
["Small - Better performance and less accuracy", |
|
"Medium - Balanced performance and accuracy", |
|
"Large - Slow performance and high accuracy"], |
|
label="Model Type", value="Small - Better performance and less accuracy", |
|
info="Select the inference model before running predictions!") |
|
|
|
options_checkbox_vid = gr.CheckboxGroup(["Show Boundary Box", "Show Segmentation Region", "Show Segmentation Boundary"], label="Options") |
|
conf_thres_vid = gr.Slider(1, 100, value=60, label="Confidence Threshold", info="Choose the threshold above which objects should be detected") |
|
with gr.Row(): |
|
cancel_btn = gr.Button(value="Cancel") |
|
submit_btn_vid = gr.Button(value="Predict") |
|
|
|
with gr.Column(scale=2): |
|
with gr.Row(): |
|
segmentation_vid_output = gr.Image(height=300, label="Segmentation") |
|
depth_vid_output = gr.Image(height=300, label="Depth Estimation") |
|
|
|
with gr.Row(): |
|
dist_vid_output = gr.Image(height=300, label="Distance") |
|
|
|
gr.Markdown("## Sample Videos") |
|
gr.Examples( |
|
examples=[os.path.join(os.path.dirname(__file__), "assets/videos/input_video.mp4"), |
|
os.path.join(os.path.dirname(__file__), "assets/videos/driving.mp4"), |
|
os.path.join(os.path.dirname(__file__), "assets/videos/overpass.mp4"), |
|
os.path.join(os.path.dirname(__file__), "assets/videos/walking.mp4")], |
|
inputs=vid_input, |
|
|
|
|
|
) |
|
|
|
|
|
with gr.Tab("API", visible=False): |
|
input_image = gr.Image() |
|
output_json = gr.JSON() |
|
gr.Interface( |
|
fn=get_detection_data, |
|
inputs=input_image, |
|
outputs=output_json, |
|
title="Get Detection Data", |
|
api_name="get_detection_data" |
|
) |
|
|
|
|
|
|
|
submit_btn_img.click(process_image, inputs=img_input, outputs=[segmentation_img_output, depth_img_output, dist_img_output, pcd_img_output]) |
|
options_checkbox_img.change(update_segmentation_options, options_checkbox_img, []) |
|
conf_thres_img.change(update_confidence_threshold, conf_thres_img, []) |
|
model_type_img.change(model_selector, model_type_img, []) |
|
|
|
|
|
submit_btn_vid.click(process_video, inputs=vid_input, outputs=[segmentation_vid_output, depth_vid_output, dist_vid_output]) |
|
model_type_vid.change(model_selector, model_type_vid, []) |
|
cancel_btn.click(cancel, inputs=[], outputs=[]) |
|
options_checkbox_vid.change(update_segmentation_options, options_checkbox_vid, []) |
|
conf_thres_vid.change(update_confidence_threshold, conf_thres_vid, []) |
|
|
|
my_app.queue(max_size=20).launch(share=True) |
|
|