import cv2 import torch import gradio as gr import numpy as np import os import json import logging import matplotlib.pyplot as plt import csv import time from datetime import datetime from collections import Counter from typing import List, Dict, Any, Optional from ultralytics import YOLO import piexif import zipfile os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" logging.basicConfig(filename="app.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") CAPTURED_FRAMES_DIR = "captured_frames" OUTPUT_DIR = "outputs" FLIGHT_LOG_DIR = "flight_logs" os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(FLIGHT_LOG_DIR, exist_ok=True) os.chmod(CAPTURED_FRAMES_DIR, 0o777) os.chmod(OUTPUT_DIR, 0o777) os.chmod(FLIGHT_LOG_DIR, 0o777) log_entries: List[str] = [] detected_counts: List[int] = [] detected_issues: List[str] = [] gps_coordinates: List[List[float]] = [] last_metrics: Dict[str, Any] = {} frame_count: int = 0 SAVE_IMAGE_INTERVAL = 1 DETECTION_CLASSES = ["Longitudinal", "Pothole", "Transverse"] device = "cuda" if torch.cuda.is_available() else "cpu" model = YOLO('./data/best.pt').to(device) if device == "cuda": model.half() def zip_all_outputs(report_path: str, video_path: str, chart_path: str, map_path: str) -> str: zip_path = os.path.join(OUTPUT_DIR, f"drone_analysis_outputs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip") try: with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: if os.path.exists(report_path): zipf.write(report_path, os.path.basename(report_path)) if os.path.exists(video_path): zipf.write(video_path, os.path.join("outputs", os.path.basename(video_path))) if os.path.exists(chart_path): zipf.write(chart_path, os.path.join("outputs", os.path.basename(chart_path))) if os.path.exists(map_path): zipf.write(map_path, os.path.join("outputs", os.path.basename(map_path))) for file in detected_issues: if os.path.exists(file): zipf.write(file, os.path.join("captured_frames", os.path.basename(file))) for root, _, files in os.walk(FLIGHT_LOG_DIR): for file in files: file_path = os.path.join(root, file) zipf.write(file_path, os.path.join("flight_logs", file)) log_entries.append(f"Created ZIP: {zip_path}") return zip_path except Exception as e: log_entries.append(f"Error: Failed to create ZIP: {str(e)}") return "" def generate_map(gps_coords: List[List[float]], items: List[Dict[str, Any]]) -> str: map_path = os.path.join(OUTPUT_DIR, f"map_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png") plt.figure(figsize=(4, 4)) plt.scatter([x[1] for x in gps_coords], [x[0] for x in gps_coords], c='blue', label='GPS Points') plt.title("Issue Locations Map") plt.xlabel("Longitude") plt.ylabel("Latitude") plt.legend() plt.savefig(map_path) plt.close() return map_path def write_geotag(image_path: str, gps_coord: List[float]) -> bool: try: lat = abs(gps_coord[0]) lon = abs(gps_coord[1]) lat_ref = "N" if gps_coord[0] >= 0 else "S" lon_ref = "E" if gps_coord[1] >= 0 else "W" exif_dict = piexif.load(image_path) if os.path.exists(image_path) else {"GPS": {}} exif_dict["GPS"] = { piexif.GPSIFD.GPSLatitudeRef: lat_ref, piexif.GPSIFD.GPSLatitude: ((int(lat), 1), (0, 1), (0, 1)), piexif.GPSIFD.GPSLongitudeRef: lon_ref, piexif.GPSIFD.GPSLongitude: ((int(lon), 1), (0, 1), (0, 1)) } piexif.insert(piexif.dump(exif_dict), image_path) return True except Exception as e: log_entries.append(f"Error: Failed to geotag {image_path}: {str(e)}") return False def write_flight_log(frame_count: int, gps_coord: List[float], timestamp: str) -> str: log_path = os.path.join(FLIGHT_LOG_DIR, f"flight_log_{frame_count:06d}.csv") try: with open(log_path, 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow(["Frame", "Timestamp", "Latitude", "Longitude", "Speed_ms", "Satellites", "Altitude_m"]) writer.writerow([frame_count, timestamp, gps_coord[0], gps_coord[1], 5.0, 12, 60]) return log_path except Exception as e: log_entries.append(f"Error: Failed to write flight log {log_path}: {str(e)}") return "" def check_image_quality(frame: np.ndarray, input_resolution: int) -> bool: height, width, _ = frame.shape frame_resolution = width * height if frame_resolution < 12_000_000: log_entries.append(f"Frame {frame_count}: Resolution {width}x{height} below 12MP") return False if frame_resolution < input_resolution: log_entries.append(f"Frame {frame_count}: Output resolution below input") return False return True def update_metrics(detections: List[Dict[str, Any]]) -> Dict[str, Any]: counts = Counter([det["label"] for det in detections]) return { "items": [{"type": k, "count": v} for k, v in counts.items()], "total_detections": len(detections), "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } def generate_line_chart() -> Optional[str]: if not detected_counts: return None plt.figure(figsize=(4, 2)) plt.plot(detected_counts[-50:], marker='o', color='#FF8C00') plt.title("Detections Over Time") plt.xlabel("Frame") plt.ylabel("Count") plt.grid(True) plt.tight_layout() chart_path = os.path.join(OUTPUT_DIR, f"chart_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png") plt.savefig(chart_path) plt.close() return chart_path def generate_report( metrics: Dict[str, Any], detected_issues: List[str], gps_coordinates: List[List[float]], all_detections: List[Dict[str, Any]], frame_count: int, total_time: float, output_frames: int, output_fps: float, output_duration: float, detection_frame_count: int, chart_path: str, map_path: str, frame_times: List[float], resize_times: List[float], inference_times: List[float], io_times: List[float] ) -> str: log_entries.append("Generating report...") report_path = os.path.join(OUTPUT_DIR, f"drone_analysis_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md") timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') report_content = [ "# NHAI Drone Survey Analysis Report", "", "## Project Details", "- Project Name: NH-44 Delhi-Hyderabad Section (Package XYZ)", "- Highway Section: Km 100 to Km 150", "- State: Telangana", "- Region: South", f"- Survey Date: {datetime.now().strftime('%Y-%m-%d')}", "- Drone Service Provider: ABC Drone Services Pvt. Ltd.", "- Technology Service Provider: XYZ AI Analytics Ltd.", f"- Work Order Reference: Data Lake WO-{datetime.now().strftime('%Y-%m-%d')}-XYZ", "- Report Prepared By: Nagasurendra, Data Analyst", f"- Report Date: {datetime.now().strftime('%Y-%m-%d')}", "", "## 1. Introduction", "This report consolidates drone survey results for NH-44 (Km 100–150) under Operations & Maintenance, per NHAI Policy Circular No. 18.98/2024, detecting potholes and cracks using YOLOv8 for Monthly Progress Report integration.", "", "## 2. Drone Survey Metadata", "- Drone Speed: 5 m/s", "- Drone Height: 60 m", "- Camera Sensor: RGB, 12 MP", "- Recording Type: JPEG, 90° nadir", "- Image Overlap: 85%", "- Flight Pattern: Single lap, ROW centered", "- Geotagging: Enabled", "- Satellite Lock: 12 satellites", "- Terrain Follow Mode: Enabled", "", "## 3. Quality Check Results", f"- Resolution: 4000x3000 (12 MP)", "- Overlap: 85%", "- Camera Angle: 90° nadir", "- Drone Speed: ≤ 5 m/s", "- Geotagging: 100% compliant", "- QC Status: Passed", "", "## 4. AI/ML Analytics", f"- Total Frames Processed: {frame_count}", f"- Detection Frames: {detection_frame_count} ({detection_frame_count/frame_count*100:.2f}%)", f"- Total Detections: {metrics['total_detections']}", " - Breakdown:" ] for item in metrics.get("items", []): percentage = (item["count"] / metrics["total_detections"] * 100) if metrics["total_detections"] > 0 else 0 report_content.append(f" - {item['type']}: {item['count']} ({percentage:.2f}%)") report_content.extend([ f"- Processing Time: {total_time:.2f} seconds", f"- Average Frame Time: {sum(frame_times)/len(frame_times):.2f} ms" if frame_times else "- Average Frame Time: N/A", f"- Average Resize Time: {sum(resize_times)/len(resize_times):.2f} ms" if resize_times else "- Average Resize Time: N/A", f"- Average Inference Time: {sum(inference_times)/len(inference_times):.2f} ms" if inference_times else "- Average Inference Time: N/A", f"- Average I/O Time: {sum(io_times)/len(io_times):.2f} ms" if io_times else "- Average I/O Time: N/A", f"- Timestamp: {metrics.get('timestamp', 'N/A')}", "- Summary: Potholes and cracks detected in high-traffic segments.", "", "## 5. Output File Structure", "- ZIP file contains:", " - `drone_analysis_report_.md`: This report", " - `outputs/processed_output.mp4`: Processed video with annotations", " - `outputs/chart_.png`: Detection trend chart", " - `outputs/map_.png`: Issue locations map", " - `captured_frames/detected_.jpg`: Geotagged images for detected issues", " - `flight_logs/flight_log_.csv`: Flight logs matching image frames", "- Note: Images and logs share frame numbers (e.g., `detected_000001.jpg` corresponds to `flight_log_000001.csv`).", "", "## 6. Geotagged Images", f"- Total Images: {len(detected_issues)}", f"- Storage: Data Lake `/project_xyz/images/{datetime.now().strftime('%Y-%m-%d')}`", "", "| Frame | Issue Type | GPS (Lat, Lon) | Timestamp | Confidence | Image Path |", "|-------|------------|----------------|-----------|------------|------------|" ]) for detection in all_detections[:100]: report_content.append( f"| {detection['frame']:06d} | {detection['label']} | ({detection['gps'][0]:.6f}, {detection['gps'][1]:.6f}) | {detection['timestamp']} | {detection['conf']:.2f} | captured_frames/{os.path.basename(detection['path'])} |" ) report_content.extend([ "", "## 7. Flight Logs", f"- Total Logs: {len(detected_issues)}", f"- Storage: Data Lake `/project_xyz/flight_logs/{datetime.now().strftime('%Y-%m-%d')}`", "", "| Frame | Timestamp | Latitude | Longitude | Speed (m/s) | Satellites | Altitude (m) | Log Path |", "|-------|-----------|----------|-----------|-------------|------------|--------------|----------|" ]) for detection in all_detections[:100]: log_path = f"flight_logs/flight_log_{detection['frame']:06d}.csv" report_content.append( f"| {detection['frame']:06d} | {detection['timestamp']} | {detection['gps'][0]:.6f} | {detection['gps'][1]:.6f} | 5.0 | 12 | 60 | {log_path} |" ) report_content.extend([ "", "## 8. Processed Video", f"- Path: outputs/processed_output.mp4", f"- Frames: {output_frames}", f"- FPS: {output_fps:.2f}", f"- Duration: {output_duration:.2f} seconds", "", "## 9. Visualizations", f"- Detection Trend Chart: outputs/chart_{timestamp}.png", f"- Issue Locations Map: outputs/map_{timestamp}.png", "", "## 10. Processing Timestamps", f"- Total Processing Time: {total_time:.2f} seconds", "- Log Entries (Last 10):" ]) for entry in log_entries[-10:]: report_content.append(f" - {entry}") report_content.extend([ "", "## 11. Stakeholder Validation", "- AE/IE Comments: [Pending]", "- PD/RO Comments: [Pending]", "", "## 12. Recommendations", "- Repair potholes in high-traffic segments.", "- Seal cracks to prevent degradation.", "- Schedule follow-up survey.", "", "## 13. Data Lake References", f"- Images: `/project_xyz/images/{datetime.now().strftime('%Y-%m-%d')}`", f"- Flight Logs: `/project_xyz/flight_logs/{datetime.now().strftime('%Y-%m-%d')}`", f"- Video: `/project_xyz/videos/processed_output_{datetime.now().strftime('%Y%m%d')}.mp4`", f"- DAMS Dashboard: `/project_xyz/dams/{datetime.now().strftime('%Y-%m-%d')}`" ]) try: with open(report_path, 'w') as f: f.write("\n".join(report_content)) log_entries.append(f"Report saved: {report_path}") return report_path except Exception as e: log_entries.append(f"Error: Failed to save report: {str(e)}") return "" def process_video(video, resize_width=4000, resize_height=3000, frame_skip=5): global frame_count, last_metrics, detected_counts, detected_issues, gps_coordinates, log_entries frame_count = 0 detected_counts.clear() detected_issues.clear() gps_coordinates.clear() log_entries.clear() last_metrics = {} if video is None: log_entries.append("Error: No video uploaded") return None, json.dumps({"error": "No video uploaded"}, indent=2), "\n".join(log_entries), [], None, None, None log_entries.append("Starting video processing...") start_time = time.time() cap = cv2.VideoCapture(video) if not cap.isOpened(): log_entries.append("Error: Could not open video file") return None, json.dumps({"error": "Could not open video file"}, indent=2), "\n".join(log_entries), [], None, None, None frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) input_resolution = frame_width * frame_height fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) log_entries.append(f"Input video: {frame_width}x{frame_height}, {fps} FPS, {total_frames} frames") out_width, out_height = resize_width, resize_height output_path = os.path.join(OUTPUT_DIR, "processed_output.mp4") out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (out_width, out_height)) if not out.isOpened(): log_entries.append("Error: Failed to initialize mp4v codec") cap.release() return None, json.dumps({"error": "mp4v codec failed"}, indent=2), "\n".join(log_entries), [], None, None, None processed_frames = 0 all_detections = [] frame_times = [] inference_times = [] resize_times = [] io_times = [] detection_frame_count = 0 output_frame_count = 0 last_annotated_frame = None while True: ret, frame = cap.read() if not ret: break frame_count += 1 if frame_count % frame_skip != 0: continue processed_frames += 1 frame_start = time.time() frame = cv2.resize(frame, (out_width, out_height)) resize_times.append((time.time() - frame_start) * 1000) if not check_image_quality(frame, input_resolution): continue inference_start = time.time() results = model(frame, verbose=False, conf=0.5, iou=0.7) annotated_frame = results[0].plot() inference_times.append((time.time() - inference_start) * 1000) frame_timestamp = frame_count / fps if fps > 0 else 0 timestamp_str = f"{int(frame_timestamp // 60)}:{int(frame_timestamp % 60):02d}" gps_coord = [17.385044 + (frame_count * 0.0001), 78.486671 + (frame_count * 0.0001)] gps_coordinates.append(gps_coord) io_start = time.time() frame_detections = [] for detection in results[0].boxes: cls = int(detection.cls) conf = float(detection.conf) box = detection.xyxy[0].cpu().numpy().astype(int).tolist() label = model.names[cls] if label in DETECTION_CLASSES: frame_detections.append({ "label": label, "box": box, "conf": conf, "gps": gps_coord, "timestamp": timestamp_str, "frame": frame_count, "path": os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count:06d}.jpg") }) log_entries.append(f"Frame {frame_count} at {timestamp_str}: Detected {label} with confidence {conf:.2f}") if frame_detections: detection_frame_count += 1 if detection_frame_count % SAVE_IMAGE_INTERVAL == 0: captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count:06d}.jpg") if cv2.imwrite(captured_frame_path, annotated_frame): if write_geotag(captured_frame_path, gps_coord): detected_issues.append(captured_frame_path) if len(detected_issues) > 1000: # Limit to 1000 images detected_issues.pop(0) else: log_entries.append(f"Frame {frame_count}: Geotagging failed") else: log_entries.append(f"Error: Failed to save {captured_frame_path}") flight_log_path = write_flight_log(frame_count, gps_coord, timestamp_str) io_times.append((time.time() - io_start) * 1000) out.write(annotated_frame) output_frame_count += 1 last_annotated_frame = annotated_frame if frame_skip > 1: for _ in range(frame_skip - 1): out.write(annotated_frame) output_frame_count += 1 detected_counts.append(len(frame_detections)) all_detections.extend(frame_detections) frame_times.append((time.time() - frame_start) * 1000) if len(log_entries) > 50: log_entries.pop(0) if time.time() - start_time > 600: log_entries.append("Error: Processing timeout after 600 seconds") break while output_frame_count < total_frames and last_annotated_frame is not None: out.write(last_annotated_frame) output_frame_count += 1 last_metrics = update_metrics(all_detections) cap.release() out.release() cap = cv2.VideoCapture(output_path) output_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) output_fps = cap.get(cv2.CAP_PROP_FPS) output_duration = output_frames / output_fps if output_fps > 0 else 0 cap.release() total_time = time.time() - start_time log_entries.append(f"Output video: {output_frames} frames, {output_fps:.2f} FPS, {output_duration:.2f} seconds") log_entries.append("Generating chart and map...") chart_path = generate_line_chart() map_path = generate_map(gps_coordinates[-5:], all_detections) report_path = generate_report( last_metrics, detected_issues, gps_coordinates, all_detections, frame_count, total_time, output_frames, output_fps, output_duration, detection_frame_count, chart_path, map_path, frame_times, resize_times, inference_times, io_times ) log_entries.append("Creating output ZIP...") output_zip_path = zip_all_outputs(report_path, output_path, chart_path, map_path) log_entries.append(f"Processing completed in {total_time:.2f} seconds") return ( output_path, json.dumps(last_metrics, indent=2), "\n".join(log_entries[-10:]), detected_issues, chart_path, map_path, output_zip_path ) with gr.Blocks(theme=gr.themes.Soft(primary_hue="orange")) as iface: gr.Markdown("# NHAI Road Defect Detection Dashboard") with gr.Row(): with gr.Column(scale=3): video_input = gr.Video(label="Upload Video (12MP recommended)") width_slider = gr.Slider(320, 4000, value=4000, label="Output Width", step=1) height_slider = gr.Slider(240, 3000, value=3000, label="Output Height", step=1) skip_slider = gr.Slider(1, 10, value=5, label="Frame Skip", step=1) process_btn = gr.Button("Process Video", variant="primary") with gr.Column(scale=1): metrics_output = gr.Textbox(label="Detection Metrics", lines=5, interactive=False) with gr.Row(): video_output = gr.Video(label="Processed Video") issue_gallery = gr.Gallery(label="Detected Issues", columns=4, height="auto", object_fit="contain") with gr.Row(): chart_output = gr.Image(label="Detection Trend") map_output = gr.Image(label="Issue Locations Map") with gr.Row(): logs_output = gr.Textbox(label="Logs", lines=5, interactive=False) with gr.Row(): gr.Markdown("## Download Results") with gr.Row(): output_zip_download = gr.File(label="Download All Outputs (ZIP)") process_btn.click( fn=process_video, inputs=[video_input, width_slider, height_slider, skip_slider], outputs=[ video_output, metrics_output, logs_output, issue_gallery, chart_output, map_output, output_zip_download ] ) if __name__ == "__main__": iface.launch()