jetclustering / src /jetfinder /basicjetfinder.py
gregorkrzmanc's picture
.
e75a247
raw
history blame
10.3 kB
# from https://github.com/graeme-a-stewart/antikt-python/tree/main/src/pyantikt
from src.jetfinder.basicjetfinder_types import NPHistory
from src.jetfinder.basicjetfinder_types import PseudoJet
from src.jetfinder.basicjetfinder_types import NPPseudoJets
import logging
import numpy as np
import numpy.typing as npt
logger = logging.getLogger("jetfinder")
from numba import njit
from copy import deepcopy
Invalid = -3
NonexistentParent = -2
BeamJet = -1
def find_closest_jets(akt_dist: npt.ArrayLike, nn: npt.ArrayLike):
'''Look over active jets and find the closest'''
closest = akt_dist.argmin()
return akt_dist[closest], closest
def scan_for_all_nearest_neighbours(phi: npt.ArrayLike, rap: npt.ArrayLike, inv_pt2: npt.ArrayLike,
dist: npt.ArrayLike, akt_dist: npt.ArrayLike,
nn: npt.ArrayLike, mask: npt.ArrayLike, R2: float):
'''Do a full scan for nearest (geometrical) neighbours'''
for ijet in range(phi.size):
if mask[ijet]:
continue
_dphi = np.pi - np.abs(np.pi - np.abs(phi - phi[ijet]))
_drap = rap - rap[ijet]
_dist = _dphi * _dphi + _drap * _drap
_dist[ijet] = R2 # Avoid measuring the distance 0 to myself!
_dist[mask] = 1e20 # Don't consider any masked jets
iclosejet = _dist.argmin()
dist[ijet] = _dist[iclosejet]
if iclosejet == ijet:
nn[ijet] = -1
akt_dist[ijet] = dist[ijet] * inv_pt2[ijet]
else:
nn[ijet] = iclosejet
akt_dist[ijet] = dist[ijet] * (inv_pt2[ijet] if inv_pt2[ijet] < inv_pt2[iclosejet] else inv_pt2[iclosejet])
def scan_for_my_nearest_neighbours(ijet: int, phi: npt.ArrayLike,
rap: npt.ArrayLike, inv_pt2: npt.ArrayLike,
dist: npt.ArrayLike, akt_dist: npt.ArrayLike, nn: npt.ArrayLike,
mask: npt.ArrayLike, R2: float):
'''Retest all other jets against the target jet'''
nn[ijet] = -1
dist[ijet] = R2
_dphi = np.pi - np.abs(np.pi - np.abs(phi - phi[ijet]))
_drap = rap - rap[ijet]
_dist = _dphi * _dphi + _drap * _drap
_dist[ijet] = R2 # Avoid measuring the distance 0 to myself!
_dist[mask] = 1e20 # Don't consider any masked jets
iclosejet = _dist.argmin()
dist[ijet] = _dist[iclosejet]
if iclosejet == ijet:
nn[ijet] = -1
akt_dist[ijet] = dist[ijet] * inv_pt2[ijet]
else:
nn[ijet] = iclosejet
akt_dist[ijet] = dist[ijet] * (inv_pt2[ijet] if inv_pt2[ijet] < inv_pt2[iclosejet] else inv_pt2[iclosejet])
# As this function is called on new PseudoJets it's possible
# that we are now the NN of our NN
if dist[iclosejet] > dist[ijet]:
dist[iclosejet] = dist[ijet]
nn[iclosejet] = ijet
akt_dist[iclosejet] = dist[iclosejet] * (
inv_pt2[ijet] if inv_pt2[ijet] < inv_pt2[iclosejet] else inv_pt2[iclosejet])
def compare_status(working: NPPseudoJets, test: NPPseudoJets):
'''Test two different copies of numpy pseudojet containers that should be equal'''
dist_diff = working.akt_dist != test.akt_dist
idist_diff = np.where(dist_diff)
if len(idist_diff[0]) > 0:
print(f"Differences found after full scan of NNs: {idist_diff[0]}")
for ijet in idist_diff[0]:
print(f"{ijet}\nW: {working.print_jet(ijet)}\nT: {test.print_jet(ijet)}")
raise RuntimeError("Jet sets are not the same and they should be!")
def add_step_to_history(history: NPHistory, jets: list[PseudoJet],
parent1: int, parent2: int, jetp_index: int, distance: float):
'''Add a merging step to the history of clustering
history - list of HistoryElement entities
jets - list of pseudojets
parent1 - the *history* element which is the parent of this merger
parent2 - the *history* element which is the parent of this merger (can be Invalid)
jetp_index - the new pseudojet that results from this merger (if both parents exist)
distance - the distance metric for this merge step
'''
max_dij_so_far = max(distance, history.max_dij_so_far[history.size - 1])
history.append(parent1=parent1, parent2=parent2, jetp_index=jetp_index, dij=distance,
max_dij_so_far=max_dij_so_far)
local_step = history.next - 1
logger.debug(f"Added history step {local_step}: {history.parent1[local_step]}")
if parent1 >= 0:
if history.child[parent1] != -1:
raise (
RuntimeError(
f"Internal error. Trying to recombine a parent1 object that has previsously been recombined: {parent1}"
)
)
history.child[parent1] = local_step
if parent2 >= 0:
if history.child[parent2] != -1:
raise (
RuntimeError(
f"Internal error. Trying to recombine a parent1 object that has previsously been recombined: {parent2}"
)
)
history.child[parent2] = local_step
# get cross-referencing right from PseudoJets
if jetp_index >= 0:
jets[jetp_index].cluster_history_index = local_step
def inclusive_jets(jets: list[PseudoJet], history: NPHistory, ptmin: float = 0.0):
'''return all inclusive jets of a ClusterSequence with pt > ptmin'''
dcut = ptmin * ptmin
jets_local = list()
# For inclusive jets with a plugin algorithm, we make no
# assumptions about anything (relation of dij to momenta,
# ordering of the dij, etc.)
for elt in range(history.size - 1, -1, -1):
if history.parent2[elt] != BeamJet:
continue
iparent_jet = history.jetp_index[history.parent1[elt]]
jet = jets[iparent_jet]
if jet.pt2 >= dcut:
jets_local.append(jet)
return jets_local
def basicjetfinder(initial_particles: list[PseudoJet], Rparam: float = 0.8, ptmin: float = 0.0, return_raw=False):
"""Basic AntiKt Jet finding code"""
R2 = Rparam * Rparam
invR2 = 1.0 / R2
# Create a container of PseudoJet objects
history = NPHistory(2 * len(initial_particles))
Qtot = history.fill_initial_history(initial_particles)
# Was doing a deepcopy here, but that turns out to be
# 1. unnecessary
# 2. extremely expensive
jets = initial_particles
# Create the numpy arrays corresponding to the pseudojets that will be used
# for fast calculations
npjets = NPPseudoJets(len(jets))
npjets.set_jets(jets)
# Setup the nearest neighbours, which is an expensive
# initial operation (N^2 scaling here)
scan_for_all_nearest_neighbours(npjets.phi, npjets.rap, npjets.inv_pt2,
npjets.dist, npjets.akt_dist, npjets.nn,
npjets.mask, R2)
# Each iteration we either merge two jets to one, or we
# finalise a jet. Thus it takes a number of iterations
# equal to the number of jets to finish
for iteration in range(len(initial_particles)):
distance, ijetA = find_closest_jets(npjets.akt_dist, npjets.nn)
ijetB = npjets.nn[ijetA]
# Add normalisation for real distance
distance *= invR2
if (ijetB >= 0):
if ijetB < ijetA:
ijetA, ijetB = ijetB, ijetA
logger.debug(f"Iteration {iteration + 1}: {distance} for jet {ijetA} and jet {ijetB}")
# Merge jets
npjets.mask_slot(ijetA)
npjets.mask_slot(ijetB)
jet_indexA = npjets.jets_index[ijetA]
jet_indexB = npjets.jets_index[ijetB]
merged_jet = jets[jet_indexA] + jets[jet_indexB]
imerged_jet = len(jets)
jets.append(merged_jet)
# We recycle the slot of jetA (which is the lowest slot)
npjets.insert_jet(merged_jet, slot=ijetA, jet_index=imerged_jet)
add_step_to_history(history=history, jets=jets,
parent1=jets[jet_indexA].cluster_history_index,
parent2=jets[jet_indexB].cluster_history_index,
jetp_index=imerged_jet, distance=distance)
# Get the NNs for the merged pseudojet
scan_for_my_nearest_neighbours(ijetA, npjets.phi, npjets.rap, npjets.inv_pt2,
npjets.dist, npjets.akt_dist, npjets.nn, npjets.mask, R2)
else:
logger.debug(f"Iteration {iteration + 1}: {distance} for jet {ijetA} and jet {ijetB}")
# Beamjet
npjets.mask_slot(ijetA)
jet_indexA = npjets.jets_index[ijetA]
add_step_to_history(history=history, jets=jets, parent1=jets[jet_indexA].cluster_history_index,
parent2=BeamJet,
jetp_index=Invalid, distance=distance)
# Now need to update nearest distances, when pseudojets are unmasked and
# had either jetA or jetB as their nearest neighbour
# Note, it doesn't matter that we reused the ijetA slot here!
if ijetB != -1:
jets_to_update = np.logical_and(~npjets.mask, np.logical_or(npjets.nn == ijetA, npjets.nn == ijetB))
else:
jets_to_update = np.logical_and(~npjets.mask, npjets.nn == ijetA)
ijets_to_update = np.where(jets_to_update)
# Doable without actually needing a loop?
for ijet_to_update in ijets_to_update[0]:
scan_for_my_nearest_neighbours(ijet_to_update, npjets.phi, npjets.rap, npjets.inv_pt2,
npjets.dist, npjets.akt_dist, npjets.nn, npjets.mask, R2)
# Useful to check that we have done all updates correctly (only for debug!)
if logger.level == logging.DEBUG:
npjets_copy = deepcopy(npjets)
scan_for_all_nearest_neighbours(npjets_copy.phi, npjets_copy.rap, npjets_copy.inv_pt2,
npjets_copy.dist, npjets.akt_dist, npjets_copy.nn, npjets_copy.mask, R2)
compare_status(npjets, npjets_copy)
if return_raw:
return jets, history
return inclusive_jets(jets, history, ptmin=ptmin)