Spaces:
Sleeping
Sleeping
File size: 8,977 Bytes
f5aec45 59009ce f5aec45 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
import cv2
from django import conf
import numpy as np
from ultralytics import YOLO
from insightface.app import FaceAnalysis
import torchreid
import torch
# Configuration
DETECTION_THRESHOLD = 0.75 # Confidence threshold for person detection
# =============================================================================
# MODEL INITIALIZATION
# =============================================================================
# Load YOLOv8 model with ByteTrack tracker for person detection and tracking
# YOLOv8 handles object detection while ByteTrack provides consistent tracking IDs
model = YOLO(r'detection.pt') # Replace with your trained model path
# Initialize InsightFace for facial feature extraction
# Uses buffalo_l model which provides high-quality face embeddings
face_app = FaceAnalysis(name='buffalo_l', providers=['CUDAExecutionProvider'])
face_app.prepare(ctx_id=0) # Prepare for GPU inference
# Initialize TorchReID for full-body person re-identification
# OSNet is a lightweight but effective model for person ReID
reid_extractor = torchreid.utils.FeatureExtractor(
model_name='osnet_x0_25',
model_path='osnet_x0_25_market1501.pth', # Pre-trained on Market1501 dataset
device='cuda'
)
# =============================================================================
# GLOBAL VARIABLES FOR PERSON RE-IDENTIFICATION
# =============================================================================
# Storage for known person embeddings and their assigned global IDs
known_embeddings = [] # List of combined face+body embeddings
known_ids = [] # Corresponding global IDs for each embedding
next_global_id = 1 # Counter for assigning new global IDs
# Mapping from ByteTrack tracker IDs to global person IDs
# This helps maintain consistency when tracker IDs change
track_to_global = {}
# =============================================================================
# VIDEO INPUT/OUTPUT SETUP
# =============================================================================
# Initialize video capture and output writer
cap = cv2.VideoCapture("demo.mp4") # Input video file
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
# Create output video writer with same properties as input
out = cv2.VideoWriter("output.mp4", cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
# =============================================================================
# MAIN PROCESSING LOOP
# =============================================================================
while True:
ret, frame = cap.read()
if not ret:
break # End of video
# Run YOLOv8 detection with ByteTrack tracking
# persist=True maintains tracking across frames
results = model.track(frame, tracker="bytetrack.yaml", persist=True,
verbose=False, conf=DETECTION_THRESHOLD)
# Process each detection result
for result in results:
# Extract bounding boxes in (x1, y1, x2, y2) format
boxes = result.boxes.xyxy.cpu().numpy()
# Extract tracking IDs if available
if result.boxes.id is not None:
track_ids = result.boxes.id.int().cpu().tolist()
else:
# No tracking IDs available, assign None for each detection
track_ids = [None] * len(boxes)
# Process each detected person
for box, track_id in zip(boxes, track_ids):
x1, y1, x2, y2 = map(int, box)
# Crop the person from the frame
person_crop = frame[y1:y2, x1:x2]
# Initialize embedding variables
face_embedding = None
body_embedding = None
# =============================================================
# FACE EMBEDDING EXTRACTION
# =============================================================
# Extract face embedding using InsightFace
faces = face_app.get(person_crop)
if faces:
# Use the first detected face (most confident)
face_embedding = faces[0].embedding
# =============================================================
# BODY EMBEDDING EXTRACTION
# =============================================================
# Extract body embedding using TorchReID
try:
# TorchReID expects 128x256 RGB input
body_input = cv2.resize(person_crop, (128, 256))
body_input = cv2.cvtColor(body_input, cv2.COLOR_BGR2RGB)
# Extract features and convert to numpy
body_embedding = reid_extractor(body_input)[0].cpu().numpy()
except:
# Handle cases where crop is too small or invalid
pass
# =============================================================
# EMBEDDING COMBINATION AND PERSON MATCHING
# =============================================================
# Combine face and body embeddings for robust person representation
embedding = None
if face_embedding is not None and body_embedding is not None:
# Concatenate both embeddings for maximum distinctiveness
embedding = np.concatenate((face_embedding, body_embedding)).astype(np.float32)
elif face_embedding is not None:
# Use only face embedding if body embedding failed
embedding = face_embedding.astype(np.float32)
elif body_embedding is not None:
# Use only body embedding if face detection failed
embedding = body_embedding.astype(np.float32)
# Assign global ID based on embedding similarity
if embedding is not None:
match_found = False
# Search for similar embeddings among known people
if known_embeddings:
# Only compare embeddings of the same dimension
matching_embeddings = [
(emb, gid) for emb, gid in zip(known_embeddings, known_ids)
if emb.shape[0] == embedding.shape[0]
]
if matching_embeddings:
embs, gids = zip(*matching_embeddings)
embs = np.array(embs)
# Calculate cosine similarity with all known embeddings
sims = np.dot(embs, embedding) / (
np.linalg.norm(embs, axis=1) * np.linalg.norm(embedding) + 1e-6
)
# Find the best match
best_match = np.argmax(sims)
if sims[best_match] > 0.6: # Similarity threshold
global_id = gids[best_match]
match_found = True
# If no match found, assign new global ID
if not match_found:
global_id = next_global_id
next_global_id += 1
known_embeddings.append(embedding)
known_ids.append(global_id)
# Update tracker ID to global ID mapping
if track_id is not None:
track_to_global[track_id] = global_id
display_id = global_id
else:
# No usable embedding available, fallback to tracker ID
global_id = track_to_global.get(track_id, f"T{track_id}")
display_id = global_id
# =============================================================
# VISUALIZATION
# =============================================================
# Draw bounding box around detected person
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
# Display the global ID above the bounding box
cv2.putText(frame, f"ID {display_id}", (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
# =============================================================================
# OUTPUT AND DISPLAY
# =============================================================================
# Show the frame with tracking results
cv2.imshow("Tracking + ReID", frame)
# Break loop if 'q' key is pressed
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Write frame to output video
out.write(frame)
# =============================================================================
# CLEANUP
# =============================================================================
# Release video capture and writer resources
cap.release()
out.release()
cv2.destroyAllWindows() |