|
import cv2 |
|
import numpy as np |
|
import time |
|
from ultralytics import YOLO |
|
import torch |
|
from collections import defaultdict |
|
import pandas as pd |
|
from typing import Dict, List, Tuple |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LABEL_MAP = {0: "auto", 1: "bus", 2: "car", 3: "electric-rickshaw", 4: "large-sized-truck",5:'medium-sized-truck',6:'motorbike',7:'small-sized-truck'} |
|
|
|
def draw_text_with_background( |
|
image, |
|
text, |
|
position, |
|
font=cv2.FONT_HERSHEY_SIMPLEX, |
|
font_scale=1, |
|
font_thickness=2, |
|
text_color=(255, 255, 255), |
|
bg_color=(0, 0, 0), |
|
padding=5, |
|
): |
|
"""Draw `text` on `image` with a filled rectangle behind it.""" |
|
(text_width, text_height), baseline = cv2.getTextSize( |
|
text, font, font_scale, font_thickness |
|
) |
|
x, y = position |
|
rect_y1 = y - text_height - padding - baseline // 2 |
|
rect_y2 = y + padding - baseline // 2 |
|
|
|
cv2.rectangle( |
|
image, |
|
(x, rect_y1), |
|
(x + text_width + 2 * padding, rect_y2), |
|
bg_color, |
|
-1, |
|
) |
|
cv2.putText( |
|
image, |
|
text, |
|
(x + padding, y - baseline // 2), |
|
font, |
|
font_scale, |
|
text_color, |
|
font_thickness, |
|
cv2.LINE_AA, |
|
) |
|
|
|
|
|
def get_color_for_class(cls_id: int): |
|
"""Deterministic bright color for each class index.""" |
|
np.random.seed(cls_id + 37) |
|
return tuple(np.random.randint(100, 256, size=3).tolist()) |
|
|
|
|
|
def _inside(pt: Tuple[int, int], poly: np.ndarray) -> bool: |
|
"""Point-in-polygon test using OpenCV (nonβzero if inside).""" |
|
return cv2.pointPolygonTest(poly, pt, False) >= 0 |
|
|
|
|
|
class YOLOVideoDetector: |
|
""" |
|
Detect objects on a video and count them **per region**. |
|
|
|
* `regions`: Dict[int, List[Tuple[int,int]]], mapping region id (0,1, β¦) to |
|
4+ vertices in *pixel* coordinates (clockwise or anticlockwise). |
|
* For each frame, counts are stored in a DataFrame column named |
|
`<label>_<region>` (e.g. `car_0`, `bus_1`). |
|
""" |
|
|
|
def __init__( |
|
self, |
|
model_path: str, |
|
video_path: str, |
|
output_path: str, |
|
regions: Dict[int, List[Tuple[int, int]]], |
|
classes=None, |
|
conf: float = 0.35, |
|
scale_factor: float = 1.5, |
|
): |
|
self.device = "cuda" if torch.cuda.is_available() else "cpu" |
|
print(f"Using device: {self.device}") |
|
|
|
self.model = YOLO(model_path) |
|
self.video_path = video_path |
|
self.output_path = output_path |
|
self.conf = conf |
|
self.classes = classes |
|
self.scale = scale_factor |
|
|
|
|
|
self.regions = { |
|
rid: np.array(pts, np.int32) for rid, pts in regions.items() if pts |
|
} |
|
if not self.regions: |
|
raise ValueError("`regions` cannot be empty β provide at least one polygon.") |
|
|
|
|
|
self.df_columns = [ |
|
"Frame Number", |
|
*[ |
|
f"{LABEL_MAP[c]}_{rid}" |
|
for rid in self.regions |
|
for c in LABEL_MAP.keys() |
|
], |
|
] |
|
|
|
|
|
def process_video(self) -> pd.DataFrame: |
|
cap = cv2.VideoCapture(self.video_path) |
|
if not cap.isOpened(): |
|
raise ValueError(f"Cannot open video: {self.video_path}") |
|
|
|
ok, first_frame_original = cap.read() |
|
if not ok: |
|
cap.release() |
|
raise ValueError(f"Cannot read first frame from: {self.video_path}") |
|
|
|
h_orig, w_orig = first_frame_original.shape[:2] |
|
prediction_counter_df = pd.DataFrame(columns=self.df_columns) |
|
|
|
first_frame_processed = first_frame_original |
|
frame_was_rotated = False |
|
|
|
if w_orig < h_orig: |
|
print( |
|
f"Original frame (h,w): ({h_orig}, {w_orig}). Portrait β rotating 90Β° CW." |
|
) |
|
first_frame_processed = cv2.rotate( |
|
first_frame_original, cv2.ROTATE_90_CLOCKWISE |
|
) |
|
frame_was_rotated = True |
|
else: |
|
print(f"Original frame (h,w): ({h_orig}, {w_orig}). Processing as landscape.") |
|
|
|
|
|
base_h, base_w = first_frame_processed.shape[:2] |
|
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 |
|
|
|
out_w, out_h = int(base_w * self.scale), int(base_h * self.scale) |
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
|
out = cv2.VideoWriter(self.output_path, fourcc, fps, (out_w, out_h)) |
|
|
|
prev_t = time.time() |
|
frame_count = 1 |
|
frame_up = cv2.resize( |
|
first_frame_processed, (out_w, out_h), interpolation=cv2.INTER_LINEAR |
|
) |
|
prev_t = self._process_and_write_frame( |
|
frame_up, out, prev_t, prediction_counter_df, frame_count |
|
) |
|
|
|
while True: |
|
ok, frame_original_loop = cap.read() |
|
if not ok: |
|
break |
|
|
|
if frame_count % (fps // 2 or 1) == 0: |
|
frame_processed_loop = ( |
|
cv2.rotate(frame_original_loop, cv2.ROTATE_90_CLOCKWISE) |
|
if frame_was_rotated |
|
else frame_original_loop |
|
) |
|
frame_up = cv2.resize( |
|
frame_processed_loop, (out_w, out_h), interpolation=cv2.INTER_LINEAR |
|
) |
|
prev_t = self._process_and_write_frame( |
|
frame_up, out, prev_t, prediction_counter_df, frame_count |
|
) |
|
|
|
frame_count += 1 |
|
|
|
cap.release() |
|
out.release() |
|
cv2.destroyAllWindows() |
|
print(f"Processed {frame_count} frames. Finished β {self.output_path}") |
|
return prediction_counter_df.fillna(0) |
|
|
|
|
|
def _process_and_write_frame( |
|
self, |
|
frame_up: np.ndarray, |
|
out_writer: cv2.VideoWriter, |
|
prev_t: float, |
|
prediction_counter_df: pd.DataFrame, |
|
frame_count: int, |
|
) -> float: |
|
"""Run YOLO on one frame, count per region, annotate, write, return timestamp.""" |
|
|
|
scale_x = frame_up.shape[1] / (frame_up.shape[1] / self.scale) |
|
scale_y = frame_up.shape[0] / (frame_up.shape[0] / self.scale) |
|
for rid, poly in self.regions.items(): |
|
poly_up = (poly * [self.scale, self.scale]).astype(np.int32) |
|
cv2.polylines(frame_up, [poly_up], True, (255, 255, 0), 2) |
|
draw_text_with_background(frame_up, f"R{rid}", tuple(poly_up[0]), font_scale=0.8) |
|
|
|
results = self.model.predict( |
|
frame_up, |
|
conf=self.conf, |
|
classes=self.classes, |
|
verbose=False, |
|
device=self.device, |
|
) |
|
|
|
|
|
counts: Dict[int, Dict[int, int]] = { |
|
rid: defaultdict(int) for rid in self.regions |
|
} |
|
|
|
if results and len(results[0].boxes): |
|
xyxy = results[0].boxes.xyxy.cpu().numpy() |
|
scores = results[0].boxes.conf.cpu().numpy() |
|
cls_ids = results[0].boxes.cls.int().cpu().tolist() |
|
|
|
for (x1, y1, x2, y2), score, cls_id in zip(xyxy, scores, cls_ids): |
|
color = get_color_for_class(cls_id) |
|
cv2.rectangle( |
|
frame_up, (int(x1), int(y1)), (int(x2), int(y2)), color, 2 |
|
) |
|
label = LABEL_MAP.get(cls_id, f"Class {cls_id}") |
|
draw_text_with_background( |
|
frame_up, |
|
f"{label}: {score:.2f}", |
|
(int(x1), int(y1) - 10), |
|
font_scale=0.6, |
|
font_thickness=1, |
|
bg_color=color, |
|
padding=3, |
|
) |
|
|
|
|
|
cx, cy = int((x1 + x2) / 2), int((y1 + y2) / 2) |
|
for rid, poly in self.regions.items(): |
|
poly_up = (poly * [self.scale, self.scale]).astype(np.int32) |
|
if _inside((cx, cy), poly_up): |
|
counts[rid][cls_id] += 1 |
|
break |
|
|
|
|
|
df_idx = len(prediction_counter_df) |
|
prediction_counter_df.at[df_idx, "Frame Number"] = frame_count |
|
|
|
y_off = 30 |
|
for rid, cls_dict in counts.items(): |
|
for cls_id, cnt in cls_dict.items(): |
|
label = LABEL_MAP.get(cls_id, f"Class {cls_id}") |
|
col_name = f"{label}_{rid}" |
|
prediction_counter_df.at[df_idx, col_name] = cnt |
|
draw_text_with_background( |
|
frame_up, |
|
f"{label}_{rid}: {cnt}", |
|
(10, y_off), |
|
font_scale=0.7, |
|
font_thickness=2, |
|
padding=6, |
|
) |
|
y_off += 25 |
|
|
|
|
|
now = time.time() |
|
fps_live = 1.0 / (now - prev_t) if (now - prev_t) > 0 else 0.0 |
|
draw_text_with_background( |
|
frame_up, |
|
f"FPS: {fps_live:.1f}", |
|
(10, frame_up.shape[0] - 20), |
|
bg_color=(0, 0, 0), |
|
text_color=(0, 255, 0), |
|
font_scale=0.8, |
|
padding=4, |
|
) |
|
|
|
out_writer.write(frame_up) |
|
return now |
|
|