Spaces:
Runtime error
Runtime error
import gradio as gr | |
import cv2 | |
import time | |
import os | |
import random | |
import matplotlib.pyplot as plt | |
import numpy as np | |
from datetime import datetime | |
from collections import Counter | |
from services.video_service import get_next_video_frame, reset_video_index, preload_video | |
from services.crack_detection_service import detect_cracks_and_holes | |
from services.overlay_service import overlay_boxes | |
from services.metrics_service import update_metrics | |
from services.map_service import generate_map | |
# Preload video to reduce startup time | |
preload_video() | |
# Globals | |
paused = False | |
frame_rate = 0.3 # Faster frame rate for quicker video reading | |
frame_count = 0 | |
log_entries = [] | |
crack_counts = [] | |
crack_severity_all = [] | |
last_frame = None | |
last_metrics = {} | |
last_timestamp = "" | |
last_detected_cracks = [] # Store up to 100+ crack images | |
last_detected_holes = [] # Store up to 100+ hole images | |
gps_coordinates = [] | |
# Constants | |
TEMP_IMAGE_PATH = "temp.jpg" | |
CAPTURED_FRAMES_DIR = "captured_frames" | |
os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True) | |
# Core monitor function | |
def monitor_feed(): | |
global paused, frame_count, last_frame, last_metrics, last_timestamp, gps_coordinates, last_detected_cracks, last_detected_holes | |
if paused and last_frame is not None: | |
frame = last_frame.copy() | |
metrics = last_metrics.copy() | |
else: | |
try: | |
frame = get_next_video_frame() | |
except RuntimeError as e: | |
log_entries.append(f"Error: {str(e)}") | |
return None, last_metrics, "\n".join(log_entries[-10:]), None, None, last_detected_cracks, last_detected_holes, None | |
detected_items = detect_cracks_and_holes(frame) # Includes cracks and holes | |
frame = overlay_boxes(frame, detected_items) | |
cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95]) | |
metrics = update_metrics(detected_items) | |
frame_count += 1 | |
last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001] | |
gps_coordinates.append(gps_coord) | |
# Save detected cracks and holes separately | |
if detected_items: | |
captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count}.jpg") | |
cv2.imwrite(captured_frame_path, frame) | |
for item in detected_items: | |
if item['type'] == 'crack': | |
last_detected_cracks.append(captured_frame_path) | |
if len(last_detected_cracks) > 100: | |
last_detected_cracks.pop(0) | |
elif item['type'] == 'hole': | |
last_detected_holes.append(captured_frame_path) | |
if len(last_detected_holes) > 100: | |
last_detected_holes.pop(0) | |
last_frame = frame.copy() | |
last_metrics = metrics.copy() | |
# Update logs and stats | |
crack_detected = len([item for item in last_metrics.get('items', []) if item['type'] == 'crack']) | |
hole_detected = len([item for item in last_metrics.get('items', []) if item['type'] == 'hole']) | |
crack_severity_all.extend([ | |
item['severity'] | |
for item in last_metrics.get('items', []) | |
if item['type'] in ['crack', 'hole'] and isinstance(item, dict) and 'severity' in item | |
]) | |
log_entries.append(f"{last_timestamp} - Frame {frame_count} - Cracks: {crack_detected} - Holes: {hole_detected} - GPS: {gps_coord}") | |
crack_counts.append(crack_detected + hole_detected) | |
if len(log_entries) > 100: | |
log_entries.pop(0) | |
if len(crack_counts) > 500: | |
crack_counts.pop(0) | |
if len(crack_severity_all) > 500: | |
crack_severity_all.pop(0) | |
frame = cv2.resize(last_frame, (640, 480)) | |
cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) | |
cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) | |
map_path = generate_map(gps_coordinates[-5:], [item for item in last_metrics.get('items', []) if item['type'] in ['crack', 'hole']]) | |
return frame[:, :, ::-1], last_metrics, "\n".join(log_entries[-10:]), generate_line_chart(), generate_pie_chart(), last_detected_cracks, last_detected_holes, map_path | |
# Line chart | |
def generate_line_chart(): | |
if not crack_counts: | |
return None | |
fig, ax = plt.subplots(figsize=(4, 2)) | |
ax.plot(crack_counts[-50:], marker='o') | |
ax.set_title("Cracks/Holes Over Time") | |
ax.set_xlabel("Frame") | |
ax.set_ylabel("Count") | |
fig.tight_layout() | |
chart_path = "chart_temp.png" | |
fig.savefig(chart_path) | |
plt.close(fig) | |
return chart_path | |
# Pie chart for severity | |
def generate_pie_chart(): | |
if not crack_severity_all: | |
return None | |
fig, ax = plt.subplots(figsize=(4, 2)) | |
count = Counter(crack_severity_all[-200:]) | |
labels, sizes = zip(*count.items()) | |
ax.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=140) | |
ax.axis('equal') | |
fig.tight_layout() | |
pie_path = "pie_temp.png" | |
fig.savefig(pie_path) | |
plt.close(fig) | |
return pie_path | |
# Gradio UI | |
with gr.Blocks(theme=gr.themes.Soft()) as app: | |
gr.Markdown("# 🛡️ Drone Road Inspection Dashboard") | |
status_text = gr.Markdown("**Status:** 🟢 Running") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
video_output = gr.Image(label="Live Drone Feed", width=640, height=480) | |
with gr.Column(scale=1): | |
metrics_output = gr.Textbox(label="Crack/Hole Metrics", lines=4) | |
with gr.Row(): | |
logs_output = gr.Textbox(label="Live Logs", lines=8) | |
chart_output = gr.Image(label="Crack/Hole Trend") | |
pie_output = gr.Image(label="Severity Distribution") | |
with gr.Row(): | |
map_output = gr.Image(label="Crack/Hole Locations Map") | |
with gr.Column(): | |
crack_images = gr.Gallery(label="Detected Cracks (Last 100+)", columns=4, rows=13) | |
hole_images = gr.Gallery(label="Detected Holes (Last 100+)", columns=4, rows=13) | |
with gr.Row(): | |
pause_btn = gr.Button("⏸️ Pause") | |
resume_btn = gr.Button("▶️ Resume") | |
frame_slider = gr.Slider(0.0005, 5, value=0.3, label="Frame Interval (seconds)") | |
def toggle_pause(): | |
global paused | |
paused = True | |
return "**Status:** ⏸️ Paused" | |
def toggle_resume(): | |
global paused | |
paused = False | |
return "**Status:** 🟢 Running" | |
def set_frame_rate(val): | |
global frame_rate | |
frame_rate = val | |
pause_btn.click(toggle_pause, outputs=status_text) | |
resume_btn.click(toggle_resume, outputs=status_text) | |
frame_slider.change(set_frame_rate, inputs=[frame_slider]) | |
def streaming_loop(): | |
while True: | |
frame, metrics, logs, chart, pie, cracks, holes, map_path = monitor_feed() | |
if frame is None: | |
yield None, str(metrics), logs, chart, pie, cracks, holes, map_path | |
else: | |
yield frame, str(metrics), logs, chart, pie, cracks, holes, map_path | |
time.sleep(frame_rate) | |
app.load(streaming_loop, outputs=[video_output, metrics_output, logs_output, chart_output, pie_output, crack_images, hole_images, map_output]) | |
if __name__ == "__main__": | |
app.launch(share=True) |