|  |  | 
					
						
						|  |  | 
					
						
						|  | import argparse | 
					
						
						|  | from collections import defaultdict | 
					
						
						|  | from pathlib import Path | 
					
						
						|  |  | 
					
						
						|  | import cv2 | 
					
						
						|  | import numpy as np | 
					
						
						|  | from shapely.geometry import Polygon | 
					
						
						|  | from shapely.geometry.point import Point | 
					
						
						|  |  | 
					
						
						|  | from ultralytics import YOLO | 
					
						
						|  | from ultralytics.utils.files import increment_path | 
					
						
						|  | from ultralytics.utils.plotting import Annotator, colors | 
					
						
						|  |  | 
					
						
						|  | track_history = defaultdict(list) | 
					
						
						|  |  | 
					
						
						|  | current_region = None | 
					
						
						|  | counting_regions = [ | 
					
						
						|  | { | 
					
						
						|  | "name": "YOLOv8 Polygon Region", | 
					
						
						|  | "polygon": Polygon([(50, 80), (250, 20), (450, 80), (400, 350), (100, 350)]), | 
					
						
						|  | "counts": 0, | 
					
						
						|  | "dragging": False, | 
					
						
						|  | "region_color": (255, 42, 4), | 
					
						
						|  | "text_color": (255, 255, 255), | 
					
						
						|  | }, | 
					
						
						|  | { | 
					
						
						|  | "name": "YOLOv8 Rectangle Region", | 
					
						
						|  | "polygon": Polygon([(200, 250), (440, 250), (440, 550), (200, 550)]), | 
					
						
						|  | "counts": 0, | 
					
						
						|  | "dragging": False, | 
					
						
						|  | "region_color": (37, 255, 225), | 
					
						
						|  | "text_color": (0, 0, 0), | 
					
						
						|  | }, | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def mouse_callback(event, x, y, flags, param): | 
					
						
						|  | """ | 
					
						
						|  | Handles mouse events for region manipulation. | 
					
						
						|  |  | 
					
						
						|  | Parameters: | 
					
						
						|  | event (int): The mouse event type (e.g., cv2.EVENT_LBUTTONDOWN). | 
					
						
						|  | x (int): The x-coordinate of the mouse pointer. | 
					
						
						|  | y (int): The y-coordinate of the mouse pointer. | 
					
						
						|  | flags (int): Additional flags passed by OpenCV. | 
					
						
						|  | param: Additional parameters passed to the callback (not used in this function). | 
					
						
						|  |  | 
					
						
						|  | Global Variables: | 
					
						
						|  | current_region (dict): A dictionary representing the current selected region. | 
					
						
						|  |  | 
					
						
						|  | Mouse Events: | 
					
						
						|  | - LBUTTONDOWN: Initiates dragging for the region containing the clicked point. | 
					
						
						|  | - MOUSEMOVE: Moves the selected region if dragging is active. | 
					
						
						|  | - LBUTTONUP: Ends dragging for the selected region. | 
					
						
						|  |  | 
					
						
						|  | Notes: | 
					
						
						|  | - This function is intended to be used as a callback for OpenCV mouse events. | 
					
						
						|  | - Requires the existence of the 'counting_regions' list and the 'Polygon' class. | 
					
						
						|  |  | 
					
						
						|  | Example: | 
					
						
						|  | >>> cv2.setMouseCallback(window_name, mouse_callback) | 
					
						
						|  | """ | 
					
						
						|  | global current_region | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if event == cv2.EVENT_LBUTTONDOWN: | 
					
						
						|  | for region in counting_regions: | 
					
						
						|  | if region["polygon"].contains(Point((x, y))): | 
					
						
						|  | current_region = region | 
					
						
						|  | current_region["dragging"] = True | 
					
						
						|  | current_region["offset_x"] = x | 
					
						
						|  | current_region["offset_y"] = y | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | elif event == cv2.EVENT_MOUSEMOVE: | 
					
						
						|  | if current_region is not None and current_region["dragging"]: | 
					
						
						|  | dx = x - current_region["offset_x"] | 
					
						
						|  | dy = y - current_region["offset_y"] | 
					
						
						|  | current_region["polygon"] = Polygon( | 
					
						
						|  | [(p[0] + dx, p[1] + dy) for p in current_region["polygon"].exterior.coords] | 
					
						
						|  | ) | 
					
						
						|  | current_region["offset_x"] = x | 
					
						
						|  | current_region["offset_y"] = y | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | elif event == cv2.EVENT_LBUTTONUP: | 
					
						
						|  | if current_region is not None and current_region["dragging"]: | 
					
						
						|  | current_region["dragging"] = False | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def run( | 
					
						
						|  | weights="yolov8n.pt", | 
					
						
						|  | source=None, | 
					
						
						|  | device="cpu", | 
					
						
						|  | view_img=False, | 
					
						
						|  | save_img=False, | 
					
						
						|  | exist_ok=False, | 
					
						
						|  | classes=None, | 
					
						
						|  | line_thickness=2, | 
					
						
						|  | track_thickness=2, | 
					
						
						|  | region_thickness=2, | 
					
						
						|  | ): | 
					
						
						|  | """ | 
					
						
						|  | Run Region counting on a video using YOLOv8 and ByteTrack. | 
					
						
						|  |  | 
					
						
						|  | Supports movable region for real time counting inside specific area. | 
					
						
						|  | Supports multiple regions counting. | 
					
						
						|  | Regions can be Polygons or rectangle in shape | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | weights (str): Model weights path. | 
					
						
						|  | source (str): Video file path. | 
					
						
						|  | device (str): processing device cpu, 0, 1 | 
					
						
						|  | view_img (bool): Show results. | 
					
						
						|  | save_img (bool): Save results. | 
					
						
						|  | exist_ok (bool): Overwrite existing files. | 
					
						
						|  | classes (list): classes to detect and track | 
					
						
						|  | line_thickness (int): Bounding box thickness. | 
					
						
						|  | track_thickness (int): Tracking line thickness | 
					
						
						|  | region_thickness (int): Region thickness. | 
					
						
						|  | """ | 
					
						
						|  | vid_frame_count = 0 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if not Path(source).exists(): | 
					
						
						|  | raise FileNotFoundError(f"Source path '{source}' does not exist.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | model = YOLO(f"{weights}") | 
					
						
						|  | model.to("cuda") if device == "0" else model.to("cpu") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | names = model.model.names | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | videocapture = cv2.VideoCapture(source) | 
					
						
						|  | frame_width, frame_height = int(videocapture.get(3)), int(videocapture.get(4)) | 
					
						
						|  | fps, fourcc = int(videocapture.get(5)), cv2.VideoWriter_fourcc(*"mp4v") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | save_dir = increment_path(Path("ultralytics_rc_output") / "exp", exist_ok) | 
					
						
						|  | save_dir.mkdir(parents=True, exist_ok=True) | 
					
						
						|  | video_writer = cv2.VideoWriter(str(save_dir / f"{Path(source).stem}.mp4"), fourcc, fps, (frame_width, frame_height)) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | while videocapture.isOpened(): | 
					
						
						|  | success, frame = videocapture.read() | 
					
						
						|  | if not success: | 
					
						
						|  | break | 
					
						
						|  | vid_frame_count += 1 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | results = model.track(frame, persist=True, classes=classes) | 
					
						
						|  |  | 
					
						
						|  | if results[0].boxes.id is not None: | 
					
						
						|  | boxes = results[0].boxes.xyxy.cpu() | 
					
						
						|  | track_ids = results[0].boxes.id.int().cpu().tolist() | 
					
						
						|  | clss = results[0].boxes.cls.cpu().tolist() | 
					
						
						|  |  | 
					
						
						|  | annotator = Annotator(frame, line_width=line_thickness, example=str(names)) | 
					
						
						|  |  | 
					
						
						|  | for box, track_id, cls in zip(boxes, track_ids, clss): | 
					
						
						|  | annotator.box_label(box, str(names[cls]), color=colors(cls, True)) | 
					
						
						|  | bbox_center = (box[0] + box[2]) / 2, (box[1] + box[3]) / 2 | 
					
						
						|  |  | 
					
						
						|  | track = track_history[track_id] | 
					
						
						|  | track.append((float(bbox_center[0]), float(bbox_center[1]))) | 
					
						
						|  | if len(track) > 30: | 
					
						
						|  | track.pop(0) | 
					
						
						|  | points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2)) | 
					
						
						|  | cv2.polylines(frame, [points], isClosed=False, color=colors(cls, True), thickness=track_thickness) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for region in counting_regions: | 
					
						
						|  | if region["polygon"].contains(Point((bbox_center[0], bbox_center[1]))): | 
					
						
						|  | region["counts"] += 1 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for region in counting_regions: | 
					
						
						|  | region_label = str(region["counts"]) | 
					
						
						|  | region_color = region["region_color"] | 
					
						
						|  | region_text_color = region["text_color"] | 
					
						
						|  |  | 
					
						
						|  | polygon_coords = np.array(region["polygon"].exterior.coords, dtype=np.int32) | 
					
						
						|  | centroid_x, centroid_y = int(region["polygon"].centroid.x), int(region["polygon"].centroid.y) | 
					
						
						|  |  | 
					
						
						|  | text_size, _ = cv2.getTextSize( | 
					
						
						|  | region_label, cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.7, thickness=line_thickness | 
					
						
						|  | ) | 
					
						
						|  | text_x = centroid_x - text_size[0] // 2 | 
					
						
						|  | text_y = centroid_y + text_size[1] // 2 | 
					
						
						|  | cv2.rectangle( | 
					
						
						|  | frame, | 
					
						
						|  | (text_x - 5, text_y - text_size[1] - 5), | 
					
						
						|  | (text_x + text_size[0] + 5, text_y + 5), | 
					
						
						|  | region_color, | 
					
						
						|  | -1, | 
					
						
						|  | ) | 
					
						
						|  | cv2.putText( | 
					
						
						|  | frame, region_label, (text_x, text_y), cv2.FONT_HERSHEY_SIMPLEX, 0.7, region_text_color, line_thickness | 
					
						
						|  | ) | 
					
						
						|  | cv2.polylines(frame, [polygon_coords], isClosed=True, color=region_color, thickness=region_thickness) | 
					
						
						|  |  | 
					
						
						|  | if view_img: | 
					
						
						|  | if vid_frame_count == 1: | 
					
						
						|  | cv2.namedWindow("Ultralytics YOLOv8 Region Counter Movable") | 
					
						
						|  | cv2.setMouseCallback("Ultralytics YOLOv8 Region Counter Movable", mouse_callback) | 
					
						
						|  | cv2.imshow("Ultralytics YOLOv8 Region Counter Movable", frame) | 
					
						
						|  |  | 
					
						
						|  | if save_img: | 
					
						
						|  | video_writer.write(frame) | 
					
						
						|  |  | 
					
						
						|  | for region in counting_regions: | 
					
						
						|  | region["counts"] = 0 | 
					
						
						|  |  | 
					
						
						|  | if cv2.waitKey(1) & 0xFF == ord("q"): | 
					
						
						|  | break | 
					
						
						|  |  | 
					
						
						|  | del vid_frame_count | 
					
						
						|  | video_writer.release() | 
					
						
						|  | videocapture.release() | 
					
						
						|  | cv2.destroyAllWindows() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def parse_opt(): | 
					
						
						|  | """Parse command line arguments.""" | 
					
						
						|  | parser = argparse.ArgumentParser() | 
					
						
						|  | parser.add_argument("--weights", type=str, default="yolov8n.pt", help="initial weights path") | 
					
						
						|  | parser.add_argument("--device", default="", help="cuda device, i.e. 0 or 0,1,2,3 or cpu") | 
					
						
						|  | parser.add_argument("--source", type=str, required=True, help="video file path") | 
					
						
						|  | parser.add_argument("--view-img", action="store_true", help="show results") | 
					
						
						|  | parser.add_argument("--save-img", action="store_true", help="save results") | 
					
						
						|  | parser.add_argument("--exist-ok", action="store_true", help="existing project/name ok, do not increment") | 
					
						
						|  | parser.add_argument("--classes", nargs="+", type=int, help="filter by class: --classes 0, or --classes 0 2 3") | 
					
						
						|  | parser.add_argument("--line-thickness", type=int, default=2, help="bounding box thickness") | 
					
						
						|  | parser.add_argument("--track-thickness", type=int, default=2, help="Tracking line thickness") | 
					
						
						|  | parser.add_argument("--region-thickness", type=int, default=4, help="Region thickness") | 
					
						
						|  |  | 
					
						
						|  | return parser.parse_args() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def main(opt): | 
					
						
						|  | """Main function.""" | 
					
						
						|  | run(**vars(opt)) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if __name__ == "__main__": | 
					
						
						|  | opt = parse_opt() | 
					
						
						|  | main(opt) | 
					
						
						|  |  |