Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import cv2 | |
| import time | |
| import os | |
| import json | |
| import random | |
| import logging | |
| from datetime import datetime | |
| from collections import Counter | |
| from services.video_service import get_next_video_frame, reset_video_index, preload_video, release_video | |
| from services.detection_service import process_frame as process_generic | |
| from services.metrics_service import update_metrics | |
| from services.overlay_service import overlay_boxes | |
| from services.salesforce_dispatcher import dispatch_to_salesforce | |
| from services.shadow_detection import detect_shadows | |
| from services.thermal_service import process_thermal | |
| # Under Construction services | |
| from services.under_construction.earthwork_detection import process_earthwork | |
| from services.under_construction.culvert_check import process_culverts | |
| from services.under_construction.bridge_pier_check import process_bridge_piers | |
| # Operations Maintenance services | |
| from services.operations_maintenance.crack_detection import detect_cracks_and_objects | |
| from services.operations_maintenance.pothole_detection import process_potholes | |
| from services.operations_maintenance.signage_check import process_signages | |
| # Road Safety services | |
| from services.road_safety.barrier_check import process_barriers | |
| from services.road_safety.lighting_check import process_lighting | |
| from services.road_safety.accident_spot_check import process_accident_spots | |
| # Plantation services | |
| from services.plantation.plant_count import process_plants | |
| from services.plantation.plant_health import process_plant_health | |
| from services.plantation.missing_patch_check import process_missing_patches | |
| # Setup logging | |
| logging.basicConfig( | |
| filename="app.log", | |
| level=logging.INFO, | |
| format="%(asctime)s - %(levelname)s - %(message)s" | |
| ) | |
| # Globals | |
| paused = False | |
| frame_rate = 0.1 # Faster frame rate for real-time feel | |
| frame_count = 0 | |
| log_entries = [] | |
| crack_counts = [] | |
| crack_severity_all = [] | |
| last_frame = None | |
| last_detections = {} | |
| last_timestamp = "" | |
| last_detected_images = [] # Store up to 100+ frames with detections | |
| gps_coordinates = [] | |
| video_loaded = False | |
| active_service = None # Track the active service category | |
| # Constants | |
| DEFAULT_VIDEO_PATH = "sample.mp4" | |
| TEMP_IMAGE_PATH = "temp.jpg" | |
| CAPTURED_FRAMES_DIR = "captured_frames" | |
| OUTPUT_DIR = "outputs" | |
| os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True) | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| def initialize_video(video_file=None): | |
| """ | |
| Initialize the video with the provided file or default path. | |
| """ | |
| global video_loaded, log_entries | |
| release_video() | |
| video_path = DEFAULT_VIDEO_PATH | |
| if video_file is not None: | |
| video_path = video_file.name | |
| log_entries.append(f"Using uploaded video: {video_path}") | |
| logging.info(f"Using uploaded video: {video_path}") | |
| status = preload_video(video_path) | |
| video_loaded = "Error" not in status | |
| log_entries.append(status) | |
| logging.info(status) | |
| return status | |
| def set_active_service(service_name, uc_val, om_val, rs_val, pl_val): | |
| """ | |
| Set the active service category based on toggles. | |
| Only one service category can be active at a time. | |
| """ | |
| global active_service, service_toggles | |
| toggles = { | |
| "under_construction": uc_val, | |
| "operations_maintenance": om_val, | |
| "road_safety": rs_val, | |
| "plantation": pl_val | |
| } | |
| # Ensure only one toggle is active | |
| active_count = sum(toggles.values()) | |
| if active_count > 1: | |
| log_entries.append("Error: Only one service category can be active at a time.") | |
| logging.error("Multiple service categories enabled simultaneously.") | |
| return None, "Error: Please enable only one service category at a time." | |
| # Set active service | |
| for service, enabled in toggles.items(): | |
| if enabled: | |
| active_service = service | |
| log_entries.append(f"{service.replace('_', ' ').title()} Services Enabled") | |
| logging.info(f"{service} services enabled") | |
| return active_service, f"{service.replace('_', ' ').title()} Services: Enabled" | |
| active_service = None | |
| log_entries.append("No service category enabled.") | |
| logging.info("No service category enabled.") | |
| return None, "No Service Category Enabled" | |
| def monitor_feed(): | |
| """ | |
| Main function to process video frames in real-time. | |
| Only the active service category processes the frame. | |
| """ | |
| global paused, frame_count, last_frame, last_detections, last_timestamp, gps_coordinates, last_detected_images, video_loaded | |
| if not video_loaded: | |
| log_entries.append("Cannot start streaming: Video not loaded successfully.") | |
| logging.error("Video not loaded successfully.") | |
| return None, json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2), "\n".join(log_entries[-10:]), None, None, last_detected_images | |
| if paused and last_frame is not None: | |
| frame = last_frame.copy() | |
| detections = last_detections.copy() | |
| else: | |
| try: | |
| frame = get_next_video_frame() | |
| if frame is None: | |
| raise RuntimeError("Failed to retrieve frame from video.") | |
| except RuntimeError as e: | |
| log_entries.append(f"Error: {str(e)}") | |
| logging.error(f"Frame retrieval error: {str(e)}") | |
| return None, json.dumps(last_detections, indent=2), "\n".join(log_entries[-10:]), None, None, last_detected_images | |
| # Initialize detected items list | |
| all_detected_items = [] | |
| # Process only the active service category | |
| try: | |
| if active_service == "under_construction": | |
| earthwork_dets, frame = process_earthwork(frame) | |
| culvert_dets, frame = process_culverts(frame) | |
| bridge_pier_dets, frame = process_bridge_piers(frame) | |
| all_detected_items.extend(earthwork_dets + culvert_dets + bridge_pier_dets) | |
| elif active_service == "operations_maintenance": | |
| crack_items = detect_cracks_and_objects(frame) | |
| frame = overlay_boxes(frame, crack_items) | |
| pothole_dets, frame = process_potholes(frame) | |
| signage_dets, frame = process_signages(frame) | |
| all_detected_items.extend(crack_items + pothole_dets + signage_dets) | |
| elif active_service == "road_safety": | |
| barrier_dets, frame = process_barriers(frame) | |
| lighting_dets, frame = process_lighting(frame) | |
| accident_dets, frame = process_accident_spots(frame) | |
| all_detected_items.extend(barrier_dets + lighting_dets + accident_dets) | |
| elif active_service == "plantation": | |
| plant_dets, frame = process_plants(frame) | |
| health_dets, frame = process_plant_health(frame) | |
| missing_dets, frame = process_missing_patches(frame) | |
| all_detected_items.extend(plant_dets + health_dets + missing_dets) | |
| else: | |
| # Fallback: Run generic detection if no service is active | |
| generic_dets, frame = process_generic(frame) | |
| all_detected_items.extend(generic_dets) | |
| # Optional: Run shadow detection (affects all modes for better accuracy) | |
| shadow_results = detect_shadows(frame) | |
| shadow_dets = shadow_results["detections"] | |
| frame = shadow_results["frame"] | |
| all_detected_items.extend(shadow_dets) | |
| # Optional: Run thermal processing if frame is grayscale (simulated check) | |
| if len(frame.shape) == 2: # Grayscale frame (simulating thermal input) | |
| thermal_results = process_thermal(frame) | |
| thermal_dets = thermal_results["detections"] | |
| frame = thermal_results["frame"] | |
| all_detected_items.extend(thermal_dets) | |
| except Exception as e: | |
| log_entries.append(f"Processing Error: {str(e)}") | |
| logging.error(f"Processing error in {active_service}: {str(e)}") | |
| all_detected_items = [] | |
| # Save frame with overlays | |
| try: | |
| cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95]) | |
| except Exception as e: | |
| log_entries.append(f"Error saving temp image: {str(e)}") | |
| logging.error(f"Error saving temp image: {str(e)}") | |
| metrics = update_metrics(all_detected_items) | |
| # Simulate GPS coordinates | |
| gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001] | |
| gps_coordinates.append(gps_coord) | |
| # Save frame if there are detections (e.g., cracks, plants, etc.) | |
| detection_types = {item.get("type") for item in all_detected_items if "type" in item} | |
| if detection_types: | |
| try: | |
| captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count}.jpg") | |
| cv2.imwrite(captured_frame_path, frame) | |
| last_detected_images.append(captured_frame_path) | |
| if len(last_detected_images) > 100: | |
| last_detected_images.pop(0) | |
| except Exception as e: | |
| log_entries.append(f"Error saving captured frame: {str(e)}") | |
| logging.error(f"Error saving captured frame: {str(e)}") | |
| # Combine detections for Salesforce | |
| all_detections = { | |
| "items": all_detected_items, | |
| "metrics": metrics, | |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| "frame_count": frame_count, | |
| "gps_coordinates": gps_coord | |
| } | |
| # Dispatch to Salesforce | |
| try: | |
| dispatch_to_salesforce(all_detections, all_detections["timestamp"]) | |
| except Exception as e: | |
| log_entries.append(f"Salesforce Dispatch Error: {str(e)}") | |
| logging.error(f"Salesforce dispatch error: {str(e)}") | |
| # Save annotated frame | |
| try: | |
| frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg") | |
| cv2.imwrite(frame_path, frame) | |
| except Exception as e: | |
| log_entries.append(f"Error saving output frame: {str(e)}") | |
| logging.error(f"Error saving output frame: {str(e)}") | |
| frame_count += 1 | |
| last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| last_frame = frame.copy() | |
| last_detections = metrics | |
| # Update logs and stats | |
| crack_detected = len([item for item in all_detected_items if item.get("type") == "crack"]) if active_service == "operations_maintenance" else 0 | |
| if active_service == "operations_maintenance": | |
| crack_severity_all.extend([ | |
| item["severity"] | |
| for item in all_detected_items | |
| if item.get("type") == "crack" and "severity" in item | |
| ]) | |
| log_message = f"{last_timestamp} - Frame {frame_count} - Detections: {len(all_detected_items)} - GPS: {gps_coord} - Avg Conf: {metrics.get('avg_confidence', 0):.2f}" | |
| if crack_detected: | |
| log_message += f" - Cracks: {crack_detected}" | |
| log_entries.append(log_message) | |
| logging.info(log_message) | |
| crack_counts.append(crack_detected) | |
| if len(log_entries) > 100: | |
| log_entries.pop(0) | |
| if len(crack_counts) > 500: | |
| crack_counts.pop(0) | |
| if len(crack_severity_all) > 500: | |
| crack_severity_all.pop(0) | |
| # Add frame count and timestamp to display | |
| frame = cv2.resize(last_frame, (640, 480)) | |
| cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) | |
| cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) | |
| # Generate charts (only for operations_maintenance) | |
| line_chart = None | |
| pie_chart = None | |
| if active_service == "operations_maintenance": | |
| line_chart = generate_line_chart() | |
| pie_chart = generate_pie_chart() | |
| return frame[:, :, ::-1], json.dumps(last_detections, indent=2), "\n".join(log_entries[-10:]), line_chart, pie_chart, last_detected_images | |
| def generate_line_chart(): | |
| """ | |
| Generate a line chart for crack counts over time using Chart.js. | |
| """ | |
| if not crack_counts: | |
| return None | |
| data = crack_counts[-50:] # Last 50 frames | |
| labels = list(range(len(data))) | |
| return { | |
| "type": "line", | |
| "data": { | |
| "labels": labels, | |
| "datasets": [{ | |
| "label": "Cracks Over Time", | |
| "data": data, | |
| "borderColor": "#FF6347", # Tomato | |
| "backgroundColor": "rgba(255, 99, 71, 0.2)", | |
| "fill": True, | |
| "tension": 0.4 | |
| }] | |
| }, | |
| "options": { | |
| "responsive": True, | |
| "plugins": { | |
| "title": { | |
| "display": True, | |
| "text": "Cracks Over Time" | |
| } | |
| }, | |
| "scales": { | |
| "x": { | |
| "title": { | |
| "display": True, | |
| "text": "Frame" | |
| } | |
| }, | |
| "y": { | |
| "title": { | |
| "display": True, | |
| "text": "Count" | |
| }, | |
| "beginAtZero": True | |
| } | |
| } | |
| } | |
| } | |
| def generate_pie_chart(): | |
| """ | |
| Generate a pie chart for crack severity distribution using Chart.js. | |
| """ | |
| if not crack_severity_all: | |
| return None | |
| count = Counter(crack_severity_all[-200:]) # Last 200 cracks | |
| labels = list(count.keys()) | |
| sizes = list(count.values()) | |
| return { | |
| "type": "pie", | |
| "data": { | |
| "labels": labels, | |
| "datasets": [{ | |
| "data": sizes, | |
| "backgroundColor": [ | |
| "#FF6347", # Tomato | |
| "#4682B4", # SteelBlue | |
| "#FFD700" # Gold | |
| ] | |
| }] | |
| }, | |
| "options": { | |
| "responsive": True, | |
| "plugins": { | |
| "title": { | |
| "display": True, | |
| "text": "Crack Severity Distribution" | |
| }, | |
| "legend": { | |
| "position": "top" | |
| } | |
| } | |
| } | |
| } | |
| # Gradio UI | |
| with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="green")) as app: | |
| gr.Markdown( | |
| """ | |
| # 🛡️ NHAI Drone Road Inspection Dashboard | |
| Monitor highway conditions in real-time using drone footage. Select a service category to analyze specific aspects of the road. | |
| """ | |
| ) | |
| # Video upload section | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| video_input = gr.File(label="Upload Video File (e.g., sample.mp4)", file_types=["video"]) | |
| load_button = gr.Button("Load Video", variant="primary") | |
| with gr.Column(scale=1): | |
| video_status = gr.Textbox( | |
| label="Video Load Status", | |
| value="Please upload a video file or ensure 'sample.mp4' exists in the root directory.", | |
| interactive=False | |
| ) | |
| # Toggles for service categories with status indicators | |
| with gr.Row(): | |
| with gr.Column(): | |
| uc_toggle = gr.Checkbox(label="Enable Under Construction Services", value=False) | |
| uc_status = gr.Textbox(label="Under Construction Status", value="Disabled", interactive=False) | |
| with gr.Column(): | |
| om_toggle = gr.Checkbox(label="Enable Operations Maintenance Services", value=False) | |
| om_status = gr.Textbox(label="Operations Maintenance Status", value="Disabled", interactive=False) | |
| with gr.Column(): | |
| rs_toggle = gr.Checkbox(label="Enable Road Safety Services", value=False) | |
| rs_status = gr.Textbox(label="Road Safety Status", value="Disabled", interactive=False) | |
| with gr.Column(): | |
| pl_toggle = gr.Checkbox(label="Enable Plantation Services", value=False) | |
| pl_status = gr.Textbox(label="Plantation Status", value="Disabled", interactive=False) | |
| status_text = gr.Markdown("**Status:** 🟢 Ready (Upload a video to start)") | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| video_output = gr.Image(label="Live Drone Feed", width=640, height=480, elem_id="live-feed") | |
| with gr.Column(scale=1): | |
| detections_output = gr.Textbox(label="Detection Metrics", lines=4, interactive=False) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| logs_output = gr.Textbox(label="Live Logs", lines=8, interactive=False) | |
| with gr.Column(scale=1): | |
| chart_output = gr.Plot(label="Crack Trend (Operations Maintenance Only)") | |
| pie_output = gr.Plot(label="Crack Severity (Operations Maintenance Only)") | |
| with gr.Row(): | |
| captured_images = gr.Gallery(label="Detected Frames (Last 100+)", columns=4, rows=25, height="auto") | |
| with gr.Row(): | |
| pause_btn = gr.Button("⏸️ Pause", variant="secondary") | |
| resume_btn = gr.Button("▶️ Resume", variant="primary") | |
| frame_slider = gr.Slider(0.05, 1.0, value=0.1, label="Frame Interval (seconds)", step=0.05) | |
| # Add some custom CSS for better UX | |
| gr.HTML(""" | |
| <style> | |
| #live-feed { | |
| border: 2px solid #4682B4; | |
| border-radius: 10px; | |
| } | |
| .gr-button-primary { | |
| background-color: #4682B4 !important; | |
| } | |
| .gr-button-secondary { | |
| background-color: #FF6347 !important; | |
| } | |
| </style> | |
| """) | |
| def toggle_pause(): | |
| global paused | |
| paused = True | |
| return "**Status:** ⏸️ Paused" | |
| def toggle_resume(): | |
| global paused | |
| paused = False | |
| return "**Status:** 🟢 Streaming" | |
| def set_frame_rate(val): | |
| global frame_rate | |
| frame_rate = val | |
| # Initialize video on app load | |
| video_status.value = initialize_video() | |
| load_button.click( | |
| initialize_video, | |
| inputs=[video_input], | |
| outputs=[video_status] | |
| ) | |
| # Toggle change events | |
| def update_toggles(uc_val, om_val, rs_val, pl_val): | |
| active, status_message = set_active_service("toggle", uc_val, om_val, rs_val, pl_val) | |
| uc_status_val = "Enabled" if active == "under_construction" else "Disabled" | |
| om_status_val = "Enabled" if active == "operations_maintenance" else "Disabled" | |
| rs_status_val = "Enabled" if active == "road_safety" else "Disabled" | |
| pl_status_val = "Enabled" if active == "plantation" else "Disabled" | |
| return ( | |
| uc_status_val, om_status_val, rs_status_val, pl_status_val, status_message | |
| ) | |
| toggle_inputs = [uc_toggle, om_toggle, rs_toggle, pl_toggle] | |
| toggle_outputs = [uc_status, om_status, rs_status, pl_status, status_text] | |
| uc_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) | |
| om_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) | |
| rs_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) | |
| pl_toggle.change(update_toggles, inputs=toggle_inputs, outputs=toggle_outputs) | |
| pause_btn.click(toggle_pause, outputs=status_text) | |
| resume_btn.click(toggle_resume, outputs=status_text) | |
| frame_slider.change(set_frame_rate, inputs=[frame_slider]) | |
| def streaming_loop(): | |
| while True: | |
| if not video_loaded: | |
| yield None, json.dumps({"error": "Video not loaded. Please upload a video file."}, indent=2), "\n".join(log_entries[-10:]), None, None, last_detected_images | |
| else: | |
| frame, detections, logs, line_chart, pie_chart, captured = monitor_feed() | |
| if frame is None: | |
| yield None, detections, logs, line_chart, pie_chart, captured | |
| else: | |
| yield frame, detections, logs, line_chart, pie_chart, captured | |
| time.sleep(frame_rate) | |
| app.load(streaming_loop, outputs=[video_output, detections_output, logs_output, chart_output, pie_output, captured_images]) | |
| if __name__ == "__main__": | |
| app.launch(share=True) |