import gradio as gr import os from PIL import Image import numpy as np import pickle import io import sys import torch import subprocess import h5py from sklearn.metrics import confusion_matrix import matplotlib.pyplot as plt import pandas as pd from sklearn.metrics import f1_score import seaborn as sns #################### BEAM PREDICTION #########################} def beam_prediction_task(data_percentage, task_complexity): # Folder naming convention based on input_type, data_percentage, and task_complexity raw_folder = f"images/raw_{data_percentage/100:.1f}_{task_complexity}" embeddings_folder = f"images/embedding_{data_percentage/100:.1f}_{task_complexity}" # Process raw confusion matrix raw_cm = compute_average_confusion_matrix(raw_folder) if raw_cm is not None: raw_cm_path = os.path.join(raw_folder, "confusion_matrix_raw.png") plot_confusion_matrix_beamPred(raw_cm, classes=np.arange(raw_cm.shape[0]), title=f"Raw Confusion Matrix\n({data_percentage}% data, {task_complexity} beams)", save_path=raw_cm_path) raw_img = Image.open(raw_cm_path) else: raw_img = None # Process embeddings confusion matrix embeddings_cm = compute_average_confusion_matrix(embeddings_folder) if embeddings_cm is not None: embeddings_cm_path = os.path.join(embeddings_folder, "confusion_matrix_embeddings.png") plot_confusion_matrix_beamPred(embeddings_cm, classes=np.arange(embeddings_cm.shape[0]), title=f"Embeddings Confusion Matrix\n({data_percentage}% data, {task_complexity} beams)", save_path=embeddings_cm_path) embeddings_img = Image.open(embeddings_cm_path) else: embeddings_img = None return raw_img, embeddings_img from sklearn.metrics import f1_score # Function to compute the F1-score based on the confusion matrix def compute_f1_score(cm): # Compute precision and recall TP = np.diag(cm) FP = np.sum(cm, axis=0) - TP FN = np.sum(cm, axis=1) - TP precision = TP / (TP + FP) recall = TP / (TP + FN) # Handle division by zero in precision or recall precision = np.nan_to_num(precision) recall = np.nan_to_num(recall) # Compute F1 score f1 = 2 * (precision * recall) / (precision + recall) f1 = np.nan_to_num(f1) # Replace NaN with 0 return np.mean(f1) # Return the mean F1-score across all classes def plot_confusion_matrix_beamPred(cm, classes, title, save_path): # Compute the average F1-score avg_f1 = compute_f1_score(cm) # Set dark mode styling plt.style.use('dark_background') plt.figure(figsize=(10, 10)) # Plot the confusion matrix with a dark-mode compatible colormap #sns.heatmap(cm, cmap="magma", cbar=True, linecolor='white', vmin=0, vmax=cm.max(), alpha=0.85) sns.heatmap(cm, cmap="cividis", cbar=True, linecolor='white', vmin=0, vmax=cm.max(), alpha=0.85) # Add F1-score to the title plt.title(f"{title}\n(F1 Score: {avg_f1:.3f})", color="white", fontsize=14) tick_marks = np.arange(len(classes)) plt.xticks(tick_marks, classes, color="white", fontsize=14) # White text for dark mode plt.yticks(tick_marks, classes, color="white", fontsize=14) # White text for dark mode plt.ylabel('True label', color="white", fontsize=14) plt.xlabel('Predicted label', color="white", fontsize=14) plt.tight_layout() # Save the plot as an image plt.savefig(save_path, transparent=True) # Use transparent to blend with the dark mode website plt.close() # Return the saved image return Image.open(save_path) def compute_average_confusion_matrix(folder): confusion_matrices = [] max_num_labels = 0 # First pass to determine the maximum number of labels for file in os.listdir(folder): if file.endswith(".csv"): data = pd.read_csv(os.path.join(folder, file)) num_labels = len(np.unique(data["Target"])) max_num_labels = max(max_num_labels, num_labels) # Second pass to calculate the confusion matrices and pad if necessary for file in os.listdir(folder): if file.endswith(".csv"): data = pd.read_csv(os.path.join(folder, file)) y_true = data["Target"] y_pred = data["Top-1 Prediction"] num_labels = len(np.unique(y_true)) # Compute confusion matrix cm = confusion_matrix(y_true, y_pred, labels=np.arange(max_num_labels)) # If the confusion matrix is smaller, pad it to match the largest size if cm.shape[0] < max_num_labels: padded_cm = np.zeros((max_num_labels, max_num_labels)) padded_cm[:cm.shape[0], :cm.shape[1]] = cm confusion_matrices.append(padded_cm) else: confusion_matrices.append(cm) if confusion_matrices: avg_cm = np.mean(confusion_matrices, axis=0) return avg_cm else: return None ########################## LOS/NLOS CLASSIFICATION #############################3 # Paths to the predefined images folder LOS_PATH = "images_LoS" # Define the percentage values percentage_values_los = np.linspace(0.001, 1, 20) * 100 # 20 percentage values from sklearn.metrics import f1_score import seaborn as sns # Function to compute confusion matrix, F1-score and plot it with dark mode style def plot_confusion_matrix_from_csv(csv_file_path, title, save_path): # Load CSV file data = pd.read_csv(csv_file_path) # Extract ground truth and predictions y_true = data['Target'] y_pred = data['Top-1 Prediction'] # Compute confusion matrix cm = confusion_matrix(y_true, y_pred) # Compute F1-score f1 = f1_score(y_true, y_pred, average='macro') # Macro-average F1-score # Set dark mode styling plt.style.use('dark_background') plt.figure(figsize=(5, 5)) # Plot the confusion matrix with a dark-mode compatible colormap sns.heatmap(cm, annot=True, fmt="d", cmap="magma", cbar=False, annot_kws={"size": 12}, linewidths=0.5, linecolor='white') # Add F1-score to the title plt.title(f"{title}\n(F1 Score: {f1:.3f})", color="white", fontsize=14) # Customize tick labels for dark mode plt.xticks([0.5, 1.5], labels=['Class 0', 'Class 1'], color="white", fontsize=10) plt.yticks([0.5, 1.5], labels=['Class 0', 'Class 1'], color="white", fontsize=10) plt.ylabel('True label', color="white", fontsize=12) plt.xlabel('Predicted label', color="white", fontsize=12) plt.tight_layout() # Save the plot as an image plt.savefig(save_path, transparent=True) # Use transparent to blend with the dark mode website plt.close() # Return the saved image return Image.open(save_path) # Function to load confusion matrix based on percentage and input_type def display_confusion_matrices_los(percentage): #percentage = percentage_values_los[percentage_idx] # Construct folder names raw_folder = os.path.join(LOS_PATH, f"raw_{percentage/100:.3f}_los_noTraining") embeddings_folder = os.path.join(LOS_PATH, f"embedding_{percentage/100:.3f}_los_noTraining") # Process raw confusion matrix raw_csv_file = os.path.join(raw_folder, f"test_predictions_raw_{percentage/100:.3f}_los.csv") raw_cm_img_path = os.path.join(raw_folder, "confusion_matrix_raw.png") raw_img = plot_confusion_matrix_from_csv(raw_csv_file, f"Raw Confusion Matrix ({percentage:.1f}% data)", raw_cm_img_path) # Process embeddings confusion matrix embeddings_csv_file = os.path.join(embeddings_folder, f"test_predictions_embedding_{percentage/100:.3f}_los.csv") embeddings_cm_img_path = os.path.join(embeddings_folder, "confusion_matrix_embeddings.png") embeddings_img = plot_confusion_matrix_from_csv(embeddings_csv_file, f"Embeddings Confusion Matrix ({percentage:.1f}% data)", embeddings_cm_img_path) return raw_img, embeddings_img # Main function to handle user choice def handle_user_choice(choice, percentage=None, uploaded_file=None): if choice == "Use Default Dataset": raw_img, embeddings_img = display_confusion_matrices_los(percentage) return raw_img, embeddings_img, "" # Return empty string for console output elif choice == "Upload Dataset": if uploaded_file is not None: raw_img, embeddings_img, console_output = process_hdf5_file(uploaded_file, percentage) return raw_img, embeddings_img, console_output else: return "Please upload a dataset", "Please upload a dataset", "" # Return empty string for console output else: return "Invalid choice", "Invalid choice", "" # Return empty string for console output # Custom class to capture print output class PrintCapture(io.StringIO): def __init__(self): super().__init__() self.output = [] def write(self, txt): self.output.append(txt) super().write(txt) def get_output(self): return ''.join(self.output) # Function to load and display predefined images based on user selection def display_predefined_images(percentage): #percentage = percentage_values_los[percentage_idx] raw_image_path = os.path.join(RAW_PATH, f"percentage_{percentage}_complexity_16.png") embeddings_image_path = os.path.join(EMBEDDINGS_PATH, f"percentage_{percentage}_complexity_16.png") # Check if the images exist if os.path.exists(raw_image_path): raw_image = Image.open(raw_image_path) else: raw_image = create_random_image() # Use a fallback random image if os.path.exists(embeddings_image_path): embeddings_image = Image.open(embeddings_image_path) else: embeddings_image = create_random_image() # Use a fallback random image return raw_image, embeddings_image def los_nlos_classification(file, percentage): if file is not None: raw_cm_image, emb_cm_image, console_output = process_hdf5_file(file, percentage) return raw_cm_image, emb_cm_image, console_output # Returning all three: two images and console output else: raw_image, embeddings_image = display_predefined_images(percentage) return raw_image, embeddings_image, "" # Return an empty string for console output when no file is uploaded # Function to create random images for LoS/NLoS classification results def create_random_image(size=(300, 300)): random_image = np.random.rand(*size, 3) * 255 return Image.fromarray(random_image.astype('uint8')) import importlib.util # Function to dynamically load a Python module from a given file path def load_module_from_path(module_name, file_path): spec = importlib.util.spec_from_file_location(module_name, file_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return module # Function to split dataset into training and test sets based on user selection def split_dataset(channels, labels, percentage): #percentage = percentage_values_los[percentage_idx] / 100 num_samples = channels.shape[0] train_size = int(num_samples * percentage/100) print(f'Number of Training Samples: {train_size}') indices = np.arange(num_samples) np.random.shuffle(indices) train_idx, test_idx = indices[:train_size], indices[train_size:] train_data, test_data = channels[train_idx], channels[test_idx] train_labels, test_labels = labels[train_idx], labels[test_idx] return train_data, test_data, train_labels, test_labels # Function to calculate Euclidean distance between a point and a centroid def euclidean_distance(x, centroid): return np.linalg.norm(x - centroid) import torch def classify_based_on_distance(train_data, train_labels, test_data): # Compute the centroids for the two classes centroid_0 = train_data[train_labels == 0].mean(dim=0) # Use torch.mean centroid_1 = train_data[train_labels == 1].mean(dim=0) # Use torch.mean predictions = [] for test_point in test_data: # Compute Euclidean distance between the test point and each centroid dist_0 = euclidean_distance(test_point, centroid_0) dist_1 = euclidean_distance(test_point, centroid_1) predictions.append(0 if dist_0 < dist_1 else 1) return torch.tensor(predictions) # Return predictions as a PyTorch tensor def plot_confusion_matrix(y_true, y_pred, title): cm = confusion_matrix(y_true, y_pred) # Calculate F1 Score f1 = f1_score(y_true, y_pred, average='weighted') plt.style.use('dark_background') plt.figure(figsize=(5, 5)) # Plot the confusion matrix with a dark-mode compatible colormap sns.heatmap(cm, annot=True, fmt="d", cmap="magma", cbar=False, annot_kws={"size": 12}, linewidths=0.5, linecolor='white') # Add F1-score to the title plt.title(f"{title}\n(F1 Score: {f1:.3f})", color="white", fontsize=14) # Customize tick labels for dark mode plt.xticks([0.5, 1.5], labels=['Class 0', 'Class 1'], color="white", fontsize=10) plt.yticks([0.5, 1.5], labels=['Class 0', 'Class 1'], color="white", fontsize=10) plt.ylabel('True label', color="white", fontsize=12) plt.xlabel('Predicted label', color="white", fontsize=12) plt.tight_layout() # Save the plot as an image plt.savefig(f"{title}.png", transparent=True) # Use transparent to blend with the dark mode website plt.close() # Return the saved image return Image.open(f"{title}.png") def identical_train_test_split(output_emb, output_raw, labels, train_percentage): N = output_emb.shape[0] indices = torch.randperm(N) test_split_index = int(N * 0.20) test_indices = indices[:test_split_index] remaining_indices = indices[test_split_index:] train_split_index = int(len(remaining_indices) * train_percentage / 100) print(f'Training Size: {train_split_index} out of remaining {len(remaining_indices)}') train_indices = remaining_indices[:train_split_index] train_emb = output_emb[train_indices] test_emb = output_emb[test_indices] train_raw = output_raw[train_indices] test_raw = output_raw[test_indices] train_labels = labels[train_indices] test_labels = labels[test_indices] return train_emb, test_emb, train_raw, test_raw, train_labels, test_labels # Store the original working directory when the app starts original_dir = os.getcwd() def process_hdf5_file(uploaded_file, percentage): capture = PrintCapture() sys.stdout = capture # Redirect print statements to capture try: model_repo_url = "https://huggingface.co/wi-lab/lwm" model_repo_dir = "./LWM" # Step 1: Clone the repository if not already done if not os.path.exists(model_repo_dir): print(f"Cloning model repository from {model_repo_url}...") subprocess.run(["git", "clone", model_repo_url, model_repo_dir], check=True) # Step 2: Verify the repository was cloned and change the working directory repo_work_dir = os.path.join(original_dir, model_repo_dir) if os.path.exists(repo_work_dir): os.chdir(repo_work_dir) # Change the working directory only once print(f"Changed working directory to {os.getcwd()}") #print(f"Directory content: {os.listdir(os.getcwd())}") # Debugging: Check repo content else: print(f"Directory {repo_work_dir} does not exist.") return # Step 3: Dynamically load lwm_model.py, input_preprocess.py, and inference.py lwm_model_path = os.path.join(os.getcwd(), 'lwm_model.py') input_preprocess_path = os.path.join(os.getcwd(), 'input_preprocess.py') inference_path = os.path.join(os.getcwd(), 'inference.py') # Load lwm_model lwm_model = load_module_from_path("lwm_model", lwm_model_path) # Load input_preprocess input_preprocess = load_module_from_path("input_preprocess", input_preprocess_path) # Load inference inference = load_module_from_path("inference", inference_path) # Step 4: Load the model from lwm_model module device = 'cuda' if torch.cuda.is_available() else 'cpu' print(f"Loading the LWM model on {device}...") model = lwm_model.lwm.from_pretrained(device=device).float() # Step 5: Load the HDF5 file and extract the channels and labels with h5py.File(uploaded_file.name, 'r') as f: channels = np.array(f['channels']).astype(np.complex64) labels = np.array(f['labels']).astype(np.int32) print(f"Loaded dataset with {channels.shape[0]} samples.") # Step 7: Tokenize the data using the tokenizer from input_preprocess preprocessed_chs = input_preprocess.tokenizer(manual_data=channels) # Step 7: Perform inference using the functions from inference.py output_emb = inference.lwm_inference(preprocessed_chs, 'cls_emb', model, device) output_raw = inference.create_raw_dataset(preprocessed_chs, device) print(f"Output Embeddings Shape: {output_emb.shape}") print(f"Output Raw Shape: {output_raw.shape}") print(f'percentage_value: {percentage}') train_data_emb, test_data_emb, train_data_raw, test_data_raw, train_labels, test_labels = identical_train_test_split(output_emb.view(len(output_emb),-1), output_raw.view(len(output_raw),-1), labels, percentage) # Step 8: Perform classification using the Euclidean distance for both raw and embeddings print(f'train_data_emb: {train_data_emb.shape}') print(f'train_labels: {train_labels.shape}') print(f'test_data_emb: {test_data_emb.shape}') pred_raw = classify_based_on_distance(train_data_raw, train_labels, test_data_raw) pred_emb = classify_based_on_distance(train_data_emb, train_labels, test_data_emb) # Step 9: Generate confusion matrices for both raw and embeddings raw_cm_image = plot_confusion_matrix(test_labels, pred_raw, title="Confusion Matrix (Raw Channels)") emb_cm_image = plot_confusion_matrix(test_labels, pred_emb, title="Confusion Matrix (Embeddings)") return raw_cm_image, emb_cm_image, capture.get_output() except Exception as e: return str(e), str(e), capture.get_output() finally: # Always return to the original working directory after processing os.chdir(original_dir) sys.stdout = sys.__stdout__ # Reset print statements ######################## Define the Gradio interface ############################### with gr.Blocks(css=""" .slider-container { display: inline-block; margin-right: 50px; text-align: center; } .explanation-box { font-size: 16px; font-style: italic; color: #4a4a4a; padding: 15px; background-color: #f0f0f0; border-radius: 10px; margin-bottom: 20px; } .bold-highlight { font-weight: bold; color: #2c3e50; font-size: 18px; text-align: center; margin-bottom: 20px; } """) as demo: # Contact Section gr.Markdown("""
Wireless Model Email
""") gr.Markdown("""
πŸš€ Explore the pre-trained **LWM Model** here: https://huggingface.co/wi-lab/lwm
""") # Tab for Beam Prediction Task with gr.Tab("Beam Prediction Task"): gr.Markdown("### Beam Prediction Task") # Explanation section with creative spacing and minimal design gr.Markdown("""

πŸ“‘ Beam Prediction Task

""") #gr.Markdown(""" #
# In this task, you'll predict the strongest mmWave beam from a predefined codebook based on Sub-6 GHz channels. Adjust the data percentage and task complexity to observe how LWM performs on different settings. These are just inferences on first the LWM model and then the trained downstream model for this task (A residual 1D-CNN model with 500K parameters). The dataset used for this task is a combination of six scenarios from the DeepMIMO dataset that were not included in the LWM pre-training, showing the genralization of our model. #
#""") with gr.Row(): with gr.Column(): data_percentage_slider = gr.Slider(label="Data Percentage for Training", minimum=10, maximum=100, step=10, value=10) task_complexity_dropdown = gr.Dropdown(label="Task Complexity (Number of Beams)", choices=[16, 32, 64, 128, 256], value=16) with gr.Row(): raw_img_bp = gr.Image(label="Raw Channels", type="pil", width=300, height=500) embeddings_img_bp = gr.Image(label="Embeddings", type="pil", width=300, height=500) # Update the confusion matrices whenever sliders change data_percentage_slider.change(fn=beam_prediction_task, inputs=[data_percentage_slider, task_complexity_dropdown], outputs=[raw_img_bp, embeddings_img_bp]) task_complexity_dropdown.change(fn=beam_prediction_task, inputs=[data_percentage_slider, task_complexity_dropdown], outputs=[raw_img_bp, embeddings_img_bp]) # Separate Tab for LoS/NLoS Classification Task with gr.Tab("LoS/NLoS Classification Task"): gr.Markdown("### LoS/NLoS Classification Task") # Explanation section with creative spacing gr.Markdown("""

πŸ” LoS/NLoS Classification Task

""") #gr.Markdown(""" #
# Use this task to classify whether a channel is LoS (Line-of-Sight) or NLoS (Non-Line-of-Sight). You can either upload your own dataset or use the default dataset to explore how LWM embeddings compare to raw channels. The default dataset is a combinbation of six scenarios from the DeepMIMO dataset. Your custom dataset in h5py format should contain 'channels' array of size (N,32,32), and 'labels' representing LoS/NLoS channels with 1/0. You can find additional information on how to save your dataset in the h5py format in the above-mentioned model repository. The interesting thing about this task is that we do not train any downstream model for LoS/NLoS classification, but just use a simple approach that predicts the label for a test sample based on the distance to the centroid of training samples corresponding to each label. #
#""") # Radio button for user choice: predefined data or upload dataset choice_radio = gr.Radio(choices=["Use Default Dataset", "Upload Dataset"], label="Choose how to proceed", value="Use Default Dataset") percentage_slider_los = gr.Slider(minimum=float(percentage_values_los[0]), maximum=float(percentage_values_los[-1]), step=float(percentage_values_los[1] - percentage_values_los[0]), value=float(percentage_values_los[0]), label="Percentage of Data for Training") # File uploader for dataset (only visible if user chooses to upload a dataset) file_input = gr.File(label="Upload HDF5 Dataset", file_types=[".h5"], visible=False) # Confusion matrices display with gr.Row(): raw_img_los = gr.Image(label="Raw Channels", type="pil", width=300, height=300) embeddings_img_los = gr.Image(label="Embeddings", type="pil", width=300, height=300) output_textbox = gr.Textbox(label="Console Output", lines=10) # Update the file uploader visibility based on user choice def toggle_file_input(choice): return gr.update(visible=(choice == "Upload Dataset")) choice_radio.change(fn=toggle_file_input, inputs=[choice_radio], outputs=file_input) # When user makes a choice, update the display choice_radio.change(fn=handle_user_choice, inputs=[choice_radio, percentage_slider_los, file_input], outputs=[raw_img_los, embeddings_img_los, output_textbox]) # When percentage slider changes (for predefined data) percentage_slider_los.change(fn=handle_user_choice, inputs=[choice_radio, percentage_slider_los, file_input], outputs=[raw_img_los, embeddings_img_los, output_textbox]) # Launch the app if __name__ == "__main__": demo.launch()