|
|
|
from ultralytics import YOLO |
|
import math |
|
import logging |
|
|
|
logging.basicConfig( |
|
level=logging.DEBUG, |
|
format="%(asctime)s - %(levelname)s - %(message)s", |
|
filename="logs.log", |
|
) |
|
|
|
|
|
logger = logging.getLogger("pipline") |
|
|
|
|
|
class ObjectDetection: |
|
"""Class to detect object from image""" |
|
|
|
def __init__(self): |
|
self.input_file_path = None |
|
self.trained_model_path = None |
|
self.trained_model = None |
|
self.base_model = None |
|
|
|
def set_input_file_path(self, input_file_path): |
|
""" |
|
Method to set path of input files to train a model |
|
|
|
Args: |
|
input_file_path (str): Relative or global path to the input files |
|
""" |
|
self.input_file_path = input_file_path |
|
logger.info("input file path is set...") |
|
|
|
def set_trained_model_path(self, trained_model_path): |
|
""" |
|
Method to set path of trained model to inference the model |
|
|
|
Args: |
|
trained_model_path (str): Relative or global path to the trained model |
|
""" |
|
self.trained_model_path = trained_model_path |
|
self.trained_model = YOLO(trained_model_path) |
|
logger.info("trained model path is set...") |
|
|
|
def train(self): |
|
""" |
|
Method to train a model for object detection in image. |
|
|
|
Raises: |
|
BaseException: It generates BaseException when it could'f find input_file_path |
|
e: any other exception that occors. |
|
""" |
|
self.base_model = YOLO("yolov8m.pt") |
|
try: |
|
if self.input_file_path is None: |
|
raise BaseException("Please set path of input_files first with set_input_file_path method.") |
|
|
|
self.base_model.train(data=self.input_file_path, epochs=100) |
|
except Exception as e: |
|
logger.error("Something went wrong in activity detection model training") |
|
logger.error(e) |
|
|
|
def inference(self, image): |
|
"""Method to detect object from image. |
|
|
|
Args: |
|
image (numpy array): Numpy array of image |
|
|
|
Raises: |
|
BaseException: It generates BaseException when it could'f find trained_model_path |
|
e: any other exception that occors. |
|
|
|
Returns: |
|
json array: returns list of all detected objects in formate: |
|
[{ |
|
'actual_boundries': [{ |
|
'top_left': tupple(x, y), |
|
'bottom_right': tupple(x,y), |
|
'class': str |
|
}], |
|
'merged_boundries': [{ |
|
'top_left': tupple(x, y), |
|
'bottom_right': tupple(x,y), |
|
'person_count': int, |
|
'vehical_count': int, |
|
'animal_count': int |
|
}] |
|
}] |
|
""" |
|
try: |
|
if self.trained_model is None: |
|
raise BaseException("Please set path of trained model first with set_trained_model_path method.") |
|
|
|
|
|
results = self.trained_model(image) |
|
detected_boundary_box = results[0].boxes.xyxy.tolist() |
|
classes = results[0].boxes.cls.tolist() |
|
class_names = results[0].names |
|
confidences = results[0].boxes.conf.tolist() |
|
number_of_objects = 0 |
|
boundary_boxes_with_margin = [] |
|
|
|
|
|
for box, cls, conf in zip(detected_boundary_box, classes, confidences): |
|
x1, y1, x2, y2 = box |
|
name = class_names[int(cls)] |
|
merged_boundry_object = {"actual_boundries": [{"top_left": (int(x1), int(y1)), |
|
"bottom_right": (int(x2), int(y2)), |
|
"class": name}]} |
|
x1 = max(0, x1 - (x2-x1)/2) |
|
y1 = max(0, y1 - (y2-y1)/2) |
|
x2 = min(len(image[0])-1, x2 + (x2-x1)/2) |
|
y2 = min(len(image)-1, y2 + (y2-y1)/2) |
|
x1, y1, x2, y2 = math.floor(x1), math.floor(y1), math.ceil(x2), math.ceil(y2) |
|
merged_boundry_object["merged_boundries"] = {"top_left": (x1, y1), |
|
"bottom_right": (x2, y2), |
|
"person_count": 1 if name == 'person' else 0, |
|
"vehical_count": 1 if name == 'vehical' else 0, |
|
"animal_count": 1 if name == 'animal' else 0} |
|
boundary_boxes_with_margin.append(merged_boundry_object) |
|
number_of_objects += 1 |
|
boundary_boxes_with_margin.sort(key=lambda x: (x['merged_boundries']['top_left'], x['merged_boundries']['bottom_right'])) |
|
|
|
merged_boundary_boxes = [] |
|
if len(boundary_boxes_with_margin) > 0: |
|
merged_boundary_boxes.append(boundary_boxes_with_margin[0]) |
|
|
|
|
|
for indx, box in enumerate(boundary_boxes_with_margin): |
|
if indx != 0: |
|
top_left_last = merged_boundary_boxes[-1]['merged_boundries']['top_left'] |
|
bottom_right_last = merged_boundary_boxes[-1]['merged_boundries']['bottom_right'] |
|
top_left_curr = box['merged_boundries']['top_left'] |
|
bottom_right_curr = box['merged_boundries']['bottom_right'] |
|
|
|
if bottom_right_last[0] >= top_left_curr[0] and bottom_right_last[1] >= top_left_curr[1]: |
|
new_x1 = min(top_left_last[0], top_left_curr[0]) |
|
new_y1 = min(top_left_last[1], top_left_curr[1]) |
|
new_x2 = max(bottom_right_last[0], bottom_right_curr[0]) |
|
new_y2 = max(bottom_right_last[1], bottom_right_curr[1]) |
|
|
|
merged_boundary_boxes[-1]['actual_boundries'] += box['actual_boundries'] |
|
merged_boundary_boxes[-1]['merged_boundries'] = {"top_left": (new_x1, new_y1), |
|
"bottom_right": (new_x2, new_y2), |
|
"person_count": merged_boundary_boxes[-1]['merged_boundries']['person_count'] + box['merged_boundries']['person_count'], |
|
"vehical_count": merged_boundary_boxes[-1]['merged_boundries']['vehical_count'] + box['merged_boundries']['vehical_count'], |
|
"animal_count": merged_boundary_boxes[-1]['merged_boundries']['animal_count'] + box['merged_boundries']['animal_count']} |
|
else: |
|
merged_boundary_boxes.append(box) |
|
logger.info("inference is done successfully...") |
|
return merged_boundary_boxes |
|
|
|
except Exception as e: |
|
logger.error("Something went wrong in activity detection model inference") |
|
logger.error(e) |
|
|