File size: 5,197 Bytes
8166792
 
 
 
4f0cfe1
7021f6a
 
8166792
 
908272b
 
 
09543a7
 
 
 
 
 
 
 
6490caa
7021f6a
 
 
 
 
 
 
 
 
 
 
8166792
 
 
 
 
 
 
 
 
 
09543a7
661e202
7021f6a
 
 
 
 
 
 
 
 
4f0cfe1
7021f6a
 
 
 
 
8166792
7021f6a
 
8166792
7021f6a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8166792
7021f6a
 
8166792
7021f6a
 
 
661e202
7021f6a
661e202
7021f6a
661e202
7021f6a
661e202
7021f6a
661e202
 
7021f6a
 
 
661e202
 
7021f6a
 
 
8166792
7021f6a
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import cv2
import numpy as np
from ultralytics import YOLO
import random
import spaces
import os
import torch

class ImageSegmenter:
    def __init__(self, model_type="yolov8s-seg", device="cpu"):
        self.device = device
        self.model = YOLO(model_type).to(self.device)
        self.is_show_bounding_boxes = True
        self.is_show_segmentation_boundary = False
        self.is_show_segmentation = False
        self.confidence_threshold = 0.5
        self.cls_clr = {}
        self.bb_thickness = 2
        self.bb_clr = (255, 0, 0)
        self.masks = {}
        self.model = None
        
        # Ensure model directory exists
        os.makedirs('models', exist_ok=True)
        
        # Check if model file exists, if not download it
        model_path = os.path.join('models', f'{model_type}.pt')
        if not os.path.exists(model_path):
            print(f"Downloading {model_type} model...")
            self.model = YOLO(model_type)
            self.model.export()
            print("Model downloaded successfully")

    def get_cls_clr(self, cls_id):
        if cls_id in self.cls_clr:
            return self.cls_clr[cls_id]
        r = random.randint(50, 200)
        g = random.randint(50, 200)
        b = random.randint(50, 200)
        self.cls_clr[cls_id] = (r, g, b)
        return (r, g, b)

    @spaces.GPU
    def predict(self, image):            
        try:
            # Initialize model if needed
            if self.model is None:
                print("Loading YOLO model...")
                model_path = os.path.join('models', f'{self.model_type}.pt')
                # Force CPU mode for YOLO initialization
                self.model = YOLO(model_path)
                self.model.to('cpu')  # Explicitly move to CPU
                print("Model loaded successfully")

            # Ensure image is in correct format
            if isinstance(image, np.ndarray):
                image = image.copy()
            else:
                raise ValueError("Input image must be a numpy array")

            # Make prediction using CPU
            predictions = self.model.predict(image, device='cpu')

            # Process results
            objects_data = []
            
            if len(predictions) == 0 or not predictions[0].boxes:
                return image, objects_data

            cls_ids = predictions[0].boxes.cls.numpy()  # Changed from cpu().numpy()
            bounding_boxes = predictions[0].boxes.xyxy.int().numpy()        
            cls_conf = predictions[0].boxes.conf.numpy()
            
            if predictions[0].masks is not None:
                seg_mask_boundary = predictions[0].masks.xy
                seg_mask = predictions[0].masks.data.numpy()  # Changed from cpu().numpy()
            else:
                seg_mask_boundary, seg_mask = [], np.array([])

            for id, cls in enumerate(cls_ids):
                if cls_conf[id] <= self.confidence_threshold:
                    continue
                    
                cls_clr = self.get_cls_clr(int(cls))

                if seg_mask.size > 0:
                    self.masks[id] = seg_mask[id]
                    
                    if self.is_show_segmentation:
                        alpha = 0.8                
                        colored_mask = np.expand_dims(seg_mask[id], 0).repeat(3, axis=0)
                        colored_mask = np.moveaxis(colored_mask, 0, -1)

                        if image.shape[:2] != seg_mask[id].shape[:2]:
                            colored_mask = cv2.resize(colored_mask, (image.shape[1], image.shape[0]))

                        masked = np.ma.MaskedArray(image, mask=colored_mask, fill_value=cls_clr)
                        image_overlay = masked.filled()                
                        image = cv2.addWeighted(image, 1 - alpha, image_overlay, alpha, 0)

                if self.is_show_bounding_boxes:
                    (x1, y1, x2, y2) = bounding_boxes[id]
                    cls_name = self.model.names[int(cls)]
                    cls_confidence = cls_conf[id]
                    disp_str = f"{cls_name} {cls_confidence:.2f}"
                    cv2.rectangle(image, (x1, y1), (x2, y2), cls_clr, self.bb_thickness)
                    cv2.rectangle(image, (x1, y1), (x1+len(disp_str)*9, y1+15), cls_clr, -1)
                    cv2.putText(image, disp_str, (x1+5, y1+10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
                
                if len(seg_mask_boundary) > 0 and self.is_show_segmentation_boundary:
                    cv2.polylines(image, [np.array(seg_mask_boundary[id], dtype=np.int32)], 
                                isClosed=True, color=cls_clr, thickness=2)

                (x1, y1, x2, y2) = bounding_boxes[id]
                center = (x1+(x2-x1)//2, y1+(y2-y1)//2)
                objects_data.append([int(cls), self.model.names[int(cls)], center, 
                                  self.masks.get(id, None), cls_clr])

            return image, objects_data
            
        except Exception as e:
            print(f"Error in predict: {str(e)}")
            import traceback
            print(traceback.format_exc())
            raise