import gradio as gr import cv2 import time import os import json from datetime import datetime from services.video_service import get_next_video_frame, reset_video_index, preload_video # Under Construction services from services.under_construction.earthwork_detection import process_earthwork from services.under_construction.culvert_check import process_culverts from services.under_construction.bridge_pier_check import process_bridge_piers # Comment out other services # from services.operations_maintenance.pothole_detection import process_potholes # from services.operations_maintenance.crack_detection import process_cracks # from services.operations_maintenance.signage_check import process_signages # from services.road_safety.barrier_check import process_barriers # from services.road_safety.lighting_check import process_lighting # from services.road_safety.accident_spot_check import process_accident_spots # from services.plantation.plant_count import process_plants # from services.plantation.plant_health import process_plant_health # from services.plantation.missing_patch_check import process_missing_patches # Original services (not used in this mode but imported for potential future use) from services.detection_service import process_frame as process_generic from services.metrics_service import compute_metrics from services.overlay_service import add_overlay from services.salesforce_dispatcher import dispatch_to_salesforce from services.shadow_detection import detect_shadows from services.thermal_service import process_thermal # Preload video try: preload_video() except Exception as e: print(f"Error preloading video: {str(e)}") # Globals paused = False frame_rate = 0.5 # Process every 0.5 seconds for real-time feel frame_count = 0 log_entries = [] last_frame = None last_detections = {} last_timestamp = "" # Constants TEMP_IMAGE_PATH = "temp.jpg" OUTPUT_DIR = "outputs" os.makedirs(OUTPUT_DIR, exist_ok=True) def monitor_feed(): """ Main function to process video frames in real-time. """ global paused, frame_count, last_frame, last_detections, last_timestamp if paused and last_frame is not None: frame = last_frame.copy() detections = last_detections.copy() else: try: frame = get_next_video_frame() except RuntimeError as e: log_entries.append(f"Error: {str(e)}") return None, json.dumps(last_detections, indent=2), "\n".join(log_entries[-10:]) # Run Under Construction detections earthwork_dets, frame = process_earthwork(frame) culvert_dets, frame = process_culverts(frame) bridge_pier_dets, frame = process_bridge_piers(frame) # Combine detections all_detections = { "earthwork": earthwork_dets, "culverts": culvert_dets, "bridge_piers": bridge_pier_dets, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "frame_count": frame_count } # Compute metrics all_dets_list = earthwork_dets + culvert_dets + bridge_pier_dets metrics = compute_metrics(all_dets_list) all_detections["metrics"] = metrics # Dispatch to Salesforce dispatch_to_salesforce(all_detections, all_detections["timestamp"]) # Save annotated frame frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg") cv2.imwrite(frame_path, frame) frame_count += 1 last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entries.append(f"{last_timestamp} - Frame {frame_count} - Detections: {len(all_dets_list)} - Avg Conf: {metrics['avg_confidence']:.2f}") last_frame = frame.copy() last_detections = all_detections if len(log_entries) > 100: log_entries.pop(0) # Add frame count and timestamp to display frame = cv2.resize(last_frame, (640, 480)) cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) return frame[:, :, ::-1], json.dumps(last_detections, indent=2), "\n".join(log_entries[-10:]) # Gradio UI with gr.Blocks(theme=gr.themes.Soft()) as app: gr.Markdown("# 🛡️ NHAI Drone Analytics Monitoring System - Under Construction") status_text = gr.Markdown("**Status:** 🟢 Running") with gr.Row(): with gr.Column(scale=3): video_output = gr.Image(label="Live Drone Feed", width=640, height=480) with gr.Column(scale=1): detections_output = gr.Textbox(label="Detections", lines=10) with gr.Row(): logs_output = gr.Textbox(label="Live Logs", lines=8) with gr.Row(): pause_btn = gr.Button("⏸️ Pause") resume_btn = gr.Button("▶️ Resume") frame_slider = gr.Slider(0.1, 5, value=0.5, label="Frame Interval (seconds)") def toggle_pause(): global paused paused = True return "**Status:** ⏸️ Paused" def toggle_resume(): global paused paused = False return "**Status:** 🟢 Running" def set_frame_rate(val): global frame_rate frame_rate = val pause_btn.click(toggle_pause, outputs=status_text) resume_btn.click(toggle_resume, outputs=status_text) frame_slider.change(set_frame_rate, inputs=[frame_slider]) def streaming_loop(): while True: frame, detections, logs = monitor_feed() if frame is None: yield None, detections, logs else: yield frame, detections, logs time.sleep(frame_rate) app.load(streaming_loop, outputs=[video_output, detections_output, logs_output]) if __name__ == "__main__": app.launch(share=True)