|
|
|
|
|
|
|
|
|
import functools |
|
import math |
|
import torch |
|
import torch.nn as nn |
|
|
|
try: |
|
from kornia.filters import filter2d |
|
except: |
|
pass |
|
|
|
|
|
|
|
|
|
|
|
class PatchGANDiscriminator(nn.Module): |
|
"""Defines a PatchGAN discriminator as in Pix2Pix |
|
--> see https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix/blob/master/models/networks.py |
|
""" |
|
|
|
def __init__(self, input_nc=3, ndf=64, n_layers=3, use_actnorm=False): |
|
"""Construct a PatchGAN discriminator |
|
Parameters: |
|
input_nc (int) -- the number of channels in input images |
|
ndf (int) -- the number of filters in the last conv layer |
|
n_layers (int) -- the number of conv layers in the discriminator |
|
norm_layer -- normalization layer |
|
""" |
|
super(PatchGANDiscriminator, self).__init__() |
|
if not use_actnorm: |
|
norm_layer = nn.BatchNorm2d |
|
else: |
|
norm_layer = ActNorm |
|
if ( |
|
type(norm_layer) == functools.partial |
|
): |
|
use_bias = norm_layer.func != nn.BatchNorm2d |
|
else: |
|
use_bias = norm_layer != nn.BatchNorm2d |
|
|
|
kw = 4 |
|
padw = 1 |
|
sequence = [ |
|
nn.Conv2d(input_nc, ndf, kernel_size=kw, stride=2, padding=padw), |
|
nn.LeakyReLU(0.2, True), |
|
] |
|
nf_mult = 1 |
|
nf_mult_prev = 1 |
|
for n in range(1, n_layers): |
|
nf_mult_prev = nf_mult |
|
nf_mult = min(2**n, 8) |
|
sequence += [ |
|
nn.Conv2d( |
|
ndf * nf_mult_prev, |
|
ndf * nf_mult, |
|
kernel_size=kw, |
|
stride=2, |
|
padding=padw, |
|
bias=use_bias, |
|
), |
|
norm_layer(ndf * nf_mult), |
|
nn.LeakyReLU(0.2, True), |
|
] |
|
|
|
nf_mult_prev = nf_mult |
|
nf_mult = min(2**n_layers, 8) |
|
sequence += [ |
|
nn.Conv2d( |
|
ndf * nf_mult_prev, |
|
ndf * nf_mult, |
|
kernel_size=kw, |
|
stride=1, |
|
padding=padw, |
|
bias=use_bias, |
|
), |
|
norm_layer(ndf * nf_mult), |
|
nn.LeakyReLU(0.2, True), |
|
] |
|
|
|
sequence += [ |
|
nn.Conv2d(ndf * nf_mult, 1, kernel_size=kw, stride=1, padding=padw) |
|
] |
|
self.main = nn.Sequential(*sequence) |
|
|
|
self.apply(self._init_weights) |
|
|
|
def _init_weights(self, module): |
|
if isinstance(module, nn.Conv2d): |
|
nn.init.normal_(module.weight.data, 0.0, 0.02) |
|
elif isinstance(module, nn.BatchNorm2d): |
|
nn.init.normal_(module.weight.data, 1.0, 0.02) |
|
nn.init.constant_(module.bias.data, 0) |
|
|
|
def forward(self, input): |
|
"""Standard forward.""" |
|
return self.main(input) |
|
|
|
|
|
class ActNorm(nn.Module): |
|
def __init__( |
|
self, num_features, logdet=False, affine=True, allow_reverse_init=False |
|
): |
|
assert affine |
|
super().__init__() |
|
self.logdet = logdet |
|
self.loc = nn.Parameter(torch.zeros(1, num_features, 1, 1)) |
|
self.scale = nn.Parameter(torch.ones(1, num_features, 1, 1)) |
|
self.allow_reverse_init = allow_reverse_init |
|
|
|
self.register_buffer("initialized", torch.tensor(0, dtype=torch.uint8)) |
|
|
|
def initialize(self, input): |
|
with torch.no_grad(): |
|
flatten = input.permute(1, 0, 2, 3).contiguous().view(input.shape[1], -1) |
|
mean = ( |
|
flatten.mean(1) |
|
.unsqueeze(1) |
|
.unsqueeze(2) |
|
.unsqueeze(3) |
|
.permute(1, 0, 2, 3) |
|
) |
|
std = ( |
|
flatten.std(1) |
|
.unsqueeze(1) |
|
.unsqueeze(2) |
|
.unsqueeze(3) |
|
.permute(1, 0, 2, 3) |
|
) |
|
|
|
self.loc.data.copy_(-mean) |
|
self.scale.data.copy_(1 / (std + 1e-6)) |
|
|
|
def forward(self, input, reverse=False): |
|
if reverse: |
|
return self.reverse(input) |
|
if len(input.shape) == 2: |
|
input = input[:, :, None, None] |
|
squeeze = True |
|
else: |
|
squeeze = False |
|
|
|
_, _, height, width = input.shape |
|
|
|
if self.training and self.initialized.item() == 0: |
|
self.initialize(input) |
|
self.initialized.fill_(1) |
|
|
|
h = self.scale * (input + self.loc) |
|
|
|
if squeeze: |
|
h = h.squeeze(-1).squeeze(-1) |
|
|
|
if self.logdet: |
|
log_abs = torch.log(torch.abs(self.scale)) |
|
logdet = height * width * torch.sum(log_abs) |
|
logdet = logdet * torch.ones(input.shape[0]).to(input) |
|
return h, logdet |
|
|
|
return h |
|
|
|
def reverse(self, output): |
|
if self.training and self.initialized.item() == 0: |
|
if not self.allow_reverse_init: |
|
raise RuntimeError( |
|
"Initializing ActNorm in reverse direction is " |
|
"disabled by default. Use allow_reverse_init=True to enable." |
|
) |
|
else: |
|
self.initialize(output) |
|
self.initialized.fill_(1) |
|
|
|
if len(output.shape) == 2: |
|
output = output[:, :, None, None] |
|
squeeze = True |
|
else: |
|
squeeze = False |
|
|
|
h = output / self.scale - self.loc |
|
|
|
if squeeze: |
|
h = h.squeeze(-1).squeeze(-1) |
|
return h |
|
|
|
|
|
|
|
|
|
|
|
class StyleGANDiscriminator(nn.Module): |
|
def __init__( |
|
self, input_nc=3, ndf=64, n_layers=3, channel_multiplier=1, image_size=256 |
|
): |
|
super().__init__() |
|
channels = { |
|
4: 512, |
|
8: 512, |
|
16: 512, |
|
32: 512, |
|
64: 256 * channel_multiplier, |
|
128: 128 * channel_multiplier, |
|
256: 64 * channel_multiplier, |
|
512: 32 * channel_multiplier, |
|
1024: 16 * channel_multiplier, |
|
} |
|
|
|
log_size = int(math.log(image_size, 2)) |
|
in_channel = channels[image_size] |
|
|
|
blocks = [nn.Conv2d(input_nc, in_channel, 3, padding=1), leaky_relu()] |
|
for i in range(log_size, 2, -1): |
|
out_channel = channels[2 ** (i - 1)] |
|
blocks.append(DiscriminatorBlock(in_channel, out_channel)) |
|
in_channel = out_channel |
|
self.blocks = nn.ModuleList(blocks) |
|
|
|
self.final_conv = nn.Sequential( |
|
nn.Conv2d(in_channel, channels[4], 3, padding=1), |
|
leaky_relu(), |
|
) |
|
self.final_linear = nn.Sequential( |
|
nn.Linear(channels[4] * 4 * 4, channels[4]), |
|
leaky_relu(), |
|
nn.Linear(channels[4], 1), |
|
) |
|
|
|
def forward(self, x): |
|
for block in self.blocks: |
|
x = block(x) |
|
x = self.final_conv(x) |
|
x = x.view(x.shape[0], -1) |
|
x = self.final_linear(x) |
|
return x |
|
|
|
|
|
class DiscriminatorBlock(nn.Module): |
|
def __init__(self, input_channels, filters, downsample=True): |
|
super().__init__() |
|
self.conv_res = nn.Conv2d( |
|
input_channels, filters, 1, stride=(2 if downsample else 1) |
|
) |
|
|
|
self.net = nn.Sequential( |
|
nn.Conv2d(input_channels, filters, 3, padding=1), |
|
leaky_relu(), |
|
nn.Conv2d(filters, filters, 3, padding=1), |
|
leaky_relu(), |
|
) |
|
|
|
self.downsample = ( |
|
nn.Sequential(Blur(), nn.Conv2d(filters, filters, 3, padding=1, stride=2)) |
|
if downsample |
|
else None |
|
) |
|
|
|
def forward(self, x): |
|
res = self.conv_res(x) |
|
x = self.net(x) |
|
if exists(self.downsample): |
|
x = self.downsample(x) |
|
x = (x + res) * (1 / math.sqrt(2)) |
|
return x |
|
|
|
|
|
class Blur(nn.Module): |
|
def __init__(self): |
|
super().__init__() |
|
f = torch.Tensor([1, 2, 1]) |
|
self.register_buffer("f", f) |
|
|
|
def forward(self, x): |
|
f = self.f |
|
f = f[None, None, :] * f[None, :, None] |
|
return filter2d(x, f, normalized=True) |
|
|
|
|
|
def leaky_relu(p=0.2): |
|
return nn.LeakyReLU(p, inplace=True) |
|
|
|
|
|
def exists(val): |
|
return val is not None |
|
|