jpterry's picture
everything in app.py
bc0a3bf
raw
history blame
41.5 kB
import gradio as gr
from matplotlib import cm
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
import numpy as np
# import onnxruntime as ort
from PIL import Image
from scipy import special
import sys
# import timm
from types import SimpleNamespace
# from transformers import AutoModel, pipeline
from transformers import AutoModelForImageClassification
import torch
from torch import Tensor, nn
from torch import Tensor
from torchvision.models._utils import _make_divisible
from torchvision.ops import StochasticDepth
# sys.path.insert(1, "../")
# from utils import model_utils, train_utils, data_utils, run_utils
# from model_utils import jason_regnet_maker, jason_efficientnet_maker
# from model_utils.efficientnet_config import EfficientNetConfig, EfficientNetPreTrained
from transformers import PretrainedConfig, PreTrainedModel
from typing import List
import copy
import math
import warnings
from dataclasses import dataclass
from functools import partial
import sys
from typing import Any, Callable, List, Optional, Sequence, Tuple, Union
# sys.path.insert(1, "../")
# from utils.vision_modifications import Conv2dNormActivation, SqueezeExcitation
interpolate = torch.nn.functional.interpolate
model_path = 'chlab/'
# model_path = './models/'
# plotting a prameters
labels = 20
ticks = 14
legends = 14
text = 14
titles = 22
lw = 3
ps = 200
cmap = 'magma'
effnet_hparams = {61: {
"num_classes": 2,
"gamma": 0.032606396652426956,
"lr": 0.008692971067922545,
"weight_decay": 0.00008348389688708425,
"batch_size": 23,
"num_channels": 61,
"stochastic_depth_prob": 0.003581930052432713,
"dropout": 0.027804120950575217,
"width_mult": 1.060782511229692,
"depth_mult": 0.7752918857163054,
"size": "v2_s",
}}
# effnet_config = SimpleNamespace(**effnet_hparams)
# which layers to look at
activation_indices = {'efficientnet': [0, 3]}
########## EfficientNet ############
@dataclass
class _MBConvConfig:
expand_ratio: float
kernel: int
stride: int
input_channels: int
out_channels: int
num_layers: int
block: Callable[..., nn.Module]
@staticmethod
def adjust_channels(
channels: int, width_mult: float, min_value: Optional[int] = None
) -> int:
return _make_divisible(channels * width_mult, 8, min_value)
class MBConvConfig(_MBConvConfig):
# Stores information listed at Table 1 of the EfficientNet paper & Table 4 of the EfficientNetV2 paper
def __init__(
self,
expand_ratio: float,
kernel: int,
stride: int,
input_channels: int,
out_channels: int,
num_layers: int,
width_mult: float = 1.0,
depth_mult: float = 1.0,
block: Optional[Callable[..., nn.Module]] = None,
) -> None:
input_channels = self.adjust_channels(input_channels, width_mult)
out_channels = self.adjust_channels(out_channels, width_mult)
num_layers = self.adjust_depth(num_layers, depth_mult)
if block is None:
block = MBConv
super().__init__(
expand_ratio,
kernel,
stride,
input_channels,
out_channels,
num_layers,
block,
)
@staticmethod
def adjust_depth(num_layers: int, depth_mult: float):
return int(math.ceil(num_layers * depth_mult))
class FusedMBConvConfig(_MBConvConfig):
# Stores information listed at Table 4 of the EfficientNetV2 paper
def __init__(
self,
expand_ratio: float,
kernel: int,
stride: int,
input_channels: int,
out_channels: int,
num_layers: int,
block: Optional[Callable[..., nn.Module]] = None,
) -> None:
if block is None:
block = FusedMBConv
super().__init__(
expand_ratio,
kernel,
stride,
input_channels,
out_channels,
num_layers,
block,
)
class MBConv(nn.Module):
def __init__(
self,
cnf: MBConvConfig,
stochastic_depth_prob: float,
norm_layer: Callable[..., nn.Module],
se_layer: Callable[..., nn.Module] = SqueezeExcitation,
) -> None:
super().__init__()
if not (1 <= cnf.stride <= 2):
raise ValueError("illegal stride value")
self.use_res_connect = (
cnf.stride == 1 and cnf.input_channels == cnf.out_channels
)
layers: List[nn.Module] = []
activation_layer = nn.SiLU
# expand
expanded_channels = cnf.adjust_channels(cnf.input_channels, cnf.expand_ratio)
if expanded_channels != cnf.input_channels:
layers.append(
Conv2dNormActivation(
cnf.input_channels,
expanded_channels,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=activation_layer,
)
)
# depthwise
layers.append(
Conv2dNormActivation(
expanded_channels,
expanded_channels,
kernel_size=cnf.kernel,
stride=cnf.stride,
groups=expanded_channels,
norm_layer=norm_layer,
activation_layer=activation_layer,
)
)
# squeeze and excitation
squeeze_channels = max(1, cnf.input_channels // 4)
layers.append(
se_layer(
expanded_channels,
squeeze_channels,
activation=partial(nn.SiLU, inplace=True),
)
)
# project
layers.append(
Conv2dNormActivation(
expanded_channels,
cnf.out_channels,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=None,
)
)
self.block = nn.Sequential(*layers)
self.stochastic_depth = StochasticDepth(stochastic_depth_prob, "row")
self.out_channels = cnf.out_channels
def forward(self, input: Tensor) -> Tensor:
result = self.block(input)
if self.use_res_connect:
result = self.stochastic_depth(result)
result += input
return result
class FusedMBConv(nn.Module):
def __init__(
self,
cnf: FusedMBConvConfig,
stochastic_depth_prob: float,
norm_layer: Callable[..., nn.Module],
) -> None:
super().__init__()
if not (1 <= cnf.stride <= 2):
raise ValueError("illegal stride value")
self.use_res_connect = (
cnf.stride == 1 and cnf.input_channels == cnf.out_channels
)
layers: List[nn.Module] = []
activation_layer = nn.SiLU
expanded_channels = cnf.adjust_channels(cnf.input_channels, cnf.expand_ratio)
if expanded_channels != cnf.input_channels:
# fused expand
layers.append(
Conv2dNormActivation(
cnf.input_channels,
expanded_channels,
kernel_size=cnf.kernel,
stride=cnf.stride,
norm_layer=norm_layer,
activation_layer=activation_layer,
)
)
# project
layers.append(
Conv2dNormActivation(
expanded_channels,
cnf.out_channels,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=None,
)
)
else:
layers.append(
Conv2dNormActivation(
cnf.input_channels,
cnf.out_channels,
kernel_size=cnf.kernel,
stride=cnf.stride,
norm_layer=norm_layer,
activation_layer=activation_layer,
)
)
self.block = nn.Sequential(*layers)
self.stochastic_depth = StochasticDepth(stochastic_depth_prob, "row")
self.out_channels = cnf.out_channels
def forward(self, input: Tensor) -> Tensor:
result = self.block(input)
if self.use_res_connect:
result = self.stochastic_depth(result)
result += input
return result
class EfficientNetConfig(PretrainedConfig):
model_type = "efficientnet"
def __init__(
self,
# inverted_residual_setting: Sequence[Union[MBConvConfig, FusedMBConvConfig]],
dropout: float=0.25,
num_channels: int = 61,
stochastic_depth_prob: float = 0.2,
num_classes: int = 2,
norm_layer: Optional[Callable[..., nn.Module]] = None,
# last_channel: Optional[int] = None,
size: str='v2_s',
width_mult: float = 1.0,
depth_mult: float = 1.0,
**kwargs: Any,
) -> None:
"""
EfficientNet V1 and V2 main class
Args:
inverted_residual_setting (Sequence[Union[MBConvConfig, FusedMBConvConfig]]): Network structure
dropout (float): The droupout probability
stochastic_depth_prob (float): The stochastic depth probability
num_classes (int): Number of classes
norm_layer (Optional[Callable[..., nn.Module]]): Module specifying the normalization layer to use
last_channel (int): The number of channels on the penultimate layer
"""
# self.model = EfficientNet(
# dropout=dropout,
# num_channels=num_channels,
# num_classes=num_classes,
# size=size,
# stochastic_depth_prob=stochastic_depth_prob,
# width_mult=width_mult,
# depth_mult=depth_mult,
# )
#
self.dropout=dropout
self.num_channels=num_channels
self.num_classes=num_classes
self.size=size
self.stochastic_depth_prob=stochastic_depth_prob
self.width_mult=width_mult
self.depth_mult=depth_mult
super().__init__(**kwargs)
class EfficientNetPreTrained(PreTrainedModel):
config_class = EfficientNetConfig
def __init__(
self,
config
):
super().__init__(config)
self.model = EfficientNet( dropout=config.dropout,
num_channels=config.num_channels,
num_classes=config.num_classes,
size=config.size,
stochastic_depth_prob=config.stochastic_depth_prob,
width_mult=config.width_mult,
depth_mult=config.depth_mult,)
def forward(self, tensor):
return self.model.forward(tensor)
class EfficientNet(nn.Module):
def __init__(
self,
# inverted_residual_setting: Sequence[Union[MBConvConfig, FusedMBConvConfig]],
dropout: float=0.25,
num_channels: int = 61,
stochastic_depth_prob: float = 0.2,
num_classes: int = 2,
norm_layer: Optional[Callable[..., nn.Module]] = None,
# last_channel: Optional[int] = None,
size: str='v2_s',
width_mult: float = 1.0,
depth_mult: float = 1.0,
**kwargs: Any,
) -> None:
"""
EfficientNet V1 and V2 main class
Args:
inverted_residual_setting (Sequence[Union[MBConvConfig, FusedMBConvConfig]]): Network structure
dropout (float): The droupout probability
stochastic_depth_prob (float): The stochastic depth probability
num_classes (int): Number of classes
norm_layer (Optional[Callable[..., nn.Module]]): Module specifying the normalization layer to use
last_channel (int): The number of channels on the penultimate layer
"""
super().__init__()
# _log_api_usage_once(self)
inverted_residual_setting, last_channel = _efficientnet_conf(
"efficientnet_%s" % (size), width_mult=width_mult, depth_mult=depth_mult
)
if not inverted_residual_setting:
raise ValueError("The inverted_residual_setting should not be empty")
elif not (
isinstance(inverted_residual_setting, Sequence)
and all([isinstance(s, _MBConvConfig) for s in inverted_residual_setting])
):
raise TypeError(
"The inverted_residual_setting should be List[MBConvConfig]"
)
if "block" in kwargs:
warnings.warn(
"The parameter 'block' is deprecated since 0.13 and will be removed 0.15. "
"Please pass this information on 'MBConvConfig.block' instead."
)
if kwargs["block"] is not None:
for s in inverted_residual_setting:
if isinstance(s, MBConvConfig):
s.block = kwargs["block"]
if norm_layer is None:
norm_layer = nn.BatchNorm2d
layers: List[nn.Module] = []
# building first layer
firstconv_output_channels = inverted_residual_setting[0].input_channels
layers.append(
Conv2dNormActivation(
num_channels,
firstconv_output_channels,
kernel_size=3,
stride=2,
norm_layer=norm_layer,
activation_layer=nn.SiLU,
)
)
# building inverted residual blocks
total_stage_blocks = sum(cnf.num_layers for cnf in inverted_residual_setting)
stage_block_id = 0
for cnf in inverted_residual_setting:
stage: List[nn.Module] = []
for _ in range(cnf.num_layers):
# copy to avoid modifications. shallow copy is enough
block_cnf = copy.copy(cnf)
# overwrite info if not the first conv in the stage
if stage:
block_cnf.input_channels = block_cnf.out_channels
block_cnf.stride = 1
# adjust stochastic depth probability based on the depth of the stage block
sd_prob = (
stochastic_depth_prob * float(stage_block_id) / total_stage_blocks
)
stage.append(block_cnf.block(block_cnf, sd_prob, norm_layer))
stage_block_id += 1
layers.append(nn.Sequential(*stage))
# building last several layers
lastconv_input_channels = inverted_residual_setting[-1].out_channels
lastconv_output_channels = (
last_channel if last_channel is not None else 4 * lastconv_input_channels
)
layers.append(
Conv2dNormActivation(
lastconv_input_channels,
lastconv_output_channels,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=nn.SiLU,
)
)
self.features = nn.Sequential(*layers)
self.avgpool = nn.AdaptiveAvgPool2d(1)
self.classifier = nn.Sequential(
nn.Dropout(p=dropout, inplace=True),
nn.Linear(lastconv_output_channels, num_classes),
)
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode="fan_out")
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Linear):
init_range = 1.0 / math.sqrt(m.out_features)
nn.init.uniform_(m.weight, -init_range, init_range)
nn.init.zeros_(m.bias)
# super().__init__(**kwargs)
def _forward_impl(self, x: Tensor) -> Tensor:
x = self.features(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.classifier(x)
return x
def forward(self, x: Tensor) -> Tensor:
return self._forward_impl(x)
# def _efficientnet(
# inverted_residual_setting: Sequence[Union[MBConvConfig, FusedMBConvConfig]],
# dropout: float,
# last_channel: Optional[int],
# weights=None,
# num_channels: int = 61,
# stochastic_depth_prob: float = 0.2,
# progress: bool = True,
# num_classes: int = 2,
# **kwargs: Any,
# ) -> EfficientNetCongig:
# model = EfficientNetCongif(
# inverted_residual_setting,
# dropout,
# num_classes=num_classes,
# num_channels=num_channels,
# stochastic_depth_prob=stochastic_depth_prob,
# last_channel=last_channel,
# **kwargs,
# )
# return model
def _efficientnet_conf(
arch: str,
**kwargs: Any,
) -> Tuple[Sequence[Union[MBConvConfig, FusedMBConvConfig]], Optional[int]]:
inverted_residual_setting: Sequence[Union[MBConvConfig, FusedMBConvConfig]]
if arch.startswith("efficientnet_b"):
bneck_conf = partial(
MBConvConfig,
width_mult=kwargs.pop("width_mult"),
depth_mult=kwargs.pop("depth_mult"),
)
inverted_residual_setting = [
bneck_conf(1, 3, 1, 32, 16, 1),
bneck_conf(6, 3, 2, 16, 24, 2),
bneck_conf(6, 5, 2, 24, 40, 2),
bneck_conf(6, 3, 2, 40, 80, 3),
bneck_conf(6, 5, 1, 80, 112, 3),
bneck_conf(6, 5, 2, 112, 192, 4),
bneck_conf(6, 3, 1, 192, 320, 1),
]
last_channel = None
elif arch.startswith("efficientnet_v2_s"):
inverted_residual_setting = [
FusedMBConvConfig(1, 3, 1, 24, 24, 2),
FusedMBConvConfig(4, 3, 2, 24, 48, 4),
FusedMBConvConfig(4, 3, 2, 48, 64, 4),
MBConvConfig(4, 3, 2, 64, 128, 6),
MBConvConfig(6, 3, 1, 128, 160, 9),
MBConvConfig(6, 3, 2, 160, 256, 15),
]
last_channel = 1280
elif arch.startswith("efficientnet_v2_m"):
inverted_residual_setting = [
FusedMBConvConfig(1, 3, 1, 24, 24, 3),
FusedMBConvConfig(4, 3, 2, 24, 48, 5),
FusedMBConvConfig(4, 3, 2, 48, 80, 5),
MBConvConfig(4, 3, 2, 80, 160, 7),
MBConvConfig(6, 3, 1, 160, 176, 14),
MBConvConfig(6, 3, 2, 176, 304, 18),
MBConvConfig(6, 3, 1, 304, 512, 5),
]
last_channel = 1280
elif arch.startswith("efficientnet_v2_l"):
inverted_residual_setting = [
FusedMBConvConfig(1, 3, 1, 32, 32, 4),
FusedMBConvConfig(4, 3, 2, 32, 64, 7),
FusedMBConvConfig(4, 3, 2, 64, 96, 7),
MBConvConfig(4, 3, 2, 96, 192, 10),
MBConvConfig(6, 3, 1, 192, 224, 19),
MBConvConfig(6, 3, 2, 224, 384, 25),
MBConvConfig(6, 3, 1, 384, 640, 7),
]
last_channel = 1280
else:
raise ValueError(f"Unsupported model type {arch}")
return inverted_residual_setting, last_channel
#### extra torchvision stuff ####
class FrozenBatchNorm2d(torch.nn.Module):
"""
BatchNorm2d where the batch statistics and the affine parameters are fixed
Args:
num_features (int): Number of features ``C`` from an expected input of size ``(N, C, H, W)``
eps (float): a value added to the denominator for numerical stability. Default: 1e-5
"""
def __init__(
self,
num_features: int,
eps: float = 1e-5,
):
super().__init__()
# _log_api_usage_once(self)
self.eps = eps
self.register_buffer("weight", torch.ones(num_features))
self.register_buffer("bias", torch.zeros(num_features))
self.register_buffer("running_mean", torch.zeros(num_features))
self.register_buffer("running_var", torch.ones(num_features))
def _load_from_state_dict(
self,
state_dict: dict,
prefix: str,
local_metadata: dict,
strict: bool,
missing_keys: List[str],
unexpected_keys: List[str],
error_msgs: List[str],
):
num_batches_tracked_key = prefix + "num_batches_tracked"
if num_batches_tracked_key in state_dict:
del state_dict[num_batches_tracked_key]
super()._load_from_state_dict(
state_dict, prefix, local_metadata, strict, missing_keys, unexpected_keys, error_msgs
)
def forward(self, x: Tensor) -> Tensor:
# move reshapes to the beginning
# to make it fuser-friendly
w = self.weight.reshape(1, -1, 1, 1)
b = self.bias.reshape(1, -1, 1, 1)
rv = self.running_var.reshape(1, -1, 1, 1)
rm = self.running_mean.reshape(1, -1, 1, 1)
scale = w * (rv + self.eps).rsqrt()
bias = b - rm * scale
return x * scale + bias
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.weight.shape[0]}, eps={self.eps})"
class ConvNormActivation(torch.nn.Sequential):
def __init__(
self,
in_channels: int,
out_channels: int,
kernel_size: int = 3,
stride: int = 1,
padding: Optional[int] = None,
groups: int = 1,
norm_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.BatchNorm2d,
activation_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.ReLU,
dilation: int = 1,
inplace: Optional[bool] = True,
bias: Optional[bool] = None,
conv_layer: Callable[..., torch.nn.Module] = torch.nn.Conv2d,
) -> None:
if padding is None:
padding = (kernel_size - 1) // 2 * dilation
if bias is None:
bias = norm_layer is None
layers = [
conv_layer(
in_channels,
out_channels,
kernel_size,
stride,
padding,
dilation=dilation,
groups=groups,
bias=bias,
)
]
if norm_layer is not None:
layers.append(norm_layer(out_channels))
if activation_layer is not None:
params = {} if inplace is None else {"inplace": inplace}
layers.append(activation_layer(**params))
super().__init__(*layers)
# _log_api_usage_once(self)
self.out_channels = out_channels
if self.__class__ == ConvNormActivation:
warnings.warn(
"Don't use ConvNormActivation directly, please use Conv2dNormActivation and Conv3dNormActivation instead."
)
class Conv2dNormActivation(ConvNormActivation):
"""
Configurable block used for Convolution2d-Normalization-Activation blocks.
Args:
in_channels (int): Number of channels in the input image
out_channels (int): Number of channels produced by the Convolution-Normalization-Activation block
kernel_size: (int, optional): Size of the convolving kernel. Default: 3
stride (int, optional): Stride of the convolution. Default: 1
padding (int, tuple or str, optional): Padding added to all four sides of the input. Default: None, in which case it will calculated as ``padding = (kernel_size - 1) // 2 * dilation``
groups (int, optional): Number of blocked connections from input channels to output channels. Default: 1
norm_layer (Callable[..., torch.nn.Module], optional): Norm layer that will be stacked on top of the convolution layer. If ``None`` this layer wont be used. Default: ``torch.nn.BatchNorm2d``
activation_layer (Callable[..., torch.nn.Module], optional): Activation function which will be stacked on top of the normalization layer (if not None), otherwise on top of the conv layer. If ``None`` this layer wont be used. Default: ``torch.nn.ReLU``
dilation (int): Spacing between kernel elements. Default: 1
inplace (bool): Parameter for the activation layer, which can optionally do the operation in-place. Default ``True``
bias (bool, optional): Whether to use bias in the convolution layer. By default, biases are included if ``norm_layer is None``.
"""
def __init__(
self,
in_channels: int,
out_channels: int,
kernel_size: int = 3,
stride: int = 1,
padding: Optional[int] = None,
groups: int = 1,
norm_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.BatchNorm2d,
activation_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.ReLU,
dilation: int = 1,
inplace: Optional[bool] = True,
bias: Optional[bool] = None,
) -> None:
super().__init__(
in_channels,
out_channels,
kernel_size,
stride,
padding,
groups,
norm_layer,
activation_layer,
dilation,
inplace,
bias,
torch.nn.Conv2d,
)
class Conv3dNormActivation(ConvNormActivation):
"""
Configurable block used for Convolution3d-Normalization-Activation blocks.
Args:
in_channels (int): Number of channels in the input video.
out_channels (int): Number of channels produced by the Convolution-Normalization-Activation block
kernel_size: (int, optional): Size of the convolving kernel. Default: 3
stride (int, optional): Stride of the convolution. Default: 1
padding (int, tuple or str, optional): Padding added to all four sides of the input. Default: None, in which case it will calculated as ``padding = (kernel_size - 1) // 2 * dilation``
groups (int, optional): Number of blocked connections from input channels to output channels. Default: 1
norm_layer (Callable[..., torch.nn.Module], optional): Norm layer that will be stacked on top of the convolution layer. If ``None`` this layer wont be used. Default: ``torch.nn.BatchNorm3d``
activation_layer (Callable[..., torch.nn.Module], optional): Activation function which will be stacked on top of the normalization layer (if not None), otherwise on top of the conv layer. If ``None`` this layer wont be used. Default: ``torch.nn.ReLU``
dilation (int): Spacing between kernel elements. Default: 1
inplace (bool): Parameter for the activation layer, which can optionally do the operation in-place. Default ``True``
bias (bool, optional): Whether to use bias in the convolution layer. By default, biases are included if ``norm_layer is None``.
"""
def __init__(
self,
in_channels: int,
out_channels: int,
kernel_size: int = 3,
stride: int = 1,
padding: Optional[int] = None,
groups: int = 1,
norm_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.BatchNorm3d,
activation_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.ReLU,
dilation: int = 1,
inplace: Optional[bool] = True,
bias: Optional[bool] = None,
) -> None:
super().__init__(
in_channels,
out_channels,
kernel_size,
stride,
padding,
groups,
norm_layer,
activation_layer,
dilation,
inplace,
bias,
torch.nn.Conv3d,
)
class SqueezeExcitation(torch.nn.Module):
"""
This block implements the Squeeze-and-Excitation block from https://arxiv.org/abs/1709.01507 (see Fig. 1).
Parameters ``activation``, and ``scale_activation`` correspond to ``delta`` and ``sigma`` in eq. 3.
Args:
input_channels (int): Number of channels in the input image
squeeze_channels (int): Number of squeeze channels
activation (Callable[..., torch.nn.Module], optional): ``delta`` activation. Default: ``torch.nn.ReLU``
scale_activation (Callable[..., torch.nn.Module]): ``sigma`` activation. Default: ``torch.nn.Sigmoid``
"""
def __init__(
self,
input_channels: int,
squeeze_channels: int,
activation: Callable[..., torch.nn.Module] = torch.nn.ReLU,
scale_activation: Callable[..., torch.nn.Module] = torch.nn.Sigmoid,
) -> None:
super().__init__()
# _log_api_usage_once(self)
self.avgpool = torch.nn.AdaptiveAvgPool2d(1)
self.fc1 = torch.nn.Conv2d(input_channels, squeeze_channels, 1)
self.fc2 = torch.nn.Conv2d(squeeze_channels, input_channels, 1)
self.activation = activation()
self.scale_activation = scale_activation()
def _scale(self, input: Tensor) -> Tensor:
scale = self.avgpool(input)
scale = self.fc1(scale)
scale = self.activation(scale)
scale = self.fc2(scale)
return self.scale_activation(scale)
def forward(self, input: Tensor) -> Tensor:
scale = self._scale(input)
return scale * input
class MLP(torch.nn.Sequential):
"""This block implements the multi-layer perceptron (MLP) module.
Args:
in_channels (int): Number of channels of the input
hidden_channels (List[int]): List of the hidden channel dimensions
norm_layer (Callable[..., torch.nn.Module], optional): Norm layer that will be stacked on top of the convolution layer. If ``None`` this layer wont be used. Default: ``None``
activation_layer (Callable[..., torch.nn.Module], optional): Activation function which will be stacked on top of the normalization layer (if not None), otherwise on top of the conv layer. If ``None`` this layer wont be used. Default: ``torch.nn.ReLU``
inplace (bool): Parameter for the activation layer, which can optionally do the operation in-place. Default ``True``
bias (bool): Whether to use bias in the linear layer. Default ``True``
dropout (float): The probability for the dropout layer. Default: 0.0
"""
def __init__(
self,
in_channels: int,
hidden_channels: List[int],
norm_layer: Optional[Callable[..., torch.nn.Module]] = None,
activation_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.ReLU,
inplace: Optional[bool] = True,
bias: bool = True,
dropout: float = 0.0,
):
# The addition of `norm_layer` is inspired from the implementation of TorchMultimodal:
# https://github.com/facebookresearch/multimodal/blob/5dec8a/torchmultimodal/modules/layers/mlp.py
params = {} if inplace is None else {"inplace": inplace}
layers = []
in_dim = in_channels
for hidden_dim in hidden_channels[:-1]:
layers.append(torch.nn.Linear(in_dim, hidden_dim, bias=bias))
if norm_layer is not None:
layers.append(norm_layer(hidden_dim))
layers.append(activation_layer(**params))
layers.append(torch.nn.Dropout(dropout, **params))
in_dim = hidden_dim
layers.append(torch.nn.Linear(in_dim, hidden_channels[-1], bias=bias))
layers.append(torch.nn.Dropout(dropout, **params))
super().__init__(*layers)
# _log_api_usage_once(self)
class Permute(torch.nn.Module):
"""This module returns a view of the tensor input with its dimensions permuted.
Args:
dims (List[int]): The desired ordering of dimensions
"""
def __init__(self, dims: List[int]):
super().__init__()
self.dims = dims
def forward(self, x: Tensor) -> Tensor:
return torch.permute(x, self.dims)
def normalize_array(x: list):
'''Makes array between 0 and 1'''
x = np.array(x)
return (x - np.min(x)) / np.max(x - np.min(x))
# def load_model(model: str, activation: bool=True):
# if activation:
# model += '_w_activation'
# # set options for onnx runtime
# options = ort.SessionOptions()
# options.intra_op_num_threads = 1
# options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
# provider = "CPUExecutionProvider"
# # start session
# ort_session = ort.InferenceSession(model_path + '%s.onnx' % (model), options, providers=[provider])
# # ort_session = ORTModel.load_model(model_path + '%s.onnx' % (model))
# return ort_session
def get_activations(model, image: list, model_name: str,
layer=None, vmax=2.5, sub_mean=True,
channel: int=0):
'''Gets activations for a given input image'''
# run model
# input_name = intermediate_model.get_inputs()[0].name
# outputs = intermediate_model.run(None, {input_name: image})
layer_outputs = {}
for i in range(len(model.model.features)):
image = model.model.features[i](image)
layer_outputs[i] = image
print(i, layer_outputs[i].shape)
output = model.model(image).detach().cpu().numpy()
output_1 = activation_indices[model_name].detach().cpu().numpy()
output_2 = activation_indices[model_name].detach().cpu().numpy()
# get activations
# output_1 = outputs[1]
# output_2 = outputs[2]
# get prediction
# output = outputs[0][0]
output = special.softmax(output)
# sum over velocity channels
if channel == 0:
in_image = np.sum(image[0, :, :, :], axis=0)
else:
image[0, int(channel-1), :, :]
in_image = normalize_array(in_image)
if layer is None:
# sum over all velocity channels
activation_1 = np.sum(output_1[0, :, :, :], axis=0)
activation_2 = np.sum(output_2[0, :, :, :], axis=0)
else:
# select a single channel
activation_1 = output_1[0, layer, :, :]
activation_2 = output_2[0, layer, :, :]
if sub_mean:
# y = |x - <x>|
activation_1 -= np.mean(activation_1)
activation_1 = np.abs(activation_1)
activation_2 -= np.mean(activation_2)
activation_2 = np.abs(activation_2)
return output, in_image, activation_1, activation_2
def plot_input(input_image: list, origin='lower'):
##### make the figure for the input image #####
plt.rcParams['xtick.labelsize'] = ticks
plt.rcParams['ytick.labelsize'] = ticks
input_fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(18, 8))
im0 = ax.imshow(input_image, cmap=cmap,
origin=origin)
divider = make_axes_locatable(ax)
cax = divider.append_axes('right', size='5%', pad=0.05)
input_fig.colorbar(im0, cax=cax, orientation='vertical')
ax.set_title('Input', fontsize=titles)
return input_fig
def plot_activations(activation_1: list, activation_2: list, origin='lower'):
##### Make the activation figure ######
plt.rcParams['xtick.labelsize'] = ticks
plt.rcParams['ytick.labelsize'] = ticks
fig, axs = plt.subplots(nrows=1, ncols=2, figsize=(27, 12))
ax1, ax2 = axs[0], axs[1]
im1 = ax1.imshow(activation_1, cmap=cmap,
origin=origin)
im2 = ax2.imshow(activation_2, cmap=cmap,
origin=origin)
ims = [im1, im2]
for (i, ax) in enumerate(axs):
divider = make_axes_locatable(ax)
cax = divider.append_axes('right', size='5%', pad=0.05)
fig.colorbar(ims[i], cax=cax, orientation='vertical')
# ax0.set_title('Input', fontsize=titles)
ax1.set_title('Early Activation', fontsize=titles)
ax2.set_title('Late Activation', fontsize=titles)
return fig
def predict_and_analyze(model_name, num_channels, dim, input_channel, image):
'''
Loads a model with activations, passes through image and shows activations
The image must be a numpy array of shape (C, W, W) or (1, C, W, W)
'''
model_name = model_name.lower()
num_channels = int(num_channels)
W = int(dim)
print("Running %s for %i channels" % (model_name, num_channels))
print("Loading data")
print(image)
image = np.load(image.name, allow_pickle=True)
image = image.astype(np.float32)
if len(image.shape) != 4:
image = image[np.newaxis, :, :, :]
image = torch.from_numpy(image)
assert image.shape == (1, num_channels, W, W), "Data is the wrong shape"
print("Data loaded")
print("Loading model")
model_loading_name = model_path + "%s_%i_planet_detection" % (model_name, num_channels)
if 'eff' in model_name:
hparams = effnet_hparams[num_channels]
hparams = SimpleNamespace(**hparams)
config = EfficientNetConfig(
dropout=hparams.dropout,
num_channels=hparams.num_channels,
num_classes=hparams.num_classes,
size=hparams.size,
stochastic_depth_prob=hparams.stochastic_depth_prob,
width_mult=hparams.width_mult,
depth_mult=hparams.depth_mult,
)
config.save_pretrained(save_directory=model_loading_name)
# config = EfficientNetConfig.from_pretrained(model_loading_name)
model = EfficientNetPreTrained.from_pretrained(model_loading_name)
# model = EfficientNetPreTrained(config)
# config.register_for_auto_class()
# model.register_for_auto_class("AutoModelForImageClassification")
# pretrained_model = timm.create_model(model_loading_name, pretrained=True)
# model.model.load_state_dict(pretrained_model.state_dict())
# pipeline = pipeline(task="image-classification", model=model_loading_name)
# model = load_model(model_name, activation=True)
# model = AutoModel.from_pretrained(model_loading_name)
print("Model loaded")
print("Looking at activations")
output, input_image, activation_1, activation_2 = get_activations(model, image, model_name,
channel=input_channel,
sub_mean=True)
print("Activations and predictions finished")
if output[0] < output[1]:
output = 'Planet predicted with %.3f percent confidence' % (100*output[1])
else:
output = 'No planet predicted with %.3f percent confidence' % (100*output[0])
input_image = normalize_array(input_image)
activation_1 = normalize_array(activation_1)
activation_2 = normalize_array(activation_2)
# convert input image to RGB (unused for now since not outputting actual image)
# input_pil_image = Image.fromarray(np.uint8(cm.magma(input_image)*255))
print("Plotting")
origin = 'upper'
# plot input image
input_fig = plot_input(input_image, origin=origin)
# plot mean subtracted activations
fig1 = plot_activations(activation_1, activation_2, model_name, origin=origin)
# plot raw activations
_, _, activation_1, activation_2 = get_activations(model, image, model_name,
channel=input_channel,
sub_mean=False)
activation_1 = normalize_array(activation_1)
activation_2 = normalize_array(activation_2)
fig2 = plot_activations(activation_1, activation_2, model_name, origin=origin)
print("Sending to Hugging Face")
return output, input_fig, fig1, fig2
if __name__ == "__main__":
demo = gr.Interface(
fn=predict_and_analyze,
inputs=[gr.Dropdown(["EfficientNet"],
# "RegNet"],
value="EfficientNet",
label="Model Selection",
show_label=True),
gr.Dropdown(["47", "61", "75"],
value="61",
label="Number of Velocity Channels",
show_label=True),
gr.Dropdown(["600"],
value="600",
label="Image Dimensions",
show_label=True),
gr.Number(value=0.,
label="Input Channel to show (0 = sum over all)",
show_label=True),
gr.File(label="Input Data", show_label=True)],
outputs=[gr.Textbox(lines=1, label="Prediction", show_label=True),
# gr.Image(label="Input Image", show_label=True),
gr.Plot(label="Input Image", show_label=True),
gr.Plot(label="Mean-Subtracted Activations", show_label=True),
gr.Plot(label="Raw Activations", show_label=True)
],
title="Kinematic Planet Detector"
)
demo.launch()