Spaces:
Running
Running
| import torch | |
| import torch.nn.functional as F | |
| from torch import nn | |
| from torch.nn import Parameter | |
| import numpy as np | |
| __all__ = ['Softmax', 'AMCosFace', 'AMArcFace', ] | |
| MIN_NUM_PATCHES = 16 | |
| """ All losses can run in 'torch.distributed.DistributedDataParallel'. | |
| """ | |
| class Softmax(nn.Module): | |
| r"""Implementation of Softmax (normal classification head): | |
| Args: | |
| in_features: dimension (d_in) of input feature (B, d_in) | |
| out_features: dimension (d_out) of output feature (B, d_out) | |
| device_id: the ID of GPU where the model will be trained by data parallel (or DP). (not used) | |
| if device_id=None, it will be trained on model parallel (or DDP). (recommend!) | |
| """ | |
| def __init__(self, | |
| in_features: int, | |
| out_features: int, | |
| device_id, | |
| ): | |
| super(Softmax, self).__init__() | |
| self.in_features = in_features | |
| self.out_features = out_features | |
| self.device_id = device_id | |
| self.weight = Parameter(torch.FloatTensor(out_features, in_features)) | |
| self.bias = Parameter(torch.FloatTensor(out_features)) | |
| nn.init.xavier_uniform_(self.weight) | |
| nn.init.zeros_(self.bias) | |
| def forward(self, embedding, label): | |
| """ | |
| :param embedding: learned face representation | |
| :param label: | |
| - label >= 0: ground truth identity | |
| - label = -1: invalid identity for this GPU (refer to 'PartialFC') | |
| + Example: label = torch.tensor([-1, 4, -1, 5, 3, -1]) | |
| :return: | |
| """ | |
| if self.device_id is None: | |
| """ Regular linear layer. | |
| """ | |
| out = F.linear(embedding, self.weight, self.bias) | |
| else: | |
| raise ValueError('DataParallel is not implemented yet.') | |
| x = input | |
| sub_weights = torch.chunk(self.weight, len(self.device_id), dim=0) | |
| sub_biases = torch.chunk(self.bias, len(self.device_id), dim=0) | |
| temp_x = x.cuda(self.device_id[0]) | |
| weight = sub_weights[0].cuda(self.device_id[0]) | |
| bias = sub_biases[0].cuda(self.device_id[0]) | |
| out = F.linear(temp_x, weight, bias) | |
| for i in range(1, len(self.device_id)): | |
| temp_x = x.cuda(self.device_id[i]) | |
| weight = sub_weights[i].cuda(self.device_id[i]) | |
| bias = sub_biases[i].cuda(self.device_id[i]) | |
| out = torch.cat((out, F.linear(temp_x, weight, bias).cuda(self.device_id[0])), dim=1) | |
| return out | |
| """ Not Used """ | |
| class ArcFace(nn.Module): | |
| r"""Implement of ArcFace (https://arxiv.org/pdf/1801.07698v1.pdf): | |
| Args: | |
| in_features: size of each input sample | |
| out_features: size of each output sample | |
| device_id: the ID of GPU where the model will be trained by model parallel. | |
| if device_id=None, it will be trained on CPU without model parallel. | |
| s: norm of input feature | |
| m: margin | |
| cos(theta+m) | |
| """ | |
| def __init__(self, in_features, out_features, device_id, s=64.0, m=0.50, easy_margin=False): | |
| super(ArcFace, self).__init__() | |
| self.in_features = in_features | |
| self.out_features = out_features | |
| self.device_id = device_id | |
| self.s = s | |
| self.m = m | |
| print('ArcFace, s=%.1f, m=%.2f' % (s, m)) | |
| self.weight = Parameter(torch.FloatTensor(out_features, in_features)) | |
| nn.init.xavier_uniform_(self.weight) | |
| self.easy_margin = easy_margin | |
| self.cos_m = np.cos(m) | |
| self.sin_m = np.sin(m) | |
| self.th = np.cos(np.pi - m) | |
| self.mm = np.sin(np.pi - m) * m | |
| def forward(self, input, label): | |
| # --------------------------- cos(theta) & phi(theta) --------------------------- | |
| if self.device_id == None: | |
| cosine = F.linear(F.normalize(input), F.normalize(self.weight)) | |
| else: | |
| x = input | |
| sub_weights = torch.chunk(self.weight, len(self.device_id), dim=0) | |
| temp_x = x.cuda(self.device_id[0]) | |
| weight = sub_weights[0].cuda(self.device_id[0]) | |
| cosine = F.linear(F.normalize(temp_x), F.normalize(weight)) | |
| for i in range(1, len(self.device_id)): | |
| temp_x = x.cuda(self.device_id[i]) | |
| weight = sub_weights[i].cuda(self.device_id[i]) | |
| cosine = torch.cat((cosine, F.linear(F.normalize(temp_x), F.normalize(weight)).cuda(self.device_id[0])), | |
| dim=1) | |
| sine = torch.sqrt(1.0 - torch.pow(cosine, 2)) | |
| phi = cosine * self.cos_m - sine * self.sin_m | |
| if self.easy_margin: | |
| phi = torch.where(cosine > 0, phi, cosine) | |
| else: | |
| phi = torch.where(cosine > self.th, phi, cosine - self.mm) | |
| # --------------------------- convert label to one-hot --------------------------- | |
| one_hot = torch.zeros(cosine.size()) | |
| if self.device_id != None: | |
| one_hot = one_hot.cuda(self.device_id[0]) | |
| else: | |
| one_hot = one_hot.cuda() | |
| one_hot.scatter_(1, label.view(-1, 1).long(), 1) | |
| # -------------torch.where(out_i = {x_i if condition_i else y_i) ------------- | |
| output = (one_hot * phi) + ( | |
| (1.0 - one_hot) * cosine) # you can use torch.where if your torch.__version__ is 0.4 | |
| output *= self.s | |
| return output | |
| """ Not Used """ | |
| class CosFace(nn.Module): | |
| r"""Implement of CosFace (https://arxiv.org/pdf/1801.09414.pdf): | |
| Args: | |
| in_features: size of each input sample | |
| out_features: size of each output sample | |
| device_id: the ID of GPU where the model will be trained by model parallel. | |
| if device_id=None, it will be trained on CPU without model parallel. | |
| s: norm of input feature | |
| m: margin | |
| cos(theta)-m | |
| """ | |
| def __init__(self, in_features, out_features, device_id, s=64.0, m=0.4): | |
| super(CosFace, self).__init__() | |
| print('CosFace, s=%.1f, m=%.2f' % (s, m)) | |
| self.in_features = in_features | |
| self.out_features = out_features | |
| self.device_id = device_id | |
| self.s = s | |
| self.m = m | |
| self.weight = Parameter(torch.FloatTensor(out_features, in_features)) | |
| nn.init.xavier_uniform_(self.weight) | |
| def forward(self, input, label): | |
| # --------------------------- cos(theta) & phi(theta) --------------------------- | |
| if self.device_id == None: | |
| cosine = F.linear(F.normalize(input), F.normalize(self.weight)) | |
| else: | |
| x = input | |
| sub_weights = torch.chunk(self.weight, len(self.device_id), dim=0) | |
| temp_x = x.cuda(self.device_id[0]) | |
| weight = sub_weights[0].cuda(self.device_id[0]) | |
| cosine = F.linear(F.normalize(temp_x), F.normalize(weight)) | |
| for i in range(1, len(self.device_id)): | |
| temp_x = x.cuda(self.device_id[i]) | |
| weight = sub_weights[i].cuda(self.device_id[i]) | |
| cosine = torch.cat((cosine, F.linear(F.normalize(temp_x), F.normalize(weight)).cuda(self.device_id[0])), | |
| dim=1) | |
| phi = cosine - self.m | |
| # --------------------------- convert label to one-hot --------------------------- | |
| one_hot = torch.zeros(cosine.size()).cuda() | |
| if self.device_id != None: | |
| one_hot = one_hot.cuda(self.device_id[0]) | |
| # one_hot = one_hot.cuda() if cosine.is_cuda else one_hot | |
| one_hot.scatter_(1, label.cuda(self.device_id[0]).view(-1, 1).long(), 1) | |
| else: | |
| one_hot.scatter_(1, label.view(-1, 1).long(), 1) | |
| # -------------torch.where(out_i = {x_i if condition_i else y_i) ------------- | |
| output = (one_hot * phi) + ( | |
| (1.0 - one_hot) * cosine) # you can use torch.where if your torch.__version__ is 0.4 | |
| output *= self.s | |
| return output | |
| def __repr__(self): | |
| return self.__class__.__name__ + '(' \ | |
| + 'in_features = ' + str(self.in_features) \ | |
| + ', out_features = ' + str(self.out_features) \ | |
| + ', s = ' + str(self.s) \ | |
| + ', m = ' + str(self.m) + ')' | |
| class AMCosFace(nn.Module): | |
| r"""Implementation of Adaptive Margin CosFace: | |
| cos(theta)-m+k(theta-a) | |
| When k is 0, AMCosFace degenerates into CosFace. | |
| Args: | |
| in_features: dimension (d_in) of input feature (B, d_in) | |
| out_features: dimension (d_out) of output feature (B, d_out) | |
| device_id: the ID of GPU where the model will be trained by data parallel (or DP). (not used) | |
| if device_id=None, it will be trained on model parallel (or DDP). (recommend!) | |
| s: norm of input feature | |
| m: margin | |
| a: AM Loss | |
| k: AM Loss | |
| """ | |
| def __init__(self, | |
| in_features: int, | |
| out_features: int, | |
| device_id, | |
| s: float = 64.0, | |
| m: float = 0.4, | |
| a: float = 1.2, | |
| k: float = 0.1, | |
| ): | |
| super(AMCosFace, self).__init__() | |
| print('AMCosFace, s=%.1f, m=%.2f, a=%.2f, k=%.2f' % (s, m, a, k)) | |
| self.in_features = in_features | |
| self.out_features = out_features | |
| self.device_id = device_id | |
| self.s = s | |
| self.m = m | |
| self.a = a | |
| self.k = k | |
| """ Weight Matrix W (d_out, d_in) """ | |
| self.weight = Parameter(torch.FloatTensor(out_features, in_features)) | |
| nn.init.xavier_uniform_(self.weight) | |
| def forward(self, embedding, label): | |
| """ | |
| :param embedding: learned face representation | |
| :param label: | |
| - label >= 0: ground truth identity | |
| - label = -1: invalid identity for this GPU (refer to 'PartialFC') | |
| + Example: label = torch.tensor([-1, 4, -1, 5, 3, -1]) | |
| :return: | |
| """ | |
| if self.device_id is None: | |
| """ - embedding: shape is (B, d_in) | |
| - weight: shape is (d_out, d_in) | |
| - cosine: shape is (B, d_out) | |
| + F.normalize is very important here. | |
| """ | |
| cosine = F.linear(F.normalize(embedding), F.normalize(self.weight)) # y = xA^T + b | |
| else: | |
| raise ValueError('DataParallel is not implemented yet.') | |
| x = input | |
| sub_weights = torch.chunk(self.weight, len(self.device_id), dim=0) | |
| temp_x = x.cuda(self.device_id[0]) | |
| weight = sub_weights[0].cuda(self.device_id[0]) | |
| cosine = F.linear(F.normalize(temp_x), F.normalize(weight)) | |
| for i in range(1, len(self.device_id)): | |
| temp_x = x.cuda(self.device_id[i]) | |
| weight = sub_weights[i].cuda(self.device_id[i]) | |
| cosine = torch.cat((cosine, F.linear(F.normalize(temp_x), | |
| F.normalize(weight)).cuda(self.device_id[0])), | |
| dim=1) | |
| """ - index: the index of valid identity in label, shape is (d_valid, ) | |
| + torch.where() returns a tuple indicating the index of each dimension | |
| + Example: index = torch.tensor([1, 3, 4]) | |
| """ | |
| index = torch.where(label != -1)[0] | |
| """ - m_hot: one-hot tensor of margin m_2, shape is (d_valid, d_out) | |
| + torch.tensor.scatter_(dim, index, source) is usually used to generate ont-hot tensor | |
| + Example: label = torch.tensor([-1, 4, -1, 5, 3, -1]) | |
| index = torch.tensor([1, 3, 4]) # d_valid = index.shape[0] = 3 | |
| m_hot = torch.tensor([[0, 0, 0, 0, m, 0], | |
| [0, 0, 0, 0, 0, m], | |
| [0, 0, 0, m, 0, 0], | |
| ]) | |
| """ | |
| m_hot = torch.zeros(index.size()[0], cosine.size()[1], device=cosine.device) | |
| m_hot.scatter_(1, label[index, None], self.m) | |
| """ logit(theta) = cos(theta) - m_2 + k * (theta - a) | |
| - theta = cosine.acos_() | |
| + Example: m_hot = torch.tensor([[0, 0, 0, 0, m-k(theta[0,4]-a), 0], | |
| [0, 0, 0, 0, 0, m-k(theta[1,5]-a)], | |
| [0, 0, 0, m-k(theta[2,3]-a), 0, 0], | |
| ]) | |
| """ | |
| a = self.a | |
| k = self.k | |
| m_hot[range(0, index.size()[0]), label[index]] -= k * (cosine[index, label[index]].acos_() - a) | |
| cosine[index] -= m_hot | |
| """ Because we have used F.normalize, we should rescale the logit term by s. | |
| """ | |
| output = cosine * self.s | |
| return output | |
| def __repr__(self): | |
| return self.__class__.__name__ + '(' \ | |
| + 'in_features = ' + str(self.in_features) \ | |
| + ', out_features = ' + str(self.out_features) \ | |
| + ', s = ' + str(self.s) \ | |
| + ', m = ' + str(self.m) \ | |
| + ', a = ' + str(self.a) \ | |
| + ', k = ' + str(self.k) \ | |
| + ')' | |
| class AMArcFace(nn.Module): | |
| r"""Implementation of Adaptive Margin ArcFace: | |
| cos(theta+m-k(theta-a)) | |
| When k is 0, AMArcFace degenerates into ArcFace. | |
| Args: | |
| in_features: dimension (d_in) of input feature (B, d_in) | |
| out_features: dimension (d_out) of output feature (B, d_out) | |
| device_id: the ID of GPU where the model will be trained by data parallel (or DP). (not used) | |
| if device_id=None, it will be trained on model parallel (or DDP). (recommend!) | |
| s: norm of input feature | |
| m: margin | |
| a: AM Loss | |
| k: AM Loss | |
| """ | |
| def __init__(self, | |
| in_features: int, | |
| out_features: int, | |
| device_id, | |
| s: float = 64.0, | |
| m: float = 0.5, | |
| a: float = 1.2, | |
| k: float = 0.1, | |
| ): | |
| super(AMArcFace, self).__init__() | |
| print('AMArcFace, s=%.1f, m=%.2f, a=%.2f, k=%.2f' % (s, m, a, k)) | |
| self.in_features = in_features | |
| self.out_features = out_features | |
| self.device_id = device_id | |
| self.s = s | |
| self.m = m | |
| self.a = a | |
| self.k = k | |
| """ Weight Matrix W (d_out, d_in) """ | |
| self.weight = Parameter(torch.FloatTensor(out_features, in_features)) | |
| nn.init.xavier_uniform_(self.weight) | |
| def forward(self, embedding, label): | |
| """ | |
| :param embedding: learned face representation | |
| :param label: | |
| - label >= 0: ground truth identity | |
| - label = -1: invalid identity for this GPU (refer to 'PartialFC') | |
| + Example: label = torch.tensor([-1, 4, -1, 5, 3, -1]) | |
| :return: | |
| """ | |
| if self.device_id is None: | |
| """ - embedding: shape is (B, d_in) | |
| - weight: shape is (d_out, d_in) | |
| - cosine: shape is (B, d_out) | |
| + F.normalize is very important here. | |
| """ | |
| cosine = F.linear(F.normalize(embedding), F.normalize(self.weight)) # y = xA^T + b | |
| else: | |
| raise ValueError('DataParallel is not implemented yet.') | |
| x = input | |
| sub_weights = torch.chunk(self.weight, len(self.device_id), dim=0) | |
| temp_x = x.cuda(self.device_id[0]) | |
| weight = sub_weights[0].cuda(self.device_id[0]) | |
| cosine = F.linear(F.normalize(temp_x), F.normalize(weight)) | |
| for i in range(1, len(self.device_id)): | |
| temp_x = x.cuda(self.device_id[i]) | |
| weight = sub_weights[i].cuda(self.device_id[i]) | |
| cosine = torch.cat((cosine, F.linear(F.normalize(temp_x), | |
| F.normalize(weight)).cuda(self.device_id[0])), | |
| dim=1) | |
| """ - index: the index of valid identity in label, shape is (d_valid, ) | |
| + torch.where() returns a tuple indicating the index of each dimension | |
| + Example: index = torch.tensor([1, 3, 4]) | |
| """ | |
| index = torch.where(label != -1)[0] | |
| """ - m_hot: one-hot tensor of margin m_2, shape is (d_valid, d_out) | |
| + torch.tensor.scatter_(dim, index, source) is usually used to generate ont-hot tensor | |
| + Example: label = torch.tensor([-1, 4, -1, 5, 3, -1]) | |
| index = torch.tensor([1, 3, 4]) # d_valid = index.shape[0] = 3 | |
| m_hot = torch.tensor([[0, 0, 0, 0, m, 0], | |
| [0, 0, 0, 0, 0, m], | |
| [0, 0, 0, m, 0, 0], | |
| ]) | |
| """ | |
| m_hot = torch.zeros(index.size()[0], cosine.size()[1], device=cosine.device) | |
| m_hot.scatter_(1, label[index, None], self.m) | |
| """ logit(theta) = cos(theta) - m_2 + k * (theta - a) | |
| - theta = cosine.acos_() | |
| + Example: m_hot = torch.tensor([[0, 0, 0, 0, m-k(theta[0,4]-a), 0], | |
| [0, 0, 0, 0, 0, m-k(theta[1,5]-a)], | |
| [0, 0, 0, m-k(theta[2,3]-a), 0, 0], | |
| ]) | |
| """ | |
| a = self.a | |
| k = self.k | |
| m_hot[range(0, index.size()[0]), label[index]] -= k * (cosine[index, label[index]].acos_() - a) | |
| cosine.acos_() | |
| cosine[index] += m_hot | |
| cosine.cos_().mul_(self.s) | |
| return cosine | |
| def __repr__(self): | |
| return self.__class__.__name__ + '(' \ | |
| + 'in_features = ' + str(self.in_features) \ | |
| + ', out_features = ' + str(self.out_features) \ | |
| + ', s = ' + str(self.s) \ | |
| + ', m = ' + str(self.m) \ | |
| + ', a = ' + str(self.a) \ | |
| + ', k = ' + str(self.k) \ | |
| + ')' | |
| if __name__ == '__main__': | |
| cosine = torch.randn(6, 8) / 100 | |
| cosine[0][2] = 0.3 | |
| cosine[1][4] = 0.4 | |
| cosine[2][6] = 0.5 | |
| cosine[3][5] = 0.6 | |
| cosine[4][3] = 0.7 | |
| cosine[5][0] = 0.8 | |
| label = torch.tensor([-1, 4, -1, 5, 3, -1]) | |
| # layer = AMCosFace(in_features=8, | |
| # out_features=8, | |
| # device_id=None, | |
| # m=0.35, s=1.0, | |
| # a=1.2, k=0.1) | |
| # layer = Softmax(in_features=8, | |
| # out_features=8, | |
| # device_id=None) | |
| layer = AMArcFace(in_features=8, | |
| out_features=8, | |
| device_id=None, | |
| m=0.5, s=1.0, | |
| a=1.2, k=0.1) | |
| logit = layer(cosine, label) | |
| logit = F.softmax(logit, dim=-1) | |
| from utils.vis_tensor import plot_tensor | |
| plot_tensor((cosine, logit), | |
| ('embedding', 'logit'), | |
| 'AMArc.jpg') |