Spaces:
Runtime error
Runtime error
| import json, os, sys | |
| import os.path as osp | |
| from typing import List, Union, Tuple, Dict | |
| from pathlib import Path | |
| import cv2 | |
| import numpy as np | |
| from imageio import imread, imwrite | |
| import pickle | |
| import pycocotools.mask as maskUtils | |
| from einops import rearrange | |
| from tqdm import tqdm | |
| from PIL import Image | |
| import io | |
| import requests | |
| import traceback | |
| import base64 | |
| import time | |
| NP_BOOL_TYPES = (np.bool_, np.bool8) | |
| NP_FLOAT_TYPES = (np.float_, np.float16, np.float32, np.float64) | |
| NP_INT_TYPES = (np.int_, np.int8, np.int16, np.int32, np.int64, np.uint, np.uint8, np.uint16, np.uint32, np.uint64) | |
| class NumpyEncoder(json.JSONEncoder): | |
| def default(self, obj): | |
| if isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| elif isinstance(obj, np.ScalarType): | |
| if isinstance(obj, NP_BOOL_TYPES): | |
| return bool(obj) | |
| elif isinstance(obj, NP_FLOAT_TYPES): | |
| return float(obj) | |
| elif isinstance(obj, NP_INT_TYPES): | |
| return int(obj) | |
| return json.JSONEncoder.default(self, obj) | |
| def json2dict(json_path: str): | |
| with open(json_path, 'r', encoding='utf8') as f: | |
| metadata = json.loads(f.read()) | |
| return metadata | |
| def dict2json(adict: dict, json_path: str): | |
| with open(json_path, "w", encoding="utf-8") as f: | |
| f.write(json.dumps(adict, ensure_ascii=False, cls=NumpyEncoder)) | |
| def dict2pickle(dumped_path: str, tgt_dict: dict): | |
| with open(dumped_path, "wb") as f: | |
| pickle.dump(tgt_dict, f, protocol=pickle.HIGHEST_PROTOCOL) | |
| def pickle2dict(pkl_path: str) -> Dict: | |
| with open(pkl_path, "rb") as f: | |
| dumped_data = pickle.load(f) | |
| return dumped_data | |
| def get_all_dirs(root_p: str) -> List[str]: | |
| alldir = os.listdir(root_p) | |
| dirlist = [] | |
| for dirp in alldir: | |
| dirp = osp.join(root_p, dirp) | |
| if osp.isdir(dirp): | |
| dirlist.append(dirp) | |
| return dirlist | |
| def read_filelist(filelistp: str): | |
| with open(filelistp, 'r', encoding='utf8') as f: | |
| lines = f.readlines() | |
| if len(lines) > 0 and lines[-1].strip() == '': | |
| lines = lines[:-1] | |
| return lines | |
| VIDEO_EXTS = {'.flv', '.mp4', '.mkv', '.ts', '.mov', 'mpeg'} | |
| def get_all_videos(video_dir: str, video_exts=VIDEO_EXTS, abs_path=False) -> List[str]: | |
| filelist = os.listdir(video_dir) | |
| vlist = [] | |
| for f in filelist: | |
| if Path(f).suffix in video_exts: | |
| if abs_path: | |
| vlist.append(osp.join(video_dir, f)) | |
| else: | |
| vlist.append(f) | |
| return vlist | |
| IMG_EXT = {'.bmp', '.jpg', '.png', '.jpeg'} | |
| def find_all_imgs(img_dir, abs_path=False): | |
| imglist = [] | |
| dir_list = os.listdir(img_dir) | |
| for filename in dir_list: | |
| file_suffix = Path(filename).suffix | |
| if file_suffix.lower() not in IMG_EXT: | |
| continue | |
| if abs_path: | |
| imglist.append(osp.join(img_dir, filename)) | |
| else: | |
| imglist.append(filename) | |
| return imglist | |
| def find_all_files_recursive(tgt_dir: Union[List, str], ext, exclude_dirs={}): | |
| if isinstance(tgt_dir, str): | |
| tgt_dir = [tgt_dir] | |
| filelst = [] | |
| for d in tgt_dir: | |
| for root, _, files in os.walk(d): | |
| if osp.basename(root) in exclude_dirs: | |
| continue | |
| for f in files: | |
| if Path(f).suffix.lower() in ext: | |
| filelst.append(osp.join(root, f)) | |
| return filelst | |
| def danbooruid2relpath(id_str: str, file_ext='.jpg'): | |
| if not isinstance(id_str, str): | |
| id_str = str(id_str) | |
| return id_str[-3:].zfill(4) + '/' + id_str + file_ext | |
| def get_template_histvq(template: np.ndarray) -> Tuple[List[np.ndarray]]: | |
| len_shape = len(template.shape) | |
| num_c = 3 | |
| mask = None | |
| if len_shape == 2: | |
| num_c = 1 | |
| elif len_shape == 3 and template.shape[-1] == 4: | |
| mask = np.where(template[..., -1]) | |
| template = template[..., :num_c][mask] | |
| values, quantiles = [], [] | |
| for ii in range(num_c): | |
| v, c = np.unique(template[..., ii].ravel(), return_counts=True) | |
| q = np.cumsum(c).astype(np.float64) | |
| if len(q) < 1: | |
| return None, None | |
| q /= q[-1] | |
| values.append(v) | |
| quantiles.append(q) | |
| return values, quantiles | |
| def inplace_hist_matching(img: np.ndarray, tv: List[np.ndarray], tq: List[np.ndarray]) -> None: | |
| len_shape = len(img.shape) | |
| num_c = 3 | |
| mask = None | |
| tgtimg = img | |
| if len_shape == 2: | |
| num_c = 1 | |
| elif len_shape == 3 and img.shape[-1] == 4: | |
| mask = np.where(img[..., -1]) | |
| tgtimg = img[..., :num_c][mask] | |
| im_h, im_w = img.shape[:2] | |
| oldtype = img.dtype | |
| for ii in range(num_c): | |
| _, bin_idx, s_counts = np.unique(tgtimg[..., ii].ravel(), return_inverse=True, | |
| return_counts=True) | |
| s_quantiles = np.cumsum(s_counts).astype(np.float64) | |
| if len(s_quantiles) == 0: | |
| return | |
| s_quantiles /= s_quantiles[-1] | |
| interp_t_values = np.interp(s_quantiles, tq[ii], tv[ii]).astype(oldtype) | |
| if mask is not None: | |
| img[..., ii][mask] = interp_t_values[bin_idx] | |
| else: | |
| img[..., ii] = interp_t_values[bin_idx].reshape((im_h, im_w)) | |
| # try: | |
| # img[..., ii] = interp_t_values[bin_idx].reshape((im_h, im_w)) | |
| # except: | |
| # LOGGER.error('##################### sth goes wrong') | |
| # cv2.imshow('img', img) | |
| # cv2.waitKey(0) | |
| def fgbg_hist_matching(fg_list: List, bg: np.ndarray, min_tq_num=128): | |
| btv, btq = get_template_histvq(bg) | |
| ftv, ftq = get_template_histvq(fg_list[0]['image']) | |
| num_fg = len(fg_list) | |
| idx_matched = -1 | |
| if num_fg > 1: | |
| _ftv, _ftq = get_template_histvq(fg_list[0]['image']) | |
| if _ftq is not None and ftq is not None: | |
| if len(_ftq[0]) > len(ftq[0]): | |
| idx_matched = num_fg - 1 | |
| ftv, ftq = _ftv, _ftq | |
| else: | |
| idx_matched = 0 | |
| if btq is not None and ftq is not None: | |
| if len(btq[0]) > len(ftq[0]): | |
| tv, tq = btv, btq | |
| idx_matched = -1 | |
| else: | |
| tv, tq = ftv, ftq | |
| if len(tq[0]) > min_tq_num: | |
| inplace_hist_matching(bg, tv, tq) | |
| if len(tq[0]) > min_tq_num: | |
| for ii, fg_dict in enumerate(fg_list): | |
| fg = fg_dict['image'] | |
| if ii != idx_matched and len(tq[0]) > min_tq_num: | |
| inplace_hist_matching(fg, tv, tq) | |
| def imread_nogrey_rgb(imp: str) -> np.ndarray: | |
| img: np.ndarray = imread(imp) | |
| c = 1 | |
| if len(img.shape) == 3: | |
| c = img.shape[-1] | |
| if c == 1: | |
| img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB) | |
| if c == 4: | |
| img = cv2.cvtColor(img, cv2.COLOR_RGBA2RGB) | |
| return img | |
| def square_pad_resize(img: np.ndarray, tgt_size: int, pad_value: Tuple = (114, 114, 114)): | |
| h, w = img.shape[:2] | |
| pad_h, pad_w = 0, 0 | |
| # make square image | |
| if w < h: | |
| pad_w = h - w | |
| w += pad_w | |
| elif h < w: | |
| pad_h = w - h | |
| h += pad_h | |
| pad_size = tgt_size - h | |
| if pad_size > 0: | |
| pad_h += pad_size | |
| pad_w += pad_size | |
| if pad_h > 0 or pad_w > 0: | |
| img = cv2.copyMakeBorder(img, 0, pad_h, 0, pad_w, cv2.BORDER_CONSTANT, value=pad_value) | |
| down_scale_ratio = tgt_size / img.shape[0] | |
| assert down_scale_ratio <= 1 | |
| if down_scale_ratio < 1: | |
| img = cv2.resize(img, (tgt_size, tgt_size), interpolation=cv2.INTER_AREA) | |
| return img, down_scale_ratio, pad_h, pad_w | |
| def scaledown_maxsize(img: np.ndarray, max_size: int, divisior: int = None): | |
| im_h, im_w = img.shape[:2] | |
| ori_h, ori_w = img.shape[:2] | |
| resize_ratio = max_size / max(im_h, im_w) | |
| if resize_ratio < 1: | |
| if im_h > im_w: | |
| im_h = max_size | |
| im_w = max(1, int(round(im_w * resize_ratio))) | |
| else: | |
| im_w = max_size | |
| im_h = max(1, int(round(im_h * resize_ratio))) | |
| if divisior is not None: | |
| im_w = int(np.ceil(im_w / divisior) * divisior) | |
| im_h = int(np.ceil(im_h / divisior) * divisior) | |
| if im_w != ori_w or im_h != ori_h: | |
| img = cv2.resize(img, (im_w, im_h), interpolation=cv2.INTER_LINEAR) | |
| return img | |
| def resize_pad(img: np.ndarray, tgt_size: int, pad_value: Tuple = (0, 0, 0)): | |
| # downscale to tgt_size and pad to square | |
| img = scaledown_maxsize(img, tgt_size) | |
| padl, padr, padt, padb = 0, 0, 0, 0 | |
| h, w = img.shape[:2] | |
| # padt = (tgt_size - h) // 2 | |
| # padb = tgt_size - h - padt | |
| # padl = (tgt_size - w) // 2 | |
| # padr = tgt_size - w - padl | |
| padb = tgt_size - h | |
| padr = tgt_size - w | |
| if padt + padb + padl + padr > 0: | |
| img = cv2.copyMakeBorder(img, padt, padb, padl, padr, cv2.BORDER_CONSTANT, value=pad_value) | |
| return img, (padt, padb, padl, padr) | |
| def resize_pad2divisior(img: np.ndarray, tgt_size: int, divisior: int = 64, pad_value: Tuple = (0, 0, 0)): | |
| img = scaledown_maxsize(img, tgt_size) | |
| img, (pad_h, pad_w) = pad2divisior(img, divisior, pad_value) | |
| return img, (pad_h, pad_w) | |
| def img2grey(img: Union[np.ndarray, str], is_rgb: bool = False) -> np.ndarray: | |
| if isinstance(img, np.ndarray): | |
| if len(img.shape) == 3: | |
| if img.shape[-1] != 1: | |
| if is_rgb: | |
| img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) | |
| else: | |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| else: | |
| img = img[..., 0] | |
| return img | |
| elif isinstance(img, str): | |
| return cv2.imread(img, cv2.IMREAD_GRAYSCALE) | |
| else: | |
| raise NotImplementedError | |
| def pad2divisior(img: np.ndarray, divisior: int, value = (0, 0, 0)) -> np.ndarray: | |
| im_h, im_w = img.shape[:2] | |
| pad_h = int(np.ceil(im_h / divisior)) * divisior - im_h | |
| pad_w = int(np.ceil(im_w / divisior)) * divisior - im_w | |
| if pad_h != 0 or pad_w != 0: | |
| img = cv2.copyMakeBorder(img, 0, pad_h, 0, pad_w, value=value, borderType=cv2.BORDER_CONSTANT) | |
| return img, (pad_h, pad_w) | |
| def mask2rle(mask: np.ndarray, decode_for_json: bool = True) -> Dict: | |
| mask_rle = maskUtils.encode(np.array( | |
| mask[..., np.newaxis] > 0, order='F', | |
| dtype='uint8'))[0] | |
| if decode_for_json: | |
| mask_rle['counts'] = mask_rle['counts'].decode() | |
| return mask_rle | |
| def bbox2xyxy(box) -> Tuple[int]: | |
| x1, y1 = box[0], box[1] | |
| return x1, y1, x1+box[2], y1+box[3] | |
| def bbox_overlap_area(abox, boxb) -> int: | |
| ax1, ay1, ax2, ay2 = bbox2xyxy(abox) | |
| bx1, by1, bx2, by2 = bbox2xyxy(boxb) | |
| ix = min(ax2, bx2) - max(ax1, bx1) | |
| iy = min(ay2, by2) - max(ay1, by1) | |
| if ix > 0 and iy > 0: | |
| return ix * iy | |
| else: | |
| return 0 | |
| def bbox_overlap_xy(abox, boxb) -> Tuple[int]: | |
| ax1, ay1, ax2, ay2 = bbox2xyxy(abox) | |
| bx1, by1, bx2, by2 = bbox2xyxy(boxb) | |
| ix = min(ax2, bx2) - max(ax1, bx1) | |
| iy = min(ay2, by2) - max(ay1, by1) | |
| return ix, iy | |
| def xyxy_overlap_area(axyxy, bxyxy) -> int: | |
| ax1, ay1, ax2, ay2 = axyxy | |
| bx1, by1, bx2, by2 = bxyxy | |
| ix = min(ax2, bx2) - max(ax1, bx1) | |
| iy = min(ay2, by2) - max(ay1, by1) | |
| if ix > 0 and iy > 0: | |
| return ix * iy | |
| else: | |
| return 0 | |
| DIRNAME2TAG = {'rezero': 're:zero'} | |
| def dirname2charactername(dirname, start=6): | |
| cname = dirname[start:] | |
| for k, v in DIRNAME2TAG.items(): | |
| cname = cname.replace(k, v) | |
| return cname | |
| def imglist2grid(imglist: np.ndarray, grid_size: int = 384, col=None) -> np.ndarray: | |
| sqimlist = [] | |
| for img in imglist: | |
| sqimlist.append(square_pad_resize(img, grid_size)[0]) | |
| nimg = len(imglist) | |
| if nimg == 0: | |
| return None | |
| padn = 0 | |
| if col is None: | |
| if nimg > 5: | |
| row = int(np.round(np.sqrt(nimg))) | |
| col = int(np.ceil(nimg / row)) | |
| else: | |
| col = nimg | |
| padn = int(np.ceil(nimg / col) * col) - nimg | |
| if padn != 0: | |
| padimg = np.zeros_like(sqimlist[0]) | |
| for _ in range(padn): | |
| sqimlist.append(padimg) | |
| return rearrange(sqimlist, '(row col) h w c -> (row h) (col w) c', col=col) | |
| def write_jsonlines(filep: str, dict_lst: List[str], progress_bar: bool = True): | |
| with open(filep, 'w') as out: | |
| if progress_bar: | |
| lst = tqdm(dict_lst) | |
| else: | |
| lst = dict_lst | |
| for ddict in lst: | |
| jout = json.dumps(ddict) + '\n' | |
| out.write(jout) | |
| def read_jsonlines(filep: str): | |
| with open(filep, 'r', encoding='utf8') as f: | |
| result = [json.loads(jline) for jline in f.read().splitlines()] | |
| return result | |
| def _b64encode(x: bytes) -> str: | |
| return base64.b64encode(x).decode("utf-8") | |
| def img2b64(img): | |
| """ | |
| Convert a PIL image to a base64-encoded string. | |
| """ | |
| if isinstance(img, np.ndarray): | |
| img = Image.fromarray(img) | |
| buffered = io.BytesIO() | |
| img.save(buffered, format='PNG') | |
| return _b64encode(buffered.getvalue()) | |
| def save_encoded_image(b64_image: str, output_path: str): | |
| with open(output_path, "wb") as image_file: | |
| image_file.write(base64.b64decode(b64_image)) | |
| def submit_request(url, data, exist_on_exception=True, auth=None, wait_time = 30): | |
| response = None | |
| try: | |
| while True: | |
| try: | |
| response = requests.post(url, data=data, auth=auth) | |
| response.raise_for_status() | |
| break | |
| except Exception as e: | |
| if wait_time > 0: | |
| print(traceback.format_exc(), file=sys.stderr) | |
| print(f'sleep {wait_time} sec...') | |
| time.sleep(wait_time) | |
| continue | |
| else: | |
| raise e | |
| except Exception as e: | |
| print(traceback.format_exc(), file=sys.stderr) | |
| if response is not None: | |
| print('response content: ' + response.text) | |
| if exist_on_exception: | |
| exit() | |
| return response | |
| # def resize_image(input_image, resolution): | |
| # H, W = input_image.shape[:2] | |
| # k = float(min(resolution)) / min(H, W) | |
| # img = cv2.resize(input_image, resolution, interpolation=cv2.INTER_LANCZOS4 if k > 1 else cv2.INTER_AREA) | |
| # return img | |