import os import torch import shutil import pickle import zipfile import matplotlib import numpy as np import gradio as gr import pandas as pd import torch.nn as nn import torch.optim as optim import torch.nn.functional as F from matplotlib import pyplot as plt from sklearn.impute import SimpleImputer from pandas.plotting import andrews_curves from sklearn.metrics import accuracy_score, confusion_matrix, ConfusionMatrixDisplay, precision_score, recall_score, f1_score from sklearn.preprocessing import StandardScaler from torch.utils.data import DataLoader, TensorDataset, Dataset ###################################################### Preprocessing ##################################################################### def preprocess_dataframe(df, target_column=None, fill_method='mean', drop_na=True, sequence_length=32, test_size=0.2, batch_size = 128): """ Loads a DataFrame from a file, preprocesses it, prepares it for LSTM data. If a target_column is provided, that column is used as the target (y). Otherwise, it prepares data for an autoencoder (no separate y). 1. Loads file and checks for the target columns 2. Drops any NaN rows and non numeric columns. 3. Fills the NaN values with given method. 4. After preprocessing, data is transformed to fit in lstm. Args: file_path (str): Path to the data file (e.g., CSV, Excel). target_column (str, optional): Name of the target column. If provided, use this as target. Otherwise, treats as autoencoder. Defaults to None. fill_method (str, optional): Method for filling NaNs: 'mean', 'median', 'most_frequent', or 'constant'. Defaults to 'mean'. If 'constant', `fill_value` must be set. drop_na (bool, optional): Whether to drop rows with any NaN values. Defaults to True. sequence_length (int): The length of the sequence to create (e.g., number of features to treat as a sequence). test_size (float): The proportion of data to use for testing. Returns: tuple: (train_loader, test_loader, input_size) if no target_column. (train_loader, test_loader, input_size, target_column_name) if target_column provided A tuple containing: - train_loader (DataLoader): DataLoader for training data. - test_loader (DataLoader): DataLoader for test data. - input_size (int): Number of features. - target_column_name (str): The name of the target column only when there is target column. """ # 1. Target Column Check target_col = None if target_column: if target_column in df.columns: target_col = target_column print(f"Target column '{target_column}' found.") else: target_column = None # Reset target_column so we treat as autoencoder else: print("No target column specified. Treating as autoencoder.") #2. Drop Rows with NaNs before Fill if drop_na: print("Dropping rows with any NaN values...") df = df.dropna() # 3. Drop Non-Numeric Columns (Except Target) columns_to_drop = [] for col in df.columns: if col != target_col and not pd.api.types.is_numeric_dtype(df[col]): columns_to_drop.append(col) #exclude the target column if target column is not numeric if columns_to_drop: print(f"Dropping non-numeric columns: {columns_to_drop}") df = df.drop(columns=columns_to_drop) else: print("No non-numeric columns found.") # 4. Handle Missing Values (Only in Numeric Columns After Dropping) numeric_cols = df.select_dtypes(include=np.number).columns #select numeric columns after non-numeric columsn removed if df[numeric_cols].isnull().any().any(): # Check if any NaN values exist (in numeric columns) print("Handling missing values...") if fill_method in ['mean', 'median', 'most_frequent', 'constant']: imputer = SimpleImputer(strategy=fill_method) if fill_method == 'constant': imputer = SimpleImputer(strategy=fill_method, fill_value=0) #only with constant filling value must be provided df[numeric_cols] = imputer.fit_transform(df[numeric_cols]) # Apply only to numeric columns else: raise ValueError("Invalid fill_method. Choose 'mean', 'median', 'most_frequent', or 'constant'.") # Droping NaN and inf df.replace([np.inf, -np.inf], np.nan, inplace=True) df.dropna(inplace=True) if target_col: inputdf = df.drop(columns=[target_col]) outputdf = df[target_col].apply(lambda x: 0 if x.lower() == 'benign' else 1) malinputdf = inputdf[outputdf == 1] beninputdf = inputdf[outputdf == 0] sample_size = min(len(beninputdf), len(malinputdf), 500) bensample = beninputdf.sample(n=sample_size, random_state=42) bensample['Label'] = 'Benign' malsample = malinputdf.sample(n=sample_size, random_state=42) malsample['Label'] = 'Malicious' sample = pd.concat([bensample, malsample]) data = beninputdf.values else: inputdf = df sample_size = min(len(inputdf), 500) sample = df.sample(n=sample_size, random_state=42) data = inputdf.values scaler = StandardScaler() data = scaler.fit_transform(data) if target_col: X_train = data data = malinputdf.values data = scaler.transform(data) X_test = data else: X_train = data class TabularDatasetTest(Dataset): def __init__(self, data): self.data = data.clone().detach() def __len__(self): return len(self.data) def __getitem__(self, idx): return self.data[idx], self.data[idx] class TabularDatasetTrain(Dataset): def __init__(self, data, sequence_length): self.data = data.clone().detach() self.sequence_length = sequence_length def __len__(self): return len(self.data) - self.sequence_length + 1 def __getitem__(self, idx): return self.data[idx:idx + self.sequence_length], self.data[idx:idx + self.sequence_length] if target_column: X_train = torch.tensor(X_train, dtype=torch.float32) X_test = torch.tensor(X_test, dtype=torch.float32) train_dataset = TabularDatasetTrain(X_train, sequence_length = sequence_length) test_dataset = TabularDatasetTest(X_test) train_DataLoader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) test_Dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) return { 'train_loader': train_DataLoader, 'test_loader': test_Dataloader, 'input_df': inputdf, 'target_df': outputdf, 'malinput_df': malinputdf, 'beninput_df': beninputdf, 'target_col': target_col, 'scaler': scaler, 'sample': sample } else: X_train = torch.tensor(X_train, dtype=torch.float32) train_dataset = TabularDatasetTrain(X_train, sequence_length = sequence_length) train_DataLoader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) sample["Label"] = "dummy_class" return { 'train_loader': train_DataLoader, 'test_loader': None, 'input_df': inputdf, 'malinput_df': None, 'beninput_df': None, 'target_df': None, 'target_col': None, 'scaler': scaler, 'sample': sample } ################################################## Model ############################################################################# class EncoderRNN(nn.Module): def __init__(self, input_size, hidden_size, num_layers, isCuda): super(EncoderRNN, self).__init__() self.input_size = input_size self.hidden_size = hidden_size self.num_layers = num_layers self.bottleneck_size = int(input_size/2) self.isCuda = isCuda self.lstm1 = nn.LSTM(input_size, int(hidden_size/2), num_layers, batch_first=True, bidirectional = True) self.relu = nn.ReLU() self.dropout = nn.Dropout(0.2) self.lstm2 = nn.LSTM(hidden_size, self.bottleneck_size, num_layers, batch_first=True) def forward(self, inputs): intermediate_state, hidden = self.lstm1(inputs)#, (h0_1, c0_1)) intermediate_state = self.relu(self.dropout(intermediate_state)) encoded_input, hidden = self.lstm2(intermediate_state)#, (h0_2, c0_2)) return encoded_input, intermediate_state class DecoderRNN(nn.Module): def __init__(self, hidden_size, output_size, num_layers, isCuda): super(DecoderRNN, self).__init__() self.hidden_size = hidden_size self.output_size = output_size self.num_layers = num_layers self.bottleneck_size = int(output_size/2) self.isCuda = isCuda self.lstm2 = nn.LSTM(self.bottleneck_size, hidden_size, num_layers, batch_first=True) self.relu = nn.ReLU() self.dropout = nn.Dropout(0.2) self.lstm1 = nn.LSTM(2*hidden_size, output_size, num_layers, batch_first=True) def forward(self, encoded_input, intermediate_state): encoded_input, hidden = self.lstm2(encoded_input)#, (h0_2, c0_2)) inputs = torch.cat((self.dropout(encoded_input), intermediate_state), dim=2) inputs = self.relu(inputs) decoded_output, hidden = self.lstm1(inputs)#, (h0_1, c0_1)) # print(f"output: {decoded_output}") return decoded_output class LSTMAE(nn.Module): def __init__(self, input_size, hidden_size, num_layers=1, isCuda="cuda" if torch.cuda.is_available() else "cpu"): super(LSTMAE, self).__init__() hidden_size = hidden_size if hidden_size%2==0 else hidden_size+1 self.encoder = EncoderRNN(input_size, hidden_size, num_layers, isCuda) self.decoder = DecoderRNN(hidden_size, input_size, num_layers, isCuda) self.initialize_weights() def initialize_weights(self): """ Initializes the weights of the linear, LSTM, and convolutional layers using appropriate initialization schemes. """ for m in self.modules(): # Iterate through all modules in the network if isinstance(m, nn.LSTM): for name, param in m.named_parameters(): if 'weight' in name: if 'ih' or 'hh' in name: nn.init.xavier_uniform_(param.data) # Input-to-hidden elif 'bias' in name: nn.init.zeros_(param.data) def forward(self, input): encoded_input, intermediate_state = self.encoder(input) decoded_output = self.decoder(encoded_input, intermediate_state) return decoded_output ############################################## Andrews Curves ########################################################################### def make_better_andrews_curves(df, class_column, colors=None, plot_title="Andrews Curves", line_width=0.8, transparency=0.5, sample_size=None, legend_loc='best', custom_labels=None, x_axis_ticks=None, x_axis_labels=None, figsize=(10, 6), dpi=300, name = "andrews_curves"): """ Generates an Andrews Curves plot with enhanced styling. Args: df: pandas DataFrame containing the data. class_column: Name of the column containing class labels. colors: List of colors to use for each class (e.g., ['blue', 'red']). Defaults to matplotlib's defaults if None. plot_title: Title of the plot. line_width: Width of the lines. transparency: Alpha value (transparency) of the lines. sample_size: If an integer is provided, a random sample of the data will be used. Useful for large datasets. legend_loc: Location of the legend (e.g., 'best', 'upper right', 'lower left'). custom_labels: A dictionary mapping original class labels to more descriptive labels for the legend. x_axis_ticks: A list of tick positions for the x-axis. If None, default ticks are used. x_axis_labels: A list of labels for the x-axis ticks. Must be the same length as x_axis_ticks. figsize: Tuple specifying the figure size (width, height) in inches. """ if sample_size and sample_size < len(df): df = df.sample(n=sample_size, random_state=42) # Sample for faster plotting plt.figure(figsize=figsize) # Set the figure size before plotting ax = andrews_curves(df, class_column, color=colors) # Store the Axes object plt.title(plot_title, fontsize=16) plt.xlabel("t", fontsize=12) # Added x-axis label plt.ylabel("f(t)", fontsize=12) # Added y-axis label for line in ax.get_lines(): line.set_linewidth(line_width) line.set_alpha(transparency) # Customize Legend if custom_labels: handles, labels = ax.get_legend_handles_labels() new_labels = [custom_labels.get(label, label) for label in labels] # Use .get() to handle missing labels ax.legend(handles, new_labels, loc=legend_loc, fontsize=10) else: plt.legend(loc=legend_loc, fontsize=10) # Customize X-axis ticks and labels if x_axis_ticks: plt.xticks(x_axis_ticks, x_axis_labels) plt.grid(False) # Add a grid plt.tight_layout() # Adjust layout to prevent labels from overlapping plt.savefig(f"{name}.png", dpi=dpi) ################################################# Model Training ###################################################################### def train_model(model, train_loader, test_loader = None, learning_rate=0.001, epochs=10): criterion = nn.MSELoss() info = "" optimizer = optim.Adam(model.parameters(), lr=learning_rate) train_loss_data = {} for epoch in range(epochs): model.train() train_loss = 0.0 epoch_train_losses = [] mse_losses = [] for i,(inputs, targets) in enumerate(train_loader): inputs = inputs.to(device) targets = targets.to(device) outputs = model(inputs) # l1_lambda = 0.001 # l2_lambda = 0.0001 # l1_norm = sum(p.abs().sum() for p in model.parameters()) # L1 norm # l2_norm = sum(p.pow(2.0).sum() for p in model.parameters()) # L2 norm loss = criterion(outputs, targets)# + l2_lambda * l2_norm + l1_lambda * l1_norm optimizer.zero_grad() loss.backward() optimizer.step() if epoch == epochs-1: mse_loss = F.mse_loss(targets, outputs, reduction='none') mse_loss_per_data_point = mse_loss.mean(dim=-1) mse_losses.extend(mse_loss_per_data_point.tolist()) epoch_train_losses.append(loss.item()) train_loss += loss.item() train_loss /= len(train_loader) # Validation if test_loader and epoch%1==0: model.eval() test_loss = 0.0 with torch.no_grad(): for i,(inputs, targets) in enumerate(test_loader): inputs = inputs.to(device) targets = targets.to(device) outputs = model(inputs.unsqueeze(1)) loss = criterion(outputs.squeeze(1), targets) test_loss += loss.item() test_loss /= len(test_loader) else: test_loss = 0.0 info += f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}\n" print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}") train_loss_data[f'Epoch {epoch + 1}'] = epoch_train_losses train_loss_df = pd.DataFrame(dict([(k,pd.Series(v)) for k,v in train_loss_data.items()])) return model, train_loss_df, mse_losses, info ######################################################################################################################################### def detect_anomalies(csv_file, sample_choice="Custom Data", data_slicing_percentage=80, epochs=3, threshold_factor=1.0): images = [] anomaly_summary = "" device = "cuda" if torch.cuda.is_available() else "cpu" if os.path.exists("Results"): shutil.rmtree("Results") os.mkdir("Results") if sample_choice == "Custom Data": anomaly_summary += f"[INFO] Loading Custom Dataset {data_slicing_percentage}%...\n" dataframe = pd.read_csv(csv_file.name).sample(frac=data_slicing_percentage/100, random_state=42).reset_index(drop=True) anomaly_summary += f"[INFO] Preprocessing Dataset...\n" if dataframe.get('Label') is not None: processed_data = preprocess_dataframe(dataframe, target_column="Label") else: processed_data = preprocess_dataframe(dataframe) anomaly_summary += f"[WARNING] No Label Column Found, Using Unsupervised Learning...\n" anomaly_summary += f"[INFO] Generating Andrews Curves...\n" make_better_andrews_curves(processed_data['sample'], 'Label', colors=['Blue', 'Red'], plot_title="Dataset Andrews Curves", line_width=1.2, transparency=0.7, legend_loc='upper right', figsize=(12, 7), name = "Results/Dataset_andrews_curves") images.append("Results/Dataset_andrews_curves.png") model = LSTMAE(len(processed_data["input_df"].columns),128).to(device) model.to(device) anomaly_summary += f"[INFO] Training Model...\n" _, train_loss_df, mse_losses, info = train_model(model, processed_data['train_loader'], processed_data['test_loader'],epochs=epochs) anomaly_summary += info anomaly_summary += f"[INFO] Saving model, scaler, Dataset Used...\n" dataframe.to_csv('Results/Original_dataset.csv', columns=dataframe.columns, index=False) pickle.dump(processed_data['scaler'], open('Results/scaler.pkl', 'wb')) torch.save(model, 'Results/model.pth') anomaly_summary += f"[INFO] Generating Loss Curves...\n" plt.figure(figsize=(12, 6)) # Adjust figure size as needed for column in train_loss_df.columns: plt.plot(train_loss_df[column], label=column) plt.xlabel("Batch") plt.ylabel("Loss") plt.title("Training Loss per Epoch") plt.legend() # Show the legend to identify each epoch plt.grid(True) # Add a grid for easier reading plt.tight_layout() # Adjust layout to prevent labels from overlapping plt.savefig("Results/loss_curves.png", dpi=300) images.append("Results/loss_curves.png") Q1, Q3 = np.percentile(mse_losses, [25, 75]) Dict = {"Q1": Q1, "Q3": Q3} pickle.dump(Dict, open('Results/INFO.pkl', 'wb')) else: Q1, Q3 = 0.19226229563355446, 0.7454282641410828 IQR = Q3 - Q1 lower_bound = Q1 - threshold_factor * IQR upper_bound = Q3 + threshold_factor * IQR # print(lower_bound, upper_bound) data_path = os.path.join(os.path.abspath('Data'),sample_choice) dataframe = pd.read_csv(data_path).sample(frac=data_slicing_percentage/100, random_state=42).reset_index(drop=True) anomaly_summary += f"[INFO] Saving model, scaler, Dataset Used...\n" dataframe.to_csv('Results/Scaled_dataset.csv', columns=dataframe.columns, index=False) scaler = pickle.load(open('scaler.pkl', 'rb')) original_df = scaler.inverse_transform(dataframe.iloc[:,:-1]) original_df = pd.DataFrame(original_df, columns=dataframe.columns[:-1]) original_df['Label'] = dataframe['Label'] original_df.to_csv('Results/Original_dataset.csv', columns=dataframe.columns, index=False) shutil.copy('scaler.pkl', 'Results/scaler.pkl') shutil.copy('model.pth', 'Results/model.pth') # andrew curve of dataset anomaly_summary += f"[INFO] Generating Andrews Curves...\n" make_better_andrews_curves(dataframe, 'Label', colors=['Blue', 'Red'], plot_title="Dataset Andrews Curves", line_width=1.2, transparency=0.7, legend_loc='upper right', figsize=(12, 7), name = "Results/Dataset_andrews_curves") images.append("Results/Dataset_andrews_curves.png") inputdf = torch.tensor(dataframe.iloc[:,:-1].to_numpy(), dtype=torch.float32, device=device) outputdf = dataframe['Label'] model = torch.load("model.pth",weights_only = False, map_location=device) model.eval() outputs = model(inputdf.unsqueeze(1)).squeeze(1) mse_loss = F.mse_loss(outputs, inputdf, reduction='none') mse_loss_per_data_point = mse_loss.mean(dim=-1) anomaly_scores = pd.DataFrame({'Loss': mse_loss_per_data_point.detach().cpu().numpy(), 'Label': outputdf}) anomaly_scores['Anomaly'] = anomaly_scores['Loss'].apply(lambda x: 1 if x > upper_bound else 0) anomaly_scores['Label'] = anomaly_scores['Label'].apply(lambda x: 1 if x == "Malicious" else 0) out_confusion_matrix = confusion_matrix(anomaly_scores['Label'], anomaly_scores['Anomaly']) disp = ConfusionMatrixDisplay(confusion_matrix=out_confusion_matrix, display_labels=["Benign","Malignant"]) disp.plot(cmap=plt.cm.Blues) plt.title('Confusion Matrix') plt.savefig(f"Results/confusion_matrix.png", dpi=300) images.append("Results/confusion_matrix.png") accuracy = accuracy_score(anomaly_scores['Label'], anomaly_scores['Anomaly']) precision = precision_score(anomaly_scores['Label'], anomaly_scores['Anomaly']) recall = recall_score(anomaly_scores['Label'], anomaly_scores['Anomaly']) f1 = f1_score(anomaly_scores['Label'], anomaly_scores['Anomaly']) # print(f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}") anomaly_summary += f"[RESULT] Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}" anomaly_summary = anomaly_summary + f"Confusion Matrix:\n{out_confusion_matrix}\n" folder_path = "Results" with zipfile.ZipFile("Results.zip", 'w', zipfile.ZIP_DEFLATED) as zipf: for root, _, files in os.walk(folder_path): for file in files: file_path = os.path.join(root, file) relative_path = os.path.relpath(file_path, folder_path) zipf.write(file_path, relative_path) return anomaly_summary, images, "Results.zip" iface = gr.Interface( fn=detect_anomalies, inputs=[ gr.File(file_types=[".csv"], label="Upload CSV File"), gr.Radio(["Benign500.csv", "Malignant500.csv", "Balance1000.csv", "Custom Data"], value="Custom Data", label="Choose Samples or CustomData"), gr.Slider(minimum=10, maximum=100, step=10, value=80, label="Data Usage Percentage (Training or Detection)"), gr.Slider(minimum=1, maximum=20, step=1, value=3, label="Training Epochs (Default value is 3)"), gr.Slider(minimum=0, maximum=5, step=0.5, value=1.5, label="Loss Threshold (x, higher x means high threshold) = Q3 + x*IQR"), ], outputs=[ gr.Textbox(label="Anomaly Summary"), gr.Gallery(label="Anomaly Plots"), "file", ], title="Your own Anomaly Detector", description=""" ### Fully Unsupervised Anomaly Detection Tool (uses Bidirectional based Autoencoder with skip conn. and Dropout Layers) ##### Download *"Result.zip"* (contains model.pkl, dataset images, output images) to download the results from Right Bottom. Upload a *CSV file* (Custom Anomalies Detection: Use Output Column: "Label" or ), or Use *our trained model*. """ ) if __name__ == "__main__": iface.launch(debug=False)