lokesh341 commited on
Commit
c824f53
·
verified ·
1 Parent(s): 04f62ee

Update services/under_construction/earthwork_detection.py

Browse files
services/under_construction/earthwork_detection.py CHANGED
@@ -1,85 +1,37 @@
1
  import cv2
2
  import numpy as np
3
- from ultralytics import YOLO
4
- import os
5
- import logging
6
- from typing import Tuple, List, Dict, Any
7
-
8
- # Configure logging
9
- logging.basicConfig(
10
- filename="app.log",
11
- level=logging.INFO,
12
- format="%(asctime)s - %(levelname)s - %(message)s"
13
- )
14
-
15
- # Define base directory and model path
16
- BASE_DIR = os.path.dirname(os.path.abspath(__file__))
17
- MODEL_PATH = os.path.abspath(os.path.join(BASE_DIR, "../../models/yolov8n.pt"))
18
-
19
- # Initialize YOLO model
20
- try:
21
- model = YOLO(MODEL_PATH)
22
- logging.info("Loaded YOLOv8n model for earthwork detection.")
23
- logging.info(f"Model class names: {model.names}")
24
- except Exception as e:
25
- logging.error(f"Failed to load YOLOv8n model: {str(e)}")
26
- model = None
27
 
28
  def process_earthwork(frame: np.ndarray) -> Tuple[List[Dict[str, Any]], np.ndarray]:
29
- # Validate input frame
30
- if not isinstance(frame, np.ndarray) or frame.size == 0:
31
- logging.error("Invalid input frame provided to earthwork_detection.")
32
- return [], frame
33
-
34
- # Check if model is loaded
35
- if model is None:
36
- logging.error("YOLO model not loaded. Skipping earthwork detection.")
37
- return [], frame
38
-
39
- try:
40
- # Perform YOLO inference
41
- results = model(frame, verbose=False)
42
- logging.debug("Completed YOLO inference for earthwork detection.")
43
- except Exception as e:
44
- logging.error(f"Error during YOLO inference: {str(e)}")
45
- return [], frame
46
-
47
- detections: List[Dict[str, Any]] = []
48
- line_counter = 1
49
-
50
- for r in results:
51
- for box in r.boxes:
52
- conf = float(box.conf[0])
53
- if conf < 0.3:
54
- continue
55
- cls = int(box.cls[0])
56
- label = model.names[cls]
57
- if label != "earthwork":
58
- continue
59
- xyxy = box.xyxy[0].cpu().numpy().astype(int)
60
- x_min, y_min, x_max, y_max = xyxy
61
-
62
- detection_label = f"Line {line_counter} - {label.capitalize()} (Conf: {conf:.2f})"
63
- detections.append({
64
- "type": label,
65
- "label": detection_label,
66
- "confidence": conf,
67
- "coordinates": [x_min, y_min, x_max, y_max]
68
- })
69
-
70
- color = (255, 165, 0) # Orange for earthwork
71
- cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, 2)
72
- cv2.putText(
73
- frame,
74
- detection_label,
75
- (x_min, y_min - 10),
76
- cv2.FONT_HERSHEY_SIMPLEX,
77
- 0.6,
78
- color,
79
- 2
80
- )
81
-
82
- line_counter += 1
83
-
84
- logging.info(f"Detected {len(detections)} earthwork items in under_construction.")
85
  return detections, frame
 
1
  import cv2
2
  import numpy as np
3
+ from typing import List, Tuple, Dict, Any
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4
 
5
  def process_earthwork(frame: np.ndarray) -> Tuple[List[Dict[str, Any]], np.ndarray]:
6
+ """
7
+ Detect earthwork activity in the frame.
8
+ Args:
9
+ frame: Input frame as a numpy array.
10
+ Returns:
11
+ Tuple of (list of detections, annotated frame).
12
+ """
13
+ # Convert to grayscale
14
+ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
15
+
16
+ # Apply edge detection
17
+ edges = cv2.Canny(gray, 100, 200)
18
+
19
+ # Find contours
20
+ contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
21
+
22
+ detections = []
23
+ for i, contour in enumerate(contours):
24
+ area = cv2.contourArea(contour)
25
+ if area < 500: # Ignore small contours
26
+ continue
27
+
28
+ x, y, w, h = cv2.boundingRect(contour)
29
+ x_min, y_min, x_max, y_max = x, y, x + w, y + h
30
+
31
+ detections.append({
32
+ "box": [x_min, y_min, x_max, y_max],
33
+ "label": f"Earthwork {i+1}",
34
+ "type": "earthwork"
35
+ })
36
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37
  return detections, frame