Spaces:
Running
Running
from __future__ import division | |
import torch | |
import numpy as np | |
import torchvision | |
import PIL.Image as Image | |
import cv2 | |
from torch.nn import functional as F | |
class Compose(object): | |
"""Composes several co_transforms together. | |
For example: | |
>>> co_transforms.Compose([ | |
>>> co_transforms.CenterCrop(10), | |
>>> co_transforms.ToTensor(), | |
>>> ]) | |
""" | |
def __init__(self, co_transforms): | |
self.co_transforms = co_transforms | |
def __call__(self, input, target): | |
for t in self.co_transforms: | |
input, target = t(input, target) | |
return input, target | |
class Scale(object): | |
"""Rescales the inputs and target arrays to the given 'size'. | |
'size' will be the size of the smaller edge. | |
For example, if height > width, then image will be | |
rescaled to (size * height / width, size) | |
size: size of the smaller edge | |
interpolation order: Default: 2 (bilinear) | |
""" | |
def __init__(self, size, order=1): | |
self.ratio = size | |
self.order = order | |
if order == 0: | |
self.code = cv2.INTER_NEAREST | |
elif order == 1: | |
self.code = cv2.INTER_LINEAR | |
elif order == 2: | |
self.code = cv2.INTER_CUBIC | |
def __call__(self, inputs, target): | |
if self.ratio == 1: | |
return inputs, target | |
h, w, _ = inputs[0].shape | |
ratio = self.ratio | |
inputs[0] = cv2.resize( | |
inputs[0], None, fx=ratio, fy=ratio, interpolation=cv2.INTER_LINEAR | |
) | |
inputs[1] = cv2.resize( | |
inputs[1], None, fx=ratio, fy=ratio, interpolation=cv2.INTER_LINEAR | |
) | |
# keep the mask same | |
tmp = cv2.resize( | |
target[:, :, 2], None, fx=ratio, fy=ratio, interpolation=cv2.INTER_NEAREST | |
) | |
target = ( | |
cv2.resize(target, None, fx=ratio, fy=ratio, interpolation=self.code) | |
* ratio | |
) | |
target[:, :, 2] = tmp | |
return inputs, target | |
class SpatialAug(object): | |
def __init__( | |
self, | |
crop, | |
scale=None, | |
rot=None, | |
trans=None, | |
squeeze=None, | |
schedule_coeff=1, | |
order=1, | |
black=False, | |
): | |
self.crop = crop | |
self.scale = scale | |
self.rot = rot | |
self.trans = trans | |
self.squeeze = squeeze | |
self.t = np.zeros(6) | |
self.schedule_coeff = schedule_coeff | |
self.order = order | |
self.black = black | |
def to_identity(self): | |
self.t[0] = 1 | |
self.t[2] = 0 | |
self.t[4] = 0 | |
self.t[1] = 0 | |
self.t[3] = 1 | |
self.t[5] = 0 | |
def left_multiply(self, u0, u1, u2, u3, u4, u5): | |
result = np.zeros(6) | |
result[0] = self.t[0] * u0 + self.t[1] * u2 | |
result[1] = self.t[0] * u1 + self.t[1] * u3 | |
result[2] = self.t[2] * u0 + self.t[3] * u2 | |
result[3] = self.t[2] * u1 + self.t[3] * u3 | |
result[4] = self.t[4] * u0 + self.t[5] * u2 + u4 | |
result[5] = self.t[4] * u1 + self.t[5] * u3 + u5 | |
self.t = result | |
def inverse(self): | |
result = np.zeros(6) | |
a = self.t[0] | |
c = self.t[2] | |
e = self.t[4] | |
b = self.t[1] | |
d = self.t[3] | |
f = self.t[5] | |
denom = a * d - b * c | |
result[0] = d / denom | |
result[1] = -b / denom | |
result[2] = -c / denom | |
result[3] = a / denom | |
result[4] = (c * f - d * e) / denom | |
result[5] = (b * e - a * f) / denom | |
return result | |
def grid_transform(self, meshgrid, t, normalize=True, gridsize=None): | |
if gridsize is None: | |
h, w = meshgrid[0].shape | |
else: | |
h, w = gridsize | |
vgrid = torch.cat( | |
[ | |
(meshgrid[0] * t[0] + meshgrid[1] * t[2] + t[4])[:, :, np.newaxis], | |
(meshgrid[0] * t[1] + meshgrid[1] * t[3] + t[5])[:, :, np.newaxis], | |
], | |
-1, | |
) | |
if normalize: | |
vgrid[:, :, 0] = 2.0 * vgrid[:, :, 0] / max(w - 1, 1) - 1.0 | |
vgrid[:, :, 1] = 2.0 * vgrid[:, :, 1] / max(h - 1, 1) - 1.0 | |
return vgrid | |
def __call__(self, inputs, target): | |
h, w, _ = inputs[0].shape | |
th, tw = self.crop | |
meshgrid = torch.meshgrid([torch.Tensor(range(th)), torch.Tensor(range(tw))], indexing="ij")[ | |
::-1 | |
] | |
cornergrid = torch.meshgrid( | |
[torch.Tensor([0, th - 1]), torch.Tensor([0, tw - 1])], indexing="ij" | |
)[::-1] | |
for i in range(50): | |
# im0 | |
self.to_identity() | |
# TODO add mirror | |
if np.random.binomial(1, 0.5): | |
mirror = True | |
else: | |
mirror = False | |
##TODO | |
# mirror = False | |
if mirror: | |
self.left_multiply(-1, 0, 0, 1, 0.5 * tw, -0.5 * th) | |
else: | |
self.left_multiply(1, 0, 0, 1, -0.5 * tw, -0.5 * th) | |
scale0 = 1 | |
scale1 = 1 | |
squeeze0 = 1 | |
squeeze1 = 1 | |
if self.rot is not None: | |
rot0 = np.random.uniform(-self.rot[0], +self.rot[0]) | |
rot1 = ( | |
np.random.uniform( | |
-self.rot[1] * self.schedule_coeff, | |
self.rot[1] * self.schedule_coeff, | |
) | |
+ rot0 | |
) | |
self.left_multiply( | |
np.cos(rot0), np.sin(rot0), -np.sin(rot0), np.cos(rot0), 0, 0 | |
) | |
if self.trans is not None: | |
trans0 = np.random.uniform(-self.trans[0], +self.trans[0], 2) | |
trans1 = ( | |
np.random.uniform( | |
-self.trans[1] * self.schedule_coeff, | |
+self.trans[1] * self.schedule_coeff, | |
2, | |
) | |
+ trans0 | |
) | |
self.left_multiply(1, 0, 0, 1, trans0[0] * tw, trans0[1] * th) | |
if self.squeeze is not None: | |
squeeze0 = np.exp(np.random.uniform(-self.squeeze[0], self.squeeze[0])) | |
squeeze1 = ( | |
np.exp( | |
np.random.uniform( | |
-self.squeeze[1] * self.schedule_coeff, | |
self.squeeze[1] * self.schedule_coeff, | |
) | |
) | |
* squeeze0 | |
) | |
if self.scale is not None: | |
scale0 = np.exp( | |
np.random.uniform( | |
self.scale[2] - self.scale[0], self.scale[2] + self.scale[0] | |
) | |
) | |
scale1 = ( | |
np.exp( | |
np.random.uniform( | |
-self.scale[1] * self.schedule_coeff, | |
self.scale[1] * self.schedule_coeff, | |
) | |
) | |
* scale0 | |
) | |
self.left_multiply( | |
1.0 / (scale0 * squeeze0), 0, 0, 1.0 / (scale0 / squeeze0), 0, 0 | |
) | |
self.left_multiply(1, 0, 0, 1, 0.5 * w, 0.5 * h) | |
transmat0 = self.t.copy() | |
# im1 | |
self.to_identity() | |
if mirror: | |
self.left_multiply(-1, 0, 0, 1, 0.5 * tw, -0.5 * th) | |
else: | |
self.left_multiply(1, 0, 0, 1, -0.5 * tw, -0.5 * th) | |
if self.rot is not None: | |
self.left_multiply( | |
np.cos(rot1), np.sin(rot1), -np.sin(rot1), np.cos(rot1), 0, 0 | |
) | |
if self.trans is not None: | |
self.left_multiply(1, 0, 0, 1, trans1[0] * tw, trans1[1] * th) | |
self.left_multiply( | |
1.0 / (scale1 * squeeze1), 0, 0, 1.0 / (scale1 / squeeze1), 0, 0 | |
) | |
self.left_multiply(1, 0, 0, 1, 0.5 * w, 0.5 * h) | |
transmat1 = self.t.copy() | |
transmat1_inv = self.inverse() | |
if self.black: | |
# black augmentation, allowing 0 values in the input images | |
# https://github.com/lmb-freiburg/flownet2/blob/master/src/caffe/layers/black_augmentation_layer.cu | |
break | |
else: | |
if ( | |
( | |
self.grid_transform( | |
cornergrid, transmat0, gridsize=[float(h), float(w)] | |
).abs() | |
> 1 | |
).sum() | |
+ ( | |
self.grid_transform( | |
cornergrid, transmat1, gridsize=[float(h), float(w)] | |
).abs() | |
> 1 | |
).sum() | |
) == 0: | |
break | |
if i == 49: | |
print("max_iter in augmentation") | |
self.to_identity() | |
self.left_multiply(1, 0, 0, 1, -0.5 * tw, -0.5 * th) | |
self.left_multiply(1, 0, 0, 1, 0.5 * w, 0.5 * h) | |
transmat0 = self.t.copy() | |
transmat1 = self.t.copy() | |
# do the real work | |
vgrid = self.grid_transform(meshgrid, transmat0, gridsize=[float(h), float(w)]) | |
inputs_0 = F.grid_sample( | |
torch.Tensor(inputs[0]).permute(2, 0, 1)[np.newaxis], vgrid[np.newaxis] | |
)[0].permute(1, 2, 0) | |
if self.order == 0: | |
target_0 = F.grid_sample( | |
torch.Tensor(target).permute(2, 0, 1)[np.newaxis], | |
vgrid[np.newaxis], | |
mode="nearest", | |
)[0].permute(1, 2, 0) | |
else: | |
target_0 = F.grid_sample( | |
torch.Tensor(target).permute(2, 0, 1)[np.newaxis], vgrid[np.newaxis] | |
)[0].permute(1, 2, 0) | |
mask_0 = target[:, :, 2:3].copy() | |
mask_0[mask_0 == 0] = np.nan | |
if self.order == 0: | |
mask_0 = F.grid_sample( | |
torch.Tensor(mask_0).permute(2, 0, 1)[np.newaxis], | |
vgrid[np.newaxis], | |
mode="nearest", | |
)[0].permute(1, 2, 0) | |
else: | |
mask_0 = F.grid_sample( | |
torch.Tensor(mask_0).permute(2, 0, 1)[np.newaxis], vgrid[np.newaxis] | |
)[0].permute(1, 2, 0) | |
mask_0[torch.isnan(mask_0)] = 0 | |
vgrid = self.grid_transform(meshgrid, transmat1, gridsize=[float(h), float(w)]) | |
inputs_1 = F.grid_sample( | |
torch.Tensor(inputs[1]).permute(2, 0, 1)[np.newaxis], vgrid[np.newaxis] | |
)[0].permute(1, 2, 0) | |
# flow | |
pos = target_0[:, :, :2] + self.grid_transform( | |
meshgrid, transmat0, normalize=False | |
) | |
pos = self.grid_transform(pos.permute(2, 0, 1), transmat1_inv, normalize=False) | |
if target_0.shape[2] >= 4: | |
# scale | |
exp = target_0[:, :, 3:] * scale1 / scale0 | |
target = torch.cat( | |
[ | |
(pos[:, :, 0] - meshgrid[0]).unsqueeze(-1), | |
(pos[:, :, 1] - meshgrid[1]).unsqueeze(-1), | |
mask_0, | |
exp, | |
], | |
-1, | |
) | |
else: | |
target = torch.cat( | |
[ | |
(pos[:, :, 0] - meshgrid[0]).unsqueeze(-1), | |
(pos[:, :, 1] - meshgrid[1]).unsqueeze(-1), | |
mask_0, | |
], | |
-1, | |
) | |
# target_0[:,:,2].unsqueeze(-1) ], -1) | |
inputs = [np.asarray(inputs_0), np.asarray(inputs_1)] | |
target = np.asarray(target) | |
return inputs, target | |
class pseudoPCAAug(object): | |
""" | |
Chromatic Eigen Augmentation: https://github.com/lmb-freiburg/flownet2/blob/master/src/caffe/layers/data_augmentation_layer.cu | |
This version is faster. | |
""" | |
def __init__(self, schedule_coeff=1): | |
self.augcolor = torchvision.transforms.ColorJitter( | |
brightness=0.4, contrast=0.4, saturation=0.5, hue=0.5 / 3.14 | |
) | |
def __call__(self, inputs, target): | |
inputs[0] = ( | |
np.asarray(self.augcolor(Image.fromarray(np.uint8(inputs[0] * 255)))) | |
/ 255.0 | |
) | |
inputs[1] = ( | |
np.asarray(self.augcolor(Image.fromarray(np.uint8(inputs[1] * 255)))) | |
/ 255.0 | |
) | |
return inputs, target | |
class PCAAug(object): | |
""" | |
Chromatic Eigen Augmentation: https://github.com/lmb-freiburg/flownet2/blob/master/src/caffe/layers/data_augmentation_layer.cu | |
""" | |
def __init__( | |
self, | |
lmult_pow=[0.4, 0, -0.2], | |
lmult_mult=[ | |
0.4, | |
0, | |
0, | |
], | |
lmult_add=[ | |
0.03, | |
0, | |
0, | |
], | |
sat_pow=[ | |
0.4, | |
0, | |
0, | |
], | |
sat_mult=[0.5, 0, -0.3], | |
sat_add=[ | |
0.03, | |
0, | |
0, | |
], | |
col_pow=[ | |
0.4, | |
0, | |
0, | |
], | |
col_mult=[ | |
0.2, | |
0, | |
0, | |
], | |
col_add=[ | |
0.02, | |
0, | |
0, | |
], | |
ladd_pow=[ | |
0.4, | |
0, | |
0, | |
], | |
ladd_mult=[ | |
0.4, | |
0, | |
0, | |
], | |
ladd_add=[ | |
0.04, | |
0, | |
0, | |
], | |
col_rotate=[ | |
1.0, | |
0, | |
0, | |
], | |
schedule_coeff=1, | |
): | |
# no mean | |
self.pow_nomean = [1, 1, 1] | |
self.add_nomean = [0, 0, 0] | |
self.mult_nomean = [1, 1, 1] | |
self.pow_withmean = [1, 1, 1] | |
self.add_withmean = [0, 0, 0] | |
self.mult_withmean = [1, 1, 1] | |
self.lmult_pow = 1 | |
self.lmult_mult = 1 | |
self.lmult_add = 0 | |
self.col_angle = 0 | |
if ladd_pow is not None: | |
self.pow_nomean[0] = np.exp(np.random.normal(ladd_pow[2], ladd_pow[0])) | |
if col_pow is not None: | |
self.pow_nomean[1] = np.exp(np.random.normal(col_pow[2], col_pow[0])) | |
self.pow_nomean[2] = np.exp(np.random.normal(col_pow[2], col_pow[0])) | |
if ladd_add is not None: | |
self.add_nomean[0] = np.random.normal(ladd_add[2], ladd_add[0]) | |
if col_add is not None: | |
self.add_nomean[1] = np.random.normal(col_add[2], col_add[0]) | |
self.add_nomean[2] = np.random.normal(col_add[2], col_add[0]) | |
if ladd_mult is not None: | |
self.mult_nomean[0] = np.exp(np.random.normal(ladd_mult[2], ladd_mult[0])) | |
if col_mult is not None: | |
self.mult_nomean[1] = np.exp(np.random.normal(col_mult[2], col_mult[0])) | |
self.mult_nomean[2] = np.exp(np.random.normal(col_mult[2], col_mult[0])) | |
# with mean | |
if sat_pow is not None: | |
self.pow_withmean[1] = np.exp( | |
np.random.uniform(sat_pow[2] - sat_pow[0], sat_pow[2] + sat_pow[0]) | |
) | |
self.pow_withmean[2] = self.pow_withmean[1] | |
if sat_add is not None: | |
self.add_withmean[1] = np.random.uniform( | |
sat_add[2] - sat_add[0], sat_add[2] + sat_add[0] | |
) | |
self.add_withmean[2] = self.add_withmean[1] | |
if sat_mult is not None: | |
self.mult_withmean[1] = np.exp( | |
np.random.uniform(sat_mult[2] - sat_mult[0], sat_mult[2] + sat_mult[0]) | |
) | |
self.mult_withmean[2] = self.mult_withmean[1] | |
if lmult_pow is not None: | |
self.lmult_pow = np.exp( | |
np.random.uniform( | |
lmult_pow[2] - lmult_pow[0], lmult_pow[2] + lmult_pow[0] | |
) | |
) | |
if lmult_mult is not None: | |
self.lmult_mult = np.exp( | |
np.random.uniform( | |
lmult_mult[2] - lmult_mult[0], lmult_mult[2] + lmult_mult[0] | |
) | |
) | |
if lmult_add is not None: | |
self.lmult_add = np.random.uniform( | |
lmult_add[2] - lmult_add[0], lmult_add[2] + lmult_add[0] | |
) | |
if col_rotate is not None: | |
self.col_angle = np.random.uniform( | |
col_rotate[2] - col_rotate[0], col_rotate[2] + col_rotate[0] | |
) | |
# eigen vectors | |
self.eigvec = np.reshape( | |
[0.51, 0.56, 0.65, 0.79, 0.01, -0.62, 0.35, -0.83, 0.44], [3, 3] | |
).transpose() | |
def __call__(self, inputs, target): | |
inputs[0] = self.pca_image(inputs[0]) | |
inputs[1] = self.pca_image(inputs[1]) | |
return inputs, target | |
def pca_image(self, rgb): | |
eig = np.dot(rgb, self.eigvec) | |
mean_rgb = rgb.mean((0, 1)) | |
max_abs_eig = np.abs(eig).max((0, 1)) | |
max_l = np.sqrt(np.sum(max_abs_eig * max_abs_eig)) | |
mean_eig = np.dot(mean_rgb, self.eigvec) | |
# no-mean stuff | |
eig -= mean_eig[np.newaxis, np.newaxis] | |
for c in range(3): | |
if max_abs_eig[c] > 1e-2: | |
mean_eig[c] /= max_abs_eig[c] | |
eig[:, :, c] = eig[:, :, c] / max_abs_eig[c] | |
eig[:, :, c] = ( | |
np.power(np.abs(eig[:, :, c]), self.pow_nomean[c]) | |
* ((eig[:, :, c] > 0) - 0.5) | |
* 2 | |
) | |
eig[:, :, c] = eig[:, :, c] + self.add_nomean[c] | |
eig[:, :, c] = eig[:, :, c] * self.mult_nomean[c] | |
eig += mean_eig[np.newaxis, np.newaxis] | |
# withmean stuff | |
if max_abs_eig[0] > 1e-2: | |
eig[:, :, 0] = ( | |
np.power(np.abs(eig[:, :, 0]), self.pow_withmean[0]) | |
* ((eig[:, :, 0] > 0) - 0.5) | |
* 2 | |
) | |
eig[:, :, 0] = eig[:, :, 0] + self.add_withmean[0] | |
eig[:, :, 0] = eig[:, :, 0] * self.mult_withmean[0] | |
s = np.sqrt(eig[:, :, 1] * eig[:, :, 1] + eig[:, :, 2] * eig[:, :, 2]) | |
smask = s > 1e-2 | |
s1 = np.power(s, self.pow_withmean[1]) | |
s1 = np.clip(s1 + self.add_withmean[1], 0, np.inf) | |
s1 = s1 * self.mult_withmean[1] | |
s1 = s1 * smask + s * (1 - smask) | |
# color angle | |
if self.col_angle != 0: | |
temp1 = ( | |
np.cos(self.col_angle) * eig[:, :, 1] | |
- np.sin(self.col_angle) * eig[:, :, 2] | |
) | |
temp2 = ( | |
np.sin(self.col_angle) * eig[:, :, 1] | |
+ np.cos(self.col_angle) * eig[:, :, 2] | |
) | |
eig[:, :, 1] = temp1 | |
eig[:, :, 2] = temp2 | |
# to origin magnitude | |
for c in range(3): | |
if max_abs_eig[c] > 1e-2: | |
eig[:, :, c] = eig[:, :, c] * max_abs_eig[c] | |
if max_l > 1e-2: | |
l1 = np.sqrt( | |
eig[:, :, 0] * eig[:, :, 0] | |
+ eig[:, :, 1] * eig[:, :, 1] | |
+ eig[:, :, 2] * eig[:, :, 2] | |
) | |
l1 = l1 / max_l | |
eig[:, :, 1][smask] = (eig[:, :, 1] / s * s1)[smask] | |
eig[:, :, 2][smask] = (eig[:, :, 2] / s * s1)[smask] | |
# eig[:,:,1] = (eig[:,:,1] / s * s1) * smask + eig[:,:,1] * (1-smask) | |
# eig[:,:,2] = (eig[:,:,2] / s * s1) * smask + eig[:,:,2] * (1-smask) | |
if max_l > 1e-2: | |
l = np.sqrt( | |
eig[:, :, 0] * eig[:, :, 0] | |
+ eig[:, :, 1] * eig[:, :, 1] | |
+ eig[:, :, 2] * eig[:, :, 2] | |
) | |
l1 = np.power(l1, self.lmult_pow) | |
l1 = np.clip(l1 + self.lmult_add, 0, np.inf) | |
l1 = l1 * self.lmult_mult | |
l1 = l1 * max_l | |
lmask = l > 1e-2 | |
eig[lmask] = (eig / l[:, :, np.newaxis] * l1[:, :, np.newaxis])[lmask] | |
for c in range(3): | |
eig[:, :, c][lmask] = (np.clip(eig[:, :, c], -np.inf, max_abs_eig[c]))[ | |
lmask | |
] | |
# for c in range(3): | |
# # eig[:,:,c][lmask] = (eig[:,:,c] / l * l1)[lmask] * lmask + eig[:,:,c] * (1-lmask) | |
# eig[:,:,c][lmask] = (eig[:,:,c] / l * l1)[lmask] | |
# eig[:,:,c] = (np.clip(eig[:,:,c], -np.inf, max_abs_eig[c])) * lmask + eig[:,:,c] * (1-lmask) | |
return np.clip(np.dot(eig, self.eigvec.transpose()), 0, 1) | |
class ChromaticAug(object): | |
""" | |
Chromatic augmentation: https://github.com/lmb-freiburg/flownet2/blob/master/src/caffe/layers/data_augmentation_layer.cu | |
""" | |
def __init__( | |
self, | |
noise=0.06, | |
gamma=0.02, | |
brightness=0.02, | |
contrast=0.02, | |
color=0.02, | |
schedule_coeff=1, | |
): | |
self.noise = np.random.uniform(0, noise) | |
self.gamma = np.exp(np.random.normal(0, gamma * schedule_coeff)) | |
self.brightness = np.random.normal(0, brightness * schedule_coeff) | |
self.contrast = np.exp(np.random.normal(0, contrast * schedule_coeff)) | |
self.color = np.exp(np.random.normal(0, color * schedule_coeff, 3)) | |
def __call__(self, inputs, target): | |
inputs[1] = self.chrom_aug(inputs[1]) | |
# noise | |
inputs[0] += np.random.normal(0, self.noise, inputs[0].shape) | |
inputs[1] += np.random.normal(0, self.noise, inputs[0].shape) | |
return inputs, target | |
def chrom_aug(self, rgb): | |
# color change | |
mean_in = rgb.sum(-1) | |
rgb = rgb * self.color[np.newaxis, np.newaxis] | |
brightness_coeff = mean_in / (rgb.sum(-1) + 0.01) | |
rgb = np.clip(rgb * brightness_coeff[:, :, np.newaxis], 0, 1) | |
# gamma | |
rgb = np.power(rgb, self.gamma) | |
# brightness | |
rgb += self.brightness | |
# contrast | |
rgb = 0.5 + (rgb - 0.5) * self.contrast | |
rgb = np.clip(rgb, 0, 1) | |
return | |