Spaces:
Sleeping
Sleeping
import numpy as np | |
import pandas as pd | |
import seaborn as sns | |
import matplotlib | |
matplotlib.rc("font", size=35) | |
import matplotlib.pyplot as plt | |
import torch | |
from src.utils.inference.inference_metrics import get_sigma_gaussian | |
from torch_scatter import scatter_sum, scatter_mean | |
def plot_per_event_metrics(sd, sd_pandora, PATH_store=None): | |
( | |
calibrated_list, | |
calibrated_list_pandora, | |
reco_list, | |
reco_list_pandora, | |
) = calculate_energy_per_event(sd, sd_pandora) | |
plot_per_event_energy_distribution( | |
calibrated_list, | |
calibrated_list_pandora, | |
reco_list, | |
reco_list_pandora, | |
PATH_store, | |
) | |
def calculate_energy_per_event( | |
sd, | |
sd_pandora, | |
): | |
sd = sd.reset_index(drop=True) | |
sd_pandora = sd_pandora.reset_index(drop=True) | |
corrected_list = [] | |
reco_list = [] | |
reco_list_pandora = [] | |
corrected_list_pandora = [] | |
for i in range(0, int(np.max(sd.number_batch))): | |
mask = sd.number_batch == i | |
event_E_total_reco = np.nansum(sd.reco_showers_E[mask]) | |
event_E_total_true = np.nansum(sd.true_showers_E[mask]) | |
event_E_total_reco_corrected = np.nansum(sd.calibrated_E[mask]) | |
event_ML_total_reco = np.nansum(sd.pred_showers_E[mask]) | |
mask_p = sd_pandora.number_batch == i | |
event_E_total_reco_p = np.nansum(sd_pandora.reco_showers_E[mask_p]) | |
event_E_total_true_p = np.nansum(sd_pandora.true_showers_E[mask_p]) | |
event_ML_total_reco_p = np.nansum(sd_pandora.pred_showers_E[mask_p]) | |
event_ML_total_reco_p_corrected = np.nansum( | |
sd_pandora.pandora_calibrated_pfo[mask_p] | |
) | |
reco_list.append(event_ML_total_reco / event_E_total_reco) | |
corrected_list.append(event_E_total_reco_corrected / event_E_total_true) | |
reco_list_pandora.append(event_ML_total_reco_p / event_E_total_reco_p) | |
corrected_list_pandora.append( | |
event_ML_total_reco_p_corrected / event_E_total_true_p | |
) | |
return corrected_list, corrected_list_pandora, reco_list, reco_list_pandora | |
def plot_per_event_energy_distribution( | |
calibrated_list, calibrated_list_pandora, reco_list, reco_list_pandora, PATH_store | |
): | |
fig = plt.figure(figsize=(8, 8)) | |
sns.histplot( | |
data=np.array(calibrated_list), # + 1 - np.mean(calibrated_list) | |
stat="percent", | |
binwidth=0.01, | |
label="MLPF", | |
# element="step", | |
# fill=False, | |
color="red", | |
# linewidth=2, | |
) | |
sns.histplot( | |
data=calibrated_list_pandora, | |
stat="percent", | |
color="blue", | |
binwidth=0.01, | |
label="Pandora", | |
# element="step", | |
# fill=False, | |
# linewidth=2, | |
) | |
plt.ylabel("Percent of events") | |
plt.xlabel("$E_{corrected}/E_{total}$") | |
# plt.yscale("log") | |
plt.legend() | |
plt.xlim([0, 2]) | |
fig.savefig( | |
PATH_store + "per_event_E.png", | |
bbox_inches="tight", | |
) | |
fig = plt.figure(figsize=(8, 8)) | |
sns.histplot(data=reco_list, stat="percent", binwidth=0.01, label="MLPF") | |
sns.histplot( | |
data=reco_list_pandora, | |
stat="percent", | |
color="orange", | |
binwidth=0.01, | |
label="Pandora", | |
) | |
plt.ylabel("Percent of events") | |
plt.xlabel("$E_{recoML}/E_{reco}$") | |
plt.legend() | |
plt.xlim([0.5, 1.5]) | |
# plt.yscale("log") | |
fig.savefig( | |
PATH_store + "per_event_E_reco.png", | |
bbox_inches="tight", | |
) | |
particle_masses = {0: 0, 22: 0, 11: 0.00511, 211: 0.13957, 130: 0.493677, 2212: 0.938272, 2112: 0.939565} | |
particle_masses_4_class = {0: 0.00511, 1: 0.13957, 2: 0.939565, 3: 0.0} # electron, CH, NH, photon | |
def safeint(x, default_val=0): | |
if np.isnan(x): | |
return default_val | |
return int(x) | |
def calculate_event_mass_resolution(df, pandora, perfect_pid=False, mass_zero=False, ML_pid=False): | |
true_e = torch.Tensor(df.true_showers_E.values) | |
mask_nan_true = np.isnan(df.true_showers_E.values) | |
true_e[mask_nan_true] = 0 | |
batch_idx = df.number_batch | |
if pandora: | |
pred_E = df.pandora_calibrated_pfo.values | |
nan_mask = np.isnan(df.pandora_calibrated_pfo.values) | |
pred_E[nan_mask] = 0 | |
pred_e1 = torch.tensor(pred_E).unsqueeze(1).repeat(1, 3) | |
pred_vect = torch.tensor(np.array(df.pandora_calibrated_pos.values.tolist())) | |
nan_mask_p = torch.isnan(pred_vect).any(dim=1) | |
pred_vect[nan_mask_p] = 0 | |
true_vect = torch.tensor(np.array(df.true_pos.values.tolist())) | |
mask_nan_p = torch.isnan(true_vect).any(dim=1) | |
true_vect[mask_nan_true] = 0 | |
else: | |
pred_E = df.calibrated_E.values | |
nan_mask = np.isnan(df.calibrated_E.values) | |
print(np.sum(nan_mask)) | |
pred_E[nan_mask] = 0 | |
pred_e1 = torch.tensor(pred_E).unsqueeze(1).repeat(1, 3) | |
pred_vect = torch.tensor( | |
np.array(df.pred_pos_matched.values.tolist()) | |
) | |
pred_vect[nan_mask] = 0 | |
true_vect = torch.tensor( | |
np.array(df.true_pos.values.tolist()) | |
) | |
true_vect[mask_nan_true] = 0 | |
if perfect_pid or mass_zero or ML_pid: | |
pred_vect /= np.linalg.norm(pred_vect, axis=1).reshape(-1, 1) | |
pred_vect[np.isnan(pred_vect)] = 0 | |
if ML_pid: | |
#assert pandora is False | |
if pandora: | |
print("Perfect PID for Pandora") | |
m = np.array([particle_masses.get(abs(safeint(i)), 0) for i in df.pid]) | |
else: | |
m = np.array([particle_masses_4_class.get(safeint(i), 0) for i in df.pred_pid_matched.values]) | |
else: | |
m = np.array([particle_masses.get(abs(safeint(i)), 0) for i in df.pid]) | |
if mass_zero: | |
m = np.array([0 for _ in m]) | |
p_squared = (pred_E ** 2 - m ** 2) | |
p_squared[p_squared < 0] = 0 # they are always like of order -1e-8 | |
pred_vect = np.sqrt(p_squared).reshape(-1, 1) * np.array(pred_vect) | |
batch_idx = torch.tensor(batch_idx.values).long() | |
pred_E = torch.tensor(pred_E) | |
true_jet_vect = scatter_sum(true_vect, batch_idx, dim=0) | |
pred_jet_vect = scatter_sum(torch.tensor(pred_vect), batch_idx, dim=0) | |
true_E_jet = scatter_sum(torch.tensor(true_e), batch_idx) | |
pred_E_jet = scatter_sum(torch.tensor(pred_E), batch_idx) | |
true_jet_p = torch.norm(true_jet_vect, dim=1) # This is actually momentum resolution | |
pred_jet_p = torch.norm(pred_jet_vect, dim=1) | |
mass_true = torch.sqrt(torch.abs(true_E_jet ** 2) - true_jet_p ** 2) | |
mass_pred_p = torch.sqrt( | |
torch.abs(pred_E_jet ** 2) - pred_jet_p ** 2) ## TODO: fix the nan values in pred_jet_p!!!!! | |
# replace nans in these with 0 | |
mass_over_true_p = mass_pred_p / mass_true | |
E_over_true = pred_E_jet / true_E_jet | |
p_over_true = pred_jet_p / true_jet_p | |
p_jet_pandora = pred_jet_p | |
( | |
mean_mass, | |
var_mass, | |
_, | |
_, | |
) = get_sigma_gaussian(mass_over_true_p, np.linspace(0, 4, 300)) | |
return mean_mass, var_mass, mass_over_true_p, mass_true, p_over_true, true_jet_p, E_over_true | |
def calculate_event_energy_resolution(df, pandora=False, full_vector=False): | |
if full_vector and pandora: | |
assert "pandora_calibrated_pos" in df.columns | |
bins = [0, 700] | |
binsx = [] | |
mean = [] | |
variance = [] | |
distributions = [] | |
distr_baseline = [] | |
mean_baseline = [] | |
variance_baseline = [] | |
mass_list = [] | |
binning = 1e-2 | |
bins_per_binned_E = np.arange(0, 2, binning) | |
for i in range(len(bins) - 1): | |
bin_i = bins[i] | |
bin_i1 = bins[i + 1] | |
binsx.append(0.5 * (bin_i + bin_i1)) | |
true_e = df.true_showers_E.values | |
batch_idx = df.number_batch | |
if pandora: | |
pred_e = df.pandora_calibrated_pfo.values | |
pred_e1 = torch.tensor(pred_e).unsqueeze(1).repeat(1, 3) | |
if full_vector: | |
pred_vect = ( | |
np.array(df.pandora_calibrated_pos.values.tolist()) | |
# * pred_e1.numpy() | |
) | |
true_vect = ( | |
np.array(df.true_pos.values.tolist()) | |
# * torch.tensor(true_e).unsqueeze(1).repeat(1, 3).numpy() | |
) | |
pred_vect = torch.tensor(pred_vect) | |
true_vect = torch.tensor(true_vect) | |
else: | |
pred_e = df.calibrated_E.values | |
pred_e1 = torch.tensor(pred_e).unsqueeze(1).repeat(1, 3) | |
if full_vector: | |
pred_vect = ( | |
np.array(df.pred_pos_matched.values.tolist()) * pred_e1.numpy() | |
) | |
true_vect = ( | |
np.array(df.true_pos.values.tolist()) | |
# * torch.tensor(true_e).unsqueeze(1).repeat(1, 3).numpy() | |
) | |
pred_vect = torch.tensor(pred_vect) | |
true_vect = torch.tensor(true_vect) | |
true_rec = df.reco_showers_E | |
# pred_e_nocor = df.pred_showers_E[mask] | |
true_e = torch.tensor(true_e) | |
batch_idx = torch.tensor(batch_idx.values).long() | |
pred_e = torch.tensor(pred_e) | |
true_rec = torch.tensor(true_rec.values) | |
if full_vector: | |
true_p_vect = scatter_sum(true_vect, batch_idx, dim=0) | |
pred_p_vect = scatter_sum(pred_vect, batch_idx, dim=0) | |
true_e1 = scatter_sum(torch.tensor(true_e), batch_idx) | |
pred_e1 = scatter_sum(torch.tensor(pred_e), batch_idx) | |
true_e = torch.norm( | |
true_p_vect, dim=1 | |
) # This is actually momentum resolution | |
pred_e = torch.norm(pred_p_vect, dim=1) | |
else: | |
true_e = scatter_sum(true_e, batch_idx) | |
pred_e = scatter_sum(pred_e, batch_idx) | |
true_rec = scatter_sum(true_rec, batch_idx) | |
mask_above = true_e <= bin_i1 | |
mask_below = true_e > bin_i | |
mask_check = true_e > 0 | |
mask = mask_below * mask_above * mask_check | |
true_e = true_e[mask] | |
true_rec = true_rec[mask] | |
pred_e = pred_e[mask] | |
if torch.sum(mask) > 0: # if the bin is not empty | |
e_over_true = pred_e / true_e | |
e_over_reco = true_rec / true_e | |
distributions.append(e_over_true) | |
distr_baseline.append(e_over_reco) | |
( | |
mean_predtotrue, | |
var_predtotrue, | |
err_mean_predtotrue, | |
err_var_predtotrue, | |
) = get_sigma_gaussian(e_over_true, bins_per_binned_E) | |
if full_vector: | |
mass_true = torch.sqrt(true_e1[mask] ** 2 - true_e**2) | |
mass_pred = torch.sqrt(pred_e1[mask] ** 2 - pred_e**2) | |
print(pandora, len(mass_true), len(mass_pred)) | |
mass_over_true = mass_pred / mass_true | |
( | |
mean_mass, | |
var_mass, | |
_, | |
_, | |
) = get_sigma_gaussian(mass_over_true, bins_per_binned_E) | |
mass_list.append(mass_over_true) | |
( | |
mean_reco_true, | |
var_reco_true, | |
err_mean_reco_true, | |
err_var_reco_true, | |
) = get_sigma_gaussian(e_over_reco, bins_per_binned_E) | |
mean.append(mean_predtotrue) | |
variance.append(np.abs(var_predtotrue)) | |
mean_baseline.append(mean_reco_true) | |
variance_baseline.append(np.abs(var_reco_true)) | |
if full_vector: | |
mass_list = torch.cat(mass_list) | |
ret = [ | |
mean, | |
variance, | |
distributions, | |
binsx, | |
mean_baseline, | |
variance_baseline, | |
distr_baseline, | |
] | |
if full_vector: | |
ret += [mass_list] | |
else: | |
ret += [None] | |
return ret | |
def get_response_for_event_energy(matched_pandora, matched_, perfect_pid=False, mass_zero=False, ML_pid=False): | |
( | |
mean_p, | |
variance_om_p, | |
distr_p, | |
x_p, | |
_, | |
_, | |
_, | |
mass_over_true_pandora, | |
) = calculate_event_energy_resolution(matched_pandora, True, False) | |
( | |
mean, | |
variance_om, | |
distr, | |
x, | |
mean_baseline, | |
variance_om_baseline, | |
_, | |
mass_over_true_model, | |
) = calculate_event_energy_resolution(matched_, False, False) | |
mean_mass_p, var_mass_p, distr_mass_p, mass_true_p, _, _, E_over_true_pandora = calculate_event_mass_resolution(matched_pandora, True, perfect_pid=perfect_pid, mass_zero=mass_zero, ML_pid=ML_pid) | |
mean_mass, var_mass, distr_mass, mass_true, _, _, E_over_true = calculate_event_mass_resolution(matched_, False, perfect_pid=perfect_pid, mass_zero=mass_zero, ML_pid=ML_pid) | |
( | |
mean_energy_over_true, | |
var_energy_over_true, | |
_, | |
_, | |
) = get_sigma_gaussian(E_over_true, np.linspace(0, 4, 300)) | |
( | |
mean_energy_over_true_pandora, | |
var_energy_over_true_pandora, | |
_, | |
_, | |
) = get_sigma_gaussian(E_over_true_pandora, np.linspace(0, 4, 300)) | |
dic = {} | |
dic["mean_p"] = mean_p | |
dic["variance_om_p"] = variance_om_p | |
dic["variance_om"] = variance_om | |
dic["mean"] = mean | |
dic["energy_resolutions"] = x | |
dic["energy_resolutions_p"] = x_p | |
dic["mean_baseline"] = mean_baseline | |
dic["variance_om_baseline"] = variance_om_baseline | |
dic["distributions_pandora"] = distr_p | |
dic["distributions_model"] = distr | |
dic["mass_over_true_model"] = distr_mass | |
dic["mass_over_true_pandora"] = distr_mass_p | |
dic["mass_model"] = distr_mass * mass_true | |
dic["mass_pandora"] = distr_mass_p * mass_true_p | |
dic["mean_mass_model"] = mean_mass | |
dic["mean_mass_pandora"] = mean_mass_p | |
dic["var_mass_model"] = var_mass | |
dic["var_mass_pandora"] = var_mass_p | |
dic["energy_over_true"] = E_over_true | |
dic["energy_over_true_pandora"] = E_over_true_pandora | |
dic["mean_energy_over_true"] = mean_energy_over_true | |
dic["mean_energy_over_true_pandora"] = mean_energy_over_true_pandora | |
dic["var_energy_over_true"] = var_energy_over_true | |
dic["var_energy_over_true_pandora"] = var_energy_over_true_pandora | |
return dic | |
def plot_mass_resolution(event_res_dic, PATH_store): | |
fig, ax = plt.subplots(figsize=(7, 7)) | |
ax.set_xlabel(r"$m_{pred}/m_{true}$") | |
bins = np.linspace(0, 3, 100) | |
ax.hist( | |
event_res_dic["mass_over_true_model"], | |
bins=bins, | |
histtype="step", | |
label="ML $\mu$={} $\sigma/\mu$={}".format( | |
round((event_res_dic["mean_mass_model"]), 2), | |
round((event_res_dic["var_mass_model"]), 2), | |
), | |
color="red", | |
density=True, | |
) | |
ax.hist( | |
event_res_dic["mass_over_true_pandora"], | |
bins=bins, | |
histtype="step", | |
label="Pandora $\mu$={} $\sigma/\mu$={}".format( | |
round((event_res_dic["mean_mass_pandora"]), 2), | |
round((event_res_dic["var_mass_pandora"]), 2), | |
), | |
color="blue", | |
density=True, | |
) | |
ax.grid() | |
ax.legend() | |
#ax.set_xlim([0, 10]) | |
fig.tight_layout() | |
print("Saving mass resolution") | |
import os | |
fig.savefig(os.path.join(PATH_store, "mass_resolution.pdf"), bbox_inches="tight") | |
fig, ax = plt.subplots(figsize=(7, 7)) | |
ax.set_xlabel(r"$M_{reco}$") | |
bins = np.linspace(0, 3, 100) | |
ax.hist( | |
event_res_dic["mass_model"], | |
bins=bins, | |
histtype="step", | |
label="ML", | |
color="red", | |
density=True, | |
) | |
ax.hist( | |
event_res_dic["mass_pandora"], | |
bins=bins, | |
histtype="step", | |
label="Pandora", | |
color="blue", | |
density=True, | |
) | |
ax.grid() | |
ax.legend() | |
#ax.set_xlim([0, 10]) | |
fig.tight_layout() | |
fig.savefig(os.path.join(PATH_store, "mass_reco_absolute.pdf"), bbox_inches="tight") | |