import cv2 import numpy as np from ultralytics import YOLO import os import logging from typing import Tuple, List, Dict, Any # Configure logging logging.basicConfig( filename="app.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) # Define base directory and model path BASE_DIR = os.path.dirname(os.path.abspath(__file__)) MODEL_PATH = os.path.abspath(os.path.join(BASE_DIR, "../../models/yolov8n.pt")) # Initialize YOLO model try: model = YOLO(MODEL_PATH) logging.info("Loaded YOLOv8n model for earthwork detection.") logging.info(f"Model class names: {model.names}") except Exception as e: logging.error(f"Failed to load YOLOv8n model: {str(e)}") model = None def process_earthwork(frame: np.ndarray) -> Tuple[List[Dict[str, Any]], np.ndarray]: # Validate input frame if not isinstance(frame, np.ndarray) or frame.size == 0: logging.error("Invalid input frame provided to earthwork_detection.") return [], frame # Check if model is loaded if model is None: logging.error("YOLO model not loaded. Skipping earthwork detection.") return [], frame try: # Perform YOLO inference results = model(frame, verbose=False) logging.debug("Completed YOLO inference for earthwork detection.") except Exception as e: logging.error(f"Error during YOLO inference: {str(e)}") return [], frame detections: List[Dict[str, Any]] = [] line_counter = 1 for r in results: for box in r.boxes: conf = float(box.conf[0]) if conf < 0.3: continue cls = int(box.cls[0]) label = model.names[cls] if label != "earthwork": continue xyxy = box.xyxy[0].cpu().numpy().astype(int) x_min, y_min, x_max, y_max = xyxy detection_label = f"Line {line_counter} - {label.capitalize()} (Conf: {conf:.2f})" detections.append({ "type": label, "label": detection_label, "confidence": conf, "coordinates": [x_min, y_min, x_max, y_max] }) color = (255, 165, 0) # Orange for earthwork cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, 2) cv2.putText( frame, detection_label, (x_min, y_min - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2 ) line_counter += 1 logging.info(f"Detected {len(detections)} earthwork items in under_construction.") return detections, frame