Spaces:
Sleeping
Sleeping
| import math | |
| import awkward as ak | |
| import tqdm | |
| import traceback | |
| from src.data.tools import _concat | |
| from src.logger.logger import _logger | |
| def _read_hdf5(filepath, branches, load_range=None): | |
| import tables | |
| tables.set_blosc_max_threads(4) | |
| with tables.open_file(filepath) as f: | |
| outputs = {k: getattr(f.root, k)[:] for k in branches} | |
| if load_range is None: | |
| load_range = (0, 1) | |
| start = math.trunc(load_range[0] * len(outputs[branches[0]])) | |
| stop = max(start + 1, math.trunc(load_range[1] * len(outputs[branches[0]]))) | |
| for k, v in outputs.items(): | |
| outputs[k] = v[start:stop] | |
| return ak.Array(outputs) | |
| def _read_root(filepath, branches, load_range=None, treename=None): | |
| import uproot | |
| with uproot.open(filepath) as f: | |
| if treename is None: | |
| treenames = set([k.split(';')[0] for k, v in f.items() if getattr(v, 'classname', '') == 'TTree']) | |
| #if len(treenames) == 1: | |
| # treename = treenames.pop() | |
| #else: | |
| # raise RuntimeError( | |
| # 'Need to specify `treename` as more than one trees are found in file %s: %s' % | |
| # (filepath, str(branches))) | |
| # set treename to the first of the treenames | |
| treename = treenames.pop() | |
| tree = f[treename] | |
| if load_range is not None: | |
| start = math.trunc(load_range[0] * tree.num_entries) | |
| stop = max(start + 1, math.trunc(load_range[1] * tree.num_entries)) | |
| else: | |
| start, stop = None, None | |
| outputs = tree.arrays(filter_name=branches, entry_start=start, entry_stop=stop) | |
| return outputs | |
| def _read_awkd(filepath, branches, load_range=None): | |
| import awkward0 | |
| with awkward0.load(filepath) as f: | |
| outputs = {k: f[k] for k in branches} | |
| if load_range is None: | |
| load_range = (0, 1) | |
| start = math.trunc(load_range[0] * len(outputs[branches[0]])) | |
| stop = max(start + 1, math.trunc(load_range[1] * len(outputs[branches[0]]))) | |
| for k, v in outputs.items(): | |
| outputs[k] = ak.from_awkward0(v[start:stop]) | |
| return ak.Array(outputs) | |
| def _read_parquet(filepath, branches, load_range=None): | |
| outputs = ak.from_parquet(filepath, columns=branches) | |
| if load_range is not None: | |
| start = math.trunc(load_range[0] * len(outputs)) | |
| stop = max(start + 1, math.trunc(load_range[1] * len(outputs))) | |
| outputs = outputs[start:stop] | |
| return outputs | |
| def _read_files(filelist, branches, load_range=None, show_progressbar=False, **kwargs): | |
| import os | |
| branches = list(branches) | |
| table = [] | |
| if show_progressbar: | |
| filelist = tqdm.tqdm(filelist) | |
| for filepath in filelist: | |
| ext = os.path.splitext(filepath)[1] | |
| if ext not in ('.h5', '.root', '.awkd', '.parquet'): | |
| raise RuntimeError('File %s of type `%s` is not supported!' % (filepath, ext)) | |
| try: | |
| if ext == '.h5': | |
| a = _read_hdf5(filepath, branches, load_range=load_range) | |
| elif ext == '.root': | |
| a = _read_root(filepath, branches, load_range=load_range, treename=kwargs.get('treename', None)) | |
| elif ext == '.awkd': | |
| a = _read_awkd(filepath, branches, load_range=load_range) | |
| elif ext == '.parquet': | |
| a = _read_parquet(filepath, branches, load_range=load_range) | |
| except Exception as e: | |
| a = None | |
| _logger.error('When reading file %s:', filepath) | |
| _logger.error(traceback.format_exc()) | |
| if a is not None: | |
| table.append(a) | |
| table = _concat(table) # ak.Array | |
| if len(table) == 0: | |
| raise RuntimeError(f'Zero entries loaded when reading files {filelist} with `load_range`={load_range}.') | |
| return table | |
| def _write_root(file, table, treename='Events', compression=-1, step=1048576): | |
| import uproot | |
| if compression == -1: | |
| compression = uproot.LZ4(4) | |
| with uproot.recreate(file, compression=compression) as fout: | |
| tree = fout.mktree(treename, {k: v.dtype for k, v in table.items()}) | |
| start = 0 | |
| while start < len(list(table.values())[0]) - 1: | |
| tree.extend({k: v[start:start + step] for k, v in table.items()}) | |
| start += step |