|
import os |
|
import tqdm |
|
import datetime |
|
import itertools |
|
|
|
from collections import defaultdict |
|
|
|
|
|
def print_message(*s, condition=True, pad=False, sep=None): |
|
s = " ".join([str(x) for x in s]) |
|
msg = "[{}] {}".format(datetime.datetime.now().strftime("%b %d, %H:%M:%S"), s) |
|
|
|
if condition: |
|
msg = msg if not pad else f"\n{msg}\n" |
|
print(msg, flush=True, sep=sep) |
|
|
|
return msg |
|
|
|
|
|
def timestamp(daydir=False): |
|
format_str = f"%Y-%m{'/' if daydir else '-'}%d{'/' if daydir else '_'}%H.%M.%S" |
|
result = datetime.datetime.now().strftime(format_str) |
|
return result |
|
|
|
|
|
def file_tqdm(file): |
|
print(f"#> Reading {file.name}") |
|
|
|
with tqdm.tqdm( |
|
total=os.path.getsize(file.name) / 1024.0 / 1024.0, unit="MiB" |
|
) as pbar: |
|
for line in file: |
|
yield line |
|
pbar.update(len(line) / 1024.0 / 1024.0) |
|
|
|
pbar.close() |
|
|
|
|
|
def create_directory(path): |
|
if os.path.exists(path): |
|
print("\n") |
|
print_message("#> Note: Output directory", path, "already exists\n\n") |
|
else: |
|
print("\n") |
|
print_message("#> Creating directory", path, "\n\n") |
|
os.makedirs(path) |
|
|
|
|
|
def deduplicate(seq: list[str]) -> list[str]: |
|
""" |
|
Source: https://stackoverflow.com/a/480227/1493011 |
|
""" |
|
|
|
seen = set() |
|
return [x for x in seq if not (x in seen or seen.add(x))] |
|
|
|
|
|
def batch(group, bsize, provide_offset=False): |
|
offset = 0 |
|
while offset < len(group): |
|
L = group[offset : offset + bsize] |
|
yield ((offset, L) if provide_offset else L) |
|
offset += len(L) |
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import copy |
|
|
|
class dotdict(dict): |
|
def __getattr__(self, key): |
|
if key.startswith('__') and key.endswith('__'): |
|
return super().__getattr__(key) |
|
try: |
|
return self[key] |
|
except KeyError: |
|
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{key}'") |
|
|
|
def __setattr__(self, key, value): |
|
if key.startswith('__') and key.endswith('__'): |
|
super().__setattr__(key, value) |
|
else: |
|
self[key] = value |
|
|
|
def __delattr__(self, key): |
|
if key.startswith('__') and key.endswith('__'): |
|
super().__delattr__(key) |
|
else: |
|
del self[key] |
|
|
|
def __deepcopy__(self, memo): |
|
|
|
return dotdict(copy.deepcopy(dict(self), memo)) |
|
|
|
|
|
class dotdict_lax(dict): |
|
__getattr__ = dict.get |
|
__setattr__ = dict.__setitem__ |
|
__delattr__ = dict.__delitem__ |
|
|
|
|
|
def flatten(L): |
|
|
|
|
|
result = [] |
|
for _list in L: |
|
result += _list |
|
|
|
return result |
|
|
|
|
|
def zipstar(L, lazy=False): |
|
""" |
|
A much faster A, B, C = zip(*[(a, b, c), (a, b, c), ...]) |
|
May return lists or tuples. |
|
""" |
|
|
|
if len(L) == 0: |
|
return L |
|
|
|
width = len(L[0]) |
|
|
|
if width < 100: |
|
return [[elem[idx] for elem in L] for idx in range(width)] |
|
|
|
L = zip(*L) |
|
|
|
return L if lazy else list(L) |
|
|
|
|
|
def zip_first(L1, L2): |
|
length = len(L1) if type(L1) in [tuple, list] else None |
|
|
|
L3 = list(zip(L1, L2)) |
|
|
|
assert length in [None, len(L3)], "zip_first() failure: length differs!" |
|
|
|
return L3 |
|
|
|
|
|
def int_or_float(val): |
|
if "." in val: |
|
return float(val) |
|
|
|
return int(val) |
|
|
|
|
|
def groupby_first_item(lst): |
|
groups = defaultdict(list) |
|
|
|
for first, *rest in lst: |
|
rest = rest[0] if len(rest) == 1 else rest |
|
groups[first].append(rest) |
|
|
|
return groups |
|
|
|
|
|
def process_grouped_by_first_item(lst): |
|
""" |
|
Requires items in list to already be grouped by first item. |
|
""" |
|
|
|
groups = defaultdict(list) |
|
|
|
started = False |
|
last_group = None |
|
|
|
for first, *rest in lst: |
|
rest = rest[0] if len(rest) == 1 else rest |
|
|
|
if started and first != last_group: |
|
yield (last_group, groups[last_group]) |
|
assert ( |
|
first not in groups |
|
), f"{first} seen earlier --- violates precondition." |
|
|
|
groups[first].append(rest) |
|
|
|
last_group = first |
|
started = True |
|
|
|
return groups |
|
|
|
|
|
def grouper(iterable, n, fillvalue=None): |
|
""" |
|
Collect data into fixed-length chunks or blocks |
|
Example: grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" |
|
Source: https://docs.python.org/3/library/itertools.html#itertools-recipes |
|
""" |
|
|
|
args = [iter(iterable)] * n |
|
return itertools.zip_longest(*args, fillvalue=fillvalue) |
|
|
|
|
|
def lengths2offsets(lengths): |
|
offset = 0 |
|
|
|
for length in lengths: |
|
yield (offset, offset + length) |
|
offset += length |
|
|
|
return |
|
|
|
|
|
|
|
class NullContextManager(object): |
|
def __init__(self, dummy_resource=None): |
|
self.dummy_resource = dummy_resource |
|
|
|
def __enter__(self): |
|
return self.dummy_resource |
|
|
|
def __exit__(self, *args): |
|
pass |
|
|
|
|
|
def load_batch_backgrounds(args, qids): |
|
if args.qid2backgrounds is None: |
|
return None |
|
|
|
qbackgrounds = [] |
|
|
|
for qid in qids: |
|
back = args.qid2backgrounds[qid] |
|
|
|
if len(back) and type(back[0]) == int: |
|
x = [args.collection[pid] for pid in back] |
|
else: |
|
x = [args.collectionX.get(pid, "") for pid in back] |
|
|
|
x = " [SEP] ".join(x) |
|
qbackgrounds.append(x) |
|
|
|
return qbackgrounds |
|
|