Spaces:
Running
Running
| import logging | |
| import os | |
| import time | |
| from typing import List | |
| import torch | |
| from third_party.arcface import verification | |
| class AverageMeter(object): | |
| """ Computes and stores the average and current value | |
| """ | |
| def __init__(self): | |
| self.val = None | |
| self.avg = None | |
| self.sum = None | |
| self.count = None | |
| self.reset() | |
| def reset(self): | |
| self.val = 0 | |
| self.avg = 0 | |
| self.sum = 0 | |
| self.count = 0 | |
| def update(self, val, n=1): | |
| self.val = val | |
| self.sum += val * n | |
| self.count += n | |
| self.avg = self.sum / self.count | |
| class CallBackVerification(object): | |
| def __init__(self, frequent, rank, val_targets, rec_prefix, image_size=(112, 112), | |
| is_gray=False): | |
| self.frequent: int = frequent | |
| self.rank: int = rank | |
| self.highest_acc: float = 0.0 | |
| self.highest_acc_list: List[float] = [0.0] * len(val_targets) | |
| self.ver_list: List[object] = [] | |
| self.ver_name_list: List[str] = [] | |
| if self.rank is 0: | |
| self.init_dataset(val_targets=val_targets, data_dir=rec_prefix, image_size=image_size) | |
| self.is_gray = is_gray | |
| def ver_test(self, backbone: torch.nn.Module, global_step: int): | |
| results = [] | |
| for i in range(len(self.ver_list)): | |
| acc1, std1, acc2, std2, xnorm, embeddings_list = verification.test( | |
| self.ver_list[i], backbone, 10, 10, | |
| is_gray=self.is_gray) | |
| # logging.info('[%s][%d]XNorm: %f' % (self.ver_name_list[i], global_step, xnorm)) | |
| # logging.info('[%s][%d]Accuracy-Flip: %1.5f+-%1.5f' % (self.ver_name_list[i], global_step, acc2, std2)) | |
| print('[%s][%d]XNorm: %f' % (self.ver_name_list[i], global_step, xnorm)) | |
| print('[%s][%d]Accuracy-Flip: %1.5f+-%1.5f' % (self.ver_name_list[i], global_step, acc2, std2)) | |
| if acc2 > self.highest_acc_list[i]: | |
| self.highest_acc_list[i] = acc2 | |
| # logging.info( | |
| # '[%s][%d]Accuracy-Highest: %1.5f' % (self.ver_name_list[i], global_step, self.highest_acc_list[i])) | |
| print( | |
| '[%s][%d]Accuracy-Highest: %1.5f' % (self.ver_name_list[i], global_step, self.highest_acc_list[i])) | |
| results.append(acc2) | |
| def init_dataset(self, val_targets, data_dir, image_size): | |
| for name in val_targets: | |
| path = os.path.join(data_dir, name + ".bin") | |
| if os.path.exists(path): | |
| data_set = verification.load_bin(path, image_size) | |
| self.ver_list.append(data_set) | |
| self.ver_name_list.append(name) | |
| def __call__(self, num_update, backbone: torch.nn.Module): | |
| if self.rank is 0 and num_update > 0 and num_update % self.frequent == 0: | |
| backbone.eval() | |
| self.ver_test(backbone, num_update) | |
| backbone.train() | |
| class CallBackLogging(object): | |
| def __init__(self, frequent, rank, total_step, batch_size, world_size, writer=None): | |
| self.frequent: int = frequent | |
| self.rank: int = rank | |
| self.time_start = time.time() | |
| self.total_step: int = total_step | |
| self.batch_size: int = batch_size | |
| self.world_size: int = world_size | |
| self.writer = writer | |
| self.init = False | |
| self.tic = 0 | |
| def __call__(self, global_step, loss: AverageMeter, epoch: int, fp16: bool, grad_scaler: torch.cuda.amp.GradScaler): | |
| if self.rank is 0 and global_step > 0 and global_step % self.frequent == 0: | |
| if self.init: | |
| try: | |
| speed: float = self.frequent * self.batch_size / (time.time() - self.tic) | |
| speed_total = speed * self.world_size | |
| except ZeroDivisionError: | |
| speed_total = float('inf') | |
| time_now = (time.time() - self.time_start) / 3600 | |
| time_total = time_now / ((global_step + 1) / self.total_step) | |
| time_for_end = time_total - time_now | |
| if self.writer is not None: | |
| self.writer.add_scalar('time_for_end', time_for_end, global_step) | |
| self.writer.add_scalar('loss', loss.avg, global_step) | |
| if fp16: | |
| msg = "Speed %.2f samples/sec Loss %.4f Epoch: %d Global Step: %d "\ | |
| "Fp16 Grad Scale: %2.f Required: %1.f hours" % ( | |
| speed_total, loss.avg, epoch, global_step, grad_scaler.get_scale(), time_for_end | |
| ) | |
| else: | |
| msg = "Speed %.2f samples/sec Loss %.4f Epoch: %d Global Step: %d Required: %1.f hours" % ( | |
| speed_total, loss.avg, epoch, global_step, time_for_end | |
| ) | |
| logging.info(msg) | |
| loss.reset() | |
| self.tic = time.time() | |
| else: | |
| self.init = True | |
| self.tic = time.time() | |
| class CallBackModelCheckpoint(object): | |
| def __init__(self, rank, output="./"): | |
| self.rank: int = rank | |
| self.output: str = output | |
| def __call__(self, | |
| global_step, | |
| backbone: torch.nn.Module, | |
| partial_fc=None, | |
| awloss=None,): | |
| print('CallBackModelCheckpoint...') | |
| if global_step > 100 and self.rank is 0: | |
| torch.save(backbone.module.state_dict(), os.path.join(self.output, "backbone.pth")) | |
| if global_step > 100 and partial_fc is not None: | |
| partial_fc.save_params() | |
| if global_step > 100 and awloss is not None: | |
| torch.save(awloss.state_dict(), os.path.join(self.output, "awloss.pth")) | |