Spaces:
Sleeping
Sleeping
File size: 15,841 Bytes
03ce4c5 539f194 03ce4c5 1242be5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 |
import gradio as gr
import torch
import cv2
import numpy as np
import pandas as pd
from scipy.signal import find_peaks, savgol_filter
from collections import Counter
from tqdm import tqdm
import time
import os
import torch
import torch.nn as nn
import torch.fft as fft
import xgboost as xgb
from torch.utils.data import DataLoader, TensorDataset
import time
# Define the TCN model
class TCN(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=3, dropout=0.1):
super(TCN, self).__init__()
# List to hold convolutional layers
self.convs = nn.ModuleList()
dropout = dropout if num_layers > 1 else 0 # No dropout if only one layer
self.dropout = nn.Dropout(dropout)
# Create the convolutional layers
for i in range(num_layers):
in_channels = input_size if i == 0 else hidden_size # First layer uses input_size, others use hidden_size
out_channels = hidden_size # All layers have the same hidden size
self.convs.append(nn.Conv1d(in_channels, out_channels, kernel_size=2, padding=1))
# Fully connected output layer
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
x = x.permute(0, 2, 1) # Change to (batch_size, features, timesteps)
# Apply each convolutional layer followed by dropout
for conv in self.convs:
x = torch.relu(conv(x))
x = self.dropout(x) # Apply dropout after each convolution
x = torch.mean(x, dim=2) # Global average pooling
x = self.fc(x) # Output layer
return x
# Define the Temporal Fusion Transformer (Temporal Fusion Transformer) model
class TemporalFusionTransformer(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=3, dropout=0.1):
super(TemporalFusionTransformer, self).__init__()
# Encoder and Decoder LSTMs with multiple layers
self.encoder = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout)
self.decoder = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout)
self.attention = nn.MultiheadAttention(hidden_size, num_heads=4, batch_first=True) # Attention mechanism
self.fc = nn.Linear(hidden_size, output_size) # Fully connected output layer
self.dropout = nn.Dropout(dropout) # Dropout layer
def forward(self, x):
encoder_output, _ = self.encoder(x) # Encoder output
decoder_output, _ = self.decoder(encoder_output) # Decoder output
attention_output, _ = self.attention(decoder_output, encoder_output, encoder_output) # Attention output
attention_output = self.dropout(attention_output) # Apply dropout
output = self.fc(attention_output[:, -1, :]) # Take the last time step from the attention output
return output
# Build the ETSformer Class: Encoder, Trend, Seasonality, Exponential Smoothing, and Output Layer
class ETSformer(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=3, dropout=0.1):
super(ETSformer, self).__init__()
# Encoder: LSTM with multiple layers and dropout
self.encoder = nn.LSTM(
input_size,
hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0.0 # Dropout only applies if num_layers > 1
)
# Trend, Seasonality, Exponential Modules
self.trend_module = nn.Sequential(
nn.Linear(hidden_size, hidden_size),
nn.Dropout(dropout) # Dropout in the trend module
)
self.seasonality_module = nn.Sequential(
nn.Linear(hidden_size, hidden_size),
nn.Dropout(dropout) # Dropout in the seasonality module
)
self.exponential_module = nn.Sequential(
nn.Linear(hidden_size, hidden_size),
nn.Dropout(dropout) # Dropout in the exponential module
)
self.fc = nn.Linear(hidden_size, output_size) # Fully connected layer for output
def forward(self, x):
encoder_output, _ = self.encoder(x) # Encode the input sequence
trend = self.trend_module(encoder_output )# Trend Component
# Seasonality Component
freq = fft.fft(encoder_output, dim=1) # Frequency domain transformation
seasonality = fft.ifft(self.seasonality_module(torch.abs(freq)), dim=1).real
exponential = torch.sigmoid(self.exponential_module(encoder_output)) # Exponential Smoothing Component
combined = trend + seasonality + exponential # Combine the components
# Output layer: Use the last time step for predictions
output = self.fc(combined[:, -1, :])
return output
# Updated BiLSTM to handle variable layers
class BiLSTM(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=2, dropout=0.1):
super(BiLSTM, self).__init__()
self.bilstm = nn.LSTM(
input_size,
hidden_size,
num_layers=num_layers,
batch_first=True,
bidirectional=True,
dropout=dropout if num_layers > 1 else 0 # Dropout only applies for num_layers > 1
)
self.fc = nn.Linear(hidden_size * 2, output_size) # Multiply hidden_size by 2 for bidirectional
def forward(self, x):
bilstm_output, _ = self.bilstm(x)
output = self.fc(bilstm_output[:, -1, :]) # Use the last time step
return output
class RespFusion(nn.Module):
def __init__(self, tft_model, tcn_model, ets_model, bilstm_model, meta_learner_path=None, weights=None, strategy='stacking',):
super(RespFusion, self).__init__()
self.tft = tft_model
self.tcn = tcn_model
self.ets = ets_model
self.bilstm = bilstm_model
self.strategy = strategy # 'stacking' or other strategies
# Initialize XGBoost meta-learner
self.meta_learner = xgb.XGBRegressor() # Or XGBClassifier for classification
# Load the meta-learner if a path is provided
if meta_learner_path is not None:
self.meta_learner.load_model(meta_learner_path)
print(f"Meta-learner loaded from {meta_learner_path}")
# Storage for stacking training data
self.stacking_features = []
self.stacking_targets = []
# Set model weights for ensembling, default to equal weights for weighted_average strategy
if strategy == 'weighted_average':
if weights is None:
self.weights = [1.0, 1.0, 1.0, 1.0]
else:
assert len(weights) == 4, "Weights must match the number of models."
self.weights = weights
def forward(self, x):
# Get predictions from each base model
tft_output = self.tft(x).detach().cpu().numpy()
tcn_output = self.tcn(x).detach().cpu().numpy()
ets_output = self.ets(x).detach().cpu().numpy()
bilstm_output = self.bilstm(x).detach().cpu().numpy()
if self.strategy == 'stacking':
# Combine outputs into features for the meta-learner
features = np.column_stack((tft_output, tcn_output, ets_output, bilstm_output))
# During inference, use the meta-learner to make predictions
ensemble_output = self.meta_learner.predict(features)
return torch.tensor(ensemble_output).to(x.device).float()
elif self.strategy == 'voting':
# For soft voting, calculate the average
ensemble_output = torch.mean(torch.stack([tft_output, tcn_output, ets_output, bilstm_output], dim=0), dim=0)
return ensemble_output
elif self.strategy == 'weighted_average':
# Weighted average of outputs
ensemble_output = (
self.weights[0] * tft_output +
self.weights[1] * tcn_output +
self.weights[2] * ets_output +
self.weights[3] * bilstm_output
) / sum(self.weights)
return ensemble_output
elif self.strategy == 'simple_average':
# Simple average of outputs
ensemble_output = (tft_output + tcn_output + ets_output + bilstm_output) / 4
return ensemble_output
else:
raise ValueError(f"Invalid strategy: {self.strategy}. Currently supports only 'stacking', 'voting', 'weighted_average', and 'simple_average'.")
def collect_stacking_data(self, x, y):
"""Collect base model outputs and corresponding targets for meta-learner training."""
tft_output = self.tft(x).detach().cpu().numpy()
tcn_output = self.tcn(x).detach().cpu().numpy()
ets_output = self.ets(x).detach().cpu().numpy()
bilstm_output = self.bilstm(x).detach().cpu().numpy()
# Stack features and store
features = np.column_stack((tft_output, tcn_output, ets_output, bilstm_output))
self.stacking_features.append(features)
self.stacking_targets.append(y.detach().cpu().numpy())
def train_meta_learner(self, save_path=None):
"""Train the XGBoost meta-learner on collected data and save the model."""
# Concatenate all collected features and targets
X = np.vstack(self.stacking_features)
y = np.concatenate(self.stacking_targets)
# Train the XGBoost model
self.meta_learner.fit(X, y)
print("Meta-learner trained successfully!")
# Save the trained meta-learner
if save_path:
self.meta_learner.save_model(save_path)
print(f"Meta-learner saved to {save_path}")
def process_video(video_path):
# Parameters
roi_coordinates = None # Manual ROI selection
fps = 30 # Frames per second
feature_window = 10 # Window for feature aggregation (seconds)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return "Error opening video file"
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Select ROI
ret, first_frame = cap.read()
if not ret:
return "Failed to read first frame"
if roi_coordinates is None:
roi_coordinates = cv2.selectROI("Select ROI", first_frame, fromCenter=False, showCrosshair=True)
cv2.destroyWindow("Select ROI")
x, y, w, h = map(int, roi_coordinates)
prev_gray = cv2.cvtColor(first_frame[y:y+h, x:x+w], cv2.COLOR_BGR2GRAY)
motion_magnitude, direction_angles = [], []
frame_skip = 2
scale_factor = 0.5
roi = cv2.resize(first_frame[y:y+h, x:x+w], None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_AREA)
prev_gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_index = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
if frame_index % frame_skip != 0:
continue
roi = cv2.resize(frame[y:y+h, x:x+w], None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_AREA)
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 2, 5, 1.2, 0)
magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])
motion_magnitude.append(np.mean(magnitude))
direction_angles.extend(angle.flatten())
prev_gray = gray
cap.release()
window_length = min(len(motion_magnitude) - 1, 31)
if window_length % 2 == 0:
window_length += 1
smoothed_magnitude = savgol_filter(motion_magnitude, window_length=window_length, polyorder=3)
features = []
window_size = feature_window * (fps // frame_skip)
num_windows = len(smoothed_magnitude) // window_size
for i in range(num_windows):
start, end = i * window_size, (i + 1) * window_size
window_magnitude = smoothed_magnitude[start:end]
peaks, _ = find_peaks(window_magnitude)
troughs, _ = find_peaks(-window_magnitude)
features.append([
np.mean(window_magnitude), np.std(window_magnitude), np.max(window_magnitude), np.min(window_magnitude),
len(peaks), np.mean(window_magnitude[peaks]) if len(peaks) > 0 else 0,
np.mean(np.diff(peaks)) / (fps // frame_skip) if len(peaks) > 1 else 0,
(np.mean(window_magnitude[peaks]) - np.mean(window_magnitude[troughs])) if peaks.size > 0 and troughs.size > 0 else 0,
Counter((np.array(direction_angles[start:end]) / (np.pi / 4)).astype(int) % 8).most_common(1)[0][0] if len(direction_angles) > 0 else -1,
np.argmax(np.abs(np.fft.fft(window_magnitude))[1:len(window_magnitude)//2]) / window_size
])
features_df = pd.DataFrame(features, columns=[
"mean_magnitude", "std_magnitude", "max_magnitude", "min_magnitude", "peak_count", "avg_peak_height", "peak_to_peak_interval", "amplitude", "dominant_direction", "dominant_frequency"
])
print("Features Extracted. Matching Model Keys.")
X_inference = np.array([features_df.iloc[i-5:i, :].values for i in range(5, len(features_df))])
X_inference_tensor = torch.tensor(X_inference, dtype=torch.float32)
input_size, hidden_size, output_size = X_inference_tensor.shape[2], 64, 1
tft_model = TemporalFusionTransformer(input_size, hidden_size, output_size)
tcn_model = TCN(input_size, hidden_size, output_size)
ets_model = ETSformer(input_size, hidden_size, output_size)
bilstm_model = BiLSTM(input_size, hidden_size, output_size)
weights_path = './models/weights/'
# Load state dicts for each model
tft_model.load_state_dict(torch.load(weights_path + "tft_loss_optimized.pth", map_location=torch.device("cpu")))
print("TFT Model Loaded. All keys matched successfully.")
tcn_model.load_state_dict(torch.load(weights_path + "tcn_loss_optimized.pth", map_location=torch.device("cpu")))
print("TCN Model Loaded. All keys matched successfully.")
ets_model.load_state_dict(torch.load(weights_path + "etsformer_loss_optimized.pth", map_location=torch.device("cpu")))
print("ETSformer Model Loaded. All keys matched successfully.")
bilstm_model.load_state_dict(torch.load(weights_path + "bilstm_loss_optimized.pth", map_location=torch.device("cpu")))
print("BiLSTM Model Loaded. All keys matched successfully.")
tft_model.eval()
tcn_model.eval()
ets_model.eval()
bilstm_model.eval()
model = RespFusion(
tft_model, tcn_model, ets_model, bilstm_model,
strategy='stacking',
meta_learner_path=weights_path + "respfusion_2_xgboost_meta_learner.json"
)
print("Respfusion Model Loaded. Detecting Apnea.")
with torch.no_grad():
predictions = model(X_inference_tensor).squeeze().tolist()
def categorize_breathing_rate(pred):
if pred < 0.5:
return "Apnea"
elif pred > 20:
return "Tachypnea"
elif pred < 10:
return "Bradypnea"
else:
return "Normal"
categories = [categorize_breathing_rate(pred) for pred in predictions]
overall_category = categorize_breathing_rate(sum(predictions) / len(predictions))
return {"Breathing State per 10s": categories, "Average Breathing Rate": sum(predictions)/len(predictions), "Overall Breathing State": overall_category,}
app = gr.Interface(
fn=process_video,
inputs=gr.Video(label="Upload Video for Analysis"),
outputs=gr.JSON(),
title="Apnea Detection System",
description="Upload a video to analyze breathing rate and detect conditions such as Apnea, Tachypnea, and Bradypnea."
)
app.launch() |