File size: 7,435 Bytes
3c87883
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# this file containes class Object Detection from Image.
from ultralytics import YOLO
import math
import logging
# Configure the logger
logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s - %(levelname)s - %(message)s",
    filename="logs.log",
)

# Create a logger
logger = logging.getLogger("pipline")


class ObjectDetection:
    """Class to detect object from image"""

    def __init__(self):
        self.input_file_path = None
        self.trained_model_path = None
        self.trained_model = None
        self.base_model = None

    def set_input_file_path(self, input_file_path):
        """
        Method to set path of input files to train a model

        Args:
            input_file_path (str): Relative or global path to the input files
        """
        self.input_file_path = input_file_path
        logger.info("input file path is set...")
    
    def set_trained_model_path(self, trained_model_path):
        """ 
        Method to set path of trained model to inference the model

        Args:
            trained_model_path (str): Relative or global path to the trained model
        """
        self.trained_model_path = trained_model_path
        self.trained_model = YOLO(trained_model_path)
        logger.info("trained model path is set...")

    def train(self):
        """
        Method to train a model for object detection in image.

        Raises:
            BaseException: It generates BaseException when it could'f find input_file_path
            e: any other exception that occors.
        """
        self.base_model = YOLO("yolov8m.pt")
        try:
            if self.input_file_path is None:
                raise BaseException("Please set path of input_files first with set_input_file_path method.")
            
            self.base_model.train(data=self.input_file_path, epochs=100)
        except Exception as e:
            logger.error("Something went wrong in activity detection model training")
            logger.error(e)
        
    def inference(self, image):
        """Method to detect object from image.

        Args:
            image (numpy array): Numpy array of image

        Raises:
            BaseException: It generates BaseException when it could'f find trained_model_path
            e: any other exception that occors.

        Returns:
            json array: returns list of all detected objects in formate:
                        [{
                            'actual_boundries': [{
                                'top_left': tupple(x, y),
                                'bottom_right': tupple(x,y),
                                'class': str
                            }],
                            'merged_boundries': [{
                                'top_left': tupple(x, y),
                                'bottom_right': tupple(x,y),
                                'person_count': int,
                                'vehical_count': int,
                                'animal_count': int
                            }]
                        }]
        """
        try:
            if self.trained_model is None:
                raise BaseException("Please set path of trained model first with set_trained_model_path method.")

            # detect object in image
            results = self.trained_model(image)
            detected_boundary_box = results[0].boxes.xyxy.tolist()
            classes = results[0].boxes.cls.tolist()
            class_names = results[0].names
            confidences = results[0].boxes.conf.tolist()
            number_of_objects = 0
            boundary_boxes_with_margin = []  # ((x1, y1), (x2,y2), person_count, vehical_count, animal_count)

            # Add margin to boundary box.
            for box, cls, conf in zip(detected_boundary_box, classes, confidences):
                x1, y1, x2, y2 = box
                name = class_names[int(cls)]
                merged_boundry_object = {"actual_boundries": [{"top_left": (int(x1), int(y1)),
                                                "bottom_right": (int(x2), int(y2)),
                                                "class": name}]}
                x1 = max(0, x1 - (x2-x1)/2)
                y1 = max(0, y1 - (y2-y1)/2)
                x2 = min(len(image[0])-1, x2 + (x2-x1)/2)
                y2 = min(len(image)-1, y2 + (y2-y1)/2)
                x1, y1, x2, y2 = math.floor(x1), math.floor(y1), math.ceil(x2), math.ceil(y2)
                merged_boundry_object["merged_boundries"] = {"top_left": (x1, y1),
                                                "bottom_right": (x2, y2),
                                                "person_count": 1 if name == 'person' else 0,
                                                "vehical_count": 1 if name == 'vehical' else 0,
                                                "animal_count": 1 if name == 'animal' else 0}
                boundary_boxes_with_margin.append(merged_boundry_object)
                number_of_objects += 1
            boundary_boxes_with_margin.sort(key=lambda x: (x['merged_boundries']['top_left'], x['merged_boundries']['bottom_right']))

            merged_boundary_boxes = []
            if len(boundary_boxes_with_margin) > 0:
                merged_boundary_boxes.append(boundary_boxes_with_margin[0])

            # merge two overlaped boundary box.
            for indx, box in enumerate(boundary_boxes_with_margin):
                if indx != 0:
                    top_left_last = merged_boundary_boxes[-1]['merged_boundries']['top_left']
                    bottom_right_last = merged_boundary_boxes[-1]['merged_boundries']['bottom_right']
                    top_left_curr = box['merged_boundries']['top_left']
                    bottom_right_curr = box['merged_boundries']['bottom_right']

                    if bottom_right_last[0] >= top_left_curr[0] and bottom_right_last[1] >= top_left_curr[1]:
                        new_x1 = min(top_left_last[0], top_left_curr[0])
                        new_y1 = min(top_left_last[1], top_left_curr[1])
                        new_x2 = max(bottom_right_last[0], bottom_right_curr[0])
                        new_y2 = max(bottom_right_last[1], bottom_right_curr[1])
                        
                        merged_boundary_boxes[-1]['actual_boundries'] += box['actual_boundries']
                        merged_boundary_boxes[-1]['merged_boundries'] = {"top_left": (new_x1, new_y1), 
                                                            "bottom_right": (new_x2, new_y2), 
                                                            "person_count": merged_boundary_boxes[-1]['merged_boundries']['person_count'] + box['merged_boundries']['person_count'], 
                                                            "vehical_count": merged_boundary_boxes[-1]['merged_boundries']['vehical_count'] + box['merged_boundries']['vehical_count'], 
                                                            "animal_count": merged_boundary_boxes[-1]['merged_boundries']['animal_count'] + box['merged_boundries']['animal_count']}
                    else:
                        merged_boundary_boxes.append(box)
            logger.info("inference is done successfully...")        
            return merged_boundary_boxes

        except Exception as e:
            logger.error("Something went wrong in activity detection model inference")
            logger.error(e)