|
import gradio as gr |
|
from matplotlib import cm |
|
import matplotlib.pyplot as plt |
|
from mpl_toolkits.axes_grid1 import make_axes_locatable |
|
import numpy as np |
|
|
|
from PIL import Image |
|
from scipy import special |
|
import sys |
|
|
|
from types import SimpleNamespace |
|
|
|
from transformers import AutoModelForImageClassification |
|
import torch |
|
from torch import Tensor, nn |
|
from torch import Tensor |
|
from torchvision.models._utils import _make_divisible |
|
from torchvision.ops import StochasticDepth |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from transformers import PretrainedConfig, PreTrainedModel |
|
|
|
from typing import List |
|
import copy |
|
import math |
|
import warnings |
|
from dataclasses import dataclass |
|
from functools import partial |
|
import sys |
|
from typing import Any, Callable, List, Optional, Sequence, Tuple, Union |
|
|
|
|
|
|
|
|
|
|
|
|
|
interpolate = torch.nn.functional.interpolate |
|
|
|
model_path = 'chlab/' |
|
|
|
|
|
|
|
labels = 20 |
|
ticks = 14 |
|
legends = 14 |
|
text = 14 |
|
titles = 22 |
|
lw = 3 |
|
ps = 200 |
|
cmap = 'magma' |
|
|
|
effnet_hparams = {61: { |
|
"num_classes": 2, |
|
"gamma": 0.032606396652426956, |
|
"lr": 0.008692971067922545, |
|
"weight_decay": 0.00008348389688708425, |
|
"batch_size": 23, |
|
"num_channels": 61, |
|
"stochastic_depth_prob": 0.003581930052432713, |
|
"dropout": 0.027804120950575217, |
|
"width_mult": 1.060782511229692, |
|
"depth_mult": 0.7752918857163054, |
|
"size": "v2_s", |
|
}} |
|
|
|
|
|
|
|
activation_indices = {'efficientnet': [0, 3]} |
|
|
|
|
|
|
|
@dataclass |
|
class _MBConvConfig: |
|
expand_ratio: float |
|
kernel: int |
|
stride: int |
|
input_channels: int |
|
out_channels: int |
|
num_layers: int |
|
block: Callable[..., nn.Module] |
|
|
|
@staticmethod |
|
def adjust_channels( |
|
channels: int, width_mult: float, min_value: Optional[int] = None |
|
) -> int: |
|
return _make_divisible(channels * width_mult, 8, min_value) |
|
|
|
|
|
class MBConvConfig(_MBConvConfig): |
|
|
|
def __init__( |
|
self, |
|
expand_ratio: float, |
|
kernel: int, |
|
stride: int, |
|
input_channels: int, |
|
out_channels: int, |
|
num_layers: int, |
|
width_mult: float = 1.0, |
|
depth_mult: float = 1.0, |
|
block: Optional[Callable[..., nn.Module]] = None, |
|
) -> None: |
|
input_channels = self.adjust_channels(input_channels, width_mult) |
|
out_channels = self.adjust_channels(out_channels, width_mult) |
|
num_layers = self.adjust_depth(num_layers, depth_mult) |
|
if block is None: |
|
block = MBConv |
|
super().__init__( |
|
expand_ratio, |
|
kernel, |
|
stride, |
|
input_channels, |
|
out_channels, |
|
num_layers, |
|
block, |
|
) |
|
|
|
@staticmethod |
|
def adjust_depth(num_layers: int, depth_mult: float): |
|
return int(math.ceil(num_layers * depth_mult)) |
|
|
|
|
|
class FusedMBConvConfig(_MBConvConfig): |
|
|
|
def __init__( |
|
self, |
|
expand_ratio: float, |
|
kernel: int, |
|
stride: int, |
|
input_channels: int, |
|
out_channels: int, |
|
num_layers: int, |
|
block: Optional[Callable[..., nn.Module]] = None, |
|
) -> None: |
|
if block is None: |
|
block = FusedMBConv |
|
super().__init__( |
|
expand_ratio, |
|
kernel, |
|
stride, |
|
input_channels, |
|
out_channels, |
|
num_layers, |
|
block, |
|
) |
|
|
|
|
|
class MBConv(nn.Module): |
|
def __init__( |
|
self, |
|
cnf: MBConvConfig, |
|
stochastic_depth_prob: float, |
|
norm_layer: Callable[..., nn.Module], |
|
se_layer: Callable[..., nn.Module] = SqueezeExcitation, |
|
) -> None: |
|
super().__init__() |
|
|
|
if not (1 <= cnf.stride <= 2): |
|
raise ValueError("illegal stride value") |
|
|
|
self.use_res_connect = ( |
|
cnf.stride == 1 and cnf.input_channels == cnf.out_channels |
|
) |
|
|
|
layers: List[nn.Module] = [] |
|
activation_layer = nn.SiLU |
|
|
|
|
|
expanded_channels = cnf.adjust_channels(cnf.input_channels, cnf.expand_ratio) |
|
if expanded_channels != cnf.input_channels: |
|
layers.append( |
|
Conv2dNormActivation( |
|
cnf.input_channels, |
|
expanded_channels, |
|
kernel_size=1, |
|
norm_layer=norm_layer, |
|
activation_layer=activation_layer, |
|
) |
|
) |
|
|
|
|
|
layers.append( |
|
Conv2dNormActivation( |
|
expanded_channels, |
|
expanded_channels, |
|
kernel_size=cnf.kernel, |
|
stride=cnf.stride, |
|
groups=expanded_channels, |
|
norm_layer=norm_layer, |
|
activation_layer=activation_layer, |
|
) |
|
) |
|
|
|
|
|
squeeze_channels = max(1, cnf.input_channels // 4) |
|
layers.append( |
|
se_layer( |
|
expanded_channels, |
|
squeeze_channels, |
|
activation=partial(nn.SiLU, inplace=True), |
|
) |
|
) |
|
|
|
|
|
layers.append( |
|
Conv2dNormActivation( |
|
expanded_channels, |
|
cnf.out_channels, |
|
kernel_size=1, |
|
norm_layer=norm_layer, |
|
activation_layer=None, |
|
) |
|
) |
|
|
|
self.block = nn.Sequential(*layers) |
|
self.stochastic_depth = StochasticDepth(stochastic_depth_prob, "row") |
|
self.out_channels = cnf.out_channels |
|
|
|
def forward(self, input: Tensor) -> Tensor: |
|
result = self.block(input) |
|
if self.use_res_connect: |
|
result = self.stochastic_depth(result) |
|
result += input |
|
return result |
|
|
|
|
|
class FusedMBConv(nn.Module): |
|
def __init__( |
|
self, |
|
cnf: FusedMBConvConfig, |
|
stochastic_depth_prob: float, |
|
norm_layer: Callable[..., nn.Module], |
|
) -> None: |
|
super().__init__() |
|
|
|
if not (1 <= cnf.stride <= 2): |
|
raise ValueError("illegal stride value") |
|
|
|
self.use_res_connect = ( |
|
cnf.stride == 1 and cnf.input_channels == cnf.out_channels |
|
) |
|
|
|
layers: List[nn.Module] = [] |
|
activation_layer = nn.SiLU |
|
|
|
expanded_channels = cnf.adjust_channels(cnf.input_channels, cnf.expand_ratio) |
|
if expanded_channels != cnf.input_channels: |
|
|
|
layers.append( |
|
Conv2dNormActivation( |
|
cnf.input_channels, |
|
expanded_channels, |
|
kernel_size=cnf.kernel, |
|
stride=cnf.stride, |
|
norm_layer=norm_layer, |
|
activation_layer=activation_layer, |
|
) |
|
) |
|
|
|
|
|
layers.append( |
|
Conv2dNormActivation( |
|
expanded_channels, |
|
cnf.out_channels, |
|
kernel_size=1, |
|
norm_layer=norm_layer, |
|
activation_layer=None, |
|
) |
|
) |
|
else: |
|
layers.append( |
|
Conv2dNormActivation( |
|
cnf.input_channels, |
|
cnf.out_channels, |
|
kernel_size=cnf.kernel, |
|
stride=cnf.stride, |
|
norm_layer=norm_layer, |
|
activation_layer=activation_layer, |
|
) |
|
) |
|
|
|
self.block = nn.Sequential(*layers) |
|
self.stochastic_depth = StochasticDepth(stochastic_depth_prob, "row") |
|
self.out_channels = cnf.out_channels |
|
|
|
def forward(self, input: Tensor) -> Tensor: |
|
result = self.block(input) |
|
if self.use_res_connect: |
|
result = self.stochastic_depth(result) |
|
result += input |
|
return result |
|
|
|
|
|
class EfficientNetConfig(PretrainedConfig): |
|
|
|
model_type = "efficientnet" |
|
|
|
def __init__( |
|
self, |
|
|
|
dropout: float=0.25, |
|
num_channels: int = 61, |
|
stochastic_depth_prob: float = 0.2, |
|
num_classes: int = 2, |
|
norm_layer: Optional[Callable[..., nn.Module]] = None, |
|
|
|
size: str='v2_s', |
|
width_mult: float = 1.0, |
|
depth_mult: float = 1.0, |
|
**kwargs: Any, |
|
) -> None: |
|
""" |
|
EfficientNet V1 and V2 main class |
|
|
|
Args: |
|
inverted_residual_setting (Sequence[Union[MBConvConfig, FusedMBConvConfig]]): Network structure |
|
dropout (float): The droupout probability |
|
stochastic_depth_prob (float): The stochastic depth probability |
|
num_classes (int): Number of classes |
|
norm_layer (Optional[Callable[..., nn.Module]]): Module specifying the normalization layer to use |
|
last_channel (int): The number of channels on the penultimate layer |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.dropout=dropout |
|
self.num_channels=num_channels |
|
self.num_classes=num_classes |
|
self.size=size |
|
self.stochastic_depth_prob=stochastic_depth_prob |
|
self.width_mult=width_mult |
|
self.depth_mult=depth_mult |
|
|
|
super().__init__(**kwargs) |
|
|
|
|
|
class EfficientNetPreTrained(PreTrainedModel): |
|
|
|
config_class = EfficientNetConfig |
|
|
|
def __init__( |
|
self, |
|
config |
|
): |
|
super().__init__(config) |
|
self.model = EfficientNet( dropout=config.dropout, |
|
num_channels=config.num_channels, |
|
num_classes=config.num_classes, |
|
size=config.size, |
|
stochastic_depth_prob=config.stochastic_depth_prob, |
|
width_mult=config.width_mult, |
|
depth_mult=config.depth_mult,) |
|
|
|
def forward(self, tensor): |
|
return self.model.forward(tensor) |
|
|
|
|
|
class EfficientNet(nn.Module): |
|
|
|
|
|
def __init__( |
|
self, |
|
|
|
dropout: float=0.25, |
|
num_channels: int = 61, |
|
stochastic_depth_prob: float = 0.2, |
|
num_classes: int = 2, |
|
norm_layer: Optional[Callable[..., nn.Module]] = None, |
|
|
|
size: str='v2_s', |
|
width_mult: float = 1.0, |
|
depth_mult: float = 1.0, |
|
**kwargs: Any, |
|
) -> None: |
|
""" |
|
EfficientNet V1 and V2 main class |
|
|
|
Args: |
|
inverted_residual_setting (Sequence[Union[MBConvConfig, FusedMBConvConfig]]): Network structure |
|
dropout (float): The droupout probability |
|
stochastic_depth_prob (float): The stochastic depth probability |
|
num_classes (int): Number of classes |
|
norm_layer (Optional[Callable[..., nn.Module]]): Module specifying the normalization layer to use |
|
last_channel (int): The number of channels on the penultimate layer |
|
""" |
|
super().__init__() |
|
|
|
|
|
inverted_residual_setting, last_channel = _efficientnet_conf( |
|
"efficientnet_%s" % (size), width_mult=width_mult, depth_mult=depth_mult |
|
) |
|
|
|
if not inverted_residual_setting: |
|
raise ValueError("The inverted_residual_setting should not be empty") |
|
elif not ( |
|
isinstance(inverted_residual_setting, Sequence) |
|
and all([isinstance(s, _MBConvConfig) for s in inverted_residual_setting]) |
|
): |
|
raise TypeError( |
|
"The inverted_residual_setting should be List[MBConvConfig]" |
|
) |
|
|
|
if "block" in kwargs: |
|
warnings.warn( |
|
"The parameter 'block' is deprecated since 0.13 and will be removed 0.15. " |
|
"Please pass this information on 'MBConvConfig.block' instead." |
|
) |
|
if kwargs["block"] is not None: |
|
for s in inverted_residual_setting: |
|
if isinstance(s, MBConvConfig): |
|
s.block = kwargs["block"] |
|
|
|
if norm_layer is None: |
|
norm_layer = nn.BatchNorm2d |
|
|
|
layers: List[nn.Module] = [] |
|
|
|
|
|
firstconv_output_channels = inverted_residual_setting[0].input_channels |
|
layers.append( |
|
Conv2dNormActivation( |
|
num_channels, |
|
firstconv_output_channels, |
|
kernel_size=3, |
|
stride=2, |
|
norm_layer=norm_layer, |
|
activation_layer=nn.SiLU, |
|
) |
|
) |
|
|
|
|
|
total_stage_blocks = sum(cnf.num_layers for cnf in inverted_residual_setting) |
|
stage_block_id = 0 |
|
for cnf in inverted_residual_setting: |
|
stage: List[nn.Module] = [] |
|
for _ in range(cnf.num_layers): |
|
|
|
block_cnf = copy.copy(cnf) |
|
|
|
|
|
if stage: |
|
block_cnf.input_channels = block_cnf.out_channels |
|
block_cnf.stride = 1 |
|
|
|
|
|
sd_prob = ( |
|
stochastic_depth_prob * float(stage_block_id) / total_stage_blocks |
|
) |
|
|
|
stage.append(block_cnf.block(block_cnf, sd_prob, norm_layer)) |
|
stage_block_id += 1 |
|
|
|
layers.append(nn.Sequential(*stage)) |
|
|
|
|
|
lastconv_input_channels = inverted_residual_setting[-1].out_channels |
|
lastconv_output_channels = ( |
|
last_channel if last_channel is not None else 4 * lastconv_input_channels |
|
) |
|
layers.append( |
|
Conv2dNormActivation( |
|
lastconv_input_channels, |
|
lastconv_output_channels, |
|
kernel_size=1, |
|
norm_layer=norm_layer, |
|
activation_layer=nn.SiLU, |
|
) |
|
) |
|
|
|
self.features = nn.Sequential(*layers) |
|
self.avgpool = nn.AdaptiveAvgPool2d(1) |
|
self.classifier = nn.Sequential( |
|
nn.Dropout(p=dropout, inplace=True), |
|
nn.Linear(lastconv_output_channels, num_classes), |
|
) |
|
|
|
for m in self.modules(): |
|
if isinstance(m, nn.Conv2d): |
|
nn.init.kaiming_normal_(m.weight, mode="fan_out") |
|
if m.bias is not None: |
|
nn.init.zeros_(m.bias) |
|
elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)): |
|
nn.init.ones_(m.weight) |
|
nn.init.zeros_(m.bias) |
|
elif isinstance(m, nn.Linear): |
|
init_range = 1.0 / math.sqrt(m.out_features) |
|
nn.init.uniform_(m.weight, -init_range, init_range) |
|
nn.init.zeros_(m.bias) |
|
|
|
|
|
|
|
def _forward_impl(self, x: Tensor) -> Tensor: |
|
x = self.features(x) |
|
|
|
x = self.avgpool(x) |
|
x = torch.flatten(x, 1) |
|
|
|
x = self.classifier(x) |
|
|
|
return x |
|
|
|
def forward(self, x: Tensor) -> Tensor: |
|
return self._forward_impl(x) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _efficientnet_conf( |
|
arch: str, |
|
**kwargs: Any, |
|
) -> Tuple[Sequence[Union[MBConvConfig, FusedMBConvConfig]], Optional[int]]: |
|
inverted_residual_setting: Sequence[Union[MBConvConfig, FusedMBConvConfig]] |
|
if arch.startswith("efficientnet_b"): |
|
bneck_conf = partial( |
|
MBConvConfig, |
|
width_mult=kwargs.pop("width_mult"), |
|
depth_mult=kwargs.pop("depth_mult"), |
|
) |
|
inverted_residual_setting = [ |
|
bneck_conf(1, 3, 1, 32, 16, 1), |
|
bneck_conf(6, 3, 2, 16, 24, 2), |
|
bneck_conf(6, 5, 2, 24, 40, 2), |
|
bneck_conf(6, 3, 2, 40, 80, 3), |
|
bneck_conf(6, 5, 1, 80, 112, 3), |
|
bneck_conf(6, 5, 2, 112, 192, 4), |
|
bneck_conf(6, 3, 1, 192, 320, 1), |
|
] |
|
last_channel = None |
|
elif arch.startswith("efficientnet_v2_s"): |
|
inverted_residual_setting = [ |
|
FusedMBConvConfig(1, 3, 1, 24, 24, 2), |
|
FusedMBConvConfig(4, 3, 2, 24, 48, 4), |
|
FusedMBConvConfig(4, 3, 2, 48, 64, 4), |
|
MBConvConfig(4, 3, 2, 64, 128, 6), |
|
MBConvConfig(6, 3, 1, 128, 160, 9), |
|
MBConvConfig(6, 3, 2, 160, 256, 15), |
|
] |
|
last_channel = 1280 |
|
elif arch.startswith("efficientnet_v2_m"): |
|
inverted_residual_setting = [ |
|
FusedMBConvConfig(1, 3, 1, 24, 24, 3), |
|
FusedMBConvConfig(4, 3, 2, 24, 48, 5), |
|
FusedMBConvConfig(4, 3, 2, 48, 80, 5), |
|
MBConvConfig(4, 3, 2, 80, 160, 7), |
|
MBConvConfig(6, 3, 1, 160, 176, 14), |
|
MBConvConfig(6, 3, 2, 176, 304, 18), |
|
MBConvConfig(6, 3, 1, 304, 512, 5), |
|
] |
|
last_channel = 1280 |
|
elif arch.startswith("efficientnet_v2_l"): |
|
inverted_residual_setting = [ |
|
FusedMBConvConfig(1, 3, 1, 32, 32, 4), |
|
FusedMBConvConfig(4, 3, 2, 32, 64, 7), |
|
FusedMBConvConfig(4, 3, 2, 64, 96, 7), |
|
MBConvConfig(4, 3, 2, 96, 192, 10), |
|
MBConvConfig(6, 3, 1, 192, 224, 19), |
|
MBConvConfig(6, 3, 2, 224, 384, 25), |
|
MBConvConfig(6, 3, 1, 384, 640, 7), |
|
] |
|
last_channel = 1280 |
|
else: |
|
raise ValueError(f"Unsupported model type {arch}") |
|
|
|
return inverted_residual_setting, last_channel |
|
|
|
|
|
|
|
|
|
|
|
class FrozenBatchNorm2d(torch.nn.Module): |
|
""" |
|
BatchNorm2d where the batch statistics and the affine parameters are fixed |
|
|
|
Args: |
|
num_features (int): Number of features ``C`` from an expected input of size ``(N, C, H, W)`` |
|
eps (float): a value added to the denominator for numerical stability. Default: 1e-5 |
|
""" |
|
|
|
def __init__( |
|
self, |
|
num_features: int, |
|
eps: float = 1e-5, |
|
): |
|
super().__init__() |
|
|
|
self.eps = eps |
|
self.register_buffer("weight", torch.ones(num_features)) |
|
self.register_buffer("bias", torch.zeros(num_features)) |
|
self.register_buffer("running_mean", torch.zeros(num_features)) |
|
self.register_buffer("running_var", torch.ones(num_features)) |
|
|
|
def _load_from_state_dict( |
|
self, |
|
state_dict: dict, |
|
prefix: str, |
|
local_metadata: dict, |
|
strict: bool, |
|
missing_keys: List[str], |
|
unexpected_keys: List[str], |
|
error_msgs: List[str], |
|
): |
|
num_batches_tracked_key = prefix + "num_batches_tracked" |
|
if num_batches_tracked_key in state_dict: |
|
del state_dict[num_batches_tracked_key] |
|
|
|
super()._load_from_state_dict( |
|
state_dict, prefix, local_metadata, strict, missing_keys, unexpected_keys, error_msgs |
|
) |
|
|
|
def forward(self, x: Tensor) -> Tensor: |
|
|
|
|
|
w = self.weight.reshape(1, -1, 1, 1) |
|
b = self.bias.reshape(1, -1, 1, 1) |
|
rv = self.running_var.reshape(1, -1, 1, 1) |
|
rm = self.running_mean.reshape(1, -1, 1, 1) |
|
scale = w * (rv + self.eps).rsqrt() |
|
bias = b - rm * scale |
|
return x * scale + bias |
|
|
|
def __repr__(self) -> str: |
|
return f"{self.__class__.__name__}({self.weight.shape[0]}, eps={self.eps})" |
|
|
|
|
|
class ConvNormActivation(torch.nn.Sequential): |
|
def __init__( |
|
self, |
|
in_channels: int, |
|
out_channels: int, |
|
kernel_size: int = 3, |
|
stride: int = 1, |
|
padding: Optional[int] = None, |
|
groups: int = 1, |
|
norm_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.BatchNorm2d, |
|
activation_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.ReLU, |
|
dilation: int = 1, |
|
inplace: Optional[bool] = True, |
|
bias: Optional[bool] = None, |
|
conv_layer: Callable[..., torch.nn.Module] = torch.nn.Conv2d, |
|
) -> None: |
|
|
|
if padding is None: |
|
padding = (kernel_size - 1) // 2 * dilation |
|
if bias is None: |
|
bias = norm_layer is None |
|
|
|
layers = [ |
|
conv_layer( |
|
in_channels, |
|
out_channels, |
|
kernel_size, |
|
stride, |
|
padding, |
|
dilation=dilation, |
|
groups=groups, |
|
bias=bias, |
|
) |
|
] |
|
|
|
if norm_layer is not None: |
|
layers.append(norm_layer(out_channels)) |
|
|
|
if activation_layer is not None: |
|
params = {} if inplace is None else {"inplace": inplace} |
|
layers.append(activation_layer(**params)) |
|
super().__init__(*layers) |
|
|
|
self.out_channels = out_channels |
|
|
|
if self.__class__ == ConvNormActivation: |
|
warnings.warn( |
|
"Don't use ConvNormActivation directly, please use Conv2dNormActivation and Conv3dNormActivation instead." |
|
) |
|
|
|
|
|
class Conv2dNormActivation(ConvNormActivation): |
|
""" |
|
Configurable block used for Convolution2d-Normalization-Activation blocks. |
|
|
|
Args: |
|
in_channels (int): Number of channels in the input image |
|
out_channels (int): Number of channels produced by the Convolution-Normalization-Activation block |
|
kernel_size: (int, optional): Size of the convolving kernel. Default: 3 |
|
stride (int, optional): Stride of the convolution. Default: 1 |
|
padding (int, tuple or str, optional): Padding added to all four sides of the input. Default: None, in which case it will calculated as ``padding = (kernel_size - 1) // 2 * dilation`` |
|
groups (int, optional): Number of blocked connections from input channels to output channels. Default: 1 |
|
norm_layer (Callable[..., torch.nn.Module], optional): Norm layer that will be stacked on top of the convolution layer. If ``None`` this layer wont be used. Default: ``torch.nn.BatchNorm2d`` |
|
activation_layer (Callable[..., torch.nn.Module], optional): Activation function which will be stacked on top of the normalization layer (if not None), otherwise on top of the conv layer. If ``None`` this layer wont be used. Default: ``torch.nn.ReLU`` |
|
dilation (int): Spacing between kernel elements. Default: 1 |
|
inplace (bool): Parameter for the activation layer, which can optionally do the operation in-place. Default ``True`` |
|
bias (bool, optional): Whether to use bias in the convolution layer. By default, biases are included if ``norm_layer is None``. |
|
|
|
""" |
|
|
|
def __init__( |
|
self, |
|
in_channels: int, |
|
out_channels: int, |
|
kernel_size: int = 3, |
|
stride: int = 1, |
|
padding: Optional[int] = None, |
|
groups: int = 1, |
|
norm_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.BatchNorm2d, |
|
activation_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.ReLU, |
|
dilation: int = 1, |
|
inplace: Optional[bool] = True, |
|
bias: Optional[bool] = None, |
|
) -> None: |
|
|
|
super().__init__( |
|
in_channels, |
|
out_channels, |
|
kernel_size, |
|
stride, |
|
padding, |
|
groups, |
|
norm_layer, |
|
activation_layer, |
|
dilation, |
|
inplace, |
|
bias, |
|
torch.nn.Conv2d, |
|
) |
|
|
|
|
|
class Conv3dNormActivation(ConvNormActivation): |
|
""" |
|
Configurable block used for Convolution3d-Normalization-Activation blocks. |
|
|
|
Args: |
|
in_channels (int): Number of channels in the input video. |
|
out_channels (int): Number of channels produced by the Convolution-Normalization-Activation block |
|
kernel_size: (int, optional): Size of the convolving kernel. Default: 3 |
|
stride (int, optional): Stride of the convolution. Default: 1 |
|
padding (int, tuple or str, optional): Padding added to all four sides of the input. Default: None, in which case it will calculated as ``padding = (kernel_size - 1) // 2 * dilation`` |
|
groups (int, optional): Number of blocked connections from input channels to output channels. Default: 1 |
|
norm_layer (Callable[..., torch.nn.Module], optional): Norm layer that will be stacked on top of the convolution layer. If ``None`` this layer wont be used. Default: ``torch.nn.BatchNorm3d`` |
|
activation_layer (Callable[..., torch.nn.Module], optional): Activation function which will be stacked on top of the normalization layer (if not None), otherwise on top of the conv layer. If ``None`` this layer wont be used. Default: ``torch.nn.ReLU`` |
|
dilation (int): Spacing between kernel elements. Default: 1 |
|
inplace (bool): Parameter for the activation layer, which can optionally do the operation in-place. Default ``True`` |
|
bias (bool, optional): Whether to use bias in the convolution layer. By default, biases are included if ``norm_layer is None``. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
in_channels: int, |
|
out_channels: int, |
|
kernel_size: int = 3, |
|
stride: int = 1, |
|
padding: Optional[int] = None, |
|
groups: int = 1, |
|
norm_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.BatchNorm3d, |
|
activation_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.ReLU, |
|
dilation: int = 1, |
|
inplace: Optional[bool] = True, |
|
bias: Optional[bool] = None, |
|
) -> None: |
|
|
|
super().__init__( |
|
in_channels, |
|
out_channels, |
|
kernel_size, |
|
stride, |
|
padding, |
|
groups, |
|
norm_layer, |
|
activation_layer, |
|
dilation, |
|
inplace, |
|
bias, |
|
torch.nn.Conv3d, |
|
) |
|
|
|
|
|
class SqueezeExcitation(torch.nn.Module): |
|
""" |
|
This block implements the Squeeze-and-Excitation block from https://arxiv.org/abs/1709.01507 (see Fig. 1). |
|
Parameters ``activation``, and ``scale_activation`` correspond to ``delta`` and ``sigma`` in eq. 3. |
|
|
|
Args: |
|
input_channels (int): Number of channels in the input image |
|
squeeze_channels (int): Number of squeeze channels |
|
activation (Callable[..., torch.nn.Module], optional): ``delta`` activation. Default: ``torch.nn.ReLU`` |
|
scale_activation (Callable[..., torch.nn.Module]): ``sigma`` activation. Default: ``torch.nn.Sigmoid`` |
|
""" |
|
|
|
def __init__( |
|
self, |
|
input_channels: int, |
|
squeeze_channels: int, |
|
activation: Callable[..., torch.nn.Module] = torch.nn.ReLU, |
|
scale_activation: Callable[..., torch.nn.Module] = torch.nn.Sigmoid, |
|
) -> None: |
|
super().__init__() |
|
|
|
self.avgpool = torch.nn.AdaptiveAvgPool2d(1) |
|
self.fc1 = torch.nn.Conv2d(input_channels, squeeze_channels, 1) |
|
self.fc2 = torch.nn.Conv2d(squeeze_channels, input_channels, 1) |
|
self.activation = activation() |
|
self.scale_activation = scale_activation() |
|
|
|
def _scale(self, input: Tensor) -> Tensor: |
|
scale = self.avgpool(input) |
|
scale = self.fc1(scale) |
|
scale = self.activation(scale) |
|
scale = self.fc2(scale) |
|
return self.scale_activation(scale) |
|
|
|
def forward(self, input: Tensor) -> Tensor: |
|
scale = self._scale(input) |
|
return scale * input |
|
|
|
|
|
class MLP(torch.nn.Sequential): |
|
"""This block implements the multi-layer perceptron (MLP) module. |
|
|
|
Args: |
|
in_channels (int): Number of channels of the input |
|
hidden_channels (List[int]): List of the hidden channel dimensions |
|
norm_layer (Callable[..., torch.nn.Module], optional): Norm layer that will be stacked on top of the convolution layer. If ``None`` this layer wont be used. Default: ``None`` |
|
activation_layer (Callable[..., torch.nn.Module], optional): Activation function which will be stacked on top of the normalization layer (if not None), otherwise on top of the conv layer. If ``None`` this layer wont be used. Default: ``torch.nn.ReLU`` |
|
inplace (bool): Parameter for the activation layer, which can optionally do the operation in-place. Default ``True`` |
|
bias (bool): Whether to use bias in the linear layer. Default ``True`` |
|
dropout (float): The probability for the dropout layer. Default: 0.0 |
|
""" |
|
|
|
def __init__( |
|
self, |
|
in_channels: int, |
|
hidden_channels: List[int], |
|
norm_layer: Optional[Callable[..., torch.nn.Module]] = None, |
|
activation_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.ReLU, |
|
inplace: Optional[bool] = True, |
|
bias: bool = True, |
|
dropout: float = 0.0, |
|
): |
|
|
|
|
|
params = {} if inplace is None else {"inplace": inplace} |
|
|
|
layers = [] |
|
in_dim = in_channels |
|
for hidden_dim in hidden_channels[:-1]: |
|
layers.append(torch.nn.Linear(in_dim, hidden_dim, bias=bias)) |
|
if norm_layer is not None: |
|
layers.append(norm_layer(hidden_dim)) |
|
layers.append(activation_layer(**params)) |
|
layers.append(torch.nn.Dropout(dropout, **params)) |
|
in_dim = hidden_dim |
|
|
|
layers.append(torch.nn.Linear(in_dim, hidden_channels[-1], bias=bias)) |
|
layers.append(torch.nn.Dropout(dropout, **params)) |
|
|
|
super().__init__(*layers) |
|
|
|
|
|
|
|
class Permute(torch.nn.Module): |
|
"""This module returns a view of the tensor input with its dimensions permuted. |
|
|
|
Args: |
|
dims (List[int]): The desired ordering of dimensions |
|
""" |
|
|
|
def __init__(self, dims: List[int]): |
|
super().__init__() |
|
self.dims = dims |
|
|
|
def forward(self, x: Tensor) -> Tensor: |
|
return torch.permute(x, self.dims) |
|
|
|
|
|
|
|
|
|
|
|
def normalize_array(x: list): |
|
|
|
'''Makes array between 0 and 1''' |
|
|
|
x = np.array(x) |
|
|
|
return (x - np.min(x)) / np.max(x - np.min(x)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_activations(model, image: list, model_name: str, |
|
layer=None, vmax=2.5, sub_mean=True, |
|
channel: int=0): |
|
|
|
'''Gets activations for a given input image''' |
|
|
|
|
|
|
|
|
|
|
|
|
|
layer_outputs = {} |
|
for i in range(len(model.model.features)): |
|
image = model.model.features[i](image) |
|
layer_outputs[i] = image |
|
print(i, layer_outputs[i].shape) |
|
output = model.model(image).detach().cpu().numpy() |
|
output_1 = activation_indices[model_name].detach().cpu().numpy() |
|
output_2 = activation_indices[model_name].detach().cpu().numpy() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
output = special.softmax(output) |
|
|
|
|
|
if channel == 0: |
|
in_image = np.sum(image[0, :, :, :], axis=0) |
|
else: |
|
image[0, int(channel-1), :, :] |
|
in_image = normalize_array(in_image) |
|
|
|
if layer is None: |
|
|
|
activation_1 = np.sum(output_1[0, :, :, :], axis=0) |
|
activation_2 = np.sum(output_2[0, :, :, :], axis=0) |
|
else: |
|
|
|
activation_1 = output_1[0, layer, :, :] |
|
activation_2 = output_2[0, layer, :, :] |
|
|
|
if sub_mean: |
|
|
|
activation_1 -= np.mean(activation_1) |
|
activation_1 = np.abs(activation_1) |
|
|
|
activation_2 -= np.mean(activation_2) |
|
activation_2 = np.abs(activation_2) |
|
|
|
return output, in_image, activation_1, activation_2 |
|
|
|
def plot_input(input_image: list, origin='lower'): |
|
|
|
|
|
plt.rcParams['xtick.labelsize'] = ticks |
|
plt.rcParams['ytick.labelsize'] = ticks |
|
|
|
input_fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(18, 8)) |
|
|
|
im0 = ax.imshow(input_image, cmap=cmap, |
|
origin=origin) |
|
|
|
divider = make_axes_locatable(ax) |
|
cax = divider.append_axes('right', size='5%', pad=0.05) |
|
input_fig.colorbar(im0, cax=cax, orientation='vertical') |
|
|
|
ax.set_title('Input', fontsize=titles) |
|
|
|
return input_fig |
|
|
|
def plot_activations(activation_1: list, activation_2: list, origin='lower'): |
|
|
|
|
|
|
|
plt.rcParams['xtick.labelsize'] = ticks |
|
plt.rcParams['ytick.labelsize'] = ticks |
|
|
|
fig, axs = plt.subplots(nrows=1, ncols=2, figsize=(27, 12)) |
|
|
|
ax1, ax2 = axs[0], axs[1] |
|
|
|
im1 = ax1.imshow(activation_1, cmap=cmap, |
|
origin=origin) |
|
im2 = ax2.imshow(activation_2, cmap=cmap, |
|
origin=origin) |
|
|
|
ims = [im1, im2] |
|
|
|
for (i, ax) in enumerate(axs): |
|
divider = make_axes_locatable(ax) |
|
cax = divider.append_axes('right', size='5%', pad=0.05) |
|
fig.colorbar(ims[i], cax=cax, orientation='vertical') |
|
|
|
|
|
ax1.set_title('Early Activation', fontsize=titles) |
|
ax2.set_title('Late Activation', fontsize=titles) |
|
|
|
return fig |
|
|
|
def predict_and_analyze(model_name, num_channels, dim, input_channel, image): |
|
|
|
''' |
|
Loads a model with activations, passes through image and shows activations |
|
|
|
The image must be a numpy array of shape (C, W, W) or (1, C, W, W) |
|
''' |
|
|
|
model_name = model_name.lower() |
|
num_channels = int(num_channels) |
|
W = int(dim) |
|
|
|
print("Running %s for %i channels" % (model_name, num_channels)) |
|
print("Loading data") |
|
print(image) |
|
|
|
image = np.load(image.name, allow_pickle=True) |
|
image = image.astype(np.float32) |
|
|
|
if len(image.shape) != 4: |
|
image = image[np.newaxis, :, :, :] |
|
|
|
image = torch.from_numpy(image) |
|
|
|
assert image.shape == (1, num_channels, W, W), "Data is the wrong shape" |
|
print("Data loaded") |
|
|
|
print("Loading model") |
|
|
|
model_loading_name = model_path + "%s_%i_planet_detection" % (model_name, num_channels) |
|
|
|
if 'eff' in model_name: |
|
hparams = effnet_hparams[num_channels] |
|
hparams = SimpleNamespace(**hparams) |
|
config = EfficientNetConfig( |
|
dropout=hparams.dropout, |
|
num_channels=hparams.num_channels, |
|
num_classes=hparams.num_classes, |
|
size=hparams.size, |
|
stochastic_depth_prob=hparams.stochastic_depth_prob, |
|
width_mult=hparams.width_mult, |
|
depth_mult=hparams.depth_mult, |
|
) |
|
|
|
config.save_pretrained(save_directory=model_loading_name) |
|
|
|
|
|
model = EfficientNetPreTrained.from_pretrained(model_loading_name) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("Model loaded") |
|
|
|
print("Looking at activations") |
|
output, input_image, activation_1, activation_2 = get_activations(model, image, model_name, |
|
channel=input_channel, |
|
sub_mean=True) |
|
print("Activations and predictions finished") |
|
|
|
if output[0] < output[1]: |
|
output = 'Planet predicted with %.3f percent confidence' % (100*output[1]) |
|
else: |
|
output = 'No planet predicted with %.3f percent confidence' % (100*output[0]) |
|
|
|
input_image = normalize_array(input_image) |
|
activation_1 = normalize_array(activation_1) |
|
activation_2 = normalize_array(activation_2) |
|
|
|
|
|
|
|
|
|
print("Plotting") |
|
|
|
origin = 'upper' |
|
|
|
|
|
input_fig = plot_input(input_image, origin=origin) |
|
|
|
|
|
fig1 = plot_activations(activation_1, activation_2, model_name, origin=origin) |
|
|
|
|
|
_, _, activation_1, activation_2 = get_activations(model, image, model_name, |
|
channel=input_channel, |
|
sub_mean=False) |
|
activation_1 = normalize_array(activation_1) |
|
activation_2 = normalize_array(activation_2) |
|
fig2 = plot_activations(activation_1, activation_2, model_name, origin=origin) |
|
|
|
print("Sending to Hugging Face") |
|
|
|
return output, input_fig, fig1, fig2 |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
demo = gr.Interface( |
|
fn=predict_and_analyze, |
|
inputs=[gr.Dropdown(["EfficientNet"], |
|
|
|
value="EfficientNet", |
|
label="Model Selection", |
|
show_label=True), |
|
gr.Dropdown(["47", "61", "75"], |
|
value="61", |
|
label="Number of Velocity Channels", |
|
show_label=True), |
|
gr.Dropdown(["600"], |
|
value="600", |
|
label="Image Dimensions", |
|
show_label=True), |
|
gr.Number(value=0., |
|
label="Input Channel to show (0 = sum over all)", |
|
show_label=True), |
|
gr.File(label="Input Data", show_label=True)], |
|
outputs=[gr.Textbox(lines=1, label="Prediction", show_label=True), |
|
|
|
gr.Plot(label="Input Image", show_label=True), |
|
gr.Plot(label="Mean-Subtracted Activations", show_label=True), |
|
gr.Plot(label="Raw Activations", show_label=True) |
|
], |
|
title="Kinematic Planet Detector" |
|
) |
|
demo.launch() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|