from pytorch_lightning.callbacks import Callback from pytorch_lightning.loggers import WandbLogger import numpy as np from pytorch_lightning.utilities import rank_zero_only from typing import Union import pytorch_lightning as pl import os from matplotlib import pyplot as plt from sgm.util import exists, isheatmap import torchvision from PIL import Image import torch import wandb from einops import rearrange class ImageLogger(Callback): def __init__( self, batch_frequency, max_images, clamp=True, increase_log_steps=True, rescale=True, disabled=False, log_on_batch_idx=False, log_first_step=False, log_images_kwargs=None, log_before_first_step=False, enable_autocast=True, ): super().__init__() self.enable_autocast = enable_autocast self.rescale = rescale self.batch_freq = batch_frequency self.max_images = max_images self.log_steps = [2**n for n in range(int(np.log2(self.batch_freq)) + 1)] if not increase_log_steps: self.log_steps = [self.batch_freq] self.clamp = clamp self.disabled = disabled self.log_on_batch_idx = log_on_batch_idx self.log_images_kwargs = log_images_kwargs if log_images_kwargs else {} self.log_first_step = log_first_step self.log_before_first_step = log_before_first_step @rank_zero_only def log_local( self, save_dir, split, images, global_step, current_epoch, batch_idx, pl_module: Union[None, pl.LightningModule] = None, ): root = os.path.join(save_dir, "images", split) for k in images: if isheatmap(images[k]): fig, ax = plt.subplots() ax = ax.matshow(images[k].cpu().numpy(), cmap="hot", interpolation="lanczos") plt.colorbar(ax) plt.axis("off") filename = "{}_gs-{:06}_e-{:06}_b-{:06}.png".format(k, global_step, current_epoch, batch_idx) os.makedirs(root, exist_ok=True) path = os.path.join(root, filename) plt.savefig(path) plt.close() # TODO: support wandb else: grid = torchvision.utils.make_grid(images[k].squeeze(2), nrow=4) if self.rescale: grid = (grid + 1.0) / 2.0 # -1,1 -> 0,1; c,h,w # print(grid.shape, grid.dtype, grid.min(), grid.max(), k) grid = rearrange(grid.squeeze(1), "c h w -> h w c") # grid = grid.transpose(0, 1).transpose(1, 2).squeeze(-1) grid = grid.numpy() grid = (grid * 255).astype(np.uint8) filename = "{}_gs-{:06}_e-{:06}_b-{:06}.png".format(k, global_step, current_epoch, batch_idx) path = os.path.join(root, filename) os.makedirs(os.path.split(path)[0], exist_ok=True) img = Image.fromarray(grid) img.save(path) if exists(pl_module): assert isinstance( pl_module.logger, WandbLogger ), "logger_log_image only supports WandbLogger currently" pl_module.logger.log_image( key=f"{split}/{k}", images=[ img, ], step=pl_module.global_step, ) @rank_zero_only def log_img(self, pl_module, batch, batch_idx, split="train"): check_idx = batch_idx if self.log_on_batch_idx else pl_module.global_step if ( self.check_frequency(check_idx) and hasattr(pl_module, "log_images") # batch_idx % self.batch_freq == 0 and callable(pl_module.log_images) and # batch_idx > 5 and self.max_images > 0 ): logger = type(pl_module.logger) is_train = pl_module.training if is_train: pl_module.eval() gpu_autocast_kwargs = { "enabled": self.enable_autocast, # torch.is_autocast_enabled(), "dtype": torch.get_autocast_gpu_dtype(), "cache_enabled": torch.is_autocast_cache_enabled(), } with torch.no_grad(), torch.cuda.amp.autocast(**gpu_autocast_kwargs): images = pl_module.log_images(batch, split=split, **self.log_images_kwargs) for k in images: N = min(images[k].shape[0], self.max_images) if not isheatmap(images[k]): images[k] = images[k][:N] if isinstance(images[k], torch.Tensor): images[k] = images[k].detach().float().cpu() if self.clamp and not isheatmap(images[k]): images[k] = torch.clamp(images[k], -1.0, 1.0) self.log_local( pl_module.logger.save_dir, split, images, pl_module.global_step, pl_module.current_epoch, batch_idx, pl_module=pl_module if isinstance(pl_module.logger, WandbLogger) else None, ) if is_train: pl_module.train() def check_frequency(self, check_idx): if check_idx: check_idx -= 1 if ((check_idx % self.batch_freq) == 0 or (check_idx in self.log_steps)) and ( check_idx > 0 or self.log_first_step ): try: self.log_steps.pop(0) except IndexError as e: print(e) pass return True return False @rank_zero_only def on_train_batch_end(self, trainer, pl_module, outputs, batch, batch_idx): if not self.disabled and (pl_module.global_step > 0 or self.log_first_step): self.log_img(pl_module, batch, batch_idx, split="train") @rank_zero_only def on_train_batch_start(self, trainer, pl_module, batch, batch_idx): if self.log_before_first_step and pl_module.global_step == 0: print(f"{self.__class__.__name__}: logging before training") self.log_img(pl_module, batch, batch_idx, split="train") @rank_zero_only def on_validation_batch_end(self, trainer, pl_module, outputs, batch, batch_idx, *args, **kwargs): if not self.disabled and pl_module.global_step > 0: self.log_img(pl_module, batch, batch_idx, split="val") if hasattr(pl_module, "calibrate_grad_norm"): if (pl_module.calibrate_grad_norm and batch_idx % 25 == 0) and batch_idx > 0: self.log_gradients(trainer, pl_module, batch_idx=batch_idx) @rank_zero_only def init_wandb(save_dir, opt, config, group_name, name_str): print(f"setting WANDB_DIR to {save_dir}") os.makedirs(save_dir, exist_ok=True) os.environ["WANDB_DIR"] = save_dir if opt.debug: wandb.init(project=opt.projectname, mode="offline", group=group_name) else: wandb.init( project=opt.projectname, config=config, settings=wandb.Settings(code_dir="./sgm"), group=group_name, name=name_str, )