Spaces:
Sleeping
Sleeping
| import torch | |
| import torchvision.transforms.functional as TF | |
| import torch.nn.functional as F | |
| from PIL import Image | |
| import os | |
| from skimage import img_as_ubyte | |
| from tqdm import tqdm | |
| from natsort import natsorted | |
| import glob | |
| import argparse | |
| from model_arch.SRMNet_SWFF import SRMNet_SWFF | |
| from model_arch.SRMNet import SRMNet | |
| tasks = ['Deblurring_motionblur', | |
| 'Dehaze_realworld', | |
| 'Denoise_gaussian', | |
| 'Denoise_realworld', | |
| 'Deraining_raindrop', | |
| 'Deraining_rainstreak', | |
| 'LLEnhancement', | |
| 'Retouching'] | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Quick demo Image Restoration') | |
| parser.add_argument('--input_dir', default='./test/', type=str, help='Input images root') | |
| parser.add_argument('--result_dir', default='./result/', type=str, help='Results images root') | |
| parser.add_argument('--weights_root', default='experiments/pretrained_models', type=str, help='Weights root') | |
| parser.add_argument('--task', default='Retouching', type=str, help='Restoration task (Above task list)') | |
| args = parser.parse_args() | |
| # Prepare testing data | |
| files = natsorted(glob.glob(os.path.join(args.input_dir, '*'))) | |
| if len(files) == 0: | |
| raise Exception(f"No files found at {args.input_dir}") | |
| os.makedirs(args.result_dir, exist_ok=True) | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| # Build model | |
| model = define_model(args) | |
| model.eval() | |
| model = model.to(device) | |
| print('restoring images......') | |
| mul = 16 | |
| for i, file_ in enumerate(tqdm(files)): | |
| img = Image.open(file_).convert('RGB') | |
| input_ = TF.to_tensor(img).unsqueeze(0).to(device) | |
| # Pad the input if not_multiple_of 8 | |
| h, w = input_.shape[2], input_.shape[3] | |
| H, W = ((h + mul) // mul) * mul, ((w + mul) // mul) * mul | |
| padh = H - h if h % mul != 0 else 0 | |
| padw = W - w if w % mul != 0 else 0 | |
| input_ = F.pad(input_, (0, padw, 0, padh), 'reflect') | |
| with torch.no_grad(): | |
| restored = model(input_) | |
| restored = torch.clamp(restored, 0, 1) | |
| restored = restored[:, :, :h, :w] | |
| restored = restored.permute(0, 2, 3, 1).cpu().detach().numpy() | |
| restored = img_as_ubyte(restored[0]) | |
| f = os.path.splitext(os.path.split(file_)[-1])[0] | |
| save_img((os.path.join(args.result_dir, f + '.png')), restored) | |
| print('{}'.format(os.path.join(args.result_dir, f + '.png'))) | |
| print('finish !') | |
| def define_model(args): | |
| # Enhance models | |
| if args.task in ['LLEnhancement', 'Retouching']: | |
| model = SRMNet(in_chn=3, wf=96, depth=4) | |
| weight_path = os.path.join(args.weights_root, args.task + '.pth') | |
| load_checkpoint(model, weight_path) | |
| # Restored models | |
| else: | |
| model = SRMNet_SWFF(in_chn=3, wf=96, depth=4) | |
| weight_path = os.path.join(args.weights_root, args.task + '.pth') | |
| load_checkpoint(model, weight_path) | |
| return model | |
| def save_img(filepath, img): | |
| cv2.imwrite(filepath, cv2.cvtColor(img, cv2.COLOR_RGB2BGR)) | |
| def load_checkpoint(model, weights): | |
| checkpoint = torch.load(weights, map_location=torch.device('cpu')) | |
| try: | |
| model.load_state_dict(checkpoint["state_dict"]) | |
| except: | |
| state_dict = checkpoint["state_dict"] | |
| new_state_dict = OrderedDict() | |
| for k, v in state_dict.items(): | |
| name = k[7:] # remove `module.` | |
| new_state_dict[name] = v | |
| model.load_state_dict(new_state_dict) | |
| if __name__ == '__main__': | |
| main() | |