import os import gradio as gr import cv2 import time import json import random import logging import matplotlib.pyplot as plt import shutil from datetime import datetime from collections import Counter from typing import Any, Dict, List, Optional, Tuple import numpy as np import cv2 import time # Suppress Ultralytics warning by setting a writable config directory os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" # Import service modules try: from services.video_service import get_next_video_frame, reset_video_index, preload_video, release_video from services.metrics_service import update_metrics from services.salesforce_dispatcher import send_to_salesforce from services.shadow_detection import detect_shadow_coverage from services.thermal_service import process_thermal from services.map_service import generate_map from services.under_construction.earthwork_detection import process_earthwork from services.under_construction.culvert_check import process_culverts from services.under_construction.bridge_pier_check import process_bridge_piers except ImportError as e: print(f"Failed to import service modules: {str(e)}") logging.error(f"Import error: {str(e)}") exit(1) # Configure logging logging.basicConfig( filename="app.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) if not cap.isOpened(): print(f"Error: Could not open video source {video_source}. Trying camera.") cap = cv2.VideoCapture(1) # Try index 1 for webcam if not cap.isOpened(): print("Error: Could not open camera or video source.") exit() # Global variables paused: bool = False frame_rate: float = 0.3 frame_count: int = 0 log_entries: List[str] = [] detected_counts: List[int] = [] last_frame: Optional[np.ndarray] = None last_metrics: Dict[str, Any] = {} last_timestamp: str = "" detected_issues: List[str] = [] gps_coordinates: List[List[float]] = [] media_loaded: bool = False active_service: Optional[str] = None is_video: bool = True static_image: Optional[np.ndarray] = None enabled_services: List[str] = [] # Constants DEFAULT_VIDEO_PATH = "sample.mp4" TEMP_IMAGE_PATH = os.path.abspath("temp.jpg") CAPTURED_FRAMES_DIR = os.path.abspath("captured_frames") OUTPUT_DIR = os.path.abspath("outputs") TEMP_MEDIA_DIR = os.path.abspath("temp_media") # Ensure directories exist with write permissions for directory in [CAPTURED_FRAMES_DIR, OUTPUT_DIR, TEMP_MEDIA_DIR]: os.makedirs(directory, exist_ok=True) os.chmod(directory, 0o777) def initialize_media(media_file: Optional[Any] = None) -> str: global media_loaded, is_video, static_image, log_entries, frame_count release_video() static_image = None frame_count = 0 if media_file is None: media_path = DEFAULT_VIDEO_PATH log_entries.append(f"No media uploaded, attempting to load default: {media_path}") logging.info(f"No media uploaded, attempting to load default: {media_path}") else: if not hasattr(media_file, 'name') or not media_file.name: status = "Error: Invalid media file uploaded." log_entries.append(status) logging.error(status) media_loaded = False return status original_path = media_file.name file_extension = os.path.splitext(original_path)[1].lower() temp_media_path = os.path.join(TEMP_MEDIA_DIR, f"uploaded_media{file_extension}") try: shutil.copy(original_path, temp_media_path) media_path = temp_media_path log_entries.append(f"Copied uploaded file to: {media_path}") logging.info(f"Copied uploaded file to: {media_path}") except Exception as e: status = f"Error copying uploaded file: {str(e)}" log_entries.append(status) logging.error(status) media_loaded = False return status if not os.path.exists(media_path): status = f"Error: Media file '{media_path}' not found." log_entries.append(status) logging.error(status) media_loaded = False return status try: if file_extension in (".mp4", ".avi"): is_video = True preload_video(media_path) media_loaded = True status = f"Successfully loaded video: {media_path}" elif file_extension in (".jpg", ".jpeg", ".png"): is_video = False static_image = cv2.imread(media_path) if static_image is None: raise RuntimeError(f"Failed to load image: {media_path}") static_image = cv2.resize(static_image, (320, 240)) media_loaded = True status = f"Successfully loaded image: {media_path}" else: media_loaded = False status = "Error: Unsupported file format. Use .mp4, .avi, .jpg, .jpeg, or .png." log_entries.append(status) logging.error(status) return status log_entries.append(status) logging.info(status) return status except Exception as e: media_loaded = False status = f"Error loading media: {str(e)}" log_entries.append(status) logging.error(status) return status def set_active_service(uc_val: bool) -> Tuple[Optional[str], str]: global active_service, enabled_services enabled_services = [] if uc_val: enabled_services.append("under_construction") if not enabled_services: active_service = None log_entries.append("Under Construction service disabled.") logging.info("Under Construction service disabled.") return None, "No Service Enabled" active_service = "under_construction" log_entries.append("Enabled service: Under Construction") logging.info("Enabled service: Under Construction") return active_service, "Enabled: Under Construction" def generate_line_chart() -> Optional[str]: if not detected_counts: return None fig, ax = plt.subplots(figsize=(4, 2)) ax.plot(detected_counts[-50:], marker='o', color='#FF8C00') ax.set_title("Detections Over Time") ax.set_xlabel("Frame") ax.set_ylabel("Count") ax.grid(True) fig.tight_layout() chart_path = "chart_temp.png" try: fig.savefig(chart_path) plt.close(fig) return chart_path except Exception as e: log_entries.append(f"Error generating chart: {str(e)}") logging.error(f"Error generating chart: {str(e)}") return None def monitor_feed() -> Tuple[ Optional[np.ndarray], str, str, List[str], Optional[str], Optional[str] ]: global paused, frame_count, last_frame, last_metrics, last_timestamp global gps_coordinates, detected_issues, media_loaded global is_video, static_image, enabled_services if not media_loaded: log_entries.append("Cannot start processing: Media not loaded successfully.") logging.error("Media not loaded successfully.") return ( None, json.dumps({"error": "Media not loaded. Please upload a video or image file."}, indent=2), "\n".join(log_entries[-10:]), detected_issues, None, None ) if paused and last_frame is not None: frame = last_frame.copy() metrics = last_metrics.copy() else: max_retries = 3 start_time = time.time() for attempt in range(max_retries): try: if is_video: frame = get_next_video_frame() if frame is None: log_entries.append(f"Frame retrieval failed on attempt {attempt + 1}, resetting video.") logging.warning(f"Frame retrieval failed on attempt {attempt + 1}, resetting video.") reset_video_index() continue break else: frame = static_image.copy() break except Exception as e: log_entries.append(f"Frame retrieval error on attempt {attempt + 1}: {str(e)}") logging.error(f"Frame retrieval error on attempt {attempt + 1}: {str(e)}") if attempt == max_retries - 1: return ( None, json.dumps(last_metrics, indent=2), "\n".join(log_entries[-10:]), detected_issues, None, None ) else: log_entries.append("Failed to retrieve frame after maximum retries.") logging.error("Failed to retrieve frame after maximum retries.") return ( None, json.dumps(last_metrics, indent=2), "\n".join(log_entries[-10:]), detected_issues, None, None ) detection_frame = cv2.resize(frame, (512, 320)) all_detected_items: List[Dict[str, Any]] = [] shadow_issue = False thermal_flag = False try: if "under_construction" in enabled_services: earthwork_dets, detection_frame = process_earthwork(detection_frame) culvert_dets, detection_frame = process_culverts(detection_frame) bridge_pier_dets, detection_frame = process_bridge_piers(detection_frame) all_detected_items.extend(earthwork_dets + culvert_dets + bridge_pier_dets) try: cv2.imwrite(TEMP_IMAGE_PATH, detection_frame) shadow_issue = detect_shadow_coverage(TEMP_IMAGE_PATH) except Exception as e: log_entries.append(f"Error saving temp image for shadow detection: {str(e)}") logging.error(f"Error saving temp image: {str(e)}") shadow_issue = False if len(detection_frame.shape) == 2: thermal_results = process_thermal(detection_frame) thermal_dets = thermal_results["detections"] detection_frame = thermal_results["frame"] all_detected_items.extend(thermal_dets) thermal_flag = bool(thermal_dets) orig_h, orig_w = frame.shape[:2] det_h, det_w = detection_frame.shape[:2] scale_x, scale_y = orig_w / det_w, orig_h / det_h for item in all_detected_items: if "box" in item: box = item["box"] item["box"] = [ int(box[0] * scale_x), int(box[1] * scale_y), int(box[2] * scale_x), int(box[3] * scale_y) ] for item in all_detected_items: box = item.get("box", []) if not box: continue x_min, y_min, x_max, y_max = box label = item.get("label", "") dtype = item.get("type", "") if dtype == "earthwork": color = (255, 105, 180) # Pink elif dtype == "culvert": color = (0, 128, 128) # Teal elif dtype == "bridge_pier": color = (255, 127, 127) # Coral else: continue cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, 3) (text_w, text_h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) label_background = frame[y_min - text_h - 15:y_min - 5, x_min:x_min + text_w + 10] if label_background.size > 0: overlay = label_background.copy() cv2.rectangle(overlay, (0, 0), (text_w + 10, text_h + 10), (0, 0, 0), -1) alpha = 0.5 cv2.addWeighted(overlay, alpha, label_background, 1 - alpha, 0, label_background) cv2.putText(frame, label, (x_min + 5, y_min - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) try: cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95]) except Exception as e: log_entries.append(f"Error saving temp image: {str(e)}") logging.error(f"Error saving temp image: {str(e)}") except Exception as e: log_entries.append(f"Processing Error: {str(e)}") logging.error(f"Processing error: {str(e)}") all_detected_items = [] metrics = update_metrics(all_detected_items) gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001] gps_coordinates.append(gps_coord) for item in all_detected_items: item["gps"] = gps_coord detection_types = {item.get("type") for item in all_detected_items if "type" in item} if detection_types: try: captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count}.jpg") success = cv2.imwrite(captured_frame_path, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100]) if not success: raise RuntimeError(f"Failed to save captured frame: {captured_frame_path}") for item in all_detected_items: detected_issues.append(captured_frame_path) if len(detected_issues) > 100: detected_issues.pop(0) except Exception as e: log_entries.append(f"Error saving captured frame: {str(e)}") logging.error(f"Error saving captured frame: {str(e)}") all_detections = { "detections": all_detected_items, "metrics": metrics, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "frame_count": frame_count, "gps_coordinates": gps_coord, "shadow_issue": shadow_issue, "thermal": thermal_flag } try: send_to_salesforce(all_detections) except Exception as e: log_entries.append(f"Salesforce Dispatch Error: {str(e)}") logging.error(f"Salesforce dispatch error: {str(e)}") try: frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg") success = cv2.imwrite(frame_path, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100]) if not success: raise RuntimeError(f"Failed to save output frame: {frame_path}") except Exception as e: log_entries.append(f"Error saving output frame: {str(e)}") logging.error(f"Error saving output frame: {str(e)}") frame_count += 1 last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") last_frame = frame.copy() last_metrics = metrics earthwork_detected = len([item for item in all_detected_items if item.get("type") == "earthwork"]) culvert_detected = len([item for item in all_detected_items if item.get("type") == "culvert"]) bridge_pier_detected = len([item for item in all_detected_items if item.get("type") == "bridge_pier"]) detected_counts.append(earthwork_detected + culvert_detected + bridge_pier_detected) processing_time = time.time() - start_time detection_summary = { "timestamp": last_timestamp, "frame": frame_count, "earthworks": earthwork_detected, "culverts": culvert_detected, "bridge_piers": bridge_pier_detected, "gps": gps_coord, "processing_time_ms": processing_time * 1000 } log_message = json.dumps(detection_summary, indent=2) log_entries.append(log_message) logging.info(log_message) if len(log_entries) > 100: log_entries.pop(0) if len(detected_counts) > 500: detected_counts.pop(0) frame = cv2.resize(last_frame, (640, 480)) cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) map_items = [item for item in last_metrics.get("items", []) if item.get("type") in ["earthwork", "culvert", "bridge_pier"]] map_path = generate_map(gps_coordinates[-5:], map_items) return ( frame[:, :, ::-1], json.dumps(last_metrics, indent=2), "\n".join(log_entries[-10:]), detected_issues, generate_line_chart(), map_path ) with gr.Blocks(theme=gr.themes.Soft(primary_hue="orange", secondary_hue="amber")) as app: gr.Markdown( """ # 🛠️ Under Construction Inspection Dashboard Monitor under construction elements in real-time using drone footage or static images. """ ) with gr.Row(): with gr.Column(scale=3): media_input = gr.File(label="Upload Media File (e.g., sample.mp4, image.jpg)", file_types=[".mp4", ".avi", ".jpg", ".jpeg", ".png"]) load_button = gr.Button("Load Media", variant="primary") with gr.Column(scale=1): media_status = gr.Textbox( label="Media Load Status", value="Please upload a video/image file or ensure 'sample.mp4' exists in the root directory.", interactive=False ) with gr.Row(): with gr.Column(): uc_toggle = gr.Checkbox(label="Enable Under Construction Services", value=False) uc_status = gr.Textbox(label="Under Construction Status", value="Disabled", interactive=False) status_text = gr.Markdown("**Status:** 🟢 Ready (Upload a media file and enable the service to start)") with gr.Row(): with gr.Column(scale=3): media_output = gr.Image(label="Live Feed", width=640, height=480, elem_id="live-feed") with gr.Column(scale=1): metrics_output = gr.Textbox( label="Detection Metrics", lines=10, interactive=False, placeholder="Detection metrics, counts will appear here." ) with gr.Row(): with gr.Column(scale=2): logs_output = gr.Textbox(label="Live Logs", lines=8, interactive=False) with gr.Column(scale=1): issue_images = gr.Gallery(label="Detected Issues (Last 100+)", columns=4, rows=13, height="auto") with gr.Row(): chart_output = gr.Image(label="Detection Trend") map_output = gr.Image(label="Issue Locations Map") with gr.Row(): pause_btn = gr.Button("⏸️ Pause", variant="secondary") resume_btn = gr.Button("▶️ Resume", variant="primary") frame_slider = gr.Slider(0.05, 1.0, value=0.3, label="Frame Interval (seconds)", step=0.05) gr.HTML(""" """) def toggle_pause() -> str: global paused paused = True return "**Status:** ⏸️ Paused" def toggle_resume() -> str: global paused paused = False return "**Status:** 🟢 Streaming" def set_frame_rate(val: float) -> None: global frame_rate frame_rate = val media_status.value = initialize_media() load_button.click( initialize_media, inputs=[media_input], outputs=[media_status] ) def update_toggles(uc_val: bool) -> Tuple[str, str]: active, status_message = set_active_service(uc_val) uc_status_val = "Enabled" if uc_val else "Disabled" return uc_status_val, status_message uc_toggle.change(update_toggles, inputs=[uc_toggle], outputs=[uc_status, status_text]) pause_btn.click(toggle_pause, outputs=status_text) resume_btn.click(toggle_resume, outputs=status_text) frame_slider.change(set_frame_rate, inputs=[frame_slider]) def streaming_loop(): while True: if not media_loaded: yield None, json.dumps({"error": "Media not loaded. Please upload a video or image file."}, indent=2), "\n".join(log_entries[-10:]), detected_issues, None, None else: frame, metrics, logs, issues, chart, map_path = monitor_feed() if frame is None: yield None, metrics, logs, issues, chart, map_path else: yield frame, metrics, logs, issues, chart, map_path if not is_video: break time.sleep(frame_rate) app.load(streaming_loop, outputs=[media_output, metrics_output, logs_output, issue_images, chart_output, map_output]) if __name__ == "__main__": app.launch(share=True)