import numpy as np import pandas as pd import seaborn as sns import matplotlib matplotlib.rc("font", size=35) import matplotlib.pyplot as plt import torch from src.utils.inference.inference_metrics import get_sigma_gaussian from torch_scatter import scatter_sum, scatter_mean def plot_per_event_metrics(sd, sd_pandora, PATH_store=None): ( calibrated_list, calibrated_list_pandora, reco_list, reco_list_pandora, ) = calculate_energy_per_event(sd, sd_pandora) plot_per_event_energy_distribution( calibrated_list, calibrated_list_pandora, reco_list, reco_list_pandora, PATH_store, ) def calculate_energy_per_event( sd, sd_pandora, ): sd = sd.reset_index(drop=True) sd_pandora = sd_pandora.reset_index(drop=True) corrected_list = [] reco_list = [] reco_list_pandora = [] corrected_list_pandora = [] for i in range(0, int(np.max(sd.number_batch))): mask = sd.number_batch == i event_E_total_reco = np.nansum(sd.reco_showers_E[mask]) event_E_total_true = np.nansum(sd.true_showers_E[mask]) event_E_total_reco_corrected = np.nansum(sd.calibrated_E[mask]) event_ML_total_reco = np.nansum(sd.pred_showers_E[mask]) mask_p = sd_pandora.number_batch == i event_E_total_reco_p = np.nansum(sd_pandora.reco_showers_E[mask_p]) event_E_total_true_p = np.nansum(sd_pandora.true_showers_E[mask_p]) event_ML_total_reco_p = np.nansum(sd_pandora.pred_showers_E[mask_p]) event_ML_total_reco_p_corrected = np.nansum( sd_pandora.pandora_calibrated_pfo[mask_p] ) reco_list.append(event_ML_total_reco / event_E_total_reco) corrected_list.append(event_E_total_reco_corrected / event_E_total_true) reco_list_pandora.append(event_ML_total_reco_p / event_E_total_reco_p) corrected_list_pandora.append( event_ML_total_reco_p_corrected / event_E_total_true_p ) return corrected_list, corrected_list_pandora, reco_list, reco_list_pandora def plot_per_event_energy_distribution( calibrated_list, calibrated_list_pandora, reco_list, reco_list_pandora, PATH_store ): fig = plt.figure(figsize=(8, 8)) sns.histplot( data=np.array(calibrated_list), # + 1 - np.mean(calibrated_list) stat="percent", binwidth=0.01, label="MLPF", # element="step", # fill=False, color="red", # linewidth=2, ) sns.histplot( data=calibrated_list_pandora, stat="percent", color="blue", binwidth=0.01, label="Pandora", # element="step", # fill=False, # linewidth=2, ) plt.ylabel("Percent of events") plt.xlabel("$E_{corrected}/E_{total}$") # plt.yscale("log") plt.legend() plt.xlim([0, 2]) fig.savefig( PATH_store + "per_event_E.png", bbox_inches="tight", ) fig = plt.figure(figsize=(8, 8)) sns.histplot(data=reco_list, stat="percent", binwidth=0.01, label="MLPF") sns.histplot( data=reco_list_pandora, stat="percent", color="orange", binwidth=0.01, label="Pandora", ) plt.ylabel("Percent of events") plt.xlabel("$E_{recoML}/E_{reco}$") plt.legend() plt.xlim([0.5, 1.5]) # plt.yscale("log") fig.savefig( PATH_store + "per_event_E_reco.png", bbox_inches="tight", ) particle_masses = {0: 0, 22: 0, 11: 0.00511, 211: 0.13957, 130: 0.493677, 2212: 0.938272, 2112: 0.939565} particle_masses_4_class = {0: 0.00511, 1: 0.13957, 2: 0.939565, 3: 0.0} # electron, CH, NH, photon def safeint(x, default_val=0): if np.isnan(x): return default_val return int(x) def calculate_event_mass_resolution(df, pandora, perfect_pid=False, mass_zero=False, ML_pid=False): true_e = torch.Tensor(df.true_showers_E.values) mask_nan_true = np.isnan(df.true_showers_E.values) true_e[mask_nan_true] = 0 batch_idx = df.number_batch if pandora: pred_E = df.pandora_calibrated_pfo.values nan_mask = np.isnan(df.pandora_calibrated_pfo.values) pred_E[nan_mask] = 0 pred_e1 = torch.tensor(pred_E).unsqueeze(1).repeat(1, 3) pred_vect = torch.tensor(np.array(df.pandora_calibrated_pos.values.tolist())) nan_mask_p = torch.isnan(pred_vect).any(dim=1) pred_vect[nan_mask_p] = 0 true_vect = torch.tensor(np.array(df.true_pos.values.tolist())) mask_nan_p = torch.isnan(true_vect).any(dim=1) true_vect[mask_nan_true] = 0 else: pred_E = df.calibrated_E.values nan_mask = np.isnan(df.calibrated_E.values) print(np.sum(nan_mask)) pred_E[nan_mask] = 0 pred_e1 = torch.tensor(pred_E).unsqueeze(1).repeat(1, 3) pred_vect = torch.tensor( np.array(df.pred_pos_matched.values.tolist()) ) pred_vect[nan_mask] = 0 true_vect = torch.tensor( np.array(df.true_pos.values.tolist()) ) true_vect[mask_nan_true] = 0 if perfect_pid or mass_zero or ML_pid: pred_vect /= np.linalg.norm(pred_vect, axis=1).reshape(-1, 1) pred_vect[np.isnan(pred_vect)] = 0 if ML_pid: #assert pandora is False if pandora: print("Perfect PID for Pandora") m = np.array([particle_masses.get(abs(safeint(i)), 0) for i in df.pid]) else: m = np.array([particle_masses_4_class.get(safeint(i), 0) for i in df.pred_pid_matched.values]) else: m = np.array([particle_masses.get(abs(safeint(i)), 0) for i in df.pid]) if mass_zero: m = np.array([0 for _ in m]) p_squared = (pred_E ** 2 - m ** 2) p_squared[p_squared < 0] = 0 # they are always like of order -1e-8 pred_vect = np.sqrt(p_squared).reshape(-1, 1) * np.array(pred_vect) batch_idx = torch.tensor(batch_idx.values).long() pred_E = torch.tensor(pred_E) true_jet_vect = scatter_sum(true_vect, batch_idx, dim=0) pred_jet_vect = scatter_sum(torch.tensor(pred_vect), batch_idx, dim=0) true_E_jet = scatter_sum(torch.tensor(true_e), batch_idx) pred_E_jet = scatter_sum(torch.tensor(pred_E), batch_idx) true_jet_p = torch.norm(true_jet_vect, dim=1) # This is actually momentum resolution pred_jet_p = torch.norm(pred_jet_vect, dim=1) mass_true = torch.sqrt(torch.abs(true_E_jet ** 2) - true_jet_p ** 2) mass_pred_p = torch.sqrt( torch.abs(pred_E_jet ** 2) - pred_jet_p ** 2) ## TODO: fix the nan values in pred_jet_p!!!!! # replace nans in these with 0 mass_over_true_p = mass_pred_p / mass_true E_over_true = pred_E_jet / true_E_jet p_over_true = pred_jet_p / true_jet_p p_jet_pandora = pred_jet_p ( mean_mass, var_mass, _, _, ) = get_sigma_gaussian(mass_over_true_p, np.linspace(0, 4, 300)) return mean_mass, var_mass, mass_over_true_p, mass_true, p_over_true, true_jet_p, E_over_true def calculate_event_energy_resolution(df, pandora=False, full_vector=False): if full_vector and pandora: assert "pandora_calibrated_pos" in df.columns bins = [0, 700] binsx = [] mean = [] variance = [] distributions = [] distr_baseline = [] mean_baseline = [] variance_baseline = [] mass_list = [] binning = 1e-2 bins_per_binned_E = np.arange(0, 2, binning) for i in range(len(bins) - 1): bin_i = bins[i] bin_i1 = bins[i + 1] binsx.append(0.5 * (bin_i + bin_i1)) true_e = df.true_showers_E.values batch_idx = df.number_batch if pandora: pred_e = df.pandora_calibrated_pfo.values pred_e1 = torch.tensor(pred_e).unsqueeze(1).repeat(1, 3) if full_vector: pred_vect = ( np.array(df.pandora_calibrated_pos.values.tolist()) # * pred_e1.numpy() ) true_vect = ( np.array(df.true_pos.values.tolist()) # * torch.tensor(true_e).unsqueeze(1).repeat(1, 3).numpy() ) pred_vect = torch.tensor(pred_vect) true_vect = torch.tensor(true_vect) else: pred_e = df.calibrated_E.values pred_e1 = torch.tensor(pred_e).unsqueeze(1).repeat(1, 3) if full_vector: pred_vect = ( np.array(df.pred_pos_matched.values.tolist()) * pred_e1.numpy() ) true_vect = ( np.array(df.true_pos.values.tolist()) # * torch.tensor(true_e).unsqueeze(1).repeat(1, 3).numpy() ) pred_vect = torch.tensor(pred_vect) true_vect = torch.tensor(true_vect) true_rec = df.reco_showers_E # pred_e_nocor = df.pred_showers_E[mask] true_e = torch.tensor(true_e) batch_idx = torch.tensor(batch_idx.values).long() pred_e = torch.tensor(pred_e) true_rec = torch.tensor(true_rec.values) if full_vector: true_p_vect = scatter_sum(true_vect, batch_idx, dim=0) pred_p_vect = scatter_sum(pred_vect, batch_idx, dim=0) true_e1 = scatter_sum(torch.tensor(true_e), batch_idx) pred_e1 = scatter_sum(torch.tensor(pred_e), batch_idx) true_e = torch.norm( true_p_vect, dim=1 ) # This is actually momentum resolution pred_e = torch.norm(pred_p_vect, dim=1) else: true_e = scatter_sum(true_e, batch_idx) pred_e = scatter_sum(pred_e, batch_idx) true_rec = scatter_sum(true_rec, batch_idx) mask_above = true_e <= bin_i1 mask_below = true_e > bin_i mask_check = true_e > 0 mask = mask_below * mask_above * mask_check true_e = true_e[mask] true_rec = true_rec[mask] pred_e = pred_e[mask] if torch.sum(mask) > 0: # if the bin is not empty e_over_true = pred_e / true_e e_over_reco = true_rec / true_e distributions.append(e_over_true) distr_baseline.append(e_over_reco) ( mean_predtotrue, var_predtotrue, err_mean_predtotrue, err_var_predtotrue, ) = get_sigma_gaussian(e_over_true, bins_per_binned_E) if full_vector: mass_true = torch.sqrt(true_e1[mask] ** 2 - true_e**2) mass_pred = torch.sqrt(pred_e1[mask] ** 2 - pred_e**2) print(pandora, len(mass_true), len(mass_pred)) mass_over_true = mass_pred / mass_true ( mean_mass, var_mass, _, _, ) = get_sigma_gaussian(mass_over_true, bins_per_binned_E) mass_list.append(mass_over_true) ( mean_reco_true, var_reco_true, err_mean_reco_true, err_var_reco_true, ) = get_sigma_gaussian(e_over_reco, bins_per_binned_E) mean.append(mean_predtotrue) variance.append(np.abs(var_predtotrue)) mean_baseline.append(mean_reco_true) variance_baseline.append(np.abs(var_reco_true)) if full_vector: mass_list = torch.cat(mass_list) ret = [ mean, variance, distributions, binsx, mean_baseline, variance_baseline, distr_baseline, ] if full_vector: ret += [mass_list] else: ret += [None] return ret def get_response_for_event_energy(matched_pandora, matched_, perfect_pid=False, mass_zero=False, ML_pid=False): ( mean_p, variance_om_p, distr_p, x_p, _, _, _, mass_over_true_pandora, ) = calculate_event_energy_resolution(matched_pandora, True, False) ( mean, variance_om, distr, x, mean_baseline, variance_om_baseline, _, mass_over_true_model, ) = calculate_event_energy_resolution(matched_, False, False) mean_mass_p, var_mass_p, distr_mass_p, mass_true_p, _, _, E_over_true_pandora = calculate_event_mass_resolution(matched_pandora, True, perfect_pid=perfect_pid, mass_zero=mass_zero, ML_pid=ML_pid) mean_mass, var_mass, distr_mass, mass_true, _, _, E_over_true = calculate_event_mass_resolution(matched_, False, perfect_pid=perfect_pid, mass_zero=mass_zero, ML_pid=ML_pid) ( mean_energy_over_true, var_energy_over_true, _, _, ) = get_sigma_gaussian(E_over_true, np.linspace(0, 4, 300)) ( mean_energy_over_true_pandora, var_energy_over_true_pandora, _, _, ) = get_sigma_gaussian(E_over_true_pandora, np.linspace(0, 4, 300)) dic = {} dic["mean_p"] = mean_p dic["variance_om_p"] = variance_om_p dic["variance_om"] = variance_om dic["mean"] = mean dic["energy_resolutions"] = x dic["energy_resolutions_p"] = x_p dic["mean_baseline"] = mean_baseline dic["variance_om_baseline"] = variance_om_baseline dic["distributions_pandora"] = distr_p dic["distributions_model"] = distr dic["mass_over_true_model"] = distr_mass dic["mass_over_true_pandora"] = distr_mass_p dic["mass_model"] = distr_mass * mass_true dic["mass_pandora"] = distr_mass_p * mass_true_p dic["mean_mass_model"] = mean_mass dic["mean_mass_pandora"] = mean_mass_p dic["var_mass_model"] = var_mass dic["var_mass_pandora"] = var_mass_p dic["energy_over_true"] = E_over_true dic["energy_over_true_pandora"] = E_over_true_pandora dic["mean_energy_over_true"] = mean_energy_over_true dic["mean_energy_over_true_pandora"] = mean_energy_over_true_pandora dic["var_energy_over_true"] = var_energy_over_true dic["var_energy_over_true_pandora"] = var_energy_over_true_pandora return dic def plot_mass_resolution(event_res_dic, PATH_store): fig, ax = plt.subplots(figsize=(7, 7)) ax.set_xlabel(r"$m_{pred}/m_{true}$") bins = np.linspace(0, 3, 100) ax.hist( event_res_dic["mass_over_true_model"], bins=bins, histtype="step", label="ML $\mu$={} $\sigma/\mu$={}".format( round((event_res_dic["mean_mass_model"]), 2), round((event_res_dic["var_mass_model"]), 2), ), color="red", density=True, ) ax.hist( event_res_dic["mass_over_true_pandora"], bins=bins, histtype="step", label="Pandora $\mu$={} $\sigma/\mu$={}".format( round((event_res_dic["mean_mass_pandora"]), 2), round((event_res_dic["var_mass_pandora"]), 2), ), color="blue", density=True, ) ax.grid() ax.legend() #ax.set_xlim([0, 10]) fig.tight_layout() print("Saving mass resolution") import os fig.savefig(os.path.join(PATH_store, "mass_resolution.pdf"), bbox_inches="tight") fig, ax = plt.subplots(figsize=(7, 7)) ax.set_xlabel(r"$M_{reco}$") bins = np.linspace(0, 3, 100) ax.hist( event_res_dic["mass_model"], bins=bins, histtype="step", label="ML", color="red", density=True, ) ax.hist( event_res_dic["mass_pandora"], bins=bins, histtype="step", label="Pandora", color="blue", density=True, ) ax.grid() ax.legend() #ax.set_xlim([0, 10]) fig.tight_layout() fig.savefig(os.path.join(PATH_store, "mass_reco_absolute.pdf"), bbox_inches="tight")