mlbench123's picture
Update GMM.py
3edb911 verified
import numpy as np
import cv2 as cv
import os
from numpy.linalg import norm, inv
from scipy.stats import multivariate_normal as mv_norm
import joblib # or import pickle
import os
import torch
from torch.distributions import MultivariateNormal
import torch.nn.functional as F
init_weight = [0.7, 0.11, 0.1, 0.09]
init_u = np.zeros(3)
# initial Covariance matrix
init_sigma = 225*np.eye(3)
init_alpha = 0.05
class GMM():
def __init__(self, data_dir, train_num, alpha=init_alpha):
self.data_dir = data_dir
self.train_num = train_num
self.alpha = alpha
self.img_shape = None
self.weight = None
self.mu = None
self.sigma = None
self.K = None
self.B = None
def check(self, pixel, mu, sigma):
'''
Check whether a pixel matches a Gaussian distribution.
Matching means the Mahalanobis distance is less than 2.5.
'''
# Convert to torch tensors on same device
if isinstance(mu, np.ndarray):
mu = torch.from_numpy(mu).float()
if isinstance(sigma, np.ndarray):
sigma = torch.from_numpy(sigma).float()
if isinstance(pixel, np.ndarray):
pixel = torch.from_numpy(pixel).float()
# Ensure all are on the same device
device = mu.device
pixel = pixel.to(device)
sigma = sigma.to(device)
# Compute Mahalanobis distance
delta = pixel - mu
sigma_inv = torch.linalg.inv(sigma)
d_squared = delta @ sigma_inv @ delta
d = torch.sqrt(d_squared + 1e-5)
return d.item() < 0.1
def train(self, K=4):
'''
train model with GPU acceleration
'''
self.K = K
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
file_list = []
for i in range(self.train_num):
file_name = os.path.join(self.data_dir, 'b%05d' % i + '.bmp')
file_list.append(file_name)
# Initialize with first image
img_init = cv.imread(file_list[0])
img_shape = img_shape = img_init.shape
self.img_shape = img_shape
height, width, channels = img_shape
# Initialize model parameters on GPU
self.weight = torch.full((height, width, K), 1.0/K,
dtype=torch.float32, device=device)
self.mu = torch.zeros(height, width, K, 3,
dtype=torch.float32, device=device)
self.sigma = torch.zeros(height, width, K, 3, 3,
dtype=torch.float32, device=device)
self.B = torch.ones((height, width),
dtype=torch.int32, device=device)
# Initialize mu with first image values
img_tensor = torch.from_numpy(img_init).float().to(device)
for k in range(K):
self.mu[:, :, k, :] = img_tensor
# Initialize sigma with identity matrix * 225
self.sigma[:] = torch.eye(3, device=device) * 225
# Training loop
for file in file_list:
print('training:{}'.format(file))
img = cv.imread(file)
img_tensor = torch.from_numpy(img).float().to(device) # (H,W,3)
# Check matches for all pixels
matches = torch.full((height, width), -1, dtype=torch.long, device=device)
for k in range(K):
# Calculate Mahalanobis distance for each distribution
delta = img_tensor.unsqueeze(2) - self.mu # (H,W,K,3)
sigma_inv = torch.linalg.inv(self.sigma) # (H,W,K,3,3)
# Compute (x-μ)T Σ^-1 (x-μ)
temp = torch.einsum('hwki,hwkij->hwkj', delta, sigma_inv)
mahalanobis = torch.sqrt(torch.einsum('hwki,hwki->hwk', temp, delta))
# Update matches where distance < 2.5 and not already matched
match_mask = (mahalanobis[:,:,k] < 2.5) & (matches == -1)
matches[match_mask] = k
# Process matched pixels
for k in range(K):
# Get mask for current distribution matches
mask = matches == k
if mask.any():
# Get matched pixels
matched_pixels = img_tensor[mask] # (N,3)
matched_mu = self.mu[:,:,k,:][mask] # (N,3)
matched_sigma = self.sigma[:,:,k,:,:][mask] # (N,3,3)
try:
# Create multivariate normal distribution
mvn = MultivariateNormal(matched_mu,
covariance_matrix=matched_sigma)
# Calculate rho
rho = self.alpha * torch.exp(mvn.log_prob(matched_pixels))
# Update weights
self.weight[:,:,k][mask] = (1 - self.alpha) * self.weight[:,:,k][mask] + self.alpha
# Update mu
delta = matched_pixels - matched_mu
self.mu[:,:,k,:][mask] += rho.unsqueeze(1) * delta
# Update sigma
delta_outer = torch.einsum('bi,bj->bij', delta, delta)
sigma_update = rho.unsqueeze(1).unsqueeze(2) * (delta_outer - matched_sigma)
self.sigma[:,:,k,:,:][mask] += sigma_update
except RuntimeError as e:
print(f"Error updating distribution {k}: {e}")
continue
# Process non-matched pixels
non_matched = matches == -1
if non_matched.any():
# Find least probable distribution for each non-matched pixel
weight_non_matched = self.weight[non_matched] # shape: (N, K)
min_weight_idx = torch.argmin(weight_non_matched, dim=1) # shape: (N,)
# Create flat indices of non-matched pixels
non_matched_indices = non_matched.nonzero(as_tuple=False) # shape: (N, 2)
for k in range(K):
# Find positions where min_weight_idx == k
k_mask = (min_weight_idx == k)
if k_mask.any():
selected_indices = non_matched_indices[k_mask] # shape: (M, 2)
y_idx = selected_indices[:, 0]
x_idx = selected_indices[:, 1]
# Update mu and sigma
self.mu[y_idx, x_idx, k, :] = img_tensor[y_idx, x_idx]
self.sigma[y_idx, x_idx, k, :, :] = torch.eye(3, device=device) * 225
# Convert to numpy for reordering and debug prints
weight_np = self.weight.cpu().numpy()
mu_np = self.mu.cpu().numpy()
sigma_np = self.sigma.cpu().numpy()
B_np = self.B.cpu().numpy()
print('img:{}'.format(img[100][100]))
print('weight:{}'.format(weight_np[100][100]))
# Update numpy arrays for reorder
self.weight = weight_np
self.mu = mu_np
self.sigma = sigma_np
self.B = B_np
self.reorder()
for i in range(self.K):
print('u:{}'.format(self.mu[100][100][i]))
# Move back to GPU for next iteration
self.weight = torch.from_numpy(self.weight).to(device)
self.mu = torch.from_numpy(self.mu).to(device)
self.sigma = torch.from_numpy(self.sigma).to(device)
self.B = torch.from_numpy(self.B).to(device)
def save_model(self, file_path):
"""
Save the trained model to a file
"""
# Only make directories if there is a directory in the path
dir_name = os.path.dirname(file_path)
if dir_name:
os.makedirs(dir_name, exist_ok=True)
joblib.dump({
'weight': self.weight,
'mu': self.mu,
'sigma': self.sigma,
'K': self.K,
'B': self.B,
'img_shape': self.img_shape,
'alpha': self.alpha,
'data_dir': self.data_dir,
'train_num': self.train_num
}, file_path)
print(f"Model saved to {file_path}")
@classmethod
def load_model(cls, file_path):
"""
Load a trained model from file
"""
data = joblib.load(file_path)
# Create new instance
gmm = cls(data['data_dir'], data['train_num'], data['alpha'])
# Restore all attributes
gmm.weight = data['weight']
gmm.mu = data['mu']
gmm.sigma = data['sigma']
gmm.K = data['K']
gmm.B = data['B']
gmm.img_shape = data['img_shape']
gmm.image_shape = data['img_shape']
print(f"Model loaded from {file_path}")
return gmm
# @classmethod
# def load_model(cls, file_path):
# """
# Load a trained model safely onto CPU, even if saved from GPU.
# """
# import pickle
# def cpu_load(path):
# with open(path, "rb") as f:
# unpickler = pickle._Unpickler(f)
# unpickler.persistent_load = lambda saved_id: torch.load(saved_id, map_location="cpu")
# return unpickler.load()
# # Force joblib to use pickle with CPU-mapped tensors
# data = cpu_load(file_path)
# # Create instance
# gmm = cls(data['data_dir'], data['train_num'], data['alpha'])
# Assign all attributes (already CPU tensors now)
gmm.weight = data['weight']
gmm.mu = data['mu']
gmm.sigma = data['sigma']
gmm.K = data['K']
gmm.B = data['B']
gmm.img_shape = data['img_shape']
gmm.image_shape = data['img_shape']
print(f"✅ GMM model loaded on CPU from {file_path}")
return gmm
def reorder(self, T=0.90):
'''
Reorder the estimated components based on the ratio pi / the norm of standard deviation.
The first B components are chosen as background components.
The default threshold is 0.90.
'''
epsilon = 1e-6 # to prevent divide-by-zero
for i in range(self.img_shape[0]):
for j in range(self.img_shape[1]):
k_weight = self.weight[i][j]
k_norm = []
for k in range(self.K):
cov = self.sigma[i][j][k]
try:
if np.all(np.linalg.eigvals(cov) >= 0):
stddev = np.sqrt(cov)
k_norm.append(norm(stddev))
else:
k_norm.append(epsilon)
except:
k_norm.append(epsilon)
k_norm = np.array(k_norm)
ratio = k_weight / (k_norm + epsilon)
descending_order = np.argsort(-ratio)
self.weight[i][j] = self.weight[i][j][descending_order]
self.mu[i][j] = self.mu[i][j][descending_order]
self.sigma[i][j] = self.sigma[i][j][descending_order]
cum_weight = 0
for index, order in enumerate(descending_order):
cum_weight += self.weight[i][j][index]
if cum_weight > T:
self.B[i][j] = index + 1
break
# def infer(self, img, heatmap=None, alpha=0.1):
# '''
# Perform inference with a persistent heatmap that intensifies with movement.
# '''
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# img_tensor = torch.from_numpy(img).float().to(device) # (H, W, 3)
# H, W, _ = img.shape
# # Initialize heatmap on the first frame
# if heatmap is None:
# heatmap = torch.zeros((H, W), dtype=torch.float32, device=device)
# # No need for an 'else' that converts from numpy,
# # as we will pass the tensor back in subsequent calls.
# # --- Your existing foreground detection logic remains the same ---
# detection_mask = torch.ones((H, W), dtype=torch.bool, device=device)
# for k in range(self.K):
# B_mask = (self.B >= (k + 1)).to(device)
# mu_k = self.mu[:, :, k, :].to(device)
# sigma_k = self.sigma[:, :, k, :, :].to(device)
# delta = (img_tensor - mu_k).unsqueeze(-1)
# sigma_inv = torch.linalg.inv(sigma_k)
# temp = torch.matmul(sigma_inv, delta)
# dist_sq = torch.matmul(delta.transpose(-2, -1), temp).squeeze(-1).squeeze(-1)
# dist = torch.sqrt(dist_sq + 1e-5)
# match_mask = (dist < 9.5) & B_mask
# detection_mask[match_mask] = False
# img_tensor[match_mask] = mu_k[match_mask] # Optional: for visualization
# foreground_mask = detection_mask & (img_tensor.abs().sum(dim=-1) > 0)
# heatmap[foreground_mask] = torch.clamp(heatmap[foreground_mask] + alpha, 0, 1)
# # Convert heatmap tensor to a numpy array for visualization
# heatmap_np = heatmap.cpu().numpy()
# # Apply the colormap (0 -> Blue, 1 -> Red)
# heatmap_viz = cv.applyColorMap((heatmap_np * 255).astype(np.uint8), cv.COLORMAP_JET)
# # Blend the heatmap with the original image
# result = cv.addWeighted(img, 0.7, heatmap_viz, 0.5, 0)
# # Return the blended image and the heatmap tensor for the next frame
# return result, heatmap
#--------------------------------------------------------------------------------------------
def infer(self, img, heatmap=None, decay_factor=0.95, alpha=0.1):
'''
Perform inference with improved heatmap reflecting persistence of foreground objects.
Default areas remain unchanged (no bluish tone), only heatmap areas are colored.
'''
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
img_tensor = torch.from_numpy(img).float().to(device) # (H, W, 3)
H, W, _ = img.shape
# Initialize or move heatmap to tensor on device
if heatmap is None:
heatmap = torch.zeros((H, W), dtype=torch.float32, device=device)
else:
heatmap = torch.from_numpy(heatmap).float().to(device)
# Detection mask initialized to 1 (foreground), 0 means background
detection_mask = torch.ones((H, W), dtype=torch.bool, device=device)
for k in range(self.K):
B_mask = (self.B >= (k + 1)).to(device)
mu_k = self.mu[:, :, k, :].to(device)
sigma_k = self.sigma[:, :, k, :, :].to(device)
delta = img_tensor - mu_k
delta = delta.unsqueeze(-1)
sigma_inv = torch.linalg.inv(sigma_k)
temp = torch.matmul(sigma_inv, delta)
dist_sq = torch.matmul(delta.transpose(-2, -1), temp).squeeze(-1).squeeze(-1)
dist = torch.sqrt(dist_sq + 1e-5)
match_mask = (dist < 9.5) & B_mask
# Mark matched pixels as background
detection_mask[match_mask] = False
img_tensor[match_mask] = mu_k[match_mask]
# Foreground mask (boolean tensor)
foreground_mask = detection_mask & (img_tensor.abs().sum(dim=-1) > 0)
# Update heatmap:
heatmap[foreground_mask] = torch.clamp(heatmap[foreground_mask] + alpha, 0, 1)
heatmap[~foreground_mask] *= decay_factor
# Convert heatmap to numpy for visualization
heatmap_np = heatmap.cpu().numpy()
# Create heatmap visualization
heatmap_viz = cv.applyColorMap((heatmap_np * 255).astype(np.uint8), cv.COLORMAP_JET)
# Create mask of significant heatmap areas (adjust threshold as needed)
significant_heat = (heatmap_np > 0.1)
# Initialize result with original image
result = img.copy()
# Only process if there are significant heat areas
if np.any(significant_heat):
# Ensure we have valid regions to blend
img_region = img[significant_heat]
heat_region = heatmap_viz[significant_heat]
# Only blend if we have valid regions
if img_region.size > 0 and heat_region.size > 0:
blended = cv.addWeighted(
img_region, 0.7,
heat_region, 0.3,
0
)
result[significant_heat] = blended
return result, heatmap_np
#_____________________________________________________________________________________Decay factor and working good
# def infer(self, img, heatmap=None, decay_factor=0.95, alpha=0.1):
# '''
# Perform inference with binary red mask (no intensity variation) and dilation.
# Returns:
# - result: Image with solid red overlay on detections (same dtype as input)
# - heatmap_np: Heatmap array
# '''
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# # Ensure input is numpy array and get original dtype
# original_dtype = img.dtype
# img = np.asarray(img).astype(np.float32)
# H, W, C = img.shape
# # Initialize tensors
# img_tensor = torch.from_numpy(img).float().to(device)
# # Initialize heatmap
# if heatmap is None:
# heatmap = torch.zeros((H, W), dtype=torch.float32, device=device)
# else:
# heatmap = torch.from_numpy(heatmap).float().to(device)
# # Detection processing (your original code)
# detection_mask = torch.ones((H, W), dtype=torch.bool, device=device)
# for k in range(self.K):
# B_mask = (self.B >= (k + 1)).to(device)
# mu_k = self.mu[:, :, k, :].to(device)
# sigma_k = self.sigma[:, :, k, :, :].to(device)
# delta = img_tensor - mu_k
# delta = delta.unsqueeze(-1)
# sigma_inv = torch.linalg.inv(sigma_k)
# temp = torch.matmul(sigma_inv, delta)
# dist_sq = torch.matmul(delta.transpose(-2, -1), temp).squeeze(-1).squeeze(-1)
# dist = torch.sqrt(dist_sq + 1e-5)
# match_mask = (dist < 9.5) & B_mask
# detection_mask[match_mask] = False
# img_tensor[match_mask] = mu_k[match_mask]
# # Update heatmap
# foreground_mask = detection_mask & (img_tensor.abs().sum(dim=-1) > 0)
# heatmap[foreground_mask] = torch.clamp(heatmap[foreground_mask] + alpha, 0, 1)
# heatmap[~foreground_mask] *= decay_factor
# heatmap_np = heatmap.cpu().numpy()
# # Create binary mask and dilate
# binary_mask = (heatmap_np > 0.1).astype(np.uint8)
# kernel = np.ones((5,5), np.uint8)
# dilated_mask = cv.dilate(binary_mask, kernel, iterations=1)
# # Create solid red overlay (BGR)
# red_overlay = np.zeros_like(img)
# red_overlay[..., 2] = 200 # Red channel
# # Apply overlay using where instead of boolean indexing
# result = np.where(
# dilated_mask[..., np.newaxis].astype(bool),
# cv.addWeighted(img, 0.7, red_overlay, 0.3, 0),
# img
# )
# # Convert back to original dtype
# if original_dtype != np.float32:
# result = np.clip(result, 0, 255).astype(original_dtype)
# return result, heatmap_np
#________________________________________________________________________________________________
# def infer(self, img, heatmap=None, alpha=0.1):
# '''
# Perform inference with binary red mask (no intensity variation) and dilation.
# Heatmap is fully recalculated every frame — no temporal decay or retention.
# Returns:
# - result: Image with solid red overlay on detections
# - heatmap_np: Binary heatmap array
# '''
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# # Ensure input is numpy array and get original dtype
# original_dtype = img.dtype
# img = np.asarray(img).astype(np.float32)
# H, W, C = img.shape
# # Initialize tensors
# img_tensor = torch.from_numpy(img).float().to(device)
# # Detection processing
# detection_mask = torch.ones((H, W), dtype=torch.bool, device=device)
# for k in range(self.K):
# B_mask = (self.B >= (k + 1)).to(device)
# mu_k = self.mu[:, :, k, :].to(device)
# sigma_k = self.sigma[:, :, k, :, :].to(device)
# delta = img_tensor - mu_k
# delta = delta.unsqueeze(-1)
# sigma_inv = torch.linalg.inv(sigma_k)
# temp = torch.matmul(sigma_inv, delta)
# dist_sq = torch.matmul(delta.transpose(-2, -1), temp).squeeze(-1).squeeze(-1)
# dist = torch.sqrt(dist_sq + 1e-5)
# match_mask = (dist < 9.5) & B_mask
# detection_mask[match_mask] = False
# img_tensor[match_mask] = mu_k[match_mask]
# # Generate a binary heatmap (no decay, no accumulation)
# foreground_mask = detection_mask & (img_tensor.abs().sum(dim=-1) > 0)
# heatmap = torch.zeros((H, W), dtype=torch.float32, device=device)
# heatmap[foreground_mask] = alpha
# heatmap_np = heatmap.cpu().numpy()
# # Create binary mask and dilate
# binary_mask = (heatmap_np > 0.05).astype(np.uint8)
# kernel = np.ones((5, 5), np.uint8)
# dilated_mask = cv.dilate(binary_mask, kernel, iterations=1)
# # Create solid red overlay (BGR)
# red_overlay = np.zeros_like(img)
# red_overlay[..., 2] = 200 # Red channel
# # Apply overlay
# result = np.where(
# dilated_mask[..., np.newaxis].astype(bool),
# cv.addWeighted(img, 0.7, red_overlay, 0.3, 0),
# img
# )
# # Convert back to original dtype
# if original_dtype != np.float32:
# result = np.clip(result, 0, 255).astype(original_dtype)
# return result, heatmap_np
# def infer(self, img, heatmap=None, alpha=0.1):
# '''
# Perform inference with binary red mask and GPU-based dilation.
# Heatmap is recalculated each frame (no temporal retention).
# Returns:
# - result: Image with red overlay where foreground is detected.
# - heatmap_np: Numpy array of binary heatmap.
# '''
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# # Convert image to float32 and move to GPU
# original_dtype = img.dtype
# img = np.asarray(img).astype(np.float32)
# H, W, C = img.shape
# img_tensor = torch.from_numpy(img).float().to(device)
# # Initialize detection mask as all True (foreground by default)
# detection_mask = torch.ones((H, W), dtype=torch.bool, device=device)
# for k in range(self.K):
# B_mask = (self.B >= (k + 1)).to(device)
# mu_k = self.mu[:, :, k, :].to(device)
# sigma_k = self.sigma[:, :, k, :, :].to(device)
# delta = img_tensor - mu_k
# delta = delta.unsqueeze(-1) # shape: (H, W, 3, 1)
# sigma_inv = torch.linalg.inv(sigma_k)
# temp = torch.matmul(sigma_inv, delta)
# dist_sq = torch.matmul(delta.transpose(-2, -1), temp).squeeze(-1).squeeze(-1)
# dist = torch.sqrt(dist_sq + 1e-5)
# match_mask = (dist < 9.5) & B_mask
# detection_mask[match_mask] = False
# # img_tensor[match_mask] = mu_k[match_mask]
# # Generate heatmap
# foreground_mask = detection_mask & (img_tensor.abs().sum(dim=-1) > 0)
# heatmap_tensor = torch.zeros((H, W), dtype=torch.float32, device=device)
# heatmap_tensor[foreground_mask] = alpha
# # Convert heatmap to binary mask and apply dilation (GPU-based)
# binary_mask = (heatmap_tensor > 0.05).float().unsqueeze(0).unsqueeze(0) # shape: (1, 1, H, W)
# kernel = torch.ones((1, 1, 5, 5), dtype=torch.float32, device=device)
# dilated = F.conv2d(binary_mask, kernel, padding=2)
# dilated_mask = (dilated > 0).squeeze().to(torch.bool)
# # Create red overlay (on GPU)
# red_overlay = torch.zeros_like(img_tensor)
# red_overlay[..., 2] = 200 # Red channel
# # Blend red overlay on detected regions
# result_tensor = torch.where(
# dilated_mask.unsqueeze(-1),
# 0.7 * img_tensor + 0.3 * red_overlay,
# img_tensor
# )
# # Convert back to NumPy and original dtype
# result = result_tensor.clamp(0, 255).cpu().numpy()
# if original_dtype != np.float32:
# result = result.astype(original_dtype)
# heatmap_np = (heatmap_tensor > 0.05).float().cpu().numpy()
# return result, heatmap_np