import math import awkward as ak import tqdm import traceback from src.data.tools import _concat from src.logger.logger import _logger def _read_hdf5(filepath, branches, load_range=None): import tables tables.set_blosc_max_threads(4) with tables.open_file(filepath) as f: outputs = {k: getattr(f.root, k)[:] for k in branches} if load_range is None: load_range = (0, 1) start = math.trunc(load_range[0] * len(outputs[branches[0]])) stop = max(start + 1, math.trunc(load_range[1] * len(outputs[branches[0]]))) for k, v in outputs.items(): outputs[k] = v[start:stop] return ak.Array(outputs) def _read_root(filepath, branches, load_range=None, treename=None): import uproot with uproot.open(filepath) as f: if treename is None: treenames = set([k.split(';')[0] for k, v in f.items() if getattr(v, 'classname', '') == 'TTree']) #if len(treenames) == 1: # treename = treenames.pop() #else: # raise RuntimeError( # 'Need to specify `treename` as more than one trees are found in file %s: %s' % # (filepath, str(branches))) # set treename to the first of the treenames treename = treenames.pop() tree = f[treename] if load_range is not None: start = math.trunc(load_range[0] * tree.num_entries) stop = max(start + 1, math.trunc(load_range[1] * tree.num_entries)) else: start, stop = None, None outputs = tree.arrays(filter_name=branches, entry_start=start, entry_stop=stop) return outputs def _read_awkd(filepath, branches, load_range=None): import awkward0 with awkward0.load(filepath) as f: outputs = {k: f[k] for k in branches} if load_range is None: load_range = (0, 1) start = math.trunc(load_range[0] * len(outputs[branches[0]])) stop = max(start + 1, math.trunc(load_range[1] * len(outputs[branches[0]]))) for k, v in outputs.items(): outputs[k] = ak.from_awkward0(v[start:stop]) return ak.Array(outputs) def _read_parquet(filepath, branches, load_range=None): outputs = ak.from_parquet(filepath, columns=branches) if load_range is not None: start = math.trunc(load_range[0] * len(outputs)) stop = max(start + 1, math.trunc(load_range[1] * len(outputs))) outputs = outputs[start:stop] return outputs def _read_files(filelist, branches, load_range=None, show_progressbar=False, **kwargs): import os branches = list(branches) table = [] if show_progressbar: filelist = tqdm.tqdm(filelist) for filepath in filelist: ext = os.path.splitext(filepath)[1] if ext not in ('.h5', '.root', '.awkd', '.parquet'): raise RuntimeError('File %s of type `%s` is not supported!' % (filepath, ext)) try: if ext == '.h5': a = _read_hdf5(filepath, branches, load_range=load_range) elif ext == '.root': a = _read_root(filepath, branches, load_range=load_range, treename=kwargs.get('treename', None)) elif ext == '.awkd': a = _read_awkd(filepath, branches, load_range=load_range) elif ext == '.parquet': a = _read_parquet(filepath, branches, load_range=load_range) except Exception as e: a = None _logger.error('When reading file %s:', filepath) _logger.error(traceback.format_exc()) if a is not None: table.append(a) table = _concat(table) # ak.Array if len(table) == 0: raise RuntimeError(f'Zero entries loaded when reading files {filelist} with `load_range`={load_range}.') return table def _write_root(file, table, treename='Events', compression=-1, step=1048576): import uproot if compression == -1: compression = uproot.LZ4(4) with uproot.recreate(file, compression=compression) as fout: tree = fout.mktree(treename, {k: v.dtype for k, v in table.items()}) start = 0 while start < len(list(table.values())[0]) - 1: tree.extend({k: v[start:start + step] for k, v in table.items()}) start += step