import cv2 import torch import gradio as gr import numpy as np import os import json import logging import matplotlib.pyplot as plt import zipfile from datetime import datetime from collections import Counter from typing import List, Dict, Any, Optional from ultralytics import YOLO import ultralytics import time # Set YOLO config directory os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" # Set up logging logging.basicConfig( filename="app.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) # Directories CAPTURED_FRAMES_DIR = "captured_frames" ORIGINAL_FRAMES_DIR = "original_frames" OUTPUT_DIR = "outputs" os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True) os.makedirs(ORIGINAL_FRAMES_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) os.chmod(CAPTURED_FRAMES_DIR, 0o777) os.chmod(ORIGINAL_FRAMES_DIR, 0o777) os.chmod(OUTPUT_DIR, 0o777) # Global variables log_entries: List[str] = [] detected_counts: List[int] = [] detected_issues: List[str] = [] gps_coordinates: List[List[float]] = [] last_metrics: Dict[str, Any] = {} frame_count: int = 0 SAVE_IMAGE_INTERVAL = 1 # Load model device = "cuda" if torch.cuda.is_available() else "cpu" model = YOLO('./data/best.pt').to(device) if device == "cuda": model.half() print(f"Using {device}, model classes: {model.names}") # Helper functions def generate_map(gps_coords: List[List[float]], items: List[Dict[str, Any]]) -> str: map_path = "map_temp.png" plt.figure(figsize=(4, 4)) plt.scatter([x[1] for x in gps_coords], [x[0] for x in gps_coords], c='blue', label='GPS Points') plt.title("Issue Locations Map") plt.xlabel("Longitude") plt.ylabel("Latitude") plt.legend() plt.savefig(map_path) plt.close() return map_path def send_to_salesforce(data: Dict[str, Any]) -> None: pass # Placeholder def update_metrics(detections: List[Dict[str, Any]]) -> Dict[str, Any]: counts = Counter([det["label"] for det in detections]) return { "items": [{"type": k, "count": v} for k, v in counts.items()], "total_detections": len(detections), "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } def generate_line_chart() -> Optional[str]: if not detected_counts: return None chart_path = "chart_temp.png" plt.figure(figsize=(4, 2)) plt.plot(detected_counts[-50:], marker='o', color='#FF8C00') plt.title("Detections Over Time") plt.xlabel("Frame") plt.ylabel("Count") plt.grid(True) plt.tight_layout() plt.savefig(chart_path) plt.close() return chart_path def create_zip_from_directory(dir_path: str, zip_filename: str) -> str: zip_path = os.path.join(OUTPUT_DIR, zip_filename) with zipfile.ZipFile(zip_path, 'w') as zipf: for root, _, files in os.walk(dir_path): for file in files: full_path = os.path.join(root, file) zipf.write(full_path, arcname=file) return zip_path # Main function def process_video(video, resize_width=320, resize_height=240, frame_skip=5): global frame_count, last_metrics, detected_counts, detected_issues, gps_coordinates, log_entries frame_count = 0 detected_counts.clear() detected_issues.clear() gps_coordinates.clear() log_entries.clear() last_metrics = {} for dir_ in [CAPTURED_FRAMES_DIR, ORIGINAL_FRAMES_DIR]: for file in os.listdir(dir_): os.remove(os.path.join(dir_, file)) if video is None: log_entries.append("Error: No video uploaded") return None, json.dumps({"error": "No video uploaded"}, indent=2), "\n".join(log_entries), [], None, None, None, None cap = cv2.VideoCapture(video) if not cap.isOpened(): log_entries.append("Error: Could not open video file") return None, json.dumps({"error": "Could not open video file"}, indent=2), "\n".join(log_entries), [], None, None, None, None frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) output_path = "processed_output.mp4" out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (resize_width, resize_height)) all_detections = [] frame_times = [] detection_frame_count = 0 start_time = time.time() while True: ret, frame = cap.read() if not ret: break frame_count += 1 if frame_count % frame_skip != 0: continue frame = cv2.resize(frame, (resize_width, resize_height)) results = model(frame, verbose=False, conf=0.5, iou=0.7) annotated_frame = results[0].plot() # Save original frame original_path = os.path.join(ORIGINAL_FRAMES_DIR, f"frame_{frame_count}.jpg") cv2.imwrite(original_path, frame) frame_detections = [] for detection in results[0].boxes: cls = int(detection.cls) conf = float(detection.conf) box = detection.xyxy[0].cpu().numpy().astype(int).tolist() label = model.names[cls] frame_detections.append({"label": label, "box": box, "conf": conf}) if frame_detections: detection_frame_count += 1 if detection_frame_count % SAVE_IMAGE_INTERVAL == 0: captured_path = os.path.join(CAPTURED_FRAMES_DIR, f"frame_{frame_count}.jpg") cv2.imwrite(captured_path, annotated_frame) detected_issues.append(captured_path) if len(detected_issues) > 100: detected_issues.pop(0) out.write(annotated_frame) gps_coord = [17.385044 + (frame_count * 0.0001), 78.486671 + (frame_count * 0.0001)] gps_coordinates.append(gps_coord) for det in frame_detections: det["gps"] = gps_coord all_detections.extend(frame_detections) detected_counts.append(len(frame_detections)) frame_time = (time.time() - start_time) * 1000 frame_times.append(frame_time) last_metrics = update_metrics(all_detections) send_to_salesforce({ "detections": all_detections, "metrics": last_metrics, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "frame_count": frame_count, "gps_coordinates": gps_coordinates[-1] if gps_coordinates else [0, 0] }) cap.release() out.release() chart_path = generate_line_chart() map_path = generate_map(gps_coordinates[-5:], all_detections) originals_zip = create_zip_from_directory(ORIGINAL_FRAMES_DIR, "original_images.zip") annotated_zip = create_zip_from_directory(CAPTURED_FRAMES_DIR, "annotated_images.zip") return ( output_path, json.dumps(last_metrics, indent=2), "\n".join(log_entries[-10:]), detected_issues, chart_path, map_path, originals_zip, annotated_zip ) # Gradio UI with gr.Blocks(theme=gr.themes.Soft(primary_hue="orange")) as iface: gr.Markdown("# Crack and Pothole Detection Dashboard") with gr.Row(): with gr.Column(scale=3): video_input = gr.Video(label="Upload Video") width_slider = gr.Slider(320, 640, value=320, label="Output Width", step=1) height_slider = gr.Slider(240, 480, value=240, label="Output Height", step=1) skip_slider = gr.Slider(1, 10, value=5, label="Frame Skip", step=1) process_btn = gr.Button("Process Video", variant="primary") with gr.Column(scale=1): metrics_output = gr.Textbox(label="Detection Metrics", lines=5, interactive=False) with gr.Row(): video_output = gr.Video(label="Processed Video") issue_gallery = gr.Gallery(label="Detected Issues", columns=4, height="auto", object_fit="contain") with gr.Row(): chart_output = gr.Image(label="Detection Trend") map_output = gr.Image(label="Issue Locations Map") with gr.Row(): logs_output = gr.Textbox(label="Logs", lines=5, interactive=False) with gr.Row(): originals_zip_out = gr.File(label="Download Original Images (ZIP)") annotated_zip_out = gr.File(label="Download Annotated Images (ZIP)") process_btn.click( process_video, inputs=[video_input, width_slider, height_slider, skip_slider], outputs=[ video_output, metrics_output, logs_output, issue_gallery, chart_output, map_output, originals_zip_out, annotated_zip_out ] ) if __name__ == "__main__": iface.launch()