Apnea-Detector / app.py
AbdullahNasir's picture
removed ssr
1242be5
raw
history blame
15.8 kB
import gradio as gr
import torch
import cv2
import numpy as np
import pandas as pd
from scipy.signal import find_peaks, savgol_filter
from collections import Counter
from tqdm import tqdm
import time
import os
import torch
import torch.nn as nn
import torch.fft as fft
import xgboost as xgb
from torch.utils.data import DataLoader, TensorDataset
import time
# Define the TCN model
class TCN(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=3, dropout=0.1):
super(TCN, self).__init__()
# List to hold convolutional layers
self.convs = nn.ModuleList()
dropout = dropout if num_layers > 1 else 0 # No dropout if only one layer
self.dropout = nn.Dropout(dropout)
# Create the convolutional layers
for i in range(num_layers):
in_channels = input_size if i == 0 else hidden_size # First layer uses input_size, others use hidden_size
out_channels = hidden_size # All layers have the same hidden size
self.convs.append(nn.Conv1d(in_channels, out_channels, kernel_size=2, padding=1))
# Fully connected output layer
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
x = x.permute(0, 2, 1) # Change to (batch_size, features, timesteps)
# Apply each convolutional layer followed by dropout
for conv in self.convs:
x = torch.relu(conv(x))
x = self.dropout(x) # Apply dropout after each convolution
x = torch.mean(x, dim=2) # Global average pooling
x = self.fc(x) # Output layer
return x
# Define the Temporal Fusion Transformer (Temporal Fusion Transformer) model
class TemporalFusionTransformer(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=3, dropout=0.1):
super(TemporalFusionTransformer, self).__init__()
# Encoder and Decoder LSTMs with multiple layers
self.encoder = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout)
self.decoder = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout)
self.attention = nn.MultiheadAttention(hidden_size, num_heads=4, batch_first=True) # Attention mechanism
self.fc = nn.Linear(hidden_size, output_size) # Fully connected output layer
self.dropout = nn.Dropout(dropout) # Dropout layer
def forward(self, x):
encoder_output, _ = self.encoder(x) # Encoder output
decoder_output, _ = self.decoder(encoder_output) # Decoder output
attention_output, _ = self.attention(decoder_output, encoder_output, encoder_output) # Attention output
attention_output = self.dropout(attention_output) # Apply dropout
output = self.fc(attention_output[:, -1, :]) # Take the last time step from the attention output
return output
# Build the ETSformer Class: Encoder, Trend, Seasonality, Exponential Smoothing, and Output Layer
class ETSformer(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=3, dropout=0.1):
super(ETSformer, self).__init__()
# Encoder: LSTM with multiple layers and dropout
self.encoder = nn.LSTM(
input_size,
hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0.0 # Dropout only applies if num_layers > 1
)
# Trend, Seasonality, Exponential Modules
self.trend_module = nn.Sequential(
nn.Linear(hidden_size, hidden_size),
nn.Dropout(dropout) # Dropout in the trend module
)
self.seasonality_module = nn.Sequential(
nn.Linear(hidden_size, hidden_size),
nn.Dropout(dropout) # Dropout in the seasonality module
)
self.exponential_module = nn.Sequential(
nn.Linear(hidden_size, hidden_size),
nn.Dropout(dropout) # Dropout in the exponential module
)
self.fc = nn.Linear(hidden_size, output_size) # Fully connected layer for output
def forward(self, x):
encoder_output, _ = self.encoder(x) # Encode the input sequence
trend = self.trend_module(encoder_output )# Trend Component
# Seasonality Component
freq = fft.fft(encoder_output, dim=1) # Frequency domain transformation
seasonality = fft.ifft(self.seasonality_module(torch.abs(freq)), dim=1).real
exponential = torch.sigmoid(self.exponential_module(encoder_output)) # Exponential Smoothing Component
combined = trend + seasonality + exponential # Combine the components
# Output layer: Use the last time step for predictions
output = self.fc(combined[:, -1, :])
return output
# Updated BiLSTM to handle variable layers
class BiLSTM(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=2, dropout=0.1):
super(BiLSTM, self).__init__()
self.bilstm = nn.LSTM(
input_size,
hidden_size,
num_layers=num_layers,
batch_first=True,
bidirectional=True,
dropout=dropout if num_layers > 1 else 0 # Dropout only applies for num_layers > 1
)
self.fc = nn.Linear(hidden_size * 2, output_size) # Multiply hidden_size by 2 for bidirectional
def forward(self, x):
bilstm_output, _ = self.bilstm(x)
output = self.fc(bilstm_output[:, -1, :]) # Use the last time step
return output
class RespFusion(nn.Module):
def __init__(self, tft_model, tcn_model, ets_model, bilstm_model, meta_learner_path=None, weights=None, strategy='stacking',):
super(RespFusion, self).__init__()
self.tft = tft_model
self.tcn = tcn_model
self.ets = ets_model
self.bilstm = bilstm_model
self.strategy = strategy # 'stacking' or other strategies
# Initialize XGBoost meta-learner
self.meta_learner = xgb.XGBRegressor() # Or XGBClassifier for classification
# Load the meta-learner if a path is provided
if meta_learner_path is not None:
self.meta_learner.load_model(meta_learner_path)
print(f"Meta-learner loaded from {meta_learner_path}")
# Storage for stacking training data
self.stacking_features = []
self.stacking_targets = []
# Set model weights for ensembling, default to equal weights for weighted_average strategy
if strategy == 'weighted_average':
if weights is None:
self.weights = [1.0, 1.0, 1.0, 1.0]
else:
assert len(weights) == 4, "Weights must match the number of models."
self.weights = weights
def forward(self, x):
# Get predictions from each base model
tft_output = self.tft(x).detach().cpu().numpy()
tcn_output = self.tcn(x).detach().cpu().numpy()
ets_output = self.ets(x).detach().cpu().numpy()
bilstm_output = self.bilstm(x).detach().cpu().numpy()
if self.strategy == 'stacking':
# Combine outputs into features for the meta-learner
features = np.column_stack((tft_output, tcn_output, ets_output, bilstm_output))
# During inference, use the meta-learner to make predictions
ensemble_output = self.meta_learner.predict(features)
return torch.tensor(ensemble_output).to(x.device).float()
elif self.strategy == 'voting':
# For soft voting, calculate the average
ensemble_output = torch.mean(torch.stack([tft_output, tcn_output, ets_output, bilstm_output], dim=0), dim=0)
return ensemble_output
elif self.strategy == 'weighted_average':
# Weighted average of outputs
ensemble_output = (
self.weights[0] * tft_output +
self.weights[1] * tcn_output +
self.weights[2] * ets_output +
self.weights[3] * bilstm_output
) / sum(self.weights)
return ensemble_output
elif self.strategy == 'simple_average':
# Simple average of outputs
ensemble_output = (tft_output + tcn_output + ets_output + bilstm_output) / 4
return ensemble_output
else:
raise ValueError(f"Invalid strategy: {self.strategy}. Currently supports only 'stacking', 'voting', 'weighted_average', and 'simple_average'.")
def collect_stacking_data(self, x, y):
"""Collect base model outputs and corresponding targets for meta-learner training."""
tft_output = self.tft(x).detach().cpu().numpy()
tcn_output = self.tcn(x).detach().cpu().numpy()
ets_output = self.ets(x).detach().cpu().numpy()
bilstm_output = self.bilstm(x).detach().cpu().numpy()
# Stack features and store
features = np.column_stack((tft_output, tcn_output, ets_output, bilstm_output))
self.stacking_features.append(features)
self.stacking_targets.append(y.detach().cpu().numpy())
def train_meta_learner(self, save_path=None):
"""Train the XGBoost meta-learner on collected data and save the model."""
# Concatenate all collected features and targets
X = np.vstack(self.stacking_features)
y = np.concatenate(self.stacking_targets)
# Train the XGBoost model
self.meta_learner.fit(X, y)
print("Meta-learner trained successfully!")
# Save the trained meta-learner
if save_path:
self.meta_learner.save_model(save_path)
print(f"Meta-learner saved to {save_path}")
def process_video(video_path):
# Parameters
roi_coordinates = None # Manual ROI selection
fps = 30 # Frames per second
feature_window = 10 # Window for feature aggregation (seconds)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return "Error opening video file"
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Select ROI
ret, first_frame = cap.read()
if not ret:
return "Failed to read first frame"
if roi_coordinates is None:
roi_coordinates = cv2.selectROI("Select ROI", first_frame, fromCenter=False, showCrosshair=True)
cv2.destroyWindow("Select ROI")
x, y, w, h = map(int, roi_coordinates)
prev_gray = cv2.cvtColor(first_frame[y:y+h, x:x+w], cv2.COLOR_BGR2GRAY)
motion_magnitude, direction_angles = [], []
frame_skip = 2
scale_factor = 0.5
roi = cv2.resize(first_frame[y:y+h, x:x+w], None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_AREA)
prev_gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_index = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
if frame_index % frame_skip != 0:
continue
roi = cv2.resize(frame[y:y+h, x:x+w], None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_AREA)
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 2, 5, 1.2, 0)
magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])
motion_magnitude.append(np.mean(magnitude))
direction_angles.extend(angle.flatten())
prev_gray = gray
cap.release()
window_length = min(len(motion_magnitude) - 1, 31)
if window_length % 2 == 0:
window_length += 1
smoothed_magnitude = savgol_filter(motion_magnitude, window_length=window_length, polyorder=3)
features = []
window_size = feature_window * (fps // frame_skip)
num_windows = len(smoothed_magnitude) // window_size
for i in range(num_windows):
start, end = i * window_size, (i + 1) * window_size
window_magnitude = smoothed_magnitude[start:end]
peaks, _ = find_peaks(window_magnitude)
troughs, _ = find_peaks(-window_magnitude)
features.append([
np.mean(window_magnitude), np.std(window_magnitude), np.max(window_magnitude), np.min(window_magnitude),
len(peaks), np.mean(window_magnitude[peaks]) if len(peaks) > 0 else 0,
np.mean(np.diff(peaks)) / (fps // frame_skip) if len(peaks) > 1 else 0,
(np.mean(window_magnitude[peaks]) - np.mean(window_magnitude[troughs])) if peaks.size > 0 and troughs.size > 0 else 0,
Counter((np.array(direction_angles[start:end]) / (np.pi / 4)).astype(int) % 8).most_common(1)[0][0] if len(direction_angles) > 0 else -1,
np.argmax(np.abs(np.fft.fft(window_magnitude))[1:len(window_magnitude)//2]) / window_size
])
features_df = pd.DataFrame(features, columns=[
"mean_magnitude", "std_magnitude", "max_magnitude", "min_magnitude", "peak_count", "avg_peak_height", "peak_to_peak_interval", "amplitude", "dominant_direction", "dominant_frequency"
])
print("Features Extracted. Matching Model Keys.")
X_inference = np.array([features_df.iloc[i-5:i, :].values for i in range(5, len(features_df))])
X_inference_tensor = torch.tensor(X_inference, dtype=torch.float32)
input_size, hidden_size, output_size = X_inference_tensor.shape[2], 64, 1
tft_model = TemporalFusionTransformer(input_size, hidden_size, output_size)
tcn_model = TCN(input_size, hidden_size, output_size)
ets_model = ETSformer(input_size, hidden_size, output_size)
bilstm_model = BiLSTM(input_size, hidden_size, output_size)
weights_path = './models/weights/'
# Load state dicts for each model
tft_model.load_state_dict(torch.load(weights_path + "tft_loss_optimized.pth", map_location=torch.device("cpu")))
print("TFT Model Loaded. All keys matched successfully.")
tcn_model.load_state_dict(torch.load(weights_path + "tcn_loss_optimized.pth", map_location=torch.device("cpu")))
print("TCN Model Loaded. All keys matched successfully.")
ets_model.load_state_dict(torch.load(weights_path + "etsformer_loss_optimized.pth", map_location=torch.device("cpu")))
print("ETSformer Model Loaded. All keys matched successfully.")
bilstm_model.load_state_dict(torch.load(weights_path + "bilstm_loss_optimized.pth", map_location=torch.device("cpu")))
print("BiLSTM Model Loaded. All keys matched successfully.")
tft_model.eval()
tcn_model.eval()
ets_model.eval()
bilstm_model.eval()
model = RespFusion(
tft_model, tcn_model, ets_model, bilstm_model,
strategy='stacking',
meta_learner_path=weights_path + "respfusion_2_xgboost_meta_learner.json"
)
print("Respfusion Model Loaded. Detecting Apnea.")
with torch.no_grad():
predictions = model(X_inference_tensor).squeeze().tolist()
def categorize_breathing_rate(pred):
if pred < 0.5:
return "Apnea"
elif pred > 20:
return "Tachypnea"
elif pred < 10:
return "Bradypnea"
else:
return "Normal"
categories = [categorize_breathing_rate(pred) for pred in predictions]
overall_category = categorize_breathing_rate(sum(predictions) / len(predictions))
return {"Breathing State per 10s": categories, "Average Breathing Rate": sum(predictions)/len(predictions), "Overall Breathing State": overall_category,}
app = gr.Interface(
fn=process_video,
inputs=gr.Video(label="Upload Video for Analysis"),
outputs=gr.JSON(),
title="Apnea Detection System",
description="Upload a video to analyze breathing rate and detect conditions such as Apnea, Tachypnea, and Bradypnea."
)
app.launch()