import os import gradio as gr import cv2 import time import json import random import logging import numpy as np # Moved to top to fix NameError from datetime import datetime from collections import Counter from typing import Any, Dict, List, Optional, Tuple import matplotlib.pyplot as plt # Added for chart generation # Suppress Ultralytics warning by setting a writable config directory os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" # Import service modules try: from services.video_service import get_next_video_frame, reset_video_index, preload_video, release_video from services.detection_service import process_frame as process_generic from services.metrics_service import update_metrics from services.overlay_service import overlay_boxes from services.salesforce_dispatcher import dispatch_to_salesforce from services.shadow_detection import detect_shadows from services.thermal_service import process_thermal # Under Construction services from services.under_construction.earthwork_detection import process_earthwork from services.under_construction.culvert_check import process_culverts from services.under_construction.bridge_pier_check import process_bridge_piers # Operations Maintenance services from services.operations_maintenance.crack_detection import detect_cracks_and_objects from services.operations_maintenance.pothole_detection import process_potholes from services.operations_maintenance.signage_check import process_signages # Road Safety services from services.road_safety.barrier_check import process_barriers from services.road_safety.lighting_check import process_lighting from services.road_safety.accident_spot_check import process_accident_spots from services.road_safety.pothole_crack_detection import detect_potholes_and_cracks # Plantation services from services.plantation.plant_count import process_plants from services.plantation.plant_health import process_plant_health from services.plantation.missing_patch_check import process_missing_patches except ImportError as e: print(f"Failed to import service modules: {str(e)}") exit(1) # Configure logging logging.basicConfig( filename="app.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) # Global variables paused: bool = False frame_rate: float = 0.1 frame_count: int = 0 log_entries: List[str] = [] crack_counts: List[int] = [] # Track number of cracks per frame crack_severity_all: List[str] = [] # Track crack severities last_frame: Optional[np.ndarray] = None last_detections: Dict[str, Any] = {} last_timestamp: str = "" last_detected_images: List[str] = [] gps_coordinates: List[List[float]] = [] video_loaded: bool = False active_service: Optional[str] = None # Constants DEFAULT_VIDEO_PATH = "sample.mp4" TEMP_IMAGE_PATH = "temp.jpg" CAPTURED_FRAMES_DIR = "captured_frames" OUTPUT_DIR = "outputs" os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) def initialize_video(video_file: Optional[Any] = None) -> str: """ Initialize the video file for processing. Args: video_file (Optional[Any]): Uploaded video file object (from Gradio). Returns: str: Status message indicating success or failure. """ global video_loaded, log_entries release_video() video_path = DEFAULT_VIDEO_PATH if video_file is not None: video_path = video_file.name log_entries.append(f"Using uploaded video: {video_path}") logging.info(f"Using uploaded video: {video_path}") status = preload_video(video_path) video_loaded = "Error" not in status log_entries.append(status) logging.info(status) return status def set_active_service( service_name: str, uc_val: bool, om_val: bool, rs_val: bool, pl_val: bool ) -> Tuple[Optional[str], str]: """ Set the active service category based on user selection. Args: service_name (str): Name of the service being toggled. uc_val (bool): Under Construction toggle value. om_val (bool): Operations Maintenance toggle value. rs_val (bool): Road Safety toggle value. pl_val (bool): Plantation toggle value. Returns: Tuple[Optional[str], str]: Active service name and status message. """ global active_service toggles = { "under_construction": uc_val, "operations_maintenance": om_val, "road_safety": rs_val, "plantation": pl_val } active_count = sum(toggles.values()) if active_count > 1: log_entries.append("Error: Only one service category can be active at a time.") logging.error("Multiple service categories enabled simultaneously.") return None, "Error: Please enable only one service category at a time." for service, enabled in toggles.items(): if enabled: active_service = service log_entries.append(f"{service.replace('_', ' ').title()} Services Enabled") logging.info(f"{service} services enabled") return active_service, f"{service.replace('_', ' ').title()} Services: Enabled" active_service = None log_entries.append("No service category enabled.") logging.info("No service category enabled.") return None, "No Service Category Enabled" def generate_crack_trend_chart() -> Optional[plt.Figure]: """ Generate a line chart for crack trend over time using Matplotlib. Returns: Optional[plt.Figure]: Matplotlib figure object or None if no data. """ if not crack_counts: return None data = crack_counts[-50:] labels = list(range(len(data))) fig, ax = plt.subplots(figsize=(6, 4)) ax.plot(labels, data, color="#FF6347", label="Cracks Over Time") ax.set_title("Crack Trend (Operations Maintenance)") ax.set_xlabel("Frame") ax.set_ylabel("Count") ax.set_ylim(bottom=0) ax.grid(True) ax.legend() plt.tight_layout() return fig def generate_crack_severity_chart() -> Optional[plt.Figure]: """ Generate a pie chart for crack severity distribution using Matplotlib. Returns: Optional[plt.Figure]: Matplotlib figure object or None if no data. """ if not crack_severity_all: return None count = Counter(crack_severity_all[-200:]) labels = list(count.keys()) sizes = list(count.values()) fig, ax = plt.subplots(figsize=(6, 4)) ax.pie( sizes, labels=labels, colors=["#FF6347", "#4682B4", "#FFD700"], autopct="%1.1f%%", startangle=90 ) ax.set_title("Crack Severity (Operations Maintenance)") plt.tight_layout() return fig def generate_severity_distribution_chart() -> Optional[plt.Figure]: """ Generate a bar chart for crack severity distribution using Matplotlib. Returns: Optional[plt.Figure]: Matplotlib figure object or None if no data. """ if not crack_severity_all: return None count = Counter(crack_severity_all[-200:]) labels = list(count.keys()) sizes = list(count.values()) fig, ax = plt.subplots(figsize=(6, 4)) ax.bar(labels, sizes, color=["#FF6347", "#4682B4", "#FFD700"]) ax.set_title("Severity Distribution (Operations Maintenance)") ax.set_xlabel("Severity") ax.set_ylabel("Count") ax.set_ylim(bottom=0) plt.tight_layout() return fig def monitor_feed() -> Tuple[ Optional[np.ndarray], str, str, Optional[plt.Figure], Optional[plt.Figure], Optional[plt.Figure], List[str] ]: """ Process video frames and perform detections based on the active service. Returns: Tuple containing: - Processed frame (numpy array or None). - Detection metrics (JSON string with crack trend and severity data). - Recent logs (string). - Crack trend chart (Matplotlib figure or None). - Crack severity chart (Matplotlib figure or None). - Severity distribution chart (Matplotlib figure or None). - List of captured image paths. """ global paused, frame_count, last_frame, last_detections, last_timestamp global gps_coordinates, last_detected_images, video_loaded if not video_loaded: log_entries.append("Cannot start streaming: Video not loaded successfully.") logging.error("Video not loaded successfully.") return ( None, json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2), "\n".join(log_entries[-10:]), None, None, None, last_detected_images ) if paused and last_frame is not None: frame = last_frame.copy() detections = last_detections.copy() else: try: frame = get_next_video_frame() if frame is None: raise RuntimeError("Failed to retrieve frame from video.") except RuntimeError as e: log_entries.append(f"Error: {str(e)}") logging.error(f"Frame retrieval error: {str(e)}") return ( None, json.dumps(last_detections, indent=2), "\n".join(log_entries[-10:]), None, None, None, last_detected_images ) all_detected_items: List[Dict[str, Any]] = [] try: # Process frame based on active service if active_service == "under_construction": earthwork_dets, frame = process_earthwork(frame) culvert_dets, frame = process_culverts(frame) bridge_pier_dets, frame = process_bridge_piers(frame) all_detected_items.extend(earthwork_dets + culvert_dets + bridge_pier_dets) elif active_service == "operations_maintenance": crack_items = detect_cracks_and_objects(frame) frame = overlay_boxes(frame, crack_items) pothole_dets, frame = process_potholes(frame) signage_dets, frame = process_signages(frame) all_detected_items.extend(crack_items + pothole_dets + signage_dets) elif active_service == "road_safety": barrier_dets, frame = process_barriers(frame) lighting_dets, frame = process_lighting(frame) accident_dets, frame = process_accident_spots(frame) pothole_crack_dets, frame = detect_potholes_and_cracks(frame) all_detected_items.extend(barrier_dets + lighting_dets + accident_dets + pothole_crack_dets) elif active_service == "plantation": plant_dets, frame = process_plants(frame) health_dets, frame = process_plant_health(frame) missing_dets, frame = process_missing_patches(frame) all_detected_items.extend(plant_dets + health_dets + missing_dets) else: generic_dets, frame = process_generic(frame) all_detected_items.extend(generic_dets) # Apply shadow detection shadow_results = detect_shadows(frame) shadow_dets = shadow_results["detections"] frame = shadow_results["frame"] all_detected_items.extend(shadow_dets) # Apply thermal processing if frame is grayscale if len(frame.shape) == 2: thermal_results = process_thermal(frame) thermal_dets = thermal_results["detections"] frame = thermal_results["frame"] all_detected_items.extend(thermal_dets) except Exception as e: log_entries.append(f"Processing Error: {str(e)}") logging.error(f"Processing error in {active_service}: {str(e)}") all_detected_items = [] # Save temporary image for display try: cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95]) except Exception as e: log_entries.append(f"Error saving temp image: {str(e)}") logging.error(f"Error saving temp image: {str(e)}") # Update detection metrics metrics = update_metrics(all_detected_items) # Generate GPS coordinates with slight variation gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001] gps_coordinates.append(gps_coord) # Save frame if detections are present detection_types = {item.get("type") for item in all_detected_items if "type" in item} if detection_types: try: captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count}.jpg") cv2.imwrite(captured_frame_path, frame) last_detected_images.append(captured_frame_path) if len(last_detected_images) > 100: last_detected_images.pop(0) except Exception as e: log_entries.append(f"Error saving captured frame: {str(e)}") logging.error(f"Error saving captured frame: {str(e)}") # Prepare data for Salesforce dispatch all_detections = { "items": all_detected_items, "metrics": metrics, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "frame_count": frame_count, "gps_coordinates": gps_coord } # Dispatch to Salesforce try: dispatch_to_salesforce(all_detections, all_detections["timestamp"]) except Exception as e: log_entries.append(f"Salesforce Dispatch Error: {str(e)}") logging.error(f"Salesforce dispatch error: {str(e)}") # Save processed frame to output directory try: frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg") cv2.imwrite(frame_path, frame) except Exception as e: log_entries.append(f"Error saving output frame: {str(e)}") logging.error(f"Error saving output frame: {str(e)}") # Update global variables frame_count += 1 last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") last_frame = frame.copy() last_detections = metrics # Track cracks for metrics (for Operations Maintenance only) crack_detected = len([item for item in all_detected_items if item.get("type") == "crack"]) if active_service == "operations_maintenance" else 0 if active_service == "operations_maintenance": crack_severity_all.extend([ item["severity"] for item in all_detected_items if item.get("type") == "crack" and "severity" in item ]) # Add crack trend and severity to metrics if active_service == "operations_maintenance": last_detections["crack_count_last_50_frames"] = crack_counts[-50:] if crack_counts else [] severity_counts = Counter(crack_severity_all[-200:]) if crack_severity_all else {} last_detections["crack_severity_distribution"] = dict(severity_counts) # Log frame processing details log_message = f"{last_timestamp} - Frame {frame_count} - Detections: {len(all_detected_items)} - GPS: {gps_coord} - Avg Conf: {metrics.get('avg_confidence', 0):.2f}" if crack_detected: log_message += f" - Cracks: {crack_detected}" log_entries.append(log_message) logging.info(log_message) crack_counts.append(crack_detected) # Limit the size of logs and crack data if len(log_entries) > 100: log_entries.pop(0) if len(crack_counts) > 500: crack_counts.pop(0) if len(crack_severity_all) > 500: crack_severity_all.pop(0) # Resize frame and add metadata for display frame = cv2.resize(last_frame, (640, 480)) cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) # Generate charts for Operations Maintenance only line_chart = None pie_chart = None bar_chart = None if active_service == "operations_maintenance": line_chart = generate_crack_trend_chart() pie_chart = generate_crack_severity_chart() bar_chart = generate_severity_distribution_chart() return ( frame[:, :, ::-1], # Convert BGR to RGB for Gradio json.dumps(last_detections, indent=2), "\n".join(log_entries[-10:]), line_chart, pie_chart, bar_chart, last_detected_images ) # Gradio UI setup with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="green")) as app: gr.Markdown( """ # 🛡️ NHAI Drone Road Inspection Dashboard Monitor highway conditions in real-time using drone footage. Select a service category to analyze specific aspects of the road. """ ) with gr.Row(): with gr.Column(scale=3): video_input = gr.File(label="Upload Video File (e.g., sample.mp4)", file_types=["video"]) load_button = gr.Button("Load Video", variant="primary") with gr.Column(scale=1): video_status = gr.Textbox( label="Video Load Status", value="Please upload a video file or ensure 'sample.mp4' exists in the root directory.", interactive=False ) with gr.Row(): with gr.Column(): uc_toggle = gr.Checkbox(label="Enable Under Construction Services", value=False) uc_status = gr.Textbox(label="Under Construction Status", value="Disabled", interactive=False) with gr.Column(): om_toggle = gr.Checkbox(label="Enable Operations Maintenance Services", value=False) om_status = gr.Textbox(label="Operations Maintenance Status", value="Disabled", interactive=False) with gr.Column(): rs_toggle = gr.Checkbox(label="Enable Road Safety Services", value=False) rs_status = gr.Textbox(label="Road Safety Status", value="Disabled", interactive=False) with gr.Column(): pl_toggle = gr.Checkbox(label="Enable Plantation Services", value=False) pl_status = gr.Textbox(label="Plantation Status", value="Disabled", interactive=False) status_text = gr.Markdown("**Status:** 🟢 Ready (Upload a video to start)") with gr.Row(): with gr.Column(scale=3): video_output = gr.Image(label="Live Drone Feed", width=640, height=480, elem_id="live-feed") with gr.Column(scale=1): detections_output = gr.Textbox( label="Detection Metrics", lines=10, interactive=False, placeholder="Detection metrics, crack trends, and severity distribution will appear here." ) with gr.Row(): with gr.Column(scale=2): logs_output = gr.Textbox(label="Live Logs", lines=8, interactive=False) with gr.Column(scale=1): chart_output = gr.Plot(label="Crack Trend (Operations Maintenance Only)") pie_output = gr.Plot(label="Crack Severity (Operations Maintenance Only)") bar_output = gr.Plot(label="Severity Distribution (Operations Maintenance Only)") with gr.Row(): captured_images = gr.Gallery(label="Detected Frames (Last 100+)", columns=4, rows=25, height="auto") with gr.Row(): pause_btn = gr.Button("⏸️ Pause", variant="secondary") resume_btn = gr.Button("▶️ Resume", variant="primary") frame_slider = gr.Slider(0.05, 1.0, value=0.1, label="Frame Interval (seconds)", step=0.05) gr.HTML(""" """) def toggle_pause() -> str: """Pause the video stream.""" global paused paused = True return "**Status:** ⏸️ Paused" def toggle_resume() -> str: """Resume the video stream.""" global paused paused = False return "**Status:** 🟢 Streaming" def set_frame_rate(val: float) -> None: """Set the frame rate for streaming.""" global frame_rate frame_rate = val video_status.value = initialize_video() load_button.click( initialize_video, inputs=[video_input], outputs=[video_status] ) def update_toggles(uc_val: bool, om_val: bool, rs_val: bool, pl_val: bool) -> Tuple[str, str, str, str, str]: """ Update toggle states and set the active service. Args: uc_val (bool): Under Construction toggle value. om_val (bool): Operations Maintenance toggle value. rs_val (bool): Road Safety toggle value. pl_val (bool): Plantation toggle value. Returns: Tuple[str, str, str, str, str]: Status updates for each toggle and overall status message. """ active, status_message = set_active_service("toggle", uc_val, om_val, rs_val, pl_val) uc_status_val = "Enabled" if active == "under_construction" else "Disabled" om_status_val = "Enabled" if active == "operations_maintenance" else "Disabled" rs_status_val = "Enabled" if active == "road_safety" else "Disabled" pl_status_val = "Enabled" if active == "plantation" else "Disabled" return ( uc_status_val, om_status_val, rs_status_val, pl_status_val, status_message ) toggle_inputs = [uc_toggle, om_toggle, rs_toggle, pl_toggle] toggle_outputs = [uc_status, om_status, rs_status, pl_status, status_text] uc_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) om_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) rs_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) pl_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) pause_btn.click(toggle_pause, outputs=status_text) resume_btn.click(toggle_resume, outputs=status_text) frame_slider.change(set_frame_rate, inputs=[frame_slider]) def streaming_loop(): """Continuous loop to stream video and process frames.""" while True: if not video_loaded: yield None, json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2), "\n".join(log_entries[-10:]), None, None, None, last_detected_images else: frame, detections, logs, line_chart, pie_chart, bar_chart, captured = monitor_feed() if frame is None: yield None, detections, logs, None, None, None, captured else: yield frame, detections, logs, line_chart, pie_chart, bar_chart, captured time.sleep(frame_rate) app.load(streaming_loop, outputs=[video_output, detections_output, logs_output, chart_output, pie_output, bar_output, captured_images]) if __name__ == "__main__": app.launch(share=False)