File size: 2,723 Bytes
277a9d1
 
 
 
6dc5651
ebb854e
6dc5651
ebb854e
6dc5651
 
 
 
 
277a9d1
ebb854e
277a9d1
ebb854e
 
 
6dc5651
 
ebb854e
 
6dc5651
ebb854e
6dc5651
277a9d1
ebb854e
 
 
 
 
 
 
6dc5651
 
 
 
 
ebb854e
 
 
6dc5651
 
 
277a9d1
ebb854e
6dc5651
277a9d1
 
 
 
ebb854e
277a9d1
 
ea1ea54
6dc5651
277a9d1
6dc5651
 
277a9d1
dc7a0e6
277a9d1
ea1ea54
dc7a0e6
277a9d1
 
 
 
ebb854e
dc7a0e6
ebb854e
 
 
 
 
 
 
 
 
dc7a0e6
 
 
ebb854e
ea1ea54
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import cv2
import numpy as np
from ultralytics import YOLO
import os
import logging
from typing import Tuple, List, Dict, Any

# Configure logging
logging.basicConfig(
    filename="app.log",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

# Define base directory and model path
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MODEL_PATH = os.path.abspath(os.path.join(BASE_DIR, "../../models/yolov8n.pt"))

# Initialize YOLO model
try:
    model = YOLO(MODEL_PATH)
    logging.info("Loaded YOLOv8n model for earthwork detection.")
    logging.info(f"Model class names: {model.names}")
except Exception as e:
    logging.error(f"Failed to load YOLOv8n model: {str(e)}")
    model = None

def process_earthwork(frame: np.ndarray) -> Tuple[List[Dict[str, Any]], np.ndarray]:
    # Validate input frame
    if not isinstance(frame, np.ndarray) or frame.size == 0:
        logging.error("Invalid input frame provided to earthwork_detection.")
        return [], frame

    # Check if model is loaded
    if model is None:
        logging.error("YOLO model not loaded. Skipping earthwork detection.")
        return [], frame

    try:
        # Perform YOLO inference
        results = model(frame, verbose=False)
        logging.debug("Completed YOLO inference for earthwork detection.")
    except Exception as e:
        logging.error(f"Error during YOLO inference: {str(e)}")
        return [], frame

    detections: List[Dict[str, Any]] = []
    line_counter = 1

    for r in results:
        for box in r.boxes:
            conf = float(box.conf[0])
            if conf < 0.3:
                continue
            cls = int(box.cls[0])
            label = model.names[cls]
            if label != "earthwork":
                continue
            xyxy = box.xyxy[0].cpu().numpy().astype(int)
            x_min, y_min, x_max, y_max = xyxy

            detection_label = f"Line {line_counter} - {label.capitalize()} (Conf: {conf:.2f})"
            detections.append({
                "type": label,
                "label": detection_label,
                "confidence": conf,
                "coordinates": [x_min, y_min, x_max, y_max]
            })

            color = (255, 165, 0)  # Orange for earthwork
            cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, 2)
            cv2.putText(
                frame,
                detection_label,
                (x_min, y_min - 10),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.6,
                color,
                2
            )
            
            line_counter += 1

    logging.info(f"Detected {len(detections)} earthwork items in under_construction.")
    return detections, frame