Spaces:
Sleeping
Sleeping
# from https://github.com/graeme-a-stewart/antikt-python/tree/main/src/pyantikt | |
from src.jetfinder.basicjetfinder_types import NPHistory | |
from src.jetfinder.basicjetfinder_types import PseudoJet | |
from src.jetfinder.basicjetfinder_types import NPPseudoJets | |
import logging | |
import numpy as np | |
import numpy.typing as npt | |
logger = logging.getLogger("jetfinder") | |
from numba import njit | |
from copy import deepcopy | |
Invalid = -3 | |
NonexistentParent = -2 | |
BeamJet = -1 | |
def find_closest_jets(akt_dist: npt.ArrayLike, nn: npt.ArrayLike): | |
'''Look over active jets and find the closest''' | |
closest = akt_dist.argmin() | |
return akt_dist[closest], closest | |
def scan_for_all_nearest_neighbours(phi: npt.ArrayLike, rap: npt.ArrayLike, inv_pt2: npt.ArrayLike, | |
dist: npt.ArrayLike, akt_dist: npt.ArrayLike, | |
nn: npt.ArrayLike, mask: npt.ArrayLike, R2: float): | |
'''Do a full scan for nearest (geometrical) neighbours''' | |
for ijet in range(phi.size): | |
if mask[ijet]: | |
continue | |
_dphi = np.pi - np.abs(np.pi - np.abs(phi - phi[ijet])) | |
_drap = rap - rap[ijet] | |
_dist = _dphi * _dphi + _drap * _drap | |
_dist[ijet] = R2 # Avoid measuring the distance 0 to myself! | |
_dist[mask] = 1e20 # Don't consider any masked jets | |
iclosejet = _dist.argmin() | |
dist[ijet] = _dist[iclosejet] | |
if iclosejet == ijet: | |
nn[ijet] = -1 | |
akt_dist[ijet] = dist[ijet] * inv_pt2[ijet] | |
else: | |
nn[ijet] = iclosejet | |
akt_dist[ijet] = dist[ijet] * (inv_pt2[ijet] if inv_pt2[ijet] < inv_pt2[iclosejet] else inv_pt2[iclosejet]) | |
def scan_for_my_nearest_neighbours(ijet: int, phi: npt.ArrayLike, | |
rap: npt.ArrayLike, inv_pt2: npt.ArrayLike, | |
dist: npt.ArrayLike, akt_dist: npt.ArrayLike, nn: npt.ArrayLike, | |
mask: npt.ArrayLike, R2: float): | |
'''Retest all other jets against the target jet''' | |
nn[ijet] = -1 | |
dist[ijet] = R2 | |
_dphi = np.pi - np.abs(np.pi - np.abs(phi - phi[ijet])) | |
_drap = rap - rap[ijet] | |
_dist = _dphi * _dphi + _drap * _drap | |
_dist[ijet] = R2 # Avoid measuring the distance 0 to myself! | |
_dist[mask] = 1e20 # Don't consider any masked jets | |
iclosejet = _dist.argmin() | |
dist[ijet] = _dist[iclosejet] | |
if iclosejet == ijet: | |
nn[ijet] = -1 | |
akt_dist[ijet] = dist[ijet] * inv_pt2[ijet] | |
else: | |
nn[ijet] = iclosejet | |
akt_dist[ijet] = dist[ijet] * (inv_pt2[ijet] if inv_pt2[ijet] < inv_pt2[iclosejet] else inv_pt2[iclosejet]) | |
# As this function is called on new PseudoJets it's possible | |
# that we are now the NN of our NN | |
if dist[iclosejet] > dist[ijet]: | |
dist[iclosejet] = dist[ijet] | |
nn[iclosejet] = ijet | |
akt_dist[iclosejet] = dist[iclosejet] * ( | |
inv_pt2[ijet] if inv_pt2[ijet] < inv_pt2[iclosejet] else inv_pt2[iclosejet]) | |
def compare_status(working: NPPseudoJets, test: NPPseudoJets): | |
'''Test two different copies of numpy pseudojet containers that should be equal''' | |
dist_diff = working.akt_dist != test.akt_dist | |
idist_diff = np.where(dist_diff) | |
if len(idist_diff[0]) > 0: | |
print(f"Differences found after full scan of NNs: {idist_diff[0]}") | |
for ijet in idist_diff[0]: | |
print(f"{ijet}\nW: {working.print_jet(ijet)}\nT: {test.print_jet(ijet)}") | |
raise RuntimeError("Jet sets are not the same and they should be!") | |
def add_step_to_history(history: NPHistory, jets: list[PseudoJet], | |
parent1: int, parent2: int, jetp_index: int, distance: float): | |
'''Add a merging step to the history of clustering | |
history - list of HistoryElement entities | |
jets - list of pseudojets | |
parent1 - the *history* element which is the parent of this merger | |
parent2 - the *history* element which is the parent of this merger (can be Invalid) | |
jetp_index - the new pseudojet that results from this merger (if both parents exist) | |
distance - the distance metric for this merge step | |
''' | |
max_dij_so_far = max(distance, history.max_dij_so_far[history.size - 1]) | |
history.append(parent1=parent1, parent2=parent2, jetp_index=jetp_index, dij=distance, | |
max_dij_so_far=max_dij_so_far) | |
local_step = history.next - 1 | |
logger.debug(f"Added history step {local_step}: {history.parent1[local_step]}") | |
if parent1 >= 0: | |
if history.child[parent1] != -1: | |
raise ( | |
RuntimeError( | |
f"Internal error. Trying to recombine a parent1 object that has previsously been recombined: {parent1}" | |
) | |
) | |
history.child[parent1] = local_step | |
if parent2 >= 0: | |
if history.child[parent2] != -1: | |
raise ( | |
RuntimeError( | |
f"Internal error. Trying to recombine a parent1 object that has previsously been recombined: {parent2}" | |
) | |
) | |
history.child[parent2] = local_step | |
# get cross-referencing right from PseudoJets | |
if jetp_index >= 0: | |
jets[jetp_index].cluster_history_index = local_step | |
def inclusive_jets(jets: list[PseudoJet], history: NPHistory, ptmin: float = 0.0): | |
'''return all inclusive jets of a ClusterSequence with pt > ptmin''' | |
dcut = ptmin * ptmin | |
jets_local = list() | |
# For inclusive jets with a plugin algorithm, we make no | |
# assumptions about anything (relation of dij to momenta, | |
# ordering of the dij, etc.) | |
for elt in range(history.size - 1, -1, -1): | |
if history.parent2[elt] != BeamJet: | |
continue | |
iparent_jet = history.jetp_index[history.parent1[elt]] | |
jet = jets[iparent_jet] | |
if jet.pt2 >= dcut: | |
jets_local.append(jet) | |
return jets_local | |
def basicjetfinder(initial_particles: list[PseudoJet], Rparam: float = 0.8, ptmin: float = 0.0, return_raw=False): | |
"""Basic AntiKt Jet finding code""" | |
R2 = Rparam * Rparam | |
invR2 = 1.0 / R2 | |
# Create a container of PseudoJet objects | |
history = NPHistory(2 * len(initial_particles)) | |
Qtot = history.fill_initial_history(initial_particles) | |
# Was doing a deepcopy here, but that turns out to be | |
# 1. unnecessary | |
# 2. extremely expensive | |
jets = initial_particles | |
# Create the numpy arrays corresponding to the pseudojets that will be used | |
# for fast calculations | |
npjets = NPPseudoJets(len(jets)) | |
npjets.set_jets(jets) | |
# Setup the nearest neighbours, which is an expensive | |
# initial operation (N^2 scaling here) | |
scan_for_all_nearest_neighbours(npjets.phi, npjets.rap, npjets.inv_pt2, | |
npjets.dist, npjets.akt_dist, npjets.nn, | |
npjets.mask, R2) | |
# Each iteration we either merge two jets to one, or we | |
# finalise a jet. Thus it takes a number of iterations | |
# equal to the number of jets to finish | |
for iteration in range(len(initial_particles)): | |
distance, ijetA = find_closest_jets(npjets.akt_dist, npjets.nn) | |
ijetB = npjets.nn[ijetA] | |
# Add normalisation for real distance | |
distance *= invR2 | |
if (ijetB >= 0): | |
if ijetB < ijetA: | |
ijetA, ijetB = ijetB, ijetA | |
logger.debug(f"Iteration {iteration + 1}: {distance} for jet {ijetA} and jet {ijetB}") | |
# Merge jets | |
npjets.mask_slot(ijetA) | |
npjets.mask_slot(ijetB) | |
jet_indexA = npjets.jets_index[ijetA] | |
jet_indexB = npjets.jets_index[ijetB] | |
merged_jet = jets[jet_indexA] + jets[jet_indexB] | |
imerged_jet = len(jets) | |
jets.append(merged_jet) | |
# We recycle the slot of jetA (which is the lowest slot) | |
npjets.insert_jet(merged_jet, slot=ijetA, jet_index=imerged_jet) | |
add_step_to_history(history=history, jets=jets, | |
parent1=jets[jet_indexA].cluster_history_index, | |
parent2=jets[jet_indexB].cluster_history_index, | |
jetp_index=imerged_jet, distance=distance) | |
# Get the NNs for the merged pseudojet | |
scan_for_my_nearest_neighbours(ijetA, npjets.phi, npjets.rap, npjets.inv_pt2, | |
npjets.dist, npjets.akt_dist, npjets.nn, npjets.mask, R2) | |
else: | |
logger.debug(f"Iteration {iteration + 1}: {distance} for jet {ijetA} and jet {ijetB}") | |
# Beamjet | |
npjets.mask_slot(ijetA) | |
jet_indexA = npjets.jets_index[ijetA] | |
add_step_to_history(history=history, jets=jets, parent1=jets[jet_indexA].cluster_history_index, | |
parent2=BeamJet, | |
jetp_index=Invalid, distance=distance) | |
# Now need to update nearest distances, when pseudojets are unmasked and | |
# had either jetA or jetB as their nearest neighbour | |
# Note, it doesn't matter that we reused the ijetA slot here! | |
if ijetB != -1: | |
jets_to_update = np.logical_and(~npjets.mask, np.logical_or(npjets.nn == ijetA, npjets.nn == ijetB)) | |
else: | |
jets_to_update = np.logical_and(~npjets.mask, npjets.nn == ijetA) | |
ijets_to_update = np.where(jets_to_update) | |
# Doable without actually needing a loop? | |
for ijet_to_update in ijets_to_update[0]: | |
scan_for_my_nearest_neighbours(ijet_to_update, npjets.phi, npjets.rap, npjets.inv_pt2, | |
npjets.dist, npjets.akt_dist, npjets.nn, npjets.mask, R2) | |
# Useful to check that we have done all updates correctly (only for debug!) | |
if logger.level == logging.DEBUG: | |
npjets_copy = deepcopy(npjets) | |
scan_for_all_nearest_neighbours(npjets_copy.phi, npjets_copy.rap, npjets_copy.inv_pt2, | |
npjets_copy.dist, npjets.akt_dist, npjets_copy.nn, npjets_copy.mask, R2) | |
compare_status(npjets, npjets_copy) | |
if return_raw: | |
return jets, history | |
return inclusive_jets(jets, history, ptmin=ptmin) | |