import os import gradio as gr import cv2 import time import json import random import logging import matplotlib.pyplot as plt from datetime import datetime from collections import Counter from typing import Any, Dict, List, Optional, Tuple import numpy as np # Suppress Ultralytics warning by setting a writable config directory os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" # Import service modules try: from services.video_service import get_next_video_frame, reset_video_index, preload_video, release_video from services.detection_service import process_frame as process_generic from services.metrics_service import update_metrics from services.overlay_service import overlay_boxes from services.salesforce_dispatcher import send_to_salesforce from services.shadow_detection import detect_shadow_coverage from services.thermal_service import process_thermal from services.map_service import generate_map # Under Construction services from services.under_construction.earthwork_detection import process_earthwork from services.under_construction.culvert_check import process_culverts from services.under_construction.bridge_pier_check import process_bridge_piers # Operations Maintenance services from services.operations_maintenance.crack_detection import detect_cracks_and_holes from services.operations_maintenance.pothole_detection import process_potholes from services.operations_maintenance.signage_check import process_signages # Road Safety services from services.road_safety.barrier_check import process_barriers from services.road_safety.lighting_check import process_lighting from services.road_safety.accident_spot_check import process_accident_spots from services.road_safety.pothole_crack_detection import detect_potholes_and_cracks # Plantation services from services.plantation.plant_count import process_plants from services.plantation.plant_health import process_plant_health from services.plantation.missing_patch_check import process_missing_patches except ImportError as e: print(f"Failed to import service modules: {str(e)}") logging.error(f"Import error: {str(e)}") exit(1) # Configure logging logging.basicConfig( filename="app.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) # Global variables paused: bool = False frame_rate: float = 0.3 frame_count: int = 0 log_entries: List[str] = [] crack_counts: List[int] = [] crack_severity_all: List[str] = [] last_frame: Optional[np.ndarray] = None last_metrics: Dict[str, Any] = {} last_timestamp: str = "" last_detected_cracks: List[str] = [] last_detected_holes: List[str] = [] gps_coordinates: List[List[float]] = [] video_loaded: bool = False active_service: Optional[str] = None # Constants DEFAULT_VIDEO_PATH = "sample.mp4" TEMP_IMAGE_PATH = "temp.jpg" CAPTURED_FRAMES_DIR = "captured_frames" OUTPUT_DIR = "outputs" os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) def initialize_video(video_file: Optional[Any] = None) -> str: global video_loaded, log_entries release_video() video_path = DEFAULT_VIDEO_PATH if video_file is None else video_file.name if not os.path.exists(video_path): status = f"Error: Video file '{video_path}' not found." log_entries.append(status) logging.error(status) video_loaded = False return status try: preload_video(video_path) video_loaded = True status = f"Successfully loaded video: {video_path}" log_entries.append(status) logging.info(status) return status except Exception as e: video_loaded = False status = f"Error loading video: {str(e)}" log_entries.append(status) logging.error(status) return status def set_active_service( service_name: str, uc_val: bool, om_val: bool, rs_val: bool, pl_val: bool ) -> Tuple[Optional[str], str]: global active_service toggles = { "under_construction": uc_val, "operations_maintenance": om_val, "road_safety": rs_val, "plantation": pl_val } active_count = sum(toggles.values()) if active_count > 1: log_entries.append("Error: Only one service category can be active at a time.") logging.error("Multiple service categories enabled simultaneously.") return None, "Error: Please enable only one service category at a time." for service, enabled in toggles.items(): if enabled: active_service = service log_entries.append(f"{service.replace('_', ' ').title()} Services Enabled") logging.info(f"{service} services enabled") return active_service, f"{service.replace('_', ' ').title()} Services: Enabled" active_service = None log_entries.append("No service category enabled.") logging.info("No service category enabled.") return None, "No Service Category Enabled" def generate_line_chart() -> Optional[str]: if not crack_counts: return None fig, ax = plt.subplots(figsize=(4, 2)) ax.plot(crack_counts[-50:], marker='o', color='#4682B4') ax.set_title("Cracks/Holes Over Time") ax.set_xlabel("Frame") ax.set_ylabel("Count") ax.grid(True) fig.tight_layout() chart_path = "chart_temp.png" fig.savefig(chart_path) plt.close(fig) return chart_path def monitor_feed() -> Tuple[ Optional[np.ndarray], str, str, List[str], List[str], Optional[str], Optional[str] ]: global paused, frame_count, last_frame, last_metrics, last_timestamp global gps_coordinates, last_detected_cracks, last_detected_holes, video_loaded if not video_loaded: log_entries.append("Cannot start streaming: Video not loaded successfully.") logging.error("Video not loaded successfully.") return ( None, json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2), "\n".join(log_entries[-10:]), last_detected_cracks, last_detected_holes, None, None ) if paused and last_frame is not None: frame = last_frame.copy() metrics = last_metrics.copy() else: try: frame = get_next_video_frame() if frame is None: raise RuntimeError("Failed to retrieve frame from video.") except RuntimeError as e: log_entries.append(f"Error: {str(e)}") logging.error(f"Frame retrieval error: {str(e)}") return ( None, json.dumps(last_metrics, indent=2), "\n".join(log_entries[-10:]), last_detected_cracks, last_detected_holes, None, None ) all_detected_items: List[Dict[str, Any]] = [] shadow_issue = False thermal_flag = False try: # Process frame based on active service if active_service == "under_construction": earthwork_dets, frame = process_earthwork(frame) culvert_dets, frame = process_culverts(frame) bridge_pier_dets, frame = process_bridge_piers(frame) all_detected_items.extend(earthwork_dets + culvert_dets + bridge_pier_dets) elif active_service == "operations_maintenance": crack_hole_dets, frame = detect_cracks_and_holes(frame) pothole_dets, frame = process_potholes(frame) signage_dets, frame = process_signages(frame) all_detected_items.extend(crack_hole_dets + pothole_dets + signage_dets) elif active_service == "road_safety": barrier_dets, frame = process_barriers(frame) lighting_dets, frame = process_lighting(frame) accident_dets, frame = process_accident_spots(frame) pothole_crack_dets, frame = detect_potholes_and_cracks(frame) all_detected_items.extend(barrier_dets + lighting_dets + accident_dets + pothole_crack_dets) elif active_service == "plantation": plant_dets, frame = process_plants(frame) health_dets, frame = process_plant_health(frame) missing_dets, frame = process_missing_patches(frame) all_detected_items.extend(plant_dets + health_dets + missing_dets) else: generic_dets, frame = process_generic(frame) all_detected_items.extend(generic_dets) # Apply shadow detection cv2.imwrite(TEMP_IMAGE_PATH, frame) shadow_issue = detect_shadow_coverage(TEMP_IMAGE_PATH) # Apply thermal processing if frame is grayscale if len(frame.shape) == 2: thermal_results = process_thermal(frame) thermal_dets = thermal_results["detections"] frame = thermal_results["frame"] all_detected_items.extend(thermal_dets) thermal_flag = bool(thermal_dets) # Overlay detections frame = overlay_boxes(frame, all_detected_items) # Save temporary image cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95]) except Exception as e: log_entries.append(f"Processing Error: {str(e)}") logging.error(f"Processing error in {active_service}: {str(e)}") all_detected_items = [] # Update detection metrics metrics = update_metrics(all_detected_items) # Generate GPS coordinates gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001] gps_coordinates.append(gps_coord) # Save frame if detections are present detection_types = {item.get("type") for item in all_detected_items if "type" in item} if detection_types: try: captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count}.jpg") cv2.imwrite(captured_frame_path, frame) for item in all_detected_items: if item.get("type") == "crack": last_detected_cracks.append(captured_frame_path) if len(last_detected_cracks) > 100: last_detected_cracks.pop(0) elif item.get("type") == "hole": last_detected_holes.append(captured_frame_path) if len(last_detected_holes) > 100: last_detected_holes.pop(0) except Exception as e: log_entries.append(f"Error saving captured frame: {str(e)}") logging.error(f"Error saving captured frame: {str(e)}") # Prepare data for Salesforce dispatch all_detections = { "detections": all_detected_items, "metrics": metrics, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "frame_count": frame_count, "gps_coordinates": gps_coord, "shadow_issue": shadow_issue, "thermal": thermal_flag } # Dispatch to Salesforce try: send_to_salesforce(all_detections) except Exception as e: log_entries.append(f"Salesforce Dispatch Error: {str(e)}") logging.error(f"Salesforce dispatch error: {str(e)}") # Save processed frame try: frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg") cv2.imwrite(frame_path, frame) except Exception as e: log_entries.append(f"Error saving output frame: {str(e)}") logging.error(f"Error saving output frame: {str(e)}") # Update global variables frame_count += 1 last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") last_frame = frame.copy() last_metrics = metrics # Track cracks/holes for metrics crack_detected = len([item for item in all_detected_items if item.get("type") == "crack"]) hole_detected = len([item for item in all_detected_items if item.get("type") == "hole"]) if active_service in ["operations_maintenance", "road_safety"]: crack_severity_all.extend([ item["severity"] for item in all_detected_items if item.get("type") in ["crack", "hole"] and "severity" in item ]) crack_counts.append(crack_detected + hole_detected) # Log frame processing details log_message = f"{last_timestamp} - Frame {frame_count} - Cracks: {crack_detected} - Holes: {hole_detected} - GPS: {gps_coord}" log_entries.append(log_message) logging.info(log_message) # Limit the size of logs and crack data if len(log_entries) > 100: log_entries.pop(0) if len(crack_counts) > 500: crack_counts.pop(0) if len(crack_severity_all) > 500: crack_severity_all.pop(0) # Resize frame and add metadata for display frame = cv2.resize(last_frame, (640, 480)) cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) # Generate map map_path = generate_map(gps_coordinates[-5:], [item for item in last_metrics.get("items", []) if item.get("type") in ["crack", "hole"]]) return ( frame[:, :, ::-1], # Convert BGR to RGB for Gradio json.dumps(last_metrics, indent=2), "\n".join(log_entries[-10:]), last_detected_cracks, last_detected_holes, generate_line_chart(), map_path ) # Gradio UI setup with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="green")) as app: gr.Markdown( """ # 🛡️ NHAI Drone Road Inspection Dashboard Monitor highway conditions in real-time using drone footage. Select a service category to analyze specific aspects of the road. """ ) with gr.Row(): with gr.Column(scale=3): video_input = gr.File(label="Upload Video File (e.g., sample.mp4)", file_types=[".mp4", ".avi"]) load_button = gr.Button("Load Video", variant="primary") with gr.Column(scale=1): video_status = gr.Textbox( label="Video Load Status", value="Please upload a video file or ensure 'sample.mp4' exists in the root directory.", interactive=False ) with gr.Row(): with gr.Column(): uc_toggle = gr.Checkbox(label="Enable Under Construction Services", value=False) uc_status = gr.Textbox(label="Under Construction Status", value="Disabled", interactive=False) with gr.Column(): om_toggle = gr.Checkbox(label="Enable Operations Maintenance Services", value=False) om_status = gr.Textbox(label="Operations Maintenance Status", value="Disabled", interactive=False) with gr.Column(): rs_toggle = gr.Checkbox(label="Enable Road Safety Services", value=False) rs_status = gr.Textbox(label="Road Safety Status", value="Disabled", interactive=False) with gr.Column(): pl_toggle = gr.Checkbox(label="Enable Plantation Services", value=False) pl_status = gr.Textbox(label="Plantation Status", value="Disabled", interactive=False) status_text = gr.Markdown("**Status:** 🟢 Ready (Upload a video to start)") with gr.Row(): with gr.Column(scale=3): video_output = gr.Image(label="Live Drone Feed", width=640, height=480, elem_id="live-feed") with gr.Column(scale=1): metrics_output = gr.Textbox( label="Detection Metrics", lines=10, interactive=False, placeholder="Detection metrics, crack/hole counts will appear here." ) with gr.Row(): with gr.Column(scale=2): logs_output = gr.Textbox(label="Live Logs", lines=8, interactive=False) with gr.Column(scale=1): crack_images = gr.Gallery(label="Detected Cracks (Last 100+)", columns=4, rows=13, height="auto") hole_images = gr.Gallery(label="Detected Holes (Last 100+)", columns=4, rows=13, height="auto") with gr.Row(): chart_output = gr.Image(label="Crack/Hole Trend") map_output = gr.Image(label="Crack/Hole Locations Map") with gr.Row(): pause_btn = gr.Button("⏸️ Pause", variant="secondary") resume_btn = gr.Button("▶️ Resume", variant="primary") frame_slider = gr.Slider(0.05, 1.0, value=0.3, label="Frame Interval (seconds)", step=0.05) gr.HTML(""" """) def toggle_pause() -> str: global paused paused = True return "**Status:** ⏸️ Paused" def toggle_resume() -> str: global paused paused = False return "**Status:** 🟢 Streaming" def set_frame_rate(val: float) -> None: global frame_rate frame_rate = val video_status.value = initialize_video() load_button.click( initialize_video, inputs=[video_input], outputs=[video_status] ) def update_toggles(uc_val: bool, om_val: bool, rs_val: bool, pl_val: bool) -> Tuple[str, str, str, str, str]: active, status_message = set_active_service("toggle", uc_val, om_val, rs_val, pl_val) uc_status_val = "Enabled" if active == "under_construction" else "Disabled" om_status_val = "Enabled" if active == "operations_maintenance" else "Disabled" rs_status_val = "Enabled" if active == "road_safety" else "Disabled" pl_status_val = "Enabled" if active == "plantation" else "Disabled" return ( uc_status_val, om_status_val, rs_status_val, pl_status_val, status_message ) toggle_inputs = [uc_toggle, om_toggle, rs_toggle, pl_toggle] toggle_outputs = [uc_status, om_status, rs_status, pl_status, status_text] uc_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) om_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) rs_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) pl_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) pause_btn.click(toggle_pause, outputs=status_text) resume_btn.click(toggle_resume, outputs=status_text) frame_slider.change(set_frame_rate, inputs=[frame_slider]) def streaming_loop(): while True: if not video_loaded: yield None, json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2), "\n".join(log_entries[-10:]), last_detected_cracks, last_detected_holes, None, None else: frame, metrics, logs, cracks, holes, chart, map_path = monitor_feed() if frame is None: yield None, metrics, logs, cracks, holes, chart, map_path else: yield frame, metrics, logs, cracks, holes, chart, map_path time.sleep(frame_rate) app.load(streaming_loop, outputs=[video_output, metrics_output, logs_output, crack_images, hole_images, chart_output, map_output]) if __name__ == "__main__": app.launch(share=False)