import os import tqdm import datetime import itertools from collections import defaultdict def print_message(*s, condition=True, pad=False, sep=None): s = " ".join([str(x) for x in s]) msg = "[{}] {}".format(datetime.datetime.now().strftime("%b %d, %H:%M:%S"), s) if condition: msg = msg if not pad else f"\n{msg}\n" print(msg, flush=True, sep=sep) return msg def timestamp(daydir=False): format_str = f"%Y-%m{'/' if daydir else '-'}%d{'/' if daydir else '_'}%H.%M.%S" result = datetime.datetime.now().strftime(format_str) return result def file_tqdm(file): print(f"#> Reading {file.name}") with tqdm.tqdm( total=os.path.getsize(file.name) / 1024.0 / 1024.0, unit="MiB" ) as pbar: for line in file: yield line pbar.update(len(line) / 1024.0 / 1024.0) pbar.close() def create_directory(path): if os.path.exists(path): print("\n") print_message("#> Note: Output directory", path, "already exists\n\n") else: print("\n") print_message("#> Creating directory", path, "\n\n") os.makedirs(path) def deduplicate(seq: list[str]) -> list[str]: """ Source: https://stackoverflow.com/a/480227/1493011 """ seen = set() return [x for x in seq if not (x in seen or seen.add(x))] def batch(group, bsize, provide_offset=False): offset = 0 while offset < len(group): L = group[offset : offset + bsize] yield ((offset, L) if provide_offset else L) offset += len(L) return # class dotdict(dict): # """ # dot.notation access to dictionary attributes # Credit: derek73 @ https://stackoverflow.com/questions/2352181 # """ # __getattr__ = dict.__getitem__ # __setattr__ = dict.__setitem__ # __delattr__ = dict.__delitem__ import copy class dotdict(dict): def __getattr__(self, key): if key.startswith('__') and key.endswith('__'): return super().__getattr__(key) try: return self[key] except KeyError: raise AttributeError(f"'{type(self).__name__}' object has no attribute '{key}'") def __setattr__(self, key, value): if key.startswith('__') and key.endswith('__'): super().__setattr__(key, value) else: self[key] = value def __delattr__(self, key): if key.startswith('__') and key.endswith('__'): super().__delattr__(key) else: del self[key] def __deepcopy__(self, memo): # Use the default dict copying method to avoid infinite recursion. return dotdict(copy.deepcopy(dict(self), memo)) class dotdict_lax(dict): __getattr__ = dict.get __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ def flatten(L): # return [x for y in L for x in y] result = [] for _list in L: result += _list return result def zipstar(L, lazy=False): """ A much faster A, B, C = zip(*[(a, b, c), (a, b, c), ...]) May return lists or tuples. """ if len(L) == 0: return L width = len(L[0]) if width < 100: return [[elem[idx] for elem in L] for idx in range(width)] L = zip(*L) return L if lazy else list(L) def zip_first(L1, L2): length = len(L1) if type(L1) in [tuple, list] else None L3 = list(zip(L1, L2)) assert length in [None, len(L3)], "zip_first() failure: length differs!" return L3 def int_or_float(val): if "." in val: return float(val) return int(val) def groupby_first_item(lst): groups = defaultdict(list) for first, *rest in lst: rest = rest[0] if len(rest) == 1 else rest groups[first].append(rest) return groups def process_grouped_by_first_item(lst): """ Requires items in list to already be grouped by first item. """ groups = defaultdict(list) started = False last_group = None for first, *rest in lst: rest = rest[0] if len(rest) == 1 else rest if started and first != last_group: yield (last_group, groups[last_group]) assert ( first not in groups ), f"{first} seen earlier --- violates precondition." groups[first].append(rest) last_group = first started = True return groups def grouper(iterable, n, fillvalue=None): """ Collect data into fixed-length chunks or blocks Example: grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" Source: https://docs.python.org/3/library/itertools.html#itertools-recipes """ args = [iter(iterable)] * n return itertools.zip_longest(*args, fillvalue=fillvalue) def lengths2offsets(lengths): offset = 0 for length in lengths: yield (offset, offset + length) offset += length return # see https://stackoverflow.com/a/45187287 class NullContextManager(object): def __init__(self, dummy_resource=None): self.dummy_resource = dummy_resource def __enter__(self): return self.dummy_resource def __exit__(self, *args): pass def load_batch_backgrounds(args, qids): if args.qid2backgrounds is None: return None qbackgrounds = [] for qid in qids: back = args.qid2backgrounds[qid] if len(back) and type(back[0]) == int: x = [args.collection[pid] for pid in back] else: x = [args.collectionX.get(pid, "") for pid in back] x = " [SEP] ".join(x) qbackgrounds.append(x) return qbackgrounds