File size: 8,977 Bytes
f5aec45
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59009ce
f5aec45
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import cv2
from django import conf
import numpy as np
from ultralytics import YOLO
from insightface.app import FaceAnalysis
import torchreid
import torch

# Configuration
DETECTION_THRESHOLD = 0.75  # Confidence threshold for person detection

# =============================================================================
# MODEL INITIALIZATION
# =============================================================================

# Load YOLOv8 model with ByteTrack tracker for person detection and tracking
# YOLOv8 handles object detection while ByteTrack provides consistent tracking IDs
model = YOLO(r'detection.pt')  # Replace with your trained model path

# Initialize InsightFace for facial feature extraction
# Uses buffalo_l model which provides high-quality face embeddings
face_app = FaceAnalysis(name='buffalo_l', providers=['CUDAExecutionProvider'])
face_app.prepare(ctx_id=0)  # Prepare for GPU inference

# Initialize TorchReID for full-body person re-identification
# OSNet is a lightweight but effective model for person ReID
reid_extractor = torchreid.utils.FeatureExtractor(
    model_name='osnet_x0_25',
    model_path='osnet_x0_25_market1501.pth',  # Pre-trained on Market1501 dataset
    device='cuda'
)

# =============================================================================
# GLOBAL VARIABLES FOR PERSON RE-IDENTIFICATION
# =============================================================================

# Storage for known person embeddings and their assigned global IDs
known_embeddings = []  # List of combined face+body embeddings
known_ids = []         # Corresponding global IDs for each embedding
next_global_id = 1     # Counter for assigning new global IDs

# Mapping from ByteTrack tracker IDs to global person IDs
# This helps maintain consistency when tracker IDs change
track_to_global = {}

# =============================================================================
# VIDEO INPUT/OUTPUT SETUP
# =============================================================================

# Initialize video capture and output writer
cap = cv2.VideoCapture("demo.mp4")  # Input video file
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)

# Create output video writer with same properties as input
out = cv2.VideoWriter("output.mp4", cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))

# =============================================================================
# MAIN PROCESSING LOOP
# =============================================================================

while True:
    ret, frame = cap.read()
    if not ret:
        break  # End of video

    # Run YOLOv8 detection with ByteTrack tracking
    # persist=True maintains tracking across frames
    results = model.track(frame, tracker="bytetrack.yaml", persist=True, 
                         verbose=False, conf=DETECTION_THRESHOLD)

    # Process each detection result
    for result in results:
        # Extract bounding boxes in (x1, y1, x2, y2) format
        boxes = result.boxes.xyxy.cpu().numpy()
        
        # Extract tracking IDs if available
        if result.boxes.id is not None:
            track_ids = result.boxes.id.int().cpu().tolist()
        else:
            # No tracking IDs available, assign None for each detection
            track_ids = [None] * len(boxes)

        # Process each detected person
        for box, track_id in zip(boxes, track_ids):
            x1, y1, x2, y2 = map(int, box)
            
            # Crop the person from the frame
            person_crop = frame[y1:y2, x1:x2]

            # Initialize embedding variables
            face_embedding = None
            body_embedding = None

            # =============================================================
            # FACE EMBEDDING EXTRACTION
            # =============================================================
            
            # Extract face embedding using InsightFace
            faces = face_app.get(person_crop)
            if faces:
                # Use the first detected face (most confident)
                face_embedding = faces[0].embedding

            # =============================================================
            # BODY EMBEDDING EXTRACTION
            # =============================================================
            
            # Extract body embedding using TorchReID
            try:
                # TorchReID expects 128x256 RGB input
                body_input = cv2.resize(person_crop, (128, 256))
                body_input = cv2.cvtColor(body_input, cv2.COLOR_BGR2RGB)
                
                # Extract features and convert to numpy
                body_embedding = reid_extractor(body_input)[0].cpu().numpy()
            except:
                # Handle cases where crop is too small or invalid
                pass

            # =============================================================
            # EMBEDDING COMBINATION AND PERSON MATCHING
            # =============================================================
            
            # Combine face and body embeddings for robust person representation
            embedding = None
            if face_embedding is not None and body_embedding is not None:
                # Concatenate both embeddings for maximum distinctiveness
                embedding = np.concatenate((face_embedding, body_embedding)).astype(np.float32)
            elif face_embedding is not None:
                # Use only face embedding if body embedding failed
                embedding = face_embedding.astype(np.float32)
            elif body_embedding is not None:
                # Use only body embedding if face detection failed
                embedding = body_embedding.astype(np.float32)

            # Assign global ID based on embedding similarity
            if embedding is not None:
                match_found = False

                # Search for similar embeddings among known people
                if known_embeddings:
                    # Only compare embeddings of the same dimension
                    matching_embeddings = [
                        (emb, gid) for emb, gid in zip(known_embeddings, known_ids)
                        if emb.shape[0] == embedding.shape[0]
                    ]

                    if matching_embeddings:
                        embs, gids = zip(*matching_embeddings)
                        embs = np.array(embs)

                        # Calculate cosine similarity with all known embeddings
                        sims = np.dot(embs, embedding) / (
                            np.linalg.norm(embs, axis=1) * np.linalg.norm(embedding) + 1e-6
                        )
                        
                        # Find the best match
                        best_match = np.argmax(sims)
                        if sims[best_match] > 0.6:  # Similarity threshold
                            global_id = gids[best_match]
                            match_found = True

                # If no match found, assign new global ID
                if not match_found:
                    global_id = next_global_id
                    next_global_id += 1
                    known_embeddings.append(embedding)
                    known_ids.append(global_id)

                # Update tracker ID to global ID mapping
                if track_id is not None:
                    track_to_global[track_id] = global_id

                display_id = global_id

            else:
                # No usable embedding available, fallback to tracker ID
                global_id = track_to_global.get(track_id, f"T{track_id}")
                display_id = global_id

            # =============================================================
            # VISUALIZATION
            # =============================================================
            
            # Draw bounding box around detected person
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            
            # Display the global ID above the bounding box
            cv2.putText(frame, f"ID {display_id}", (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)

    # =============================================================================
    # OUTPUT AND DISPLAY
    # =============================================================================
    
    # Show the frame with tracking results
    cv2.imshow("Tracking + ReID", frame)
    
    # Break loop if 'q' key is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    
    # Write frame to output video
    out.write(frame)

# =============================================================================
# CLEANUP
# =============================================================================

# Release video capture and writer resources
cap.release()
out.release()
cv2.destroyAllWindows()