import gradio as gr import cv2 import time import os import json import random from datetime import datetime from collections import Counter from services.video_service import get_next_video_frame, reset_video_index, preload_video, release_video from services.detection_service import process_frame as process_generic from services.metrics_service import update_metrics from services.overlay_service import overlay_boxes from services.salesforce_dispatcher import dispatch_to_salesforce from services.shadow_detection import detect_shadows from services.thermal_service import process_thermal # Under Construction services from services.under_construction.earthwork_detection import process_earthwork from services.under_construction.culvert_check import process_culverts from services.under_construction.bridge_pier_check import process_bridge_piers # Operations Maintenance services from services.operations_maintenance.crack_detection import detect_cracks_and_objects from services.operations_maintenance.pothole_detection import process_potholes from services.operations_maintenance.signage_check import process_signages # Road Safety services from services.road_safety.barrier_check import process_barriers from services.road_safety.lighting_check import process_lighting from services.road_safety.accident_spot_check import process_accident_spots # Plantation services from services.plantation.plant_count import process_plants from services.plantation.plant_health import process_plant_health from services.plantation.missing_patch_check import process_missing_patches # Globals paused = False frame_rate = 0.5 # Process every 0.5 seconds for real-time feel frame_count = 0 log_entries = [] crack_counts = [] crack_severity_all = [] last_frame = None last_detections = {} last_timestamp = "" last_detected_images = [] # Store up to 100+ crack images gps_coordinates = [] video_loaded = False service_toggles = { "under_construction": False, "operations_maintenance": True, # Default to crack detection focus "road_safety": False, "plantation": False } # Constants DEFAULT_VIDEO_PATH = "sample.mp4" TEMP_IMAGE_PATH = "temp.jpg" CAPTURED_FRAMES_DIR = "captured_frames" OUTPUT_DIR = "outputs" os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) def initialize_video(video_file=None): """ Initialize the video with the provided file or default path. """ global video_loaded, log_entries release_video() video_path = DEFAULT_VIDEO_PATH if video_file is not None: video_path = video_file.name log_entries.append(f"Using uploaded video: {video_path}") status = preload_video(video_path) video_loaded = "Error" not in status log_entries.append(status) return status def toggle_service(service_name, value): """ Toggle a service category. """ global service_toggles service_toggles[service_name] = value log_entries.append(f"{service_name.replace('_', ' ').title()} Services {'Enabled' if value else 'Disabled'}") return f"{service_name.replace('_', ' ').title()} Services: {'Enabled' if value else 'Disabled'}" def monitor_feed(): """ Main function to process video frames in real-time. """ global paused, frame_count, last_frame, last_detections, last_timestamp, gps_coordinates, last_detected_images, video_loaded if not video_loaded: log_entries.append("Cannot start streaming: Video not loaded successfully.") return None, json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2), "\n".join(log_entries[-10:]), None, None, last_detected_images if paused and last_frame is not None: frame = last_frame.copy() detections = last_detections.copy() else: try: frame = get_next_video_frame() except RuntimeError as e: log_entries.append(f"Error: {str(e)}") return None, json.dumps(last_detections, indent=2), "\n".join(log_entries[-10:]), None, None, last_detected_images # Initialize detected items list all_detected_items = [] # Run services based on toggles if service_toggles["under_construction"]: earthwork_dets, frame = process_earthwork(frame) culvert_dets, frame = process_culverts(frame) bridge_pier_dets, frame = process_bridge_piers(frame) all_detected_items.extend(earthwork_dets + culvert_dets + bridge_pier_dets) if service_toggles["operations_maintenance"]: crack_items = detect_cracks_and_objects(frame) frame = overlay_boxes(frame, crack_items) pothole_dets, frame = process_potholes(frame) signage_dets, frame = process_signages(frame) all_detected_items.extend(crack_items + pothole_dets + signage_dets) if service_toggles["road_safety"]: barrier_dets, frame = process_barriers(frame) lighting_dets, frame = process_lighting(frame) accident_dets, frame = process_accident_spots(frame) all_detected_items.extend(barrier_dets + lighting_dets + accident_dets) if service_toggles["plantation"]: plant_dets, frame = process_plants(frame) health_dets, frame = process_plant_health(frame) missing_dets, frame = process_missing_patches(frame) all_detected_items.extend(plant_dets + health_dets + missing_dets) # Fallback: Run generic detection if no items detected if not all_detected_items: generic_dets, frame = process_generic(frame) all_detected_items.extend(generic_dets) # Optional: Run shadow detection shadow_results = detect_shadows(frame) shadow_dets = shadow_results["detections"] frame = shadow_results["frame"] all_detected_items.extend(shadow_dets) # Optional: Run thermal processing if frame is grayscale (simulated check) if len(frame.shape) == 2: # Grayscale frame (simulating thermal input) thermal_results = process_thermal(frame) thermal_dets = thermal_results["detections"] frame = thermal_results["frame"] all_detected_items.extend(thermal_dets) # Save frame with overlays cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95]) metrics = update_metrics(all_detected_items) # Simulate GPS coordinates gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001] gps_coordinates.append(gps_coord) # Save frame if cracks are detected (only if operations_maintenance is enabled) if service_toggles["operations_maintenance"] and any(item['type'] == 'crack' for item in all_detected_items if 'type' in item): captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"crack_{frame_count}.jpg") cv2.imwrite(captured_frame_path, frame) last_detected_images.append(captured_frame_path) if len(last_detected_images) > 100: last_detected_images.pop(0) # Combine detections for Salesforce all_detections = { "items": all_detected_items, "metrics": metrics, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "frame_count": frame_count, "gps_coordinates": gps_coord } # Dispatch to Salesforce dispatch_to_salesforce(all_detections, all_detections["timestamp"]) # Save annotated frame frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg") cv2.imwrite(frame_path, frame) frame_count += 1 last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") last_frame = frame.copy() last_detections = metrics # Update logs and stats (focus on cracks if operations_maintenance is enabled) crack_detected = 0 if service_toggles["operations_maintenance"]: crack_detected = len([item for item in all_detected_items if 'type' in item and item['type'] == 'crack']) crack_severity_all.extend([ item['severity'] for item in all_detected_items if 'type' in item and item['type'] == 'crack' and 'severity' in item ]) log_entries.append(f"{last_timestamp} - Frame {frame_count} - Cracks: {crack_detected} - Total Detections: {len(all_detected_items)} - GPS: {gps_coord} - Avg Conf: {metrics['avg_confidence']:.2f}") crack_counts.append(crack_detected) if len(log_entries) > 100: log_entries.pop(0) if len(crack_counts) > 500: crack_counts.pop(0) if len(crack_severity_all) > 500: crack_severity_all.pop(0) # Add frame count and timestamp to display frame = cv2.resize(last_frame, (640, 480)) cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) # Generate charts (only if operations_maintenance is enabled) line_chart = None pie_chart = None if service_toggles["operations_maintenance"]: line_chart = generate_line_chart() pie_chart = generate_pie_chart() return frame[:, :, ::-1], json.dumps(last_detections, indent=2), "\n".join(log_entries[-10:]), line_chart, pie_chart, last_detected_images def generate_line_chart(): """ Generate a line chart for crack counts over time using Chart.js. """ if not crack_counts: return None data = crack_counts[-50:] # Last 50 frames labels = list(range(len(data))) return { "type": "line", "data": { "labels": labels, "datasets": [{ "label": "Cracks Over Time", "data": data, "borderColor": "#FF6347", # Tomato "backgroundColor": "rgba(255, 99, 71, 0.2)", "fill": True, "tension": 0.4 }] }, "options": { "responsive": True, "plugins": { "title": { "display": True, "text": "Cracks Over Time" } }, "scales": { "x": { "title": { "display": True, "text": "Frame" } }, "y": { "title": { "display": True, "text": "Count" }, "beginAtZero": True } } } } def generate_pie_chart(): """ Generate a pie chart for crack severity distribution using Chart.js. """ if not crack_severity_all: return None count = Counter(crack_severity_all[-200:]) # Last 200 cracks labels = list(count.keys()) sizes = list(count.values()) return { "type": "pie", "data": { "labels": labels, "datasets": [{ "data": sizes, "backgroundColor": [ "#FF6347", # Tomato "#4682B4", # SteelBlue "#FFD700" # Gold ] }] }, "options": { "responsive": True, "plugins": { "title": { "display": True, "text": "Crack Severity Distribution" }, "legend": { "position": "top" } } } } # Gradio UI with gr.Blocks(theme=gr.themes.Soft()) as app: gr.Markdown("# 🛡️ NHAI Drone Road Inspection Dashboard") # Video upload section with gr.Row(): video_input = gr.File(label="Upload Video File (e.g., sample.mp4)", file_types=["video"]) load_button = gr.Button("Load Video") video_status = gr.Textbox(label="Video Load Status", value="Please upload a video file or ensure 'sample.mp4' exists in the root directory.") # Toggles for service categories with gr.Row(): with gr.Column(): uc_toggle = gr.Checkbox(label="Enable Under Construction Services", value=service_toggles["under_construction"]) uc_status = gr.Textbox(label="Under Construction Status", value="Under Construction Services: Disabled") with gr.Column(): om_toggle = gr.Checkbox(label="Enable Operations Maintenance Services", value=service_toggles["operations_maintenance"]) om_status = gr.Textbox(label="Operations Maintenance Status", value="Operations Maintenance Services: Enabled") with gr.Column(): rs_toggle = gr.Checkbox(label="Enable Road Safety Services", value=service_toggles["road_safety"]) rs_status = gr.Textbox(label="Road Safety Status", value="Road Safety Services: Disabled") with gr.Column(): pl_toggle = gr.Checkbox(label="Enable Plantation Services", value=service_toggles["plantation"]) pl_status = gr.Textbox(label="Plantation Status", value="Plantation Services: Disabled") status_text = gr.Markdown("**Status:** 🟢 Ready (Upload a video to start)") with gr.Row(): with gr.Column(scale=3): video_output = gr.Image(label="Live Drone Feed", width=640, height=480) with gr.Column(scale=1): detections_output = gr.Textbox(label="Detection Metrics", lines=4) with gr.Row(): logs_output = gr.Textbox(label="Live Logs", lines=8) with gr.Column(scale=1): chart_output = gr.Plot(label="Crack Trend") pie_output = gr.Plot(label="Crack Severity") with gr.Row(): captured_images = gr.Gallery(label="Detected Cracks (Last 100+)", columns=4, rows=25) with gr.Row(): pause_btn = gr.Button("⏸️ Pause") resume_btn = gr.Button("▶️ Resume") frame_slider = gr.Slider(0.0005, 5, value=0.5, label="Frame Interval (seconds)") def toggle_pause(): global paused paused = True return "**Status:** ⏸️ Paused" def toggle_resume(): global paused paused = False return "**Status:** 🟢 Streaming" def set_frame_rate(val): global frame_rate frame_rate = val # Initialize video on app load video_status.value = initialize_video() load_button.click( initialize_video, inputs=[video_input], outputs=[video_status] ) uc_toggle.change( toggle_service, inputs=[gr.State("under_construction"), uc_toggle], outputs=[uc_status] ) om_toggle.change( toggle_service, inputs=[gr.State("operations_maintenance"), om_toggle], outputs=[om_status] ) rs_toggle.change( toggle_service, inputs=[gr.State("road_safety"), rs_toggle], outputs=[rs_status] ) pl_toggle.change( toggle_service, inputs=[gr.State("plantation"), pl_toggle], outputs=[pl_status] ) pause_btn.click(toggle_pause, outputs=status_text) resume_btn.click(toggle_resume, outputs=status_text) frame_slider.change(set_frame_rate, inputs=[frame_slider]) def streaming_loop(): while True: if not video_loaded: yield None, json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2), "\n".join(log_entries[-10:]), None, None, last_detected_images else: frame, detections, logs, line_chart, pie_chart, captured = monitor_feed() if frame is None: yield None, detections, logs, line_chart, pie_chart, captured else: yield frame, detections, logs, line_chart, pie_chart, captured time.sleep(frame_rate) app.load(streaming_loop, outputs=[video_output, detections_output, logs_output, chart_output, pie_output, captured_images]) if __name__ == "__main__": app.launch(share=True)