# from https://github.com/graeme-a-stewart/antikt-python/tree/main/src/pyantikt from src.jetfinder.basicjetfinder_types import NPHistory from src.jetfinder.basicjetfinder_types import PseudoJet from src.jetfinder.basicjetfinder_types import NPPseudoJets import logging import numpy as np import numpy.typing as npt logger = logging.getLogger("jetfinder") from numba import njit from copy import deepcopy Invalid = -3 NonexistentParent = -2 BeamJet = -1 def find_closest_jets(akt_dist: npt.ArrayLike, nn: npt.ArrayLike): '''Look over active jets and find the closest''' closest = akt_dist.argmin() return akt_dist[closest], closest def scan_for_all_nearest_neighbours(phi: npt.ArrayLike, rap: npt.ArrayLike, inv_pt2: npt.ArrayLike, dist: npt.ArrayLike, akt_dist: npt.ArrayLike, nn: npt.ArrayLike, mask: npt.ArrayLike, R2: float): '''Do a full scan for nearest (geometrical) neighbours''' for ijet in range(phi.size): if mask[ijet]: continue _dphi = np.pi - np.abs(np.pi - np.abs(phi - phi[ijet])) _drap = rap - rap[ijet] _dist = _dphi * _dphi + _drap * _drap _dist[ijet] = R2 # Avoid measuring the distance 0 to myself! _dist[mask] = 1e20 # Don't consider any masked jets iclosejet = _dist.argmin() dist[ijet] = _dist[iclosejet] if iclosejet == ijet: nn[ijet] = -1 akt_dist[ijet] = dist[ijet] * inv_pt2[ijet] else: nn[ijet] = iclosejet akt_dist[ijet] = dist[ijet] * (inv_pt2[ijet] if inv_pt2[ijet] < inv_pt2[iclosejet] else inv_pt2[iclosejet]) def scan_for_my_nearest_neighbours(ijet: int, phi: npt.ArrayLike, rap: npt.ArrayLike, inv_pt2: npt.ArrayLike, dist: npt.ArrayLike, akt_dist: npt.ArrayLike, nn: npt.ArrayLike, mask: npt.ArrayLike, R2: float): '''Retest all other jets against the target jet''' nn[ijet] = -1 dist[ijet] = R2 _dphi = np.pi - np.abs(np.pi - np.abs(phi - phi[ijet])) _drap = rap - rap[ijet] _dist = _dphi * _dphi + _drap * _drap _dist[ijet] = R2 # Avoid measuring the distance 0 to myself! _dist[mask] = 1e20 # Don't consider any masked jets iclosejet = _dist.argmin() dist[ijet] = _dist[iclosejet] if iclosejet == ijet: nn[ijet] = -1 akt_dist[ijet] = dist[ijet] * inv_pt2[ijet] else: nn[ijet] = iclosejet akt_dist[ijet] = dist[ijet] * (inv_pt2[ijet] if inv_pt2[ijet] < inv_pt2[iclosejet] else inv_pt2[iclosejet]) # As this function is called on new PseudoJets it's possible # that we are now the NN of our NN if dist[iclosejet] > dist[ijet]: dist[iclosejet] = dist[ijet] nn[iclosejet] = ijet akt_dist[iclosejet] = dist[iclosejet] * ( inv_pt2[ijet] if inv_pt2[ijet] < inv_pt2[iclosejet] else inv_pt2[iclosejet]) def compare_status(working: NPPseudoJets, test: NPPseudoJets): '''Test two different copies of numpy pseudojet containers that should be equal''' dist_diff = working.akt_dist != test.akt_dist idist_diff = np.where(dist_diff) if len(idist_diff[0]) > 0: print(f"Differences found after full scan of NNs: {idist_diff[0]}") for ijet in idist_diff[0]: print(f"{ijet}\nW: {working.print_jet(ijet)}\nT: {test.print_jet(ijet)}") raise RuntimeError("Jet sets are not the same and they should be!") def add_step_to_history(history: NPHistory, jets: list[PseudoJet], parent1: int, parent2: int, jetp_index: int, distance: float): '''Add a merging step to the history of clustering history - list of HistoryElement entities jets - list of pseudojets parent1 - the *history* element which is the parent of this merger parent2 - the *history* element which is the parent of this merger (can be Invalid) jetp_index - the new pseudojet that results from this merger (if both parents exist) distance - the distance metric for this merge step ''' max_dij_so_far = max(distance, history.max_dij_so_far[history.size - 1]) history.append(parent1=parent1, parent2=parent2, jetp_index=jetp_index, dij=distance, max_dij_so_far=max_dij_so_far) local_step = history.next - 1 logger.debug(f"Added history step {local_step}: {history.parent1[local_step]}") if parent1 >= 0: if history.child[parent1] != -1: raise ( RuntimeError( f"Internal error. Trying to recombine a parent1 object that has previsously been recombined: {parent1}" ) ) history.child[parent1] = local_step if parent2 >= 0: if history.child[parent2] != -1: raise ( RuntimeError( f"Internal error. Trying to recombine a parent1 object that has previsously been recombined: {parent2}" ) ) history.child[parent2] = local_step # get cross-referencing right from PseudoJets if jetp_index >= 0: jets[jetp_index].cluster_history_index = local_step def inclusive_jets(jets: list[PseudoJet], history: NPHistory, ptmin: float = 0.0): '''return all inclusive jets of a ClusterSequence with pt > ptmin''' dcut = ptmin * ptmin jets_local = list() # For inclusive jets with a plugin algorithm, we make no # assumptions about anything (relation of dij to momenta, # ordering of the dij, etc.) for elt in range(history.size - 1, -1, -1): if history.parent2[elt] != BeamJet: continue iparent_jet = history.jetp_index[history.parent1[elt]] jet = jets[iparent_jet] if jet.pt2 >= dcut: jets_local.append(jet) return jets_local def basicjetfinder(initial_particles: list[PseudoJet], Rparam: float = 0.8, ptmin: float = 0.0, return_raw=False): """Basic AntiKt Jet finding code""" R2 = Rparam * Rparam invR2 = 1.0 / R2 # Create a container of PseudoJet objects history = NPHistory(2 * len(initial_particles)) Qtot = history.fill_initial_history(initial_particles) # Was doing a deepcopy here, but that turns out to be # 1. unnecessary # 2. extremely expensive jets = initial_particles # Create the numpy arrays corresponding to the pseudojets that will be used # for fast calculations npjets = NPPseudoJets(len(jets)) npjets.set_jets(jets) # Setup the nearest neighbours, which is an expensive # initial operation (N^2 scaling here) scan_for_all_nearest_neighbours(npjets.phi, npjets.rap, npjets.inv_pt2, npjets.dist, npjets.akt_dist, npjets.nn, npjets.mask, R2) # Each iteration we either merge two jets to one, or we # finalise a jet. Thus it takes a number of iterations # equal to the number of jets to finish for iteration in range(len(initial_particles)): distance, ijetA = find_closest_jets(npjets.akt_dist, npjets.nn) ijetB = npjets.nn[ijetA] # Add normalisation for real distance distance *= invR2 if (ijetB >= 0): if ijetB < ijetA: ijetA, ijetB = ijetB, ijetA logger.debug(f"Iteration {iteration + 1}: {distance} for jet {ijetA} and jet {ijetB}") # Merge jets npjets.mask_slot(ijetA) npjets.mask_slot(ijetB) jet_indexA = npjets.jets_index[ijetA] jet_indexB = npjets.jets_index[ijetB] merged_jet = jets[jet_indexA] + jets[jet_indexB] imerged_jet = len(jets) jets.append(merged_jet) # We recycle the slot of jetA (which is the lowest slot) npjets.insert_jet(merged_jet, slot=ijetA, jet_index=imerged_jet) add_step_to_history(history=history, jets=jets, parent1=jets[jet_indexA].cluster_history_index, parent2=jets[jet_indexB].cluster_history_index, jetp_index=imerged_jet, distance=distance) # Get the NNs for the merged pseudojet scan_for_my_nearest_neighbours(ijetA, npjets.phi, npjets.rap, npjets.inv_pt2, npjets.dist, npjets.akt_dist, npjets.nn, npjets.mask, R2) else: logger.debug(f"Iteration {iteration + 1}: {distance} for jet {ijetA} and jet {ijetB}") # Beamjet npjets.mask_slot(ijetA) jet_indexA = npjets.jets_index[ijetA] add_step_to_history(history=history, jets=jets, parent1=jets[jet_indexA].cluster_history_index, parent2=BeamJet, jetp_index=Invalid, distance=distance) # Now need to update nearest distances, when pseudojets are unmasked and # had either jetA or jetB as their nearest neighbour # Note, it doesn't matter that we reused the ijetA slot here! if ijetB != -1: jets_to_update = np.logical_and(~npjets.mask, np.logical_or(npjets.nn == ijetA, npjets.nn == ijetB)) else: jets_to_update = np.logical_and(~npjets.mask, npjets.nn == ijetA) ijets_to_update = np.where(jets_to_update) # Doable without actually needing a loop? for ijet_to_update in ijets_to_update[0]: scan_for_my_nearest_neighbours(ijet_to_update, npjets.phi, npjets.rap, npjets.inv_pt2, npjets.dist, npjets.akt_dist, npjets.nn, npjets.mask, R2) # Useful to check that we have done all updates correctly (only for debug!) if logger.level == logging.DEBUG: npjets_copy = deepcopy(npjets) scan_for_all_nearest_neighbours(npjets_copy.phi, npjets_copy.rap, npjets_copy.inv_pt2, npjets_copy.dist, npjets.akt_dist, npjets_copy.nn, npjets_copy.mask, R2) compare_status(npjets, npjets_copy) if return_raw: return jets, history return inclusive_jets(jets, history, ptmin=ptmin)