test / lane_detection.py
Ashkchamp's picture
Upload 2 files
0b17514 verified
import cv2
import numpy as np
import time
from ultralytics import YOLO
import torch
from collections import defaultdict
import pandas as pd
from typing import Dict, List, Tuple
# LABEL_MAP = {
# 0: "auto",
# 1: "bus",
# 2: "car",
# 3: "motorcycle",
# 4: "mini-bus",
# 5: "scooter",
# 6: "truck",
# }
LABEL_MAP = {0: "auto", 1: "bus", 2: "car", 3: "electric-rickshaw", 4: "large-sized-truck",5:'medium-sized-truck',6:'motorbike',7:'small-sized-truck'}
def draw_text_with_background(
image,
text,
position,
font=cv2.FONT_HERSHEY_SIMPLEX,
font_scale=1,
font_thickness=2,
text_color=(255, 255, 255),
bg_color=(0, 0, 0),
padding=5,
):
"""Draw `text` on `image` with a filled rectangle behind it."""
(text_width, text_height), baseline = cv2.getTextSize(
text, font, font_scale, font_thickness
)
x, y = position
rect_y1 = y - text_height - padding - baseline // 2
rect_y2 = y + padding - baseline // 2
cv2.rectangle(
image,
(x, rect_y1),
(x + text_width + 2 * padding, rect_y2),
bg_color,
-1,
)
cv2.putText(
image,
text,
(x + padding, y - baseline // 2),
font,
font_scale,
text_color,
font_thickness,
cv2.LINE_AA,
)
def get_color_for_class(cls_id: int):
"""Deterministic bright color for each class index."""
np.random.seed(cls_id + 37)
return tuple(np.random.randint(100, 256, size=3).tolist())
def _inside(pt: Tuple[int, int], poly: np.ndarray) -> bool:
"""Point-in-polygon test using OpenCV (non‑zero if inside)."""
return cv2.pointPolygonTest(poly, pt, False) >= 0
class YOLOVideoDetector:
"""
Detect objects on a video and count them **per region**.
* `regions`: Dict[int, List[Tuple[int,int]]], mapping region id (0,1, …) to
4+ vertices in *pixel* coordinates (clockwise or anticlockwise).
* For each frame, counts are stored in a DataFrame column named
`<label>_<region>` (e.g. `car_0`, `bus_1`).
"""
def __init__(
self,
model_path: str,
video_path: str,
output_path: str,
regions: Dict[int, List[Tuple[int, int]]],
classes=None,
conf: float = 0.35,
scale_factor: float = 1.5,
):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {self.device}")
self.model = YOLO(model_path)
self.video_path = video_path
self.output_path = output_path
self.conf = conf
self.classes = classes
self.scale = scale_factor
# ──────── NEW ────────
self.regions = {
rid: np.array(pts, np.int32) for rid, pts in regions.items() if pts
}
if not self.regions:
raise ValueError("`regions` cannot be empty β€” provide at least one polygon.")
# Prepare DataFrame columns once
self.df_columns = [
"Frame Number",
*[
f"{LABEL_MAP[c]}_{rid}"
for rid in self.regions
for c in LABEL_MAP.keys()
],
]
# ────────────────────────────────────────────────────────────────
def process_video(self) -> pd.DataFrame:
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened():
raise ValueError(f"Cannot open video: {self.video_path}")
ok, first_frame_original = cap.read()
if not ok:
cap.release()
raise ValueError(f"Cannot read first frame from: {self.video_path}")
h_orig, w_orig = first_frame_original.shape[:2]
prediction_counter_df = pd.DataFrame(columns=self.df_columns)
first_frame_processed = first_frame_original
frame_was_rotated = False
if w_orig < h_orig:
print(
f"Original frame (h,w): ({h_orig}, {w_orig}). Portrait β†’ rotating 90Β° CW."
)
first_frame_processed = cv2.rotate(
first_frame_original, cv2.ROTATE_90_CLOCKWISE
)
frame_was_rotated = True
else:
print(f"Original frame (h,w): ({h_orig}, {w_orig}). Processing as landscape.")
# ----------------------------------------------------------------
base_h, base_w = first_frame_processed.shape[:2]
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
out_w, out_h = int(base_w * self.scale), int(base_h * self.scale)
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(self.output_path, fourcc, fps, (out_w, out_h))
prev_t = time.time()
frame_count = 1
frame_up = cv2.resize(
first_frame_processed, (out_w, out_h), interpolation=cv2.INTER_LINEAR
)
prev_t = self._process_and_write_frame(
frame_up, out, prev_t, prediction_counter_df, frame_count
)
while True:
ok, frame_original_loop = cap.read()
if not ok:
break
if frame_count % (fps // 2 or 1) == 0: # frame skipping @ β‰ˆ2 fps
frame_processed_loop = (
cv2.rotate(frame_original_loop, cv2.ROTATE_90_CLOCKWISE)
if frame_was_rotated
else frame_original_loop
)
frame_up = cv2.resize(
frame_processed_loop, (out_w, out_h), interpolation=cv2.INTER_LINEAR
)
prev_t = self._process_and_write_frame(
frame_up, out, prev_t, prediction_counter_df, frame_count
)
frame_count += 1
cap.release()
out.release()
cv2.destroyAllWindows()
print(f"Processed {frame_count} frames. Finished β†’ {self.output_path}")
return prediction_counter_df.fillna(0)
# ────────────────────────────────────────────────────────────────
def _process_and_write_frame(
self,
frame_up: np.ndarray,
out_writer: cv2.VideoWriter,
prev_t: float,
prediction_counter_df: pd.DataFrame,
frame_count: int,
) -> float:
"""Run YOLO on one frame, count per region, annotate, write, return timestamp."""
# Draw polygons first (scaled!)
scale_x = frame_up.shape[1] / (frame_up.shape[1] / self.scale)
scale_y = frame_up.shape[0] / (frame_up.shape[0] / self.scale)
for rid, poly in self.regions.items():
poly_up = (poly * [self.scale, self.scale]).astype(np.int32)
cv2.polylines(frame_up, [poly_up], True, (255, 255, 0), 2)
draw_text_with_background(frame_up, f"R{rid}", tuple(poly_up[0]), font_scale=0.8)
results = self.model.predict(
frame_up,
conf=self.conf,
classes=self.classes,
verbose=False,
device=self.device,
)
# counts[region][cls_id] β†’ int
counts: Dict[int, Dict[int, int]] = {
rid: defaultdict(int) for rid in self.regions
}
if results and len(results[0].boxes):
xyxy = results[0].boxes.xyxy.cpu().numpy()
scores = results[0].boxes.conf.cpu().numpy()
cls_ids = results[0].boxes.cls.int().cpu().tolist()
for (x1, y1, x2, y2), score, cls_id in zip(xyxy, scores, cls_ids):
color = get_color_for_class(cls_id)
cv2.rectangle(
frame_up, (int(x1), int(y1)), (int(x2), int(y2)), color, 2
)
label = LABEL_MAP.get(cls_id, f"Class {cls_id}")
draw_text_with_background(
frame_up,
f"{label}: {score:.2f}",
(int(x1), int(y1) - 10),
font_scale=0.6,
font_thickness=1,
bg_color=color,
padding=3,
)
# Region assignment based on *centre* of the box
cx, cy = int((x1 + x2) / 2), int((y1 + y2) / 2)
for rid, poly in self.regions.items():
poly_up = (poly * [self.scale, self.scale]).astype(np.int32)
if _inside((cx, cy), poly_up):
counts[rid][cls_id] += 1
break # one region per detection
# ─── Overlay per‑region counts + update DataFrame ───
df_idx = len(prediction_counter_df)
prediction_counter_df.at[df_idx, "Frame Number"] = frame_count
y_off = 30
for rid, cls_dict in counts.items():
for cls_id, cnt in cls_dict.items():
label = LABEL_MAP.get(cls_id, f"Class {cls_id}")
col_name = f"{label}_{rid}"
prediction_counter_df.at[df_idx, col_name] = cnt
draw_text_with_background(
frame_up,
f"{label}_{rid}: {cnt}",
(10, y_off),
font_scale=0.7,
font_thickness=2,
padding=6,
)
y_off += 25
# FPS overlay
now = time.time()
fps_live = 1.0 / (now - prev_t) if (now - prev_t) > 0 else 0.0
draw_text_with_background(
frame_up,
f"FPS: {fps_live:.1f}",
(10, frame_up.shape[0] - 20),
bg_color=(0, 0, 0),
text_color=(0, 255, 0),
font_scale=0.8,
padding=4,
)
out_writer.write(frame_up)
return now