jev-aleks's picture
scenedino init
9e15541
from typing import Optional
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from scenedino.common import util
class ImplicitNet(nn.Module):
"""
Represents a MLP;
Original code from IGR
"""
def __init__(
self,
d_in,
dims,
skip_in=(),
d_out=4,
geometric_init=True,
radius_init=0.3,
beta=0.0,
output_init_gain=2.0,
num_position_inputs=3,
sdf_scale=1.0,
dim_excludes_skip=False,
combine_layer=1000,
combine_type="average",
):
"""
:param d_in input size
:param dims dimensions of hidden layers. Num hidden layers == len(dims)
:param skip_in layers with skip connections from input (residual)
:param d_out output size
:param geometric_init if true, uses geometric initialization
(to SDF of sphere)
:param radius_init if geometric_init, then SDF sphere will have
this radius
:param beta softplus beta, 100 is reasonable; if <=0 uses ReLU activations instead
:param output_init_gain output layer normal std, only used for
output dimension >= 1, when d_out >= 1
:param dim_excludes_skip if true, dimension sizes do not include skip
connections
"""
super().__init__()
dims = [d_in] + dims + [d_out]
if dim_excludes_skip:
for i in range(1, len(dims) - 1):
if i in skip_in:
dims[i] += d_in
self.num_layers = len(dims)
self.skip_in = skip_in
self.dims = dims
self.combine_layer = combine_layer
self.combine_type = combine_type
for layer in range(0, self.num_layers - 1):
if layer + 1 in skip_in:
out_dim = dims[layer + 1] - d_in
else:
out_dim = dims[layer + 1]
lin = nn.Linear(dims[layer], out_dim)
# if true preform geometric initialization
if geometric_init:
if layer == self.num_layers - 2:
# Note our geometric init is negated (compared to IDR)
# since we are using the opposite SDF convention:
# inside is +
nn.init.normal_(
lin.weight[0],
mean=-np.sqrt(np.pi) / np.sqrt(dims[layer]) * sdf_scale,
std=0.00001,
)
nn.init.constant_(lin.bias[0], radius_init)
if d_out > 1:
# More than SDF output
nn.init.normal_(lin.weight[1:], mean=0.0, std=output_init_gain)
nn.init.constant_(lin.bias[1:], 0.0)
else:
nn.init.constant_(lin.bias, 0.0)
nn.init.normal_(lin.weight, 0.0, np.sqrt(2) / np.sqrt(out_dim))
if d_in > num_position_inputs and (layer == 0 or layer in skip_in):
# Special handling for input to allow positional encoding
nn.init.constant_(lin.weight[:, -d_in + num_position_inputs :], 0.0)
else:
nn.init.constant_(lin.bias, 0.0)
nn.init.kaiming_normal_(lin.weight, a=0, mode="fan_in")
setattr(self, "lin" + str(layer), lin)
if beta > 0:
self.activation = nn.Softplus(beta=beta)
else:
# Vanilla ReLU
self.activation = nn.ReLU()
def forward(self, x, combine_inner_dims=(1,)):
"""
:param x (..., d_in)
:param combine_inner_dims Combining dimensions for use with multiview inputs.
Tensor will be reshaped to (-1, combine_inner_dims, ...) and reduced using combine_type
on dim 1, at combine_layer
"""
x_init = x
for layer in range(0, self.num_layers - 1):
lin = getattr(self, "lin" + str(layer))
if layer == self.combine_layer:
x = util.combine_interleaved(x, combine_inner_dims, self.combine_type)
x_init = util.combine_interleaved(
x_init, combine_inner_dims, self.combine_type
)
if layer < self.combine_layer and layer in self.skip_in:
x = torch.cat([x, x_init], -1) / np.sqrt(2)
x = lin(x)
if layer < self.num_layers - 2:
x = self.activation(x)
return x
@classmethod
def from_conf(cls, conf, d_in, d_out):
return cls(d_in=d_in, d_out=d_out, **conf)
# @classmethod
# def from_conf(cls, conf, d_in, **kwargs):
# # PyHocon construction
# return cls(
# d_in,
# conf.get_list("dims"),
# skip_in=conf.get_list("skip_in"),
# beta=conf.get_float("beta", 0.0),
# dim_excludes_skip=conf.get_bool("dim_excludes_skip", False),
# combine_layer=conf.get_int("combine_layer", 1000),
# combine_type=conf.get_string("combine_type", "average"), # average | max
# **kwargs,
# )
"""
GeoNeRF
https://github.com/idiap/GeoNeRF/blob/e6249fdae5672853c6bbbd4ba380c4c166d02c95/model/self_attn_renderer.py#L60
"""
# Custom TransposeLayer to perform transpose operation
class TransposeLayer(nn.Module):
def __init__(self):
super(TransposeLayer, self).__init__()
def forward(self, x):
print("x_shape before transpose: ", x.shape)
return x.transpose(1, 2)
#
# class CNN2AE(nn.Module):
# def __init__(self, num_channels, num_features, desired_spatial_output): ## reduced mapping: num_points |-> num_features
# super(CNN2AE, self).__init__()
# self.conv1 = nn.Conv1d(num_channels, num_channels*2, kernel_size=3, stride=1, padding=1)
# self.conv2 = nn.Conv1d(num_channels*2, num_channels*4, kernel_size=3, stride=1, padding=1)
# self.conv3 = nn.Conv1d(num_channels*4, num_channels*8, kernel_size=3, stride=1, padding=1)
# self.pool = nn.AvgPool1d(kernel_size=2, stride=2)
# self.desired_spatial_output = desired_spatial_output
# # self.fc = nn.Linear(num_channels*4 * num_features, num_features) # Fully connected layer to further reduce dimension
# # self.fc = nn.Linear(num_channels*4 * (num_features // 4), num_channels) # Fully connected layer to reduce dimension
#
# def forward(self, x): ## input_tensor's shape: (batch_size=1, C=num_channels, M=num_points)
# _, num_channels, num_features = x.shape
# x = self.pool(nn.functional.relu(self.conv1(x)))
# x = self.pool(nn.functional.relu(self.conv2(x)))
# x = self.pool(nn.functional.relu(self.conv3(x)))
# x = x.view(x.size(0), num_channels, self.desired_spatial_output) # Reshape to (batch_size, num_channels, reduced_features)
# return x
device = torch.device(
"cuda" if torch.cuda.is_available() else "cpu"
) # Use GPU if available, else CPU
class CNN2AE(
nn.Module
): ## convolute density sampled features along a ray from end of cam's frustum to the end. ( n_coarse==16 x att_feat==32 x (8x8) )
def __init__(self, num_channels: int = 32, num_features: int = 64):
super(CNN2AE, self).__init__()
self.n_coarse = num_features
self.conv1 = nn.Conv1d(
num_channels, num_channels, kernel_size=3, stride=1, padding=1
)
# self.conv2 = nn.Conv1d(num_channels*2, num_channels*4, kernel_size=3, stride=1, padding=1)
self.pool = nn.AvgPool1d(kernel_size=2, stride=2)
# self.fc = nn.Linear(num_channels * num_features, num_features) # Fully connected layer to further reduce dimension
# self.fc = None # We will initialize this later
def forward(self, x): ## , desired_spatial_output):
assert (
x.size(0) % self.n_coarse
) == 0, f"__given points should be dividable by n_coarse: {self.n_coarse},but points given: {x.size(0)}"
# x = x.to(device) # Move the input data to the device
# B_, C_, M_ = x.shape # Get the new number of channels and points
x = self.pool(F.relu(self.conv1(x))) # Apply first conv layer and pool
x = self.pool(F.relu(self.conv1(x))) # Apply second conv layer and pool
# if self.fc is None:
# # Initialize the fully connected layer now that we know the input size
# self.fc = nn.Linear(C_ * M_, C_ * desired_spatial_output).to(device)
# x = x.view(B_, C_ * M_) # Reshape to (batch_size, C * M)
# x = self.fc(x) # Apply fully connected layer
# x = x.view(B_, C_, desired_spatial_output) # Reshape to (batch_size, num_channels, desired_spatial_output)
return x
## Auto-encoder network
class ConvAutoEncoder(nn.Module): ## purpose: to enforce the geometric generalization
def __init__(
self, num_ch: int = 32, S_: int = 64
): ## S:= Sequence length of the input tensor. i.e. nb_samples_per_ray
super(ConvAutoEncoder, self).__init__()
# Encoder
self.conv1 = nn.Sequential(
nn.Conv1d(num_ch, num_ch * 2, 3, stride=1, padding=1),
# TransposeLayer(), # Use the custom TransposeLayer to transpose the output
nn.LayerNorm(
S_, elementwise_affine=False
), ## RuntimeError: Given normalized_shape=[64], expected input with shape [*, 64], but got input of size[1, 64, 100000]
nn.ELU(alpha=1.0, inplace=True),
# TransposeLayer(), # Use the custom TransposeLayer to transpose the output
nn.MaxPool1d(2),
)
self.conv2 = nn.Sequential(
nn.Conv1d(num_ch * 2, num_ch * 4, 3, stride=1, padding=1),
# TransposeLayer(), # Use the custom TransposeLayer to transpose the output
nn.LayerNorm(S_ // 2, elementwise_affine=False),
nn.ELU(alpha=1.0, inplace=True),
# TransposeLayer(), # Use the custom TransposeLayer to transpose the output
nn.MaxPool1d(2),
)
self.conv3 = nn.Sequential(
nn.Conv1d(num_ch * 4, num_ch * 4, 3, stride=1, padding=1),
# TransposeLayer(), # Use the custom TransposeLayer to transpose the output
nn.LayerNorm(S_ // 4, elementwise_affine=False),
nn.ELU(alpha=1.0, inplace=True),
# TransposeLayer(), # Use the custom TransposeLayer to transpose the output
nn.MaxPool1d(2),
)
# Decoder
self.t_conv1 = nn.Sequential(
nn.ConvTranspose1d(num_ch * 4, num_ch * 4, 4, stride=2, padding=1),
nn.LayerNorm(S_ // 4, elementwise_affine=False),
nn.ELU(alpha=1.0, inplace=True),
)
self.t_conv2 = nn.Sequential(
nn.ConvTranspose1d(num_ch * 8, num_ch * 2, 4, stride=2, padding=1),
nn.LayerNorm(S_ // 2, elementwise_affine=False),
nn.ELU(alpha=1.0, inplace=True),
)
self.t_conv3 = nn.Sequential(
nn.ConvTranspose1d(num_ch * 4, num_ch, 4, stride=2, padding=1),
nn.LayerNorm(S_, elementwise_affine=False),
nn.ELU(alpha=1.0, inplace=True),
)
# Output
self.conv_out = nn.Sequential(
nn.Conv1d(num_ch * 2, num_ch, 3, stride=1, padding=1),
nn.LayerNorm(S_, elementwise_affine=False),
nn.ELU(alpha=1.0, inplace=True),
)
def forward(self, x):
input = x
x = self.conv1(x)
conv1_out = x
x = self.conv2(x)
conv2_out = x
x = self.conv3(x)
x = self.t_conv1(x)
x = self.t_conv2(torch.cat([x, conv2_out], dim=1))
x = self.t_conv3(torch.cat([x, conv1_out], dim=1))
x = self.conv_out(torch.cat([x, input], dim=1))
return x
"""
Transformer encoder part from IBRNet network
https://github.com/googleinterns/IBRNet/blob/master/ibrnet/mlp_network.py
"""
class ScaledDotProductAttention(nn.Module):
"""Scaled Dot-Product Attention"""
def __init__(self, temperature, attn_dropout=0.1):
super().__init__()
self.temperature = temperature
# self.dropout = nn.Dropout(attn_dropout)
def forward(self, q, k, v, mask=None):
attn = torch.matmul(
q / self.temperature, k.transpose(2, 3)
) ### ?? [32768, 4, 7, 7]
if mask is not None: ### [32768, 1, 7]
mask = mask.unsqueeze(-1) ##
mask = mask.expand(
-1, attn.shape[1], -1, attn.shape[-1]
) ## TODO: matrix should be investiated to validate the operator
mask = 1.0 - (
(1.0 - mask) * (1.0 - mask.transpose(-2, -1))
) ### As being symmetric of the mask matrix => the info of masked info won't give result: 2 problems: 1) computation bottleneck demand, eval_batch_size=25000 decreasing (setup pipeline using smaller pipeline nerf.py)
attn = attn.masked_fill(
mask == 1, -1e9
) ## masking should be done when the value of invalidity as boolean is 1 by making the value of element zero (numerical stability)
# attn = attn * mask
"""
def masked_fill(self, mask, value):
result = self.clone() # Start with a copy of the original data
result[mask] = value # Replace values where the mask is true
return result
"""
attn = F.softmax(attn, dim=-1)
# attn = self.dropout(F.softmax(attn, dim=-1))
output = torch.matmul(attn, v)
return output, attn
class PositionwiseFeedForward(nn.Module):
"""A two-feed-forward-layer module"""
def __init__(self, d_in, d_hid, dropout=0.1):
super().__init__()
self.w_1 = nn.Linear(d_in, d_hid) # position-wise
self.w_2 = nn.Linear(d_hid, d_in) # position-wise
self.layer_norm = nn.LayerNorm(d_in, eps=1e-6)
# self.dropout = nn.Dropout(dropout)
def forward(self, x):
residual = x
x = self.w_2(F.relu(self.w_1(x)))
# x = self.dropout(x)
x += residual
x = self.layer_norm(x)
return x
class PoswiseFF_emb4enc(nn.Module):
"""A two-feed-forward-layer module (tailored to encoder for DFT model's input) inspired code from Transformer's encoder"""
def __init__(self, d_in, d_hid, d_out, dropout=0.1):
super().__init__()
self.w_1 = nn.Linear(d_in, d_hid) # position-wise
self.w_2 = nn.Linear(d_hid, d_out) # position-wise
self.w_match = nn.Linear(d_in, d_out) # position-wise
# self.post_layer_norm = nn.LayerNorm(d_out, eps=1e-6)
self.pre_layer_norm = nn.LayerNorm(d_in, eps=1e-6)
# self.dropout = nn.Dropout(dropout)
def forward(self, x):
# embedding for residual input
emb_residual = self.w_match(x)
# Pre-layer normalization
x = self.pre_layer_norm(x)
# Transform the (normalized) input
x = self.w_2(
F.elu(self.w_1(x))
) ## default: ReLU | or F.leaky_relu, LeakyReLU used to handle dying gradients, espeically when dense outputs are expected, so that it wouldn't lose expressiveness for Transformer due to lack of info
# x = self.dropout(x)
# Post-layer normaliation
# x = self.post_layer_norm(x)
# Residual connection
x += emb_residual
return x
class PreLNPositionwiseFeedForward(nn.Module):
"""A two-feed-forward-layer module"""
def __init__(self, d_in, d_hid, dropout=0.1):
super().__init__()
self.w_1 = nn.Linear(d_in, d_hid) # position-wise
self.w_2 = nn.Linear(d_hid, d_in) # position-wise
self.layer_norm = nn.LayerNorm(d_in, eps=1e-6)
# self.dropout = nn.Dropout(dropout)
def forward(self, x):
residual = x
x = self.layer_norm(x)
x = self.w_2(F.leaky_relu(self.w_1(x))) ## default: F.relu
# x = self.dropout(x)
x += residual
return x
def make_embedding_encoder(
config, input_channels: int, output_channels: int
) -> Optional[nn.Module]:
emb_enc_type = config.get("type", "none")
non_linearity = nn.ELU() # make configurable
if emb_enc_type == "none":
return None
elif emb_enc_type == "pwf":
return PoswiseFF_emb4enc(input_channels, 2 * output_channels, output_channels)
elif emb_enc_type == "ff":
return nn.Sequential(
nn.Linear(input_channels, 2 * output_channels, bias=True),
non_linearity,
nn.Linear(2 * output_channels, output_channels, bias=True),
) ## default: ReLU | nn.LeakyReLU()
elif emb_enc_type == "ffh":
return nn.Sequential(
nn.Linear(input_channels, output_channels, bias=True)
) ## default: ReLU | nn.LeakyReLU()
elif emb_enc_type == "hpwf":
return nn.Sequential( ## == mlp.PositionwiseFeedForward
nn.Linear(input_channels, 2 * output_channels, bias=True),
non_linearity,
nn.LayerNorm(2 * output_channels, eps=1e-6),
nn.Linear(2 * output_channels, output_channels, bias=True),
)
else:
raise NotImplementedError(
"__unrecognized input for emb_enc, not using an embedding encoder."
)
return None
class MultiHeadAttention(nn.Module):
"""Multi-Head Attention module"""
def __init__(self, n_head, d_model, d_k, d_v, dropout=0.1):
super().__init__()
self.n_head = n_head
self.d_k = d_k
self.d_v = d_v
self.w_qs = nn.Linear(d_model, n_head * d_k, bias=False)
self.w_ks = nn.Linear(d_model, n_head * d_k, bias=False)
self.w_vs = nn.Linear(d_model, n_head * d_v, bias=False)
self.fc = nn.Linear(n_head * d_v, d_model, bias=False)
self.attention = ScaledDotProductAttention(temperature=d_k**0.5)
# self.dropout = nn.Dropout(dropout)
self.layer_norm = nn.LayerNorm(d_model, eps=1e-6)
def forward(self, q, k, v, mask=None):
d_k, d_v, n_head = self.d_k, self.d_v, self.n_head
sz_b, len_q, len_k, len_v = q.size(0), q.size(1), k.size(1), v.size(1)
residual = q
# Pass through the pre-attention projection: b x lq x (n*dv)
# Separate different heads: b x lq x n x dv
q = self.w_qs(q).view(sz_b, len_q, n_head, d_k)
k = self.w_ks(k).view(sz_b, len_k, n_head, d_k)
v = self.w_vs(v).view(sz_b, len_v, n_head, d_v)
# Transpose for attention dot product: b x n x lq x dv
q, k, v = q.transpose(1, 2), k.transpose(1, 2), v.transpose(1, 2)
if mask is not None:
mask = mask.unsqueeze(1) # For head axis broadcasting.
q, attn = self.attention(q, k, v, mask=mask)
# Transpose to move the head dimension back: b x lq x n x dv
# Combine the last two dimensions to concatenate all the heads together: b x lq x (n*dv)
q = q.transpose(1, 2).contiguous().view(sz_b, len_q, -1)
# q = self.dropout(self.fc(q))
q = self.fc(q)
q += residual
q = self.layer_norm(q)
return q, attn
class PreLNMultiHeadAttention(nn.Module):
"""Multi-Head Attention module"""
def __init__(self, n_head, d_model, d_k, d_v, dropout=0.1):
super().__init__()
self.n_head = n_head
self.d_k = d_k
self.d_v = d_v
self.w_qs = nn.Linear(d_model, n_head * d_k, bias=False)
self.w_ks = nn.Linear(d_model, n_head * d_k, bias=False)
self.w_vs = nn.Linear(d_model, n_head * d_v, bias=False)
self.fc = nn.Linear(n_head * d_v, d_model, bias=False)
self.attention = ScaledDotProductAttention(temperature=d_k**0.5)
# self.dropout = nn.Dropout(dropout)
self.layer_norm = nn.LayerNorm(d_model, eps=1e-6)
def forward(self, q, k, v, mask=None):
d_k, d_v, n_head = self.d_k, self.d_v, self.n_head
sz_b, len_q, len_k, len_v = q.size(0), q.size(1), k.size(1), v.size(1)
residual = q
q = self.layer_norm(q)
# Pass through the pre-attention projection: b x lq x (n*dv)
# Separate different heads: b x lq x n x dv
q = self.w_qs(q).view(sz_b, len_q, n_head, d_k)
k = self.w_ks(k).view(sz_b, len_k, n_head, d_k)
v = self.w_vs(v).view(sz_b, len_v, n_head, d_v)
# Transpose for attention dot product: b x n x lq x dv
q, k, v = q.transpose(1, 2), k.transpose(1, 2), v.transpose(1, 2)
if mask is not None:
mask = mask.unsqueeze(1) # For head axis broadcasting.
q, attn = self.attention(q, k, v, mask=mask)
# Transpose to move the head dimension back: b x lq x n x dv
# Combine the last two dimensions to concatenate all the heads together: b x lq x (n*dv)
q = q.transpose(1, 2).contiguous().view(sz_b, len_q, -1)
# q = self.dropout(self.fc(q))
q = self.fc(q)
q += residual
return q, attn
class EncoderLayer(nn.Module):
"""Compose with two layers"""
def __init__(
self, d_model, d_inner, n_head, d_k, d_v, dropout=0, pre_ln: bool = False
):
super(EncoderLayer, self).__init__()
if pre_ln:
self.slf_attn = PreLNMultiHeadAttention(
n_head, d_model, d_k, d_v, dropout=dropout
)
self.pos_ffn = PreLNPositionwiseFeedForward(
d_model, d_inner, dropout=dropout
)
else:
self.slf_attn = MultiHeadAttention(
n_head, d_model, d_k, d_v, dropout=dropout
)
self.pos_ffn = PositionwiseFeedForward(d_model, d_inner, dropout=dropout)
def forward(self, enc_input, slf_attn_mask=None):
enc_output, enc_slf_attn = self.slf_attn(
enc_input, enc_input, enc_input, mask=slf_attn_mask
)
enc_output = self.pos_ffn(enc_output)
return enc_output, enc_slf_attn
"""(modified) Transformer arch from Pytorch library
to be compatible with nn.TransformerEncoder() as input arg"""
class TrEnLayer(nn.Module):
r"""
Args:
encoder_layer: an instance of the TransformerEncoderLayer() class (required).
num_layers: the number of sub-encoder-layers in the encoder (required).
norm: the layer normalization component (optional).
enable_nested_tensor: if True, input will automatically convert to nested tensor
(and convert back on output). This will improve the overall performance of
TransformerEncoder when padding rate is high. Default: ``True`` (enabled).
"""
def __init__(
self,
encoder_layer,
num_layers,
norm=None,
enable_nested_tensor=True,
mask_check=True,
):
super(TrEnLayer, self).__init__()
# self.layers = nn.ModuleList([deepcopy(encoder_layer) for _ in range(num_layers)])
self.layers = TTF._get_clones(encoder_layer, num_layers) ## deep copy
self.num_layers = num_layers
self.norm = norm
self.enable_nested_tensor = enable_nested_tensor
self.mask_check = mask_check
def forward(
self,
src: torch.Tensor,
mask: Optional[torch.Tensor] = None,
src_key_padding_mask: Optional[torch.Tensor] = None,
) -> torch.Tensor:
r"""Pass the input through the encoder layers in turn.
Args:
src: the sequence to the encoder (required).
mask: the mask for the src sequence (optional).
src_key_padding_mask: the mask for the src keys per batch (optional).
Shape:
see the docs in Transformer class.
"""
if src_key_padding_mask is not None:
_skpm_dtype = src_key_padding_mask.dtype
if _skpm_dtype != torch.bool and not torch.is_floating_point(
src_key_padding_mask
):
raise AssertionError(
"only bool and floating types of key_padding_mask are supported"
)
output = src
convert_to_nested = False
first_layer = self.layers[0]
src_key_padding_mask_for_layers = src_key_padding_mask
why_not_sparsity_fast_path = ""
str_first_layer = "self.layers[0]"
# if not isinstance(first_layer, EncoderLayer):
# why_not_sparsity_fast_path = f"{str_first_layer} was not IBR EncoderLayer"
# elif first_layer.norm_first :
# why_not_sparsity_fast_path = f"{str_first_layer}.norm_first was True"
# elif first_layer.training:
# why_not_sparsity_fast_path = f"{str_first_layer} was in training mode"
# elif not first_layer.self_attn.batch_first:
# why_not_sparsity_fast_path = f" {str_first_layer}.self_attn.batch_first was not True"
# elif not first_layer.self_attn._qkv_same_embed_dim:
# why_not_sparsity_fast_path = f"{str_first_layer}.self_attn._qkv_same_embed_dim was not True"
# elif not first_layer.activation_relu_or_gelu:
# why_not_sparsity_fast_path = f" {str_first_layer}.activation_relu_or_gelu was not True"
# elif not (first_layer.norm1.eps == first_layer.norm2.eps) :
# why_not_sparsity_fast_path = f"{str_first_layer}.norm1.eps was not equal to {str_first_layer}.norm2.eps"
# elif not src.dim() == 3:
# why_not_sparsity_fast_path = f"input not batched; expected src.dim() of 3 but got {src.dim()}"
# elif not self.enable_nested_tensor:
# why_not_sparsity_fast_path = "enable_nested_tensor was not True"
# elif src_key_padding_mask is None:
# why_not_sparsity_fast_path = "src_key_padding_mask was None"
# elif (((not hasattr(self, "mask_check")) or self.mask_check)
# and not torch._nested_tensor_from_mask_left_aligned(src, src_key_padding_mask.logical_not())):
# why_not_sparsity_fast_path = "mask_check enabled, and src and src_key_padding_mask was not left aligned"
# elif output.is_nested:
# why_not_sparsity_fast_path = "NestedTensor input is not supported"
# elif mask is not None:
# why_not_sparsity_fast_path = "src_key_padding_mask and mask were both supplied"
# elif first_layer.self_attn.num_heads % 2 == 1:
# why_not_sparsity_fast_path = "num_head is odd"
# elif torch.is_autocast_enabled():
# why_not_sparsity_fast_path = "autocast is enabled"
#
# if not why_not_sparsity_fast_path:
# tensor_args = (
# src,
# first_layer.self_attn.in_proj_weight,
# first_layer.self_attn.in_proj_bias,
# first_layer.self_attn.out_proj.weight,
# first_layer.self_attn.out_proj.bias,
# first_layer.norm1.weight,
# first_layer.norm1.bias,
# first_layer.norm2.weight,
# first_layer.norm2.bias,
# first_layer.linear1.weight,
# first_layer.linear1.bias,
# first_layer.linear2.weight,
# first_layer.linear2.bias,
# )
#
# if torch.overrides.has_torch_function(tensor_args):
# why_not_sparsity_fast_path = "some Tensor argument has_torch_function"
# elif not (src.is_cuda or 'cpu' in str(src.device)):
# why_not_sparsity_fast_path = "src is neither CUDA nor CPU"
# elif torch.is_grad_enabled() and any(x.requires_grad for x in tensor_args):
# why_not_sparsity_fast_path = ("grad is enabled and at least one of query or the "
# "input/output projection weights or biases requires_grad")
#
# if (not why_not_sparsity_fast_path) and (src_key_padding_mask is not None):
# convert_to_nested = True
# output = torch._nested_tensor_from_mask(output, src_key_padding_mask.logical_not(), mask_check=False)
# src_key_padding_mask_for_layers = None
for mod in self.layers:
# output = mod(output, src_mask=mask, src_key_padding_mask=src_key_padding_mask_for_layers)
output = mod(output, slf_attn_mask=src_key_padding_mask_for_layers)[0]
if convert_to_nested:
output = output.to_padded_tensor(0.0)
if self.norm is not None:
output = self.norm(output)
return output
# class TrEnLayer(torch.nn.Module):
# def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,
# activation="relu", batch_first=True, norm_first=False,
# activation_relu_or_gelu=True):
# super(TransformerEncoderLayer, self).__init__()
# self.self_attn = torch.nn.MultiheadAttention(d_model, nhead, dropout=dropout)
# # Implementation of Feedforward model
# self.linear1 = torch.nn.Linear(d_model, dim_feedforward)
# self.dropout = torch.nn.Dropout(dropout)
# self.linear2 = torch.nn.Linear(dim_feedforward, d_model)
#
# self.norm1 = torch.nn.LayerNorm(d_model)
# self.norm2 = torch.nn.LayerNorm(d_model)
# self.dropout1 = torch.nn.Dropout(dropout)
# self.dropout2 = torch.nn.Dropout(dropout)
#
# # Legacy string support for activation function.
# if isinstance(activation, str):
# self.activation = _get_activation_fn(activation)
# else:
# self.activation = activation
#
# self.pos_ffn = PositionwiseFeedForward(d_model, dim_feedforward, dropout)
#
# self.self_attn.batch_first = batch_first
# self.self_attn._qkv_same_embed_dim = True # assuming d_model is the same for query, key, value
# self.norm_first = norm_first
# self.activation_relu_or_gelu = activation_relu_or_gelu
#
# def forward(self, src, src_mask=None, src_key_padding_mask=None):
# src2 = self.self_attn(src, src, src, attn_mask=src_mask,
# key_padding_mask=src_key_padding_mask)[0]
# if self.norm_first:
# src = src + self.dropout1(src2)
# src = self.norm1(src)
# src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
# src = src + self.dropout2(src2)
# src = self.norm2(src)
# else:
# src = self.norm1(src)
# src = src + self.dropout1(src2)
# src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
# src = self.norm2(src)
# src = src + self.dropout2(src2)
# return src
# '''
# c.f. nn.transformer.py
# '''
# def _get_activation_fn(activation: str) -> Callable[[Tensor], Tensor]:
# if activation == "relu":
# return F.relu
# elif activation == "gelu":
# return F.gelu
#
# raise RuntimeError("activation should be relu/gelu, not {}".format(activation))
#
# def _get_clones(module, N):
# return ModuleList([copy.deepcopy(module) for i in range(N)])