lokesh341 commited on
Commit
0e0c258
·
verified ·
1 Parent(s): e529cf1

Update services/plantation/plant_health.py

Browse files
Files changed (1) hide show
  1. services/plantation/plant_health.py +58 -66
services/plantation/plant_health.py CHANGED
@@ -1,85 +1,77 @@
1
  import cv2
2
  import numpy as np
3
- from ultralytics import YOLO
4
- import os
5
  import logging
6
- from typing import Tuple, List, Dict, Any
7
 
8
- # Configure logging
9
  logging.basicConfig(
10
  filename="app.log",
11
  level=logging.INFO,
12
  format="%(asctime)s - %(levelname)s - %(message)s"
13
  )
14
 
15
- # Define base directory and model path
16
- BASE_DIR = os.path.dirname(os.path.abspath(__file__))
17
- MODEL_PATH = os.path.abspath(os.path.join(BASE_DIR, "../../models/yolov8n.pt"))
18
-
19
- # Initialize YOLO model
20
- try:
21
- model = YOLO(MODEL_PATH)
22
- logging.info("Loaded YOLOv8n model for plant health detection.")
23
- logging.info(f"Model class names: {model.names}")
24
- except Exception as e:
25
- logging.error(f"Failed to load YOLOv8n model: {str(e)}")
26
- model = None
27
-
28
- def process_plant_health(frame: np.ndarray) -> Tuple[List[Dict[str, Any]], np.ndarray]:
29
- # Validate input frame
30
- if not isinstance(frame, np.ndarray) or frame.size == 0:
31
- logging.error("Invalid input frame provided to plant_health.")
32
- return [], frame
33
-
34
- # Check if model is loaded
35
- if model is None:
36
- logging.error("YOLO model not loaded. Skipping plant health detection.")
37
- return [], frame
38
-
39
  try:
40
- # Perform YOLO inference
41
- results = model(frame, verbose=False)
42
- logging.debug("Completed YOLO inference for plant health detection.")
43
- except Exception as e:
44
- logging.error(f"Error during YOLO inference: {str(e)}")
45
- return [], frame
46
 
47
- detections: List[Dict[str, Any]] = []
48
- line_counter = 1
 
 
49
 
50
- for r in results:
51
- for box in r.boxes:
52
- conf = float(box.conf[0])
53
- if conf < 0.3:
54
- continue
55
- cls = int(box.cls[0])
56
- label = model.names[cls]
57
- if label != "unhealthy_plant":
 
 
 
58
  continue
59
- xyxy = box.xyxy[0].cpu().numpy().astype(int)
60
- x_min, y_min, x_max, y_max = xyxy
 
 
 
 
 
 
 
 
 
61
 
62
- detection_label = f"Line {line_counter} - Unhealthy Plant (Conf: {conf:.2f})"
 
 
 
 
 
 
 
63
  detections.append({
64
- "type": "unhealthy_plant",
65
- "label": detection_label,
66
- "confidence": conf,
67
- "coordinates": [x_min, y_min, x_max, y_max]
68
  })
 
 
 
69
 
70
- color = (0, 255, 0) # Green for unhealthy plants
71
- cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, 2)
72
- cv2.putText(
73
- frame,
74
- detection_label,
75
- (x_min, y_min - 10),
76
- cv2.FONT_HERSHEY_SIMPLEX,
77
- 0.6,
78
- color,
79
- 2
80
- )
81
-
82
- line_counter += 1
83
 
84
- logging.info(f"Detected {len(detections)} unhealthy plants in plantation.")
85
- return detections, frame
 
 
1
  import cv2
2
  import numpy as np
 
 
3
  import logging
4
+ from typing import List, Dict, Tuple
5
 
6
+ # Setup logging
7
  logging.basicConfig(
8
  filename="app.log",
9
  level=logging.INFO,
10
  format="%(asctime)s - %(levelname)s - %(message)s"
11
  )
12
 
13
+ def process_plant_health(frame: np.ndarray) -> Tuple[List[Dict], np.ndarray]:
14
+ """
15
+ Process a frame to assess plant health based on color.
16
+ Args:
17
+ frame: Input frame (BGR numpy array)
18
+ Returns:
19
+ Tuple: (List of detection dictionaries with health status, annotated frame)
20
+ """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  try:
22
+ # Convert frame to HSV
23
+ hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
 
 
 
 
24
 
25
+ # Define range for green (healthy) and yellow/brown (unhealthy)
26
+ lower_green = np.array([35, 40, 40])
27
+ upper_green = np.array([85, 255, 255])
28
+ green_mask = cv2.inRange(hsv, lower_green, upper_green)
29
 
30
+ # Define range for yellow/brown (unhealthy)
31
+ lower_unhealthy = np.array([20, 40, 40])
32
+ upper_unhealthy = np.array([35, 255, 255])
33
+ unhealthy_mask = cv2.inRange(hsv, lower_unhealthy, upper_unhealthy)
34
+
35
+ detections = []
36
+ # Process green regions (healthy plants)
37
+ contours, _ = cv2.findContours(green_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
38
+ for idx, contour in enumerate(contours):
39
+ area = cv2.contourArea(contour)
40
+ if area < 300:
41
  continue
42
+ x, y, w, h = cv2.boundingRect(contour)
43
+ x_min, y_min, x_max, y_max = x, y, x + w, y + h
44
+ detections.append({
45
+ "type": "plant",
46
+ "label": f"Plant {idx + 1} - Healthy",
47
+ "box": [x_min, y_min, x_max, y_max],
48
+ "health": "Healthy"
49
+ })
50
+ cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), (0, 255, 0), 2)
51
+ cv2.putText(frame, "Healthy", (x_min, y_min - 10),
52
+ cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
53
 
54
+ # Process unhealthy regions
55
+ contours, _ = cv2.findContours(unhealthy_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
56
+ for idx, contour in enumerate(contours):
57
+ area = cv2.contourArea(contour)
58
+ if area < 300:
59
+ continue
60
+ x, y, w, h = cv2.boundingRect(contour)
61
+ x_min, y_min, x_max, y_max = x, y, x + w, y + h
62
  detections.append({
63
+ "type": "plant",
64
+ "label": f"Plant {idx + 1} - Unhealthy",
65
+ "box": [x_min, y_min, x_max, y_max],
66
+ "health": "Unhealthy"
67
  })
68
+ cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), (255, 0, 0), 2)
69
+ cv2.putText(frame, "Unhealthy", (x_min, y_min - 10),
70
+ cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
71
 
72
+ logging.info(f"Assessed health of {len(detections)} plants in frame.")
73
+ return detections, frame
 
 
 
 
 
 
 
 
 
 
 
74
 
75
+ except Exception as e:
76
+ logging.error(f"Error in plant health assessment: {str(e)}")
77
+ return [], frame