Spaces:
Sleeping
Sleeping
import matplotlib | |
import torch | |
#matplotlib.rc("font", size=25) | |
import numpy as np | |
from scipy import stats | |
from scipy.optimize import curve_fit | |
from scipy import asarray as ar, exp | |
def calculate_eff(sd, log_scale=False, pandora=False): | |
if log_scale: | |
bins = np.exp(np.arange(np.log(0.1), np.log(80), 0.3)) | |
else: | |
bins = np.arange(0, 51, 5) | |
eff = [] | |
energy_eff = [] | |
for i in range(len(bins) - 1): | |
bin_i = bins[i] | |
bin_i1 = bins[i + 1] | |
mask_above = sd.reco_showers_E.values <= bin_i1 | |
mask_below = sd.reco_showers_E.values > bin_i | |
mask = mask_below * mask_above | |
number_of_non_reconstructed_showers = np.sum( | |
np.isnan(sd.pred_showers_E.values)[mask] | |
) | |
total_showers = len(sd.true_showers_E.values[mask]) | |
if pandora: | |
number_of_non_reconstructed_showers = np.sum( | |
np.isnan(sd.pandora_calibrated_E.values)[mask] | |
) | |
total_showers = len(sd.pandora_calibrated_E.values[mask]) | |
if total_showers > 0: | |
eff.append( | |
(total_showers - number_of_non_reconstructed_showers) / total_showers | |
) | |
energy_eff.append((bin_i1 + bin_i) / 2) | |
return eff, energy_eff | |
def calculate_fakes(sd, matched, log_scale=False, pandora=False): | |
if log_scale: | |
bins_fakes = np.exp(np.arange(np.log(0.1), np.log(80), 0.3)) | |
else: | |
bins_fakes = np.linspace(0, 51, 5) | |
fake_rate = [] | |
energy_fakes = [] | |
fake_percent_energy = [] | |
total_true_showers = np.sum( | |
~np.isnan(sd.true_showers_E.values) | |
) # the ones where truthHitAssignedEnergies is not nan | |
for i in range(len(bins_fakes) - 1): | |
bin_i = bins_fakes[i] | |
bin_i1 = bins_fakes[i + 1] | |
if pandora: | |
mask_above = sd.pred_showers_E.values <= bin_i1 | |
mask_below = sd.pred_showers_E.values > bin_i | |
mask = mask_below * mask_above | |
fakes = np.sum(np.isnan(sd.pid)[mask]) | |
non_fakes_mask = ~np.isnan(sd.pid)[mask] | |
fakes_mask = np.isnan(sd.pid)[mask] | |
energy_in_fakes = np.sum(sd.pandora_calibrated_pfo[mask].values[fakes_mask]) | |
total_energy_true = np.sum(sd.true_showers_E.values[mask][non_fakes_mask]) | |
total_showers = len(sd.pred_showers_E.values[mask]) | |
else: | |
mask_above = sd.pred_showers_E.values <= bin_i1 | |
mask_below = sd.pred_showers_E.values > bin_i | |
mask = mask_below * mask_above | |
fakes = np.sum(np.isnan(sd.pid)[mask]) | |
total_showers = len(sd.pred_showers_E.values[mask]) | |
fakes_mask = np.isnan(sd.pid)[mask] | |
energy_in_fakes = np.sum(sd.pred_showers_E[mask].values[fakes_mask]) | |
non_fakes_mask = ~np.isnan(sd.pid)[mask] | |
total_energy_true = np.sum(sd.true_showers_E.values[mask][non_fakes_mask]) | |
if total_showers > 0: | |
# print(fakes, np.mean(sd.pred_energy_hits_raw[mask])) | |
fake_rate.append(fakes / total_true_showers) | |
energy_fakes.append((bin_i1 + bin_i) / 2) | |
fake_percent_energy.append(energy_in_fakes / total_energy_true) | |
return fake_rate, energy_fakes, fake_percent_energy | |
def calculate_response(matched, pandora, log_scale=False): | |
if log_scale: | |
bins = np.exp(np.arange(np.log(0.1), np.log(80), 0.3)) | |
else: | |
bins = np.arange(0, 51, 2) | |
bins_plot_histogram = [5, 6, 10, 20] | |
if pandora: | |
bins_per_binned_E = np.arange(0, 3, 0.001) | |
else: | |
bins_per_binned_E = np.arange(0, 3, 0.001) | |
mean = [] | |
variance_om = [] | |
mean_true_rec = [] | |
variance_om_true_rec = [] | |
energy_resolutions = [] | |
energy_resolutions_reco = [] | |
dic_histograms = {} | |
for i in range(len(bins) - 1): | |
bin_i = bins[i] | |
bin_i1 = bins[i + 1] | |
mask_above = ( | |
matched["reco_showers_E"] <= bin_i1 | |
) # true_showers_E, reco_showers_E | |
mask_below = matched["reco_showers_E"] > bin_i | |
mask_check = matched["pred_showers_E"] > 0 | |
mask = mask_below * mask_above * mask_check | |
pred_e = matched.calibrated_E[mask] | |
true_rec = matched.reco_showers_E[mask] | |
true_e = matched.true_showers_E[mask] | |
if pandora: | |
pred_e_corrected = matched.pandora_calibrated_E[mask] | |
else: | |
pred_e_corrected = matched.calibrated_E[mask] | |
if np.sum(mask) > 0: # if the bin is not empty | |
e_over_rec = pred_e / true_rec | |
if i in bins_plot_histogram: | |
dic_histograms[str(i) + "reco"] = e_over_rec | |
dic_histograms[str(i) + "reco_baseline"] = true_rec | |
dic_histograms[str(i) + "pred_corr_e"] = pred_e_corrected | |
dic_histograms[str(i) + "true_baseline"] = true_e | |
dic_histograms[str(i) + "pred_e"] = pred_e | |
mean_predtored, variance_om_true_rec_ = obtain_MPV_and_68( | |
e_over_rec, bins_per_binned_E | |
) | |
# mean_predtored = np.mean(e_over_rec) | |
# variance_om_true_rec_ = np.var(e_over_rec) / mean_predtored | |
mean_true_rec.append(mean_predtored) | |
variance_om_true_rec.append(variance_om_true_rec_) | |
energy_resolutions_reco.append((bin_i1 + bin_i) / 2) | |
# TODO change the pred_showers_E to the pandora calibrated E and the calibrated E for the model pandora_calibrated_E | |
if pandora: | |
bins_per_binned_E = np.arange(0, 3, 0.005) | |
else: | |
bins_per_binned_E = np.arange(0, 3, 0.005) | |
for i in range(len(bins) - 1): | |
bin_i = bins[i] | |
bin_i1 = bins[i + 1] | |
mask_above = matched["true_showers_E"] <= bin_i1 | |
mask_below = matched["true_showers_E"] > bin_i | |
mask_check = matched["pred_showers_E"] > 0 | |
mask = mask_below * mask_above * mask_check | |
true_e = matched.true_showers_E[mask] | |
true_rec = matched.reco_showers_E[mask] | |
if pandora: | |
pred_e = matched.pandora_calibrated_E[mask] | |
else: | |
pred_e = matched.calibrated_E[mask] | |
if np.sum(mask) > 0: # if the bin is not empty | |
e_over_true = pred_e / true_e | |
e_rec_over_true = true_rec / true_e | |
if i in bins_plot_histogram: | |
dic_histograms[str(i) + "true"] = e_over_true | |
dic_histograms[str(i) + "reco_showers"] = e_rec_over_true | |
mean_predtotrue, var_predtotrue = obtain_MPV_and_68( | |
e_over_true, bins_per_binned_E | |
) | |
# mean_predtotrue, var_predtotrue = get_sigma_gaussian(e_over_true,bins_per_binned_E) | |
# mean_predtotrue = np.mean(e_over_true) | |
# var_predtotrue = np.var(e_over_true) / mean_predtotrue | |
print( | |
"bin i ", | |
bins[i], | |
mean_predtotrue, | |
var_predtotrue, | |
np.mean(e_over_true), | |
np.var(e_over_true) / np.mean(e_over_true), | |
) | |
mean.append(mean_predtotrue) | |
variance_om.append(var_predtotrue) | |
energy_resolutions.append((bin_i1 + bin_i) / 2) | |
return ( | |
mean, | |
variance_om, | |
mean_true_rec, | |
variance_om_true_rec, | |
energy_resolutions, | |
energy_resolutions_reco, | |
dic_histograms, | |
) | |
def get_sigma_gaussian(e_over_reco, bins_per_binned_E): | |
hist, bin_edges = np.histogram(e_over_reco, bins=bins_per_binned_E, density=True) | |
# Calculating the Gaussian PDF values given Gaussian parameters and random variable X | |
def gaus(X, C, X_mean, sigma): | |
return C * exp(-((X - X_mean) ** 2) / (2 * sigma**2)) | |
n = len(hist) | |
x_hist = np.zeros((n), dtype=float) | |
for ii in range(n): | |
x_hist[ii] = (bin_edges[ii + 1] + bin_edges[ii]) / 2 | |
y_hist = hist | |
if (torch.tensor(hist) == 0).all(): | |
return 0,0 | |
mean = sum(x_hist * y_hist) / sum(y_hist) | |
sigma = sum(y_hist * (x_hist - mean) ** 2) / sum(y_hist) | |
# cut 1% of highest vals | |
#e_over_reco_filtered = np.sort(e_over_reco) | |
#e_over_reco_filtered = e_over_reco_filtered[:int(len(e_over_reco_filtered) * 0.99)] | |
#mean = np.mean(e_over_reco_filtered) | |
#sigma = np.std(e_over_reco_filtered) | |
try: | |
param_optimised, param_covariance_matrix = curve_fit( | |
gaus, x_hist, y_hist, p0=[max(y_hist), mean, sigma], maxfev=10000 | |
) | |
except: | |
print("Error! Using this") | |
return mean, sigma/mean, 0.001, 0.001 # dummy errors temporarily | |
if param_optimised[2] < 0: | |
param_optimised[2] = sigma | |
if param_optimised[1] < 0: | |
param_optimised[1] = mean # due to some weird fitting errors | |
#assert param_optimised[1] >= 0 | |
#assert param_optimised[2] >= 0 | |
errors = np.sqrt(np.diag(param_covariance_matrix)) | |
# sigma_over_E_error = errors[2] / param_optimised[1] | |
return param_optimised[1], param_optimised[2] / param_optimised[1], errors[1], errors[2] / param_optimised[1] | |
def obtain_MPV_and_68(data_for_hist, bins_per_binned_E, epsilon=0.0001): | |
hist, bin_edges = np.histogram(data_for_hist, bins=bins_per_binned_E, density=True) | |
ind_max_hist = np.argmax(hist) | |
MPV = (bin_edges[ind_max_hist] + bin_edges[ind_max_hist + 1]) / 2 | |
std68, low, high = get_std68(hist, bin_edges, epsilon=epsilon) | |
return MPV, std68 / MPV | |
def get_std68(theHist, bin_edges, percentage=0.683, epsilon=0.01): | |
# theHist, bin_edges = np.histogram(data_for_hist, bins=bins, density=True) | |
wmin = 0.2 | |
wmax = 1.0 | |
weight = 0.0 | |
points = [] | |
sums = [] | |
# fill list of bin centers and the integral up to those point | |
for i in range(len(bin_edges) - 1): | |
weight += theHist[i] * (bin_edges[i + 1] - bin_edges[i]) | |
points.append([(bin_edges[i + 1] + bin_edges[i]) / 2, weight]) | |
sums.append(weight) | |
low = wmin | |
high = wmax | |
width = 100 | |
for i in range(len(points)): | |
for j in range(i, len(points)): | |
wy = points[j][1] - points[i][1] | |
if abs(wy - percentage) < epsilon: | |
wx = points[j][0] - points[i][0] | |
if wx < width: | |
low = points[i][0] | |
high = points[j][0] | |
width = wx | |
# ii = i | |
# jj = j | |
return 0.5 * (high - low), low, high | |
def calculate_purity_containment(matched, log_scale=False): | |
if log_scale: | |
bins = np.exp(np.arange(np.log(0.1), np.log(80), 0.3)) | |
else: | |
bins = np.arange(0, 51, 2) | |
fce_energy = [] | |
fce_var_energy = [] | |
energy_ms = [] | |
purity_energy = [] | |
purity_var_energy = [] | |
fce = matched["e_pred_and_truth"] / matched["reco_showers_E"] | |
purity = matched["e_pred_and_truth"] / matched["pred_showers_E"] | |
for i in range(len(bins) - 1): | |
bin_i = bins[i] | |
bin_i1 = bins[i + 1] | |
mask_above = matched["reco_showers_E"] <= bin_i1 | |
mask_below = matched["reco_showers_E"] > bin_i | |
mask_check = matched["pred_showers_E"] > 0 | |
mask = mask_below * mask_above * mask_check | |
fce_e = np.mean(fce[mask]) | |
fce_var = np.var(fce[mask]) | |
purity_e = np.mean(purity[mask]) | |
purity_var = np.var(purity[mask]) | |
if np.sum(mask) > 0: | |
fce_energy.append(fce_e) | |
fce_var_energy.append(fce_var) | |
energy_ms.append((bin_i1 + bin_i) / 2) | |
purity_energy.append(purity_e) | |
purity_var_energy.append(purity_var) | |
return ( | |
fce_energy, | |
fce_var_energy, | |
energy_ms, | |
purity_energy, | |
purity_var_energy, | |
) | |
def obtain_metrics(sd, matched, pandora=False, log_scale=False): | |
eff, energy_eff = calculate_eff(sd, log_scale) | |
fake_rate, energy_fakes = calculate_fakes(sd, matched, log_scale) | |
( | |
mean, | |
variance_om, | |
mean_true_rec, | |
variance_om_true_rec, | |
energy_resolutions, | |
energy_resolutions_reco, | |
dic_histograms, | |
) = calculate_response(matched, pandora, log_scale) | |
( | |
fce_energy, | |
fce_var_energy, | |
energy_ms, | |
purity_energy, | |
purity_var_energy, | |
) = calculate_purity_containment(matched, log_scale) | |
dict = { | |
"energy_eff": energy_eff, | |
"eff": eff, | |
"energy_fakes": energy_fakes, | |
"fake_rate": fake_rate, | |
"mean": mean, | |
"variance_om": variance_om, | |
"mean_true_rec": mean_true_rec, | |
"variance_om_true_rec": variance_om_true_rec, | |
"fce_energy": fce_energy, | |
"fce_var_energy": fce_var_energy, | |
"energy_ms": energy_ms, | |
"purity_energy": purity_energy, | |
"purity_var_energy": purity_var_energy, | |
"energy_resolutions": energy_resolutions, | |
"energy_resolutions_reco": energy_resolutions_reco, | |
"dic_histograms": dic_histograms, | |
} | |
return dict | |