optical-flow-MEMFOF / core /utils /flow_transforms.py
egorchistov's picture
Initial release
ac59957
from __future__ import division
import torch
import numpy as np
import torchvision
import PIL.Image as Image
import cv2
from torch.nn import functional as F
class Compose(object):
"""Composes several co_transforms together.
For example:
>>> co_transforms.Compose([
>>> co_transforms.CenterCrop(10),
>>> co_transforms.ToTensor(),
>>> ])
"""
def __init__(self, co_transforms):
self.co_transforms = co_transforms
def __call__(self, input, target):
for t in self.co_transforms:
input, target = t(input, target)
return input, target
class Scale(object):
"""Rescales the inputs and target arrays to the given 'size'.
'size' will be the size of the smaller edge.
For example, if height > width, then image will be
rescaled to (size * height / width, size)
size: size of the smaller edge
interpolation order: Default: 2 (bilinear)
"""
def __init__(self, size, order=1):
self.ratio = size
self.order = order
if order == 0:
self.code = cv2.INTER_NEAREST
elif order == 1:
self.code = cv2.INTER_LINEAR
elif order == 2:
self.code = cv2.INTER_CUBIC
def __call__(self, inputs, target):
if self.ratio == 1:
return inputs, target
h, w, _ = inputs[0].shape
ratio = self.ratio
inputs[0] = cv2.resize(
inputs[0], None, fx=ratio, fy=ratio, interpolation=cv2.INTER_LINEAR
)
inputs[1] = cv2.resize(
inputs[1], None, fx=ratio, fy=ratio, interpolation=cv2.INTER_LINEAR
)
# keep the mask same
tmp = cv2.resize(
target[:, :, 2], None, fx=ratio, fy=ratio, interpolation=cv2.INTER_NEAREST
)
target = (
cv2.resize(target, None, fx=ratio, fy=ratio, interpolation=self.code)
* ratio
)
target[:, :, 2] = tmp
return inputs, target
class SpatialAug(object):
def __init__(
self,
crop,
scale=None,
rot=None,
trans=None,
squeeze=None,
schedule_coeff=1,
order=1,
black=False,
):
self.crop = crop
self.scale = scale
self.rot = rot
self.trans = trans
self.squeeze = squeeze
self.t = np.zeros(6)
self.schedule_coeff = schedule_coeff
self.order = order
self.black = black
def to_identity(self):
self.t[0] = 1
self.t[2] = 0
self.t[4] = 0
self.t[1] = 0
self.t[3] = 1
self.t[5] = 0
def left_multiply(self, u0, u1, u2, u3, u4, u5):
result = np.zeros(6)
result[0] = self.t[0] * u0 + self.t[1] * u2
result[1] = self.t[0] * u1 + self.t[1] * u3
result[2] = self.t[2] * u0 + self.t[3] * u2
result[3] = self.t[2] * u1 + self.t[3] * u3
result[4] = self.t[4] * u0 + self.t[5] * u2 + u4
result[5] = self.t[4] * u1 + self.t[5] * u3 + u5
self.t = result
def inverse(self):
result = np.zeros(6)
a = self.t[0]
c = self.t[2]
e = self.t[4]
b = self.t[1]
d = self.t[3]
f = self.t[5]
denom = a * d - b * c
result[0] = d / denom
result[1] = -b / denom
result[2] = -c / denom
result[3] = a / denom
result[4] = (c * f - d * e) / denom
result[5] = (b * e - a * f) / denom
return result
def grid_transform(self, meshgrid, t, normalize=True, gridsize=None):
if gridsize is None:
h, w = meshgrid[0].shape
else:
h, w = gridsize
vgrid = torch.cat(
[
(meshgrid[0] * t[0] + meshgrid[1] * t[2] + t[4])[:, :, np.newaxis],
(meshgrid[0] * t[1] + meshgrid[1] * t[3] + t[5])[:, :, np.newaxis],
],
-1,
)
if normalize:
vgrid[:, :, 0] = 2.0 * vgrid[:, :, 0] / max(w - 1, 1) - 1.0
vgrid[:, :, 1] = 2.0 * vgrid[:, :, 1] / max(h - 1, 1) - 1.0
return vgrid
def __call__(self, inputs, target):
h, w, _ = inputs[0].shape
th, tw = self.crop
meshgrid = torch.meshgrid([torch.Tensor(range(th)), torch.Tensor(range(tw))], indexing="ij")[
::-1
]
cornergrid = torch.meshgrid(
[torch.Tensor([0, th - 1]), torch.Tensor([0, tw - 1])], indexing="ij"
)[::-1]
for i in range(50):
# im0
self.to_identity()
# TODO add mirror
if np.random.binomial(1, 0.5):
mirror = True
else:
mirror = False
##TODO
# mirror = False
if mirror:
self.left_multiply(-1, 0, 0, 1, 0.5 * tw, -0.5 * th)
else:
self.left_multiply(1, 0, 0, 1, -0.5 * tw, -0.5 * th)
scale0 = 1
scale1 = 1
squeeze0 = 1
squeeze1 = 1
if self.rot is not None:
rot0 = np.random.uniform(-self.rot[0], +self.rot[0])
rot1 = (
np.random.uniform(
-self.rot[1] * self.schedule_coeff,
self.rot[1] * self.schedule_coeff,
)
+ rot0
)
self.left_multiply(
np.cos(rot0), np.sin(rot0), -np.sin(rot0), np.cos(rot0), 0, 0
)
if self.trans is not None:
trans0 = np.random.uniform(-self.trans[0], +self.trans[0], 2)
trans1 = (
np.random.uniform(
-self.trans[1] * self.schedule_coeff,
+self.trans[1] * self.schedule_coeff,
2,
)
+ trans0
)
self.left_multiply(1, 0, 0, 1, trans0[0] * tw, trans0[1] * th)
if self.squeeze is not None:
squeeze0 = np.exp(np.random.uniform(-self.squeeze[0], self.squeeze[0]))
squeeze1 = (
np.exp(
np.random.uniform(
-self.squeeze[1] * self.schedule_coeff,
self.squeeze[1] * self.schedule_coeff,
)
)
* squeeze0
)
if self.scale is not None:
scale0 = np.exp(
np.random.uniform(
self.scale[2] - self.scale[0], self.scale[2] + self.scale[0]
)
)
scale1 = (
np.exp(
np.random.uniform(
-self.scale[1] * self.schedule_coeff,
self.scale[1] * self.schedule_coeff,
)
)
* scale0
)
self.left_multiply(
1.0 / (scale0 * squeeze0), 0, 0, 1.0 / (scale0 / squeeze0), 0, 0
)
self.left_multiply(1, 0, 0, 1, 0.5 * w, 0.5 * h)
transmat0 = self.t.copy()
# im1
self.to_identity()
if mirror:
self.left_multiply(-1, 0, 0, 1, 0.5 * tw, -0.5 * th)
else:
self.left_multiply(1, 0, 0, 1, -0.5 * tw, -0.5 * th)
if self.rot is not None:
self.left_multiply(
np.cos(rot1), np.sin(rot1), -np.sin(rot1), np.cos(rot1), 0, 0
)
if self.trans is not None:
self.left_multiply(1, 0, 0, 1, trans1[0] * tw, trans1[1] * th)
self.left_multiply(
1.0 / (scale1 * squeeze1), 0, 0, 1.0 / (scale1 / squeeze1), 0, 0
)
self.left_multiply(1, 0, 0, 1, 0.5 * w, 0.5 * h)
transmat1 = self.t.copy()
transmat1_inv = self.inverse()
if self.black:
# black augmentation, allowing 0 values in the input images
# https://github.com/lmb-freiburg/flownet2/blob/master/src/caffe/layers/black_augmentation_layer.cu
break
else:
if (
(
self.grid_transform(
cornergrid, transmat0, gridsize=[float(h), float(w)]
).abs()
> 1
).sum()
+ (
self.grid_transform(
cornergrid, transmat1, gridsize=[float(h), float(w)]
).abs()
> 1
).sum()
) == 0:
break
if i == 49:
print("max_iter in augmentation")
self.to_identity()
self.left_multiply(1, 0, 0, 1, -0.5 * tw, -0.5 * th)
self.left_multiply(1, 0, 0, 1, 0.5 * w, 0.5 * h)
transmat0 = self.t.copy()
transmat1 = self.t.copy()
# do the real work
vgrid = self.grid_transform(meshgrid, transmat0, gridsize=[float(h), float(w)])
inputs_0 = F.grid_sample(
torch.Tensor(inputs[0]).permute(2, 0, 1)[np.newaxis], vgrid[np.newaxis]
)[0].permute(1, 2, 0)
if self.order == 0:
target_0 = F.grid_sample(
torch.Tensor(target).permute(2, 0, 1)[np.newaxis],
vgrid[np.newaxis],
mode="nearest",
)[0].permute(1, 2, 0)
else:
target_0 = F.grid_sample(
torch.Tensor(target).permute(2, 0, 1)[np.newaxis], vgrid[np.newaxis]
)[0].permute(1, 2, 0)
mask_0 = target[:, :, 2:3].copy()
mask_0[mask_0 == 0] = np.nan
if self.order == 0:
mask_0 = F.grid_sample(
torch.Tensor(mask_0).permute(2, 0, 1)[np.newaxis],
vgrid[np.newaxis],
mode="nearest",
)[0].permute(1, 2, 0)
else:
mask_0 = F.grid_sample(
torch.Tensor(mask_0).permute(2, 0, 1)[np.newaxis], vgrid[np.newaxis]
)[0].permute(1, 2, 0)
mask_0[torch.isnan(mask_0)] = 0
vgrid = self.grid_transform(meshgrid, transmat1, gridsize=[float(h), float(w)])
inputs_1 = F.grid_sample(
torch.Tensor(inputs[1]).permute(2, 0, 1)[np.newaxis], vgrid[np.newaxis]
)[0].permute(1, 2, 0)
# flow
pos = target_0[:, :, :2] + self.grid_transform(
meshgrid, transmat0, normalize=False
)
pos = self.grid_transform(pos.permute(2, 0, 1), transmat1_inv, normalize=False)
if target_0.shape[2] >= 4:
# scale
exp = target_0[:, :, 3:] * scale1 / scale0
target = torch.cat(
[
(pos[:, :, 0] - meshgrid[0]).unsqueeze(-1),
(pos[:, :, 1] - meshgrid[1]).unsqueeze(-1),
mask_0,
exp,
],
-1,
)
else:
target = torch.cat(
[
(pos[:, :, 0] - meshgrid[0]).unsqueeze(-1),
(pos[:, :, 1] - meshgrid[1]).unsqueeze(-1),
mask_0,
],
-1,
)
# target_0[:,:,2].unsqueeze(-1) ], -1)
inputs = [np.asarray(inputs_0), np.asarray(inputs_1)]
target = np.asarray(target)
return inputs, target
class pseudoPCAAug(object):
"""
Chromatic Eigen Augmentation: https://github.com/lmb-freiburg/flownet2/blob/master/src/caffe/layers/data_augmentation_layer.cu
This version is faster.
"""
def __init__(self, schedule_coeff=1):
self.augcolor = torchvision.transforms.ColorJitter(
brightness=0.4, contrast=0.4, saturation=0.5, hue=0.5 / 3.14
)
def __call__(self, inputs, target):
inputs[0] = (
np.asarray(self.augcolor(Image.fromarray(np.uint8(inputs[0] * 255))))
/ 255.0
)
inputs[1] = (
np.asarray(self.augcolor(Image.fromarray(np.uint8(inputs[1] * 255))))
/ 255.0
)
return inputs, target
class PCAAug(object):
"""
Chromatic Eigen Augmentation: https://github.com/lmb-freiburg/flownet2/blob/master/src/caffe/layers/data_augmentation_layer.cu
"""
def __init__(
self,
lmult_pow=[0.4, 0, -0.2],
lmult_mult=[
0.4,
0,
0,
],
lmult_add=[
0.03,
0,
0,
],
sat_pow=[
0.4,
0,
0,
],
sat_mult=[0.5, 0, -0.3],
sat_add=[
0.03,
0,
0,
],
col_pow=[
0.4,
0,
0,
],
col_mult=[
0.2,
0,
0,
],
col_add=[
0.02,
0,
0,
],
ladd_pow=[
0.4,
0,
0,
],
ladd_mult=[
0.4,
0,
0,
],
ladd_add=[
0.04,
0,
0,
],
col_rotate=[
1.0,
0,
0,
],
schedule_coeff=1,
):
# no mean
self.pow_nomean = [1, 1, 1]
self.add_nomean = [0, 0, 0]
self.mult_nomean = [1, 1, 1]
self.pow_withmean = [1, 1, 1]
self.add_withmean = [0, 0, 0]
self.mult_withmean = [1, 1, 1]
self.lmult_pow = 1
self.lmult_mult = 1
self.lmult_add = 0
self.col_angle = 0
if ladd_pow is not None:
self.pow_nomean[0] = np.exp(np.random.normal(ladd_pow[2], ladd_pow[0]))
if col_pow is not None:
self.pow_nomean[1] = np.exp(np.random.normal(col_pow[2], col_pow[0]))
self.pow_nomean[2] = np.exp(np.random.normal(col_pow[2], col_pow[0]))
if ladd_add is not None:
self.add_nomean[0] = np.random.normal(ladd_add[2], ladd_add[0])
if col_add is not None:
self.add_nomean[1] = np.random.normal(col_add[2], col_add[0])
self.add_nomean[2] = np.random.normal(col_add[2], col_add[0])
if ladd_mult is not None:
self.mult_nomean[0] = np.exp(np.random.normal(ladd_mult[2], ladd_mult[0]))
if col_mult is not None:
self.mult_nomean[1] = np.exp(np.random.normal(col_mult[2], col_mult[0]))
self.mult_nomean[2] = np.exp(np.random.normal(col_mult[2], col_mult[0]))
# with mean
if sat_pow is not None:
self.pow_withmean[1] = np.exp(
np.random.uniform(sat_pow[2] - sat_pow[0], sat_pow[2] + sat_pow[0])
)
self.pow_withmean[2] = self.pow_withmean[1]
if sat_add is not None:
self.add_withmean[1] = np.random.uniform(
sat_add[2] - sat_add[0], sat_add[2] + sat_add[0]
)
self.add_withmean[2] = self.add_withmean[1]
if sat_mult is not None:
self.mult_withmean[1] = np.exp(
np.random.uniform(sat_mult[2] - sat_mult[0], sat_mult[2] + sat_mult[0])
)
self.mult_withmean[2] = self.mult_withmean[1]
if lmult_pow is not None:
self.lmult_pow = np.exp(
np.random.uniform(
lmult_pow[2] - lmult_pow[0], lmult_pow[2] + lmult_pow[0]
)
)
if lmult_mult is not None:
self.lmult_mult = np.exp(
np.random.uniform(
lmult_mult[2] - lmult_mult[0], lmult_mult[2] + lmult_mult[0]
)
)
if lmult_add is not None:
self.lmult_add = np.random.uniform(
lmult_add[2] - lmult_add[0], lmult_add[2] + lmult_add[0]
)
if col_rotate is not None:
self.col_angle = np.random.uniform(
col_rotate[2] - col_rotate[0], col_rotate[2] + col_rotate[0]
)
# eigen vectors
self.eigvec = np.reshape(
[0.51, 0.56, 0.65, 0.79, 0.01, -0.62, 0.35, -0.83, 0.44], [3, 3]
).transpose()
def __call__(self, inputs, target):
inputs[0] = self.pca_image(inputs[0])
inputs[1] = self.pca_image(inputs[1])
return inputs, target
def pca_image(self, rgb):
eig = np.dot(rgb, self.eigvec)
mean_rgb = rgb.mean((0, 1))
max_abs_eig = np.abs(eig).max((0, 1))
max_l = np.sqrt(np.sum(max_abs_eig * max_abs_eig))
mean_eig = np.dot(mean_rgb, self.eigvec)
# no-mean stuff
eig -= mean_eig[np.newaxis, np.newaxis]
for c in range(3):
if max_abs_eig[c] > 1e-2:
mean_eig[c] /= max_abs_eig[c]
eig[:, :, c] = eig[:, :, c] / max_abs_eig[c]
eig[:, :, c] = (
np.power(np.abs(eig[:, :, c]), self.pow_nomean[c])
* ((eig[:, :, c] > 0) - 0.5)
* 2
)
eig[:, :, c] = eig[:, :, c] + self.add_nomean[c]
eig[:, :, c] = eig[:, :, c] * self.mult_nomean[c]
eig += mean_eig[np.newaxis, np.newaxis]
# withmean stuff
if max_abs_eig[0] > 1e-2:
eig[:, :, 0] = (
np.power(np.abs(eig[:, :, 0]), self.pow_withmean[0])
* ((eig[:, :, 0] > 0) - 0.5)
* 2
)
eig[:, :, 0] = eig[:, :, 0] + self.add_withmean[0]
eig[:, :, 0] = eig[:, :, 0] * self.mult_withmean[0]
s = np.sqrt(eig[:, :, 1] * eig[:, :, 1] + eig[:, :, 2] * eig[:, :, 2])
smask = s > 1e-2
s1 = np.power(s, self.pow_withmean[1])
s1 = np.clip(s1 + self.add_withmean[1], 0, np.inf)
s1 = s1 * self.mult_withmean[1]
s1 = s1 * smask + s * (1 - smask)
# color angle
if self.col_angle != 0:
temp1 = (
np.cos(self.col_angle) * eig[:, :, 1]
- np.sin(self.col_angle) * eig[:, :, 2]
)
temp2 = (
np.sin(self.col_angle) * eig[:, :, 1]
+ np.cos(self.col_angle) * eig[:, :, 2]
)
eig[:, :, 1] = temp1
eig[:, :, 2] = temp2
# to origin magnitude
for c in range(3):
if max_abs_eig[c] > 1e-2:
eig[:, :, c] = eig[:, :, c] * max_abs_eig[c]
if max_l > 1e-2:
l1 = np.sqrt(
eig[:, :, 0] * eig[:, :, 0]
+ eig[:, :, 1] * eig[:, :, 1]
+ eig[:, :, 2] * eig[:, :, 2]
)
l1 = l1 / max_l
eig[:, :, 1][smask] = (eig[:, :, 1] / s * s1)[smask]
eig[:, :, 2][smask] = (eig[:, :, 2] / s * s1)[smask]
# eig[:,:,1] = (eig[:,:,1] / s * s1) * smask + eig[:,:,1] * (1-smask)
# eig[:,:,2] = (eig[:,:,2] / s * s1) * smask + eig[:,:,2] * (1-smask)
if max_l > 1e-2:
l = np.sqrt(
eig[:, :, 0] * eig[:, :, 0]
+ eig[:, :, 1] * eig[:, :, 1]
+ eig[:, :, 2] * eig[:, :, 2]
)
l1 = np.power(l1, self.lmult_pow)
l1 = np.clip(l1 + self.lmult_add, 0, np.inf)
l1 = l1 * self.lmult_mult
l1 = l1 * max_l
lmask = l > 1e-2
eig[lmask] = (eig / l[:, :, np.newaxis] * l1[:, :, np.newaxis])[lmask]
for c in range(3):
eig[:, :, c][lmask] = (np.clip(eig[:, :, c], -np.inf, max_abs_eig[c]))[
lmask
]
# for c in range(3):
# # eig[:,:,c][lmask] = (eig[:,:,c] / l * l1)[lmask] * lmask + eig[:,:,c] * (1-lmask)
# eig[:,:,c][lmask] = (eig[:,:,c] / l * l1)[lmask]
# eig[:,:,c] = (np.clip(eig[:,:,c], -np.inf, max_abs_eig[c])) * lmask + eig[:,:,c] * (1-lmask)
return np.clip(np.dot(eig, self.eigvec.transpose()), 0, 1)
class ChromaticAug(object):
"""
Chromatic augmentation: https://github.com/lmb-freiburg/flownet2/blob/master/src/caffe/layers/data_augmentation_layer.cu
"""
def __init__(
self,
noise=0.06,
gamma=0.02,
brightness=0.02,
contrast=0.02,
color=0.02,
schedule_coeff=1,
):
self.noise = np.random.uniform(0, noise)
self.gamma = np.exp(np.random.normal(0, gamma * schedule_coeff))
self.brightness = np.random.normal(0, brightness * schedule_coeff)
self.contrast = np.exp(np.random.normal(0, contrast * schedule_coeff))
self.color = np.exp(np.random.normal(0, color * schedule_coeff, 3))
def __call__(self, inputs, target):
inputs[1] = self.chrom_aug(inputs[1])
# noise
inputs[0] += np.random.normal(0, self.noise, inputs[0].shape)
inputs[1] += np.random.normal(0, self.noise, inputs[0].shape)
return inputs, target
def chrom_aug(self, rgb):
# color change
mean_in = rgb.sum(-1)
rgb = rgb * self.color[np.newaxis, np.newaxis]
brightness_coeff = mean_in / (rgb.sum(-1) + 0.01)
rgb = np.clip(rgb * brightness_coeff[:, :, np.newaxis], 0, 1)
# gamma
rgb = np.power(rgb, self.gamma)
# brightness
rgb += self.brightness
# contrast
rgb = 0.5 + (rgb - 0.5) * self.contrast
rgb = np.clip(rgb, 0, 1)
return