Spaces:
Runtime error
Runtime error
import gradio as gr | |
import cv2 | |
import time | |
import os | |
import json | |
import random | |
from datetime import datetime | |
from collections import Counter | |
from services.video_service import get_next_video_frame, reset_video_index, preload_video, release_video | |
from services.detection_service import process_frame as process_generic | |
from services.metrics_service import update_metrics | |
from services.overlay_service import overlay_boxes | |
from services.salesforce_dispatcher import dispatch_to_salesforce | |
from services.shadow_detection import detect_shadows | |
from services.thermal_service import process_thermal | |
# Under Construction services | |
from services.under_construction.earthwork_detection import process_earthwork | |
from services.under_construction.culvert_check import process_culverts | |
from services.under_construction.bridge_pier_check import process_bridge_piers | |
# Operations Maintenance services | |
from services.operations_maintenance.crack_detection import detect_cracks_and_objects | |
from services.operations_maintenance.pothole_detection import process_potholes | |
from services.operations_maintenance.signage_check import process_signages | |
# Road Safety services | |
from services.road_safety.barrier_check import process_barriers | |
from services.road_safety.lighting_check import process_lighting | |
from services.road_safety.accident_spot_check import process_accident_spots | |
# Plantation services | |
from services.plantation.plant_count import process_plants | |
from services.plantation.plant_health import process_plant_health | |
from services.plantation.missing_patch_check import process_missing_patches | |
# Globals | |
paused = False | |
frame_rate = 0.5 # Process every 0.5 seconds for real-time feel | |
frame_count = 0 | |
log_entries = [] | |
crack_counts = [] | |
crack_severity_all = [] | |
last_frame = None | |
last_detections = {} | |
last_timestamp = "" | |
last_detected_images = [] # Store up to 100+ crack images | |
gps_coordinates = [] | |
video_loaded = False | |
service_toggles = { | |
"under_construction": False, | |
"operations_maintenance": True, # Default to crack detection focus | |
"road_safety": False, | |
"plantation": False | |
} | |
# Constants | |
DEFAULT_VIDEO_PATH = "sample.mp4" | |
TEMP_IMAGE_PATH = "temp.jpg" | |
CAPTURED_FRAMES_DIR = "captured_frames" | |
OUTPUT_DIR = "outputs" | |
os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True) | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
def initialize_video(video_file=None): | |
""" | |
Initialize the video with the provided file or default path. | |
""" | |
global video_loaded, log_entries | |
release_video() | |
video_path = DEFAULT_VIDEO_PATH | |
if video_file is not None: | |
video_path = video_file.name | |
log_entries.append(f"Using uploaded video: {video_path}") | |
status = preload_video(video_path) | |
video_loaded = "Error" not in status | |
log_entries.append(status) | |
return status | |
def toggle_service(service_name, value): | |
""" | |
Toggle a service category. | |
""" | |
global service_toggles | |
service_toggles[service_name] = value | |
log_entries.append(f"{service_name.replace('_', ' ').title()} Services {'Enabled' if value else 'Disabled'}") | |
return f"{service_name.replace('_', ' ').title()} Services: {'Enabled' if value else 'Disabled'}" | |
def monitor_feed(): | |
""" | |
Main function to process video frames in real-time. | |
""" | |
global paused, frame_count, last_frame, last_detections, last_timestamp, gps_coordinates, last_detected_images, video_loaded | |
if not video_loaded: | |
log_entries.append("Cannot start streaming: Video not loaded successfully.") | |
return None, json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2), "\n".join(log_entries[-10:]), None, None, last_detected_images | |
if paused and last_frame is not None: | |
frame = last_frame.copy() | |
detections = last_detections.copy() | |
else: | |
try: | |
frame = get_next_video_frame() | |
except RuntimeError as e: | |
log_entries.append(f"Error: {str(e)}") | |
return None, json.dumps(last_detections, indent=2), "\n".join(log_entries[-10:]), None, None, last_detected_images | |
# Initialize detected items list | |
all_detected_items = [] | |
# Run services based on toggles | |
if service_toggles["under_construction"]: | |
earthwork_dets, frame = process_earthwork(frame) | |
culvert_dets, frame = process_culverts(frame) | |
bridge_pier_dets, frame = process_bridge_piers(frame) | |
all_detected_items.extend(earthwork_dets + culvert_dets + bridge_pier_dets) | |
if service_toggles["operations_maintenance"]: | |
crack_items = detect_cracks_and_objects(frame) | |
frame = overlay_boxes(frame, crack_items) | |
pothole_dets, frame = process_potholes(frame) | |
signage_dets, frame = process_signages(frame) | |
all_detected_items.extend(crack_items + pothole_dets + signage_dets) | |
if service_toggles["road_safety"]: | |
barrier_dets, frame = process_barriers(frame) | |
lighting_dets, frame = process_lighting(frame) | |
accident_dets, frame = process_accident_spots(frame) | |
all_detected_items.extend(barrier_dets + lighting_dets + accident_dets) | |
if service_toggles["plantation"]: | |
plant_dets, frame = process_plants(frame) | |
health_dets, frame = process_plant_health(frame) | |
missing_dets, frame = process_missing_patches(frame) | |
all_detected_items.extend(plant_dets + health_dets + missing_dets) | |
# Fallback: Run generic detection if no items detected | |
if not all_detected_items: | |
generic_dets, frame = process_generic(frame) | |
all_detected_items.extend(generic_dets) | |
# Optional: Run shadow detection | |
shadow_results = detect_shadows(frame) | |
shadow_dets = shadow_results["detections"] | |
frame = shadow_results["frame"] | |
all_detected_items.extend(shadow_dets) | |
# Optional: Run thermal processing if frame is grayscale (simulated check) | |
if len(frame.shape) == 2: # Grayscale frame (simulating thermal input) | |
thermal_results = process_thermal(frame) | |
thermal_dets = thermal_results["detections"] | |
frame = thermal_results["frame"] | |
all_detected_items.extend(thermal_dets) | |
# Save frame with overlays | |
cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95]) | |
metrics = update_metrics(all_detected_items) | |
# Simulate GPS coordinates | |
gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001] | |
gps_coordinates.append(gps_coord) | |
# Save frame if cracks are detected (only if operations_maintenance is enabled) | |
if service_toggles["operations_maintenance"] and any(item['type'] == 'crack' for item in all_detected_items if 'type' in item): | |
captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"crack_{frame_count}.jpg") | |
cv2.imwrite(captured_frame_path, frame) | |
last_detected_images.append(captured_frame_path) | |
if len(last_detected_images) > 100: | |
last_detected_images.pop(0) | |
# Combine detections for Salesforce | |
all_detections = { | |
"items": all_detected_items, | |
"metrics": metrics, | |
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
"frame_count": frame_count, | |
"gps_coordinates": gps_coord | |
} | |
# Dispatch to Salesforce | |
dispatch_to_salesforce(all_detections, all_detections["timestamp"]) | |
# Save annotated frame | |
frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg") | |
cv2.imwrite(frame_path, frame) | |
frame_count += 1 | |
last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
last_frame = frame.copy() | |
last_detections = metrics | |
# Update logs and stats (focus on cracks if operations_maintenance is enabled) | |
crack_detected = 0 | |
if service_toggles["operations_maintenance"]: | |
crack_detected = len([item for item in all_detected_items if 'type' in item and item['type'] == 'crack']) | |
crack_severity_all.extend([ | |
item['severity'] | |
for item in all_detected_items | |
if 'type' in item and item['type'] == 'crack' and 'severity' in item | |
]) | |
log_entries.append(f"{last_timestamp} - Frame {frame_count} - Cracks: {crack_detected} - Total Detections: {len(all_detected_items)} - GPS: {gps_coord} - Avg Conf: {metrics['avg_confidence']:.2f}") | |
crack_counts.append(crack_detected) | |
if len(log_entries) > 100: | |
log_entries.pop(0) | |
if len(crack_counts) > 500: | |
crack_counts.pop(0) | |
if len(crack_severity_all) > 500: | |
crack_severity_all.pop(0) | |
# Add frame count and timestamp to display | |
frame = cv2.resize(last_frame, (640, 480)) | |
cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) | |
cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) | |
# Generate charts (only if operations_maintenance is enabled) | |
line_chart = None | |
pie_chart = None | |
if service_toggles["operations_maintenance"]: | |
line_chart = generate_line_chart() | |
pie_chart = generate_pie_chart() | |
return frame[:, :, ::-1], json.dumps(last_detections, indent=2), "\n".join(log_entries[-10:]), line_chart, pie_chart, last_detected_images | |
def generate_line_chart(): | |
""" | |
Generate a line chart for crack counts over time using Chart.js. | |
""" | |
if not crack_counts: | |
return None | |
data = crack_counts[-50:] # Last 50 frames | |
labels = list(range(len(data))) | |
return { | |
"type": "line", | |
"data": { | |
"labels": labels, | |
"datasets": [{ | |
"label": "Cracks Over Time", | |
"data": data, | |
"borderColor": "#FF6347", # Tomato | |
"backgroundColor": "rgba(255, 99, 71, 0.2)", | |
"fill": True, | |
"tension": 0.4 | |
}] | |
}, | |
"options": { | |
"responsive": True, | |
"plugins": { | |
"title": { | |
"display": True, | |
"text": "Cracks Over Time" | |
} | |
}, | |
"scales": { | |
"x": { | |
"title": { | |
"display": True, | |
"text": "Frame" | |
} | |
}, | |
"y": { | |
"title": { | |
"display": True, | |
"text": "Count" | |
}, | |
"beginAtZero": True | |
} | |
} | |
} | |
} | |
def generate_pie_chart(): | |
""" | |
Generate a pie chart for crack severity distribution using Chart.js. | |
""" | |
if not crack_severity_all: | |
return None | |
count = Counter(crack_severity_all[-200:]) # Last 200 cracks | |
labels = list(count.keys()) | |
sizes = list(count.values()) | |
return { | |
"type": "pie", | |
"data": { | |
"labels": labels, | |
"datasets": [{ | |
"data": sizes, | |
"backgroundColor": [ | |
"#FF6347", # Tomato | |
"#4682B4", # SteelBlue | |
"#FFD700" # Gold | |
] | |
}] | |
}, | |
"options": { | |
"responsive": True, | |
"plugins": { | |
"title": { | |
"display": True, | |
"text": "Crack Severity Distribution" | |
}, | |
"legend": { | |
"position": "top" | |
} | |
} | |
} | |
} | |
# Gradio UI | |
with gr.Blocks(theme=gr.themes.Soft()) as app: | |
gr.Markdown("# 🛡️ NHAI Drone Road Inspection Dashboard") | |
# Video upload section | |
with gr.Row(): | |
video_input = gr.File(label="Upload Video File (e.g., sample.mp4)", file_types=["video"]) | |
load_button = gr.Button("Load Video") | |
video_status = gr.Textbox(label="Video Load Status", value="Please upload a video file or ensure 'sample.mp4' exists in the root directory.") | |
# Toggles for service categories | |
with gr.Row(): | |
with gr.Column(): | |
uc_toggle = gr.Checkbox(label="Enable Under Construction Services", value=service_toggles["under_construction"]) | |
uc_status = gr.Textbox(label="Under Construction Status", value="Under Construction Services: Disabled") | |
with gr.Column(): | |
om_toggle = gr.Checkbox(label="Enable Operations Maintenance Services", value=service_toggles["operations_maintenance"]) | |
om_status = gr.Textbox(label="Operations Maintenance Status", value="Operations Maintenance Services: Enabled") | |
with gr.Column(): | |
rs_toggle = gr.Checkbox(label="Enable Road Safety Services", value=service_toggles["road_safety"]) | |
rs_status = gr.Textbox(label="Road Safety Status", value="Road Safety Services: Disabled") | |
with gr.Column(): | |
pl_toggle = gr.Checkbox(label="Enable Plantation Services", value=service_toggles["plantation"]) | |
pl_status = gr.Textbox(label="Plantation Status", value="Plantation Services: Disabled") | |
status_text = gr.Markdown("**Status:** 🟢 Ready (Upload a video to start)") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
video_output = gr.Image(label="Live Drone Feed", width=640, height=480) | |
with gr.Column(scale=1): | |
detections_output = gr.Textbox(label="Detection Metrics", lines=4) | |
with gr.Row(): | |
logs_output = gr.Textbox(label="Live Logs", lines=8) | |
with gr.Column(scale=1): | |
chart_output = gr.Plot(label="Crack Trend") | |
pie_output = gr.Plot(label="Crack Severity") | |
with gr.Row(): | |
captured_images = gr.Gallery(label="Detected Cracks (Last 100+)", columns=4, rows=25) | |
with gr.Row(): | |
pause_btn = gr.Button("⏸️ Pause") | |
resume_btn = gr.Button("▶️ Resume") | |
frame_slider = gr.Slider(0.0005, 5, value=0.5, label="Frame Interval (seconds)") | |
def toggle_pause(): | |
global paused | |
paused = True | |
return "**Status:** ⏸️ Paused" | |
def toggle_resume(): | |
global paused | |
paused = False | |
return "**Status:** 🟢 Streaming" | |
def set_frame_rate(val): | |
global frame_rate | |
frame_rate = val | |
# Initialize video on app load | |
video_status.value = initialize_video() | |
load_button.click( | |
initialize_video, | |
inputs=[video_input], | |
outputs=[video_status] | |
) | |
uc_toggle.change( | |
toggle_service, | |
inputs=[gr.State("under_construction"), uc_toggle], | |
outputs=[uc_status] | |
) | |
om_toggle.change( | |
toggle_service, | |
inputs=[gr.State("operations_maintenance"), om_toggle], | |
outputs=[om_status] | |
) | |
rs_toggle.change( | |
toggle_service, | |
inputs=[gr.State("road_safety"), rs_toggle], | |
outputs=[rs_status] | |
) | |
pl_toggle.change( | |
toggle_service, | |
inputs=[gr.State("plantation"), pl_toggle], | |
outputs=[pl_status] | |
) | |
pause_btn.click(toggle_pause, outputs=status_text) | |
resume_btn.click(toggle_resume, outputs=status_text) | |
frame_slider.change(set_frame_rate, inputs=[frame_slider]) | |
def streaming_loop(): | |
while True: | |
if not video_loaded: | |
yield None, json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2), "\n".join(log_entries[-10:]), None, None, last_detected_images | |
else: | |
frame, detections, logs, line_chart, pie_chart, captured = monitor_feed() | |
if frame is None: | |
yield None, detections, logs, line_chart, pie_chart, captured | |
else: | |
yield frame, detections, logs, line_chart, pie_chart, captured | |
time.sleep(frame_rate) | |
app.load(streaming_loop, outputs=[video_output, detections_output, logs_output, chart_output, pie_output, captured_images]) | |
if __name__ == "__main__": | |
app.launch(share=True) |