import numpy as np import cv2 as cv import os from numpy.linalg import norm, inv from scipy.stats import multivariate_normal as mv_norm import joblib # or import pickle import os import torch from torch.distributions import MultivariateNormal import torch.nn.functional as F init_weight = [0.7, 0.11, 0.1, 0.09] init_u = np.zeros(3) # initial Covariance matrix init_sigma = 225*np.eye(3) init_alpha = 0.05 class GMM(): def __init__(self, data_dir, train_num, alpha=init_alpha): self.data_dir = data_dir self.train_num = train_num self.alpha = alpha self.img_shape = None self.weight = None self.mu = None self.sigma = None self.K = None self.B = None def check(self, pixel, mu, sigma): ''' Check whether a pixel matches a Gaussian distribution. Matching means the Mahalanobis distance is less than 2.5. ''' # Convert to torch tensors on same device if isinstance(mu, np.ndarray): mu = torch.from_numpy(mu).float() if isinstance(sigma, np.ndarray): sigma = torch.from_numpy(sigma).float() if isinstance(pixel, np.ndarray): pixel = torch.from_numpy(pixel).float() # Ensure all are on the same device device = mu.device pixel = pixel.to(device) sigma = sigma.to(device) # Compute Mahalanobis distance delta = pixel - mu sigma_inv = torch.linalg.inv(sigma) d_squared = delta @ sigma_inv @ delta d = torch.sqrt(d_squared + 1e-5) return d.item() < 0.1 def train(self, K=4): ''' train model with GPU acceleration ''' self.K = K device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"Using device: {device}") file_list = [] for i in range(self.train_num): file_name = os.path.join(self.data_dir, 'b%05d' % i + '.bmp') file_list.append(file_name) # Initialize with first image img_init = cv.imread(file_list[0]) img_shape = img_shape = img_init.shape self.img_shape = img_shape height, width, channels = img_shape # Initialize model parameters on GPU self.weight = torch.full((height, width, K), 1.0/K, dtype=torch.float32, device=device) self.mu = torch.zeros(height, width, K, 3, dtype=torch.float32, device=device) self.sigma = torch.zeros(height, width, K, 3, 3, dtype=torch.float32, device=device) self.B = torch.ones((height, width), dtype=torch.int32, device=device) # Initialize mu with first image values img_tensor = torch.from_numpy(img_init).float().to(device) for k in range(K): self.mu[:, :, k, :] = img_tensor # Initialize sigma with identity matrix * 225 self.sigma[:] = torch.eye(3, device=device) * 225 # Training loop for file in file_list: print('training:{}'.format(file)) img = cv.imread(file) img_tensor = torch.from_numpy(img).float().to(device) # (H,W,3) # Check matches for all pixels matches = torch.full((height, width), -1, dtype=torch.long, device=device) for k in range(K): # Calculate Mahalanobis distance for each distribution delta = img_tensor.unsqueeze(2) - self.mu # (H,W,K,3) sigma_inv = torch.linalg.inv(self.sigma) # (H,W,K,3,3) # Compute (x-μ)T Σ^-1 (x-μ) temp = torch.einsum('hwki,hwkij->hwkj', delta, sigma_inv) mahalanobis = torch.sqrt(torch.einsum('hwki,hwki->hwk', temp, delta)) # Update matches where distance < 2.5 and not already matched match_mask = (mahalanobis[:,:,k] < 2.5) & (matches == -1) matches[match_mask] = k # Process matched pixels for k in range(K): # Get mask for current distribution matches mask = matches == k if mask.any(): # Get matched pixels matched_pixels = img_tensor[mask] # (N,3) matched_mu = self.mu[:,:,k,:][mask] # (N,3) matched_sigma = self.sigma[:,:,k,:,:][mask] # (N,3,3) try: # Create multivariate normal distribution mvn = MultivariateNormal(matched_mu, covariance_matrix=matched_sigma) # Calculate rho rho = self.alpha * torch.exp(mvn.log_prob(matched_pixels)) # Update weights self.weight[:,:,k][mask] = (1 - self.alpha) * self.weight[:,:,k][mask] + self.alpha # Update mu delta = matched_pixels - matched_mu self.mu[:,:,k,:][mask] += rho.unsqueeze(1) * delta # Update sigma delta_outer = torch.einsum('bi,bj->bij', delta, delta) sigma_update = rho.unsqueeze(1).unsqueeze(2) * (delta_outer - matched_sigma) self.sigma[:,:,k,:,:][mask] += sigma_update except RuntimeError as e: print(f"Error updating distribution {k}: {e}") continue # Process non-matched pixels non_matched = matches == -1 if non_matched.any(): # Find least probable distribution for each non-matched pixel weight_non_matched = self.weight[non_matched] # shape: (N, K) min_weight_idx = torch.argmin(weight_non_matched, dim=1) # shape: (N,) # Create flat indices of non-matched pixels non_matched_indices = non_matched.nonzero(as_tuple=False) # shape: (N, 2) for k in range(K): # Find positions where min_weight_idx == k k_mask = (min_weight_idx == k) if k_mask.any(): selected_indices = non_matched_indices[k_mask] # shape: (M, 2) y_idx = selected_indices[:, 0] x_idx = selected_indices[:, 1] # Update mu and sigma self.mu[y_idx, x_idx, k, :] = img_tensor[y_idx, x_idx] self.sigma[y_idx, x_idx, k, :, :] = torch.eye(3, device=device) * 225 # Convert to numpy for reordering and debug prints weight_np = self.weight.cpu().numpy() mu_np = self.mu.cpu().numpy() sigma_np = self.sigma.cpu().numpy() B_np = self.B.cpu().numpy() print('img:{}'.format(img[100][100])) print('weight:{}'.format(weight_np[100][100])) # Update numpy arrays for reorder self.weight = weight_np self.mu = mu_np self.sigma = sigma_np self.B = B_np self.reorder() for i in range(self.K): print('u:{}'.format(self.mu[100][100][i])) # Move back to GPU for next iteration self.weight = torch.from_numpy(self.weight).to(device) self.mu = torch.from_numpy(self.mu).to(device) self.sigma = torch.from_numpy(self.sigma).to(device) self.B = torch.from_numpy(self.B).to(device) def save_model(self, file_path): """ Save the trained model to a file """ # Only make directories if there is a directory in the path dir_name = os.path.dirname(file_path) if dir_name: os.makedirs(dir_name, exist_ok=True) joblib.dump({ 'weight': self.weight, 'mu': self.mu, 'sigma': self.sigma, 'K': self.K, 'B': self.B, 'img_shape': self.img_shape, 'alpha': self.alpha, 'data_dir': self.data_dir, 'train_num': self.train_num }, file_path) print(f"Model saved to {file_path}") @classmethod def load_model(cls, file_path): """ Load a trained model from file """ data = joblib.load(file_path) # Create new instance gmm = cls(data['data_dir'], data['train_num'], data['alpha']) # Restore all attributes gmm.weight = data['weight'] gmm.mu = data['mu'] gmm.sigma = data['sigma'] gmm.K = data['K'] gmm.B = data['B'] gmm.img_shape = data['img_shape'] gmm.image_shape = data['img_shape'] print(f"Model loaded from {file_path}") return gmm # @classmethod # def load_model(cls, file_path): # """ # Load a trained model safely onto CPU, even if saved from GPU. # """ # import pickle # def cpu_load(path): # with open(path, "rb") as f: # unpickler = pickle._Unpickler(f) # unpickler.persistent_load = lambda saved_id: torch.load(saved_id, map_location="cpu") # return unpickler.load() # # Force joblib to use pickle with CPU-mapped tensors # data = cpu_load(file_path) # # Create instance # gmm = cls(data['data_dir'], data['train_num'], data['alpha']) # Assign all attributes (already CPU tensors now) gmm.weight = data['weight'] gmm.mu = data['mu'] gmm.sigma = data['sigma'] gmm.K = data['K'] gmm.B = data['B'] gmm.img_shape = data['img_shape'] gmm.image_shape = data['img_shape'] print(f"✅ GMM model loaded on CPU from {file_path}") return gmm def reorder(self, T=0.90): ''' Reorder the estimated components based on the ratio pi / the norm of standard deviation. The first B components are chosen as background components. The default threshold is 0.90. ''' epsilon = 1e-6 # to prevent divide-by-zero for i in range(self.img_shape[0]): for j in range(self.img_shape[1]): k_weight = self.weight[i][j] k_norm = [] for k in range(self.K): cov = self.sigma[i][j][k] try: if np.all(np.linalg.eigvals(cov) >= 0): stddev = np.sqrt(cov) k_norm.append(norm(stddev)) else: k_norm.append(epsilon) except: k_norm.append(epsilon) k_norm = np.array(k_norm) ratio = k_weight / (k_norm + epsilon) descending_order = np.argsort(-ratio) self.weight[i][j] = self.weight[i][j][descending_order] self.mu[i][j] = self.mu[i][j][descending_order] self.sigma[i][j] = self.sigma[i][j][descending_order] cum_weight = 0 for index, order in enumerate(descending_order): cum_weight += self.weight[i][j][index] if cum_weight > T: self.B[i][j] = index + 1 break # def infer(self, img, heatmap=None, alpha=0.1): # ''' # Perform inference with a persistent heatmap that intensifies with movement. # ''' # device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # img_tensor = torch.from_numpy(img).float().to(device) # (H, W, 3) # H, W, _ = img.shape # # Initialize heatmap on the first frame # if heatmap is None: # heatmap = torch.zeros((H, W), dtype=torch.float32, device=device) # # No need for an 'else' that converts from numpy, # # as we will pass the tensor back in subsequent calls. # # --- Your existing foreground detection logic remains the same --- # detection_mask = torch.ones((H, W), dtype=torch.bool, device=device) # for k in range(self.K): # B_mask = (self.B >= (k + 1)).to(device) # mu_k = self.mu[:, :, k, :].to(device) # sigma_k = self.sigma[:, :, k, :, :].to(device) # delta = (img_tensor - mu_k).unsqueeze(-1) # sigma_inv = torch.linalg.inv(sigma_k) # temp = torch.matmul(sigma_inv, delta) # dist_sq = torch.matmul(delta.transpose(-2, -1), temp).squeeze(-1).squeeze(-1) # dist = torch.sqrt(dist_sq + 1e-5) # match_mask = (dist < 9.5) & B_mask # detection_mask[match_mask] = False # img_tensor[match_mask] = mu_k[match_mask] # Optional: for visualization # foreground_mask = detection_mask & (img_tensor.abs().sum(dim=-1) > 0) # heatmap[foreground_mask] = torch.clamp(heatmap[foreground_mask] + alpha, 0, 1) # # Convert heatmap tensor to a numpy array for visualization # heatmap_np = heatmap.cpu().numpy() # # Apply the colormap (0 -> Blue, 1 -> Red) # heatmap_viz = cv.applyColorMap((heatmap_np * 255).astype(np.uint8), cv.COLORMAP_JET) # # Blend the heatmap with the original image # result = cv.addWeighted(img, 0.7, heatmap_viz, 0.5, 0) # # Return the blended image and the heatmap tensor for the next frame # return result, heatmap #-------------------------------------------------------------------------------------------- def infer(self, img, heatmap=None, decay_factor=0.95, alpha=0.1): ''' Perform inference with improved heatmap reflecting persistence of foreground objects. Default areas remain unchanged (no bluish tone), only heatmap areas are colored. ''' device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') img_tensor = torch.from_numpy(img).float().to(device) # (H, W, 3) H, W, _ = img.shape # Initialize or move heatmap to tensor on device if heatmap is None: heatmap = torch.zeros((H, W), dtype=torch.float32, device=device) else: heatmap = torch.from_numpy(heatmap).float().to(device) # Detection mask initialized to 1 (foreground), 0 means background detection_mask = torch.ones((H, W), dtype=torch.bool, device=device) for k in range(self.K): B_mask = (self.B >= (k + 1)).to(device) mu_k = self.mu[:, :, k, :].to(device) sigma_k = self.sigma[:, :, k, :, :].to(device) delta = img_tensor - mu_k delta = delta.unsqueeze(-1) sigma_inv = torch.linalg.inv(sigma_k) temp = torch.matmul(sigma_inv, delta) dist_sq = torch.matmul(delta.transpose(-2, -1), temp).squeeze(-1).squeeze(-1) dist = torch.sqrt(dist_sq + 1e-5) match_mask = (dist < 9.5) & B_mask # Mark matched pixels as background detection_mask[match_mask] = False img_tensor[match_mask] = mu_k[match_mask] # Foreground mask (boolean tensor) foreground_mask = detection_mask & (img_tensor.abs().sum(dim=-1) > 0) # Update heatmap: heatmap[foreground_mask] = torch.clamp(heatmap[foreground_mask] + alpha, 0, 1) heatmap[~foreground_mask] *= decay_factor # Convert heatmap to numpy for visualization heatmap_np = heatmap.cpu().numpy() # Create heatmap visualization heatmap_viz = cv.applyColorMap((heatmap_np * 255).astype(np.uint8), cv.COLORMAP_JET) # Create mask of significant heatmap areas (adjust threshold as needed) significant_heat = (heatmap_np > 0.1) # Initialize result with original image result = img.copy() # Only process if there are significant heat areas if np.any(significant_heat): # Ensure we have valid regions to blend img_region = img[significant_heat] heat_region = heatmap_viz[significant_heat] # Only blend if we have valid regions if img_region.size > 0 and heat_region.size > 0: blended = cv.addWeighted( img_region, 0.7, heat_region, 0.3, 0 ) result[significant_heat] = blended return result, heatmap_np #_____________________________________________________________________________________Decay factor and working good # def infer(self, img, heatmap=None, decay_factor=0.95, alpha=0.1): # ''' # Perform inference with binary red mask (no intensity variation) and dilation. # Returns: # - result: Image with solid red overlay on detections (same dtype as input) # - heatmap_np: Heatmap array # ''' # device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # # Ensure input is numpy array and get original dtype # original_dtype = img.dtype # img = np.asarray(img).astype(np.float32) # H, W, C = img.shape # # Initialize tensors # img_tensor = torch.from_numpy(img).float().to(device) # # Initialize heatmap # if heatmap is None: # heatmap = torch.zeros((H, W), dtype=torch.float32, device=device) # else: # heatmap = torch.from_numpy(heatmap).float().to(device) # # Detection processing (your original code) # detection_mask = torch.ones((H, W), dtype=torch.bool, device=device) # for k in range(self.K): # B_mask = (self.B >= (k + 1)).to(device) # mu_k = self.mu[:, :, k, :].to(device) # sigma_k = self.sigma[:, :, k, :, :].to(device) # delta = img_tensor - mu_k # delta = delta.unsqueeze(-1) # sigma_inv = torch.linalg.inv(sigma_k) # temp = torch.matmul(sigma_inv, delta) # dist_sq = torch.matmul(delta.transpose(-2, -1), temp).squeeze(-1).squeeze(-1) # dist = torch.sqrt(dist_sq + 1e-5) # match_mask = (dist < 9.5) & B_mask # detection_mask[match_mask] = False # img_tensor[match_mask] = mu_k[match_mask] # # Update heatmap # foreground_mask = detection_mask & (img_tensor.abs().sum(dim=-1) > 0) # heatmap[foreground_mask] = torch.clamp(heatmap[foreground_mask] + alpha, 0, 1) # heatmap[~foreground_mask] *= decay_factor # heatmap_np = heatmap.cpu().numpy() # # Create binary mask and dilate # binary_mask = (heatmap_np > 0.1).astype(np.uint8) # kernel = np.ones((5,5), np.uint8) # dilated_mask = cv.dilate(binary_mask, kernel, iterations=1) # # Create solid red overlay (BGR) # red_overlay = np.zeros_like(img) # red_overlay[..., 2] = 200 # Red channel # # Apply overlay using where instead of boolean indexing # result = np.where( # dilated_mask[..., np.newaxis].astype(bool), # cv.addWeighted(img, 0.7, red_overlay, 0.3, 0), # img # ) # # Convert back to original dtype # if original_dtype != np.float32: # result = np.clip(result, 0, 255).astype(original_dtype) # return result, heatmap_np #________________________________________________________________________________________________ # def infer(self, img, heatmap=None, alpha=0.1): # ''' # Perform inference with binary red mask (no intensity variation) and dilation. # Heatmap is fully recalculated every frame — no temporal decay or retention. # Returns: # - result: Image with solid red overlay on detections # - heatmap_np: Binary heatmap array # ''' # device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # # Ensure input is numpy array and get original dtype # original_dtype = img.dtype # img = np.asarray(img).astype(np.float32) # H, W, C = img.shape # # Initialize tensors # img_tensor = torch.from_numpy(img).float().to(device) # # Detection processing # detection_mask = torch.ones((H, W), dtype=torch.bool, device=device) # for k in range(self.K): # B_mask = (self.B >= (k + 1)).to(device) # mu_k = self.mu[:, :, k, :].to(device) # sigma_k = self.sigma[:, :, k, :, :].to(device) # delta = img_tensor - mu_k # delta = delta.unsqueeze(-1) # sigma_inv = torch.linalg.inv(sigma_k) # temp = torch.matmul(sigma_inv, delta) # dist_sq = torch.matmul(delta.transpose(-2, -1), temp).squeeze(-1).squeeze(-1) # dist = torch.sqrt(dist_sq + 1e-5) # match_mask = (dist < 9.5) & B_mask # detection_mask[match_mask] = False # img_tensor[match_mask] = mu_k[match_mask] # # Generate a binary heatmap (no decay, no accumulation) # foreground_mask = detection_mask & (img_tensor.abs().sum(dim=-1) > 0) # heatmap = torch.zeros((H, W), dtype=torch.float32, device=device) # heatmap[foreground_mask] = alpha # heatmap_np = heatmap.cpu().numpy() # # Create binary mask and dilate # binary_mask = (heatmap_np > 0.05).astype(np.uint8) # kernel = np.ones((5, 5), np.uint8) # dilated_mask = cv.dilate(binary_mask, kernel, iterations=1) # # Create solid red overlay (BGR) # red_overlay = np.zeros_like(img) # red_overlay[..., 2] = 200 # Red channel # # Apply overlay # result = np.where( # dilated_mask[..., np.newaxis].astype(bool), # cv.addWeighted(img, 0.7, red_overlay, 0.3, 0), # img # ) # # Convert back to original dtype # if original_dtype != np.float32: # result = np.clip(result, 0, 255).astype(original_dtype) # return result, heatmap_np # def infer(self, img, heatmap=None, alpha=0.1): # ''' # Perform inference with binary red mask and GPU-based dilation. # Heatmap is recalculated each frame (no temporal retention). # Returns: # - result: Image with red overlay where foreground is detected. # - heatmap_np: Numpy array of binary heatmap. # ''' # device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # # Convert image to float32 and move to GPU # original_dtype = img.dtype # img = np.asarray(img).astype(np.float32) # H, W, C = img.shape # img_tensor = torch.from_numpy(img).float().to(device) # # Initialize detection mask as all True (foreground by default) # detection_mask = torch.ones((H, W), dtype=torch.bool, device=device) # for k in range(self.K): # B_mask = (self.B >= (k + 1)).to(device) # mu_k = self.mu[:, :, k, :].to(device) # sigma_k = self.sigma[:, :, k, :, :].to(device) # delta = img_tensor - mu_k # delta = delta.unsqueeze(-1) # shape: (H, W, 3, 1) # sigma_inv = torch.linalg.inv(sigma_k) # temp = torch.matmul(sigma_inv, delta) # dist_sq = torch.matmul(delta.transpose(-2, -1), temp).squeeze(-1).squeeze(-1) # dist = torch.sqrt(dist_sq + 1e-5) # match_mask = (dist < 9.5) & B_mask # detection_mask[match_mask] = False # # img_tensor[match_mask] = mu_k[match_mask] # # Generate heatmap # foreground_mask = detection_mask & (img_tensor.abs().sum(dim=-1) > 0) # heatmap_tensor = torch.zeros((H, W), dtype=torch.float32, device=device) # heatmap_tensor[foreground_mask] = alpha # # Convert heatmap to binary mask and apply dilation (GPU-based) # binary_mask = (heatmap_tensor > 0.05).float().unsqueeze(0).unsqueeze(0) # shape: (1, 1, H, W) # kernel = torch.ones((1, 1, 5, 5), dtype=torch.float32, device=device) # dilated = F.conv2d(binary_mask, kernel, padding=2) # dilated_mask = (dilated > 0).squeeze().to(torch.bool) # # Create red overlay (on GPU) # red_overlay = torch.zeros_like(img_tensor) # red_overlay[..., 2] = 200 # Red channel # # Blend red overlay on detected regions # result_tensor = torch.where( # dilated_mask.unsqueeze(-1), # 0.7 * img_tensor + 0.3 * red_overlay, # img_tensor # ) # # Convert back to NumPy and original dtype # result = result_tensor.clamp(0, 255).cpu().numpy() # if original_dtype != np.float32: # result = result.astype(original_dtype) # heatmap_np = (heatmap_tensor > 0.05).float().cpu().numpy() # return result, heatmap_np