raw2logit / utils /dataset_utils.py
marco.aversa
updated libraries
a43a0ab
"""
Dataset Import/Download Tools
"""
import os
import random
import numpy as np
import rawpy
from PIL import Image
from sklearn.model_selection import StratifiedShuffleSplit
import torch
from skimage.util.shape import view_as_windows
IMAGE_FILE_TYPES = ['dng', 'png', 'tif', 'tiff']
def load_image(path):
file_type = path.split('.')[-1].lower()
if file_type == 'dng':
img = rawpy.imread(path).raw_image_visible
elif file_type == 'tiff' or file_type == 'tif':
img = np.array(tiff.imread(path), dtype=np.float32)
else:
img = np.array(Image.open(path), dtype=np.float32)
return img
def list_images_in_dir(path):
image_list = [os.path.join(path, img_name)
for img_name in sorted(os.listdir(path))
if img_name.split('.')[-1].lower() in IMAGE_FILE_TYPES]
return image_list
def k_fold(dataset, n_splits: int, seed: int, train_size: float):
"""Split dataset in subsets for cross-validation
Args:
dataset (class): dataset to split
n_split (int): Number of re-shuffling & splitting iterations.
seed (int): seed for k_fold splitting
train_size (float): should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the train split.
Returns:
idxs (list): indeces for splitting the dataset. The list contain n_split pair of train/test indeces.
"""
if hasattr(dataset, 'labels'):
x = dataset.images
y = dataset.labels
elif hasattr(dataset, 'masks'):
x = dataset.images
y = dataset.masks
idxs = []
if dataset.task == 'classification':
sss = StratifiedShuffleSplit(n_splits=n_splits, train_size=train_size, random_state=seed)
for idxs_train, idxs_test in sss.split(x, y):
idxs.append((idxs_train.tolist(), idxs_test.tolist()))
elif dataset.task == 'segmentation':
for n in range(n_splits):
split_idx = int(len(dataset) * train_size)
indices = np.random.permutation(len(dataset))
idxs.append((indices[:split_idx].tolist(), indices[split_idx:].tolist()))
return idxs
def split_img(imgs, ROIs=(3, 3), step=(1, 1)):
"""Split the imgs in regions of size ROIs.
Args:
imgs (ndarray): images which you want to split
ROIs (tuple): size of sub-regions splitted (ROIs=region of interests)
step (tuple): step path from one sub-region to the next one (in the x,y axis)
Returns:
ndarray: splitted subimages.
The size is (x_num_subROIs*y_num_subROIs, **) where:
x_num_subROIs = ( imgs.shape[1]-int(ROIs[1]/2)*2 )/step[1]
y_num_subROIs = ( imgs.shape[0]-int(ROIs[0]/2)*2 )/step[0]
Example:
>>> from dataset_generator import split
>>> imgs_splitted = split(imgs, ROI_size = (5,5), step=(2,3))
"""
if len(ROIs) > 2:
return print("ROIs is a 2 element list")
if len(step) > 2:
return print("step is a 2 element list")
if type(imgs) != type(np.array(1)):
return print("imgs should be a ndarray")
if len(imgs.shape) == 2: # Single image with one channel (HxW)
splitted = view_as_windows(imgs, (ROIs[0], ROIs[1]), (step[0], step[1]))
return splitted.reshape((-1, ROIs[0], ROIs[1]))
if len(imgs.shape) == 3:
_, _, channels = imgs.shape
if channels <= 3: # Single image more channels (HxWxC)
splitted = view_as_windows(imgs, (ROIs[0], ROIs[1], channels), (step[0], step[1], channels))
return splitted.reshape((-1, ROIs[0], ROIs[1], channels))
else: # More images with 1 channel
splitted = view_as_windows(imgs, (1, ROIs[0], ROIs[1]), (1, step[0], step[1]))
return splitted.reshape((-1, ROIs[0], ROIs[1]))
if len(imgs.shape) == 4: # More images with more channels(BxHxWxC)
_, _, _, channels = imgs.shape
splitted = view_as_windows(imgs, (1, ROIs[0], ROIs[1], channels), (1, step[0], step[1], channels))
return splitted.reshape((-1, ROIs[0], ROIs[1], channels))
def join_blocks(splitted, final_shape):
"""Join blocks to reobtain a splitted image
Attribute:
splitted (tensor) = image splitted in blocks, size = (N_blocks, Channels, Height, Width)
final_shape (tuple) = size of the final image reconstructed (Height, Width)
Return:
tensor: image restored from blocks. size = (Channels, Height, Width)
"""
n_blocks, channels, ROI_height, ROI_width = splitted.shape
rows = final_shape[0] // ROI_height
columns = final_shape[1] // ROI_width
final_img = torch.empty(rows, channels, ROI_height, ROI_width * columns)
for r in range(rows):
stackblocks = splitted[r * columns]
for c in range(1, columns):
stackblocks = torch.cat((stackblocks, splitted[r * columns + c]), axis=2)
final_img[r] = stackblocks
joined_img = final_img[0]
for i in np.arange(1, len(final_img)):
joined_img = torch.cat((joined_img, final_img[i]), axis=1)
return joined_img
def random_ROI(X, Y, ROIs=(512, 512)):
""" Return a random region for each input/target pair images of the dataset
Args:
Y (ndarray): target of your dataset --> size: (BxHxWxC)
X (ndarray): input of your dataset --> size: (BxHxWxC)
ROIs (tuple): size of random region (ROIs=region of interests)
Returns:
For each pair images (input/target) of the dataset, return respectively random ROIs
Y_cut (ndarray): target of your dataset --> size: (Batch,Channels,ROIs[0],ROIs[1])
X_cut (ndarray): input of your dataset --> size: (Batch,Channels,ROIs[0],ROIs[1])
Example:
>>> from dataset_generator import random_ROI
>>> X,Y = random_ROI(X,Y, ROIs = (10,10))
"""
batch, channels, height, width = X.shape
X_cut = np.empty((batch, ROIs[0], ROIs[1], channels))
Y_cut = np.empty((batch, ROIs[0], ROIs[1], channels))
for i in np.arange(len(X)):
x_size = int(random.random() * (height - (ROIs[0] + 1)))
y_size = int(random.random() * (width - (ROIs[1] + 1)))
X_cut[i] = X[i, x_size:x_size + ROIs[0], y_size:y_size + ROIs[1], :]
Y_cut[i] = Y[i, x_size:x_size + ROIs[0], y_size:y_size + ROIs[1], :]
return X_cut, Y_cut
def one2many_random_ROI(X, Y, datasize=1000, ROIs=(512, 512)):
""" Return a dataset of N subimages obtained from random regions of the same image
Args:
Y (ndarray): target of your dataset --> size: (1,H,W,C)
X (ndarray): input of your dataset --> size: (1,H,W,C)
datasize = number of random ROIs to generate
ROIs (tuple): size of random region (ROIs=region of interests)
Returns:
Y_cut (ndarray): target of your dataset --> size: (Datasize,ROIs[0],ROIs[1],Channels)
X_cut (ndarray): input of your dataset --> size: (Datasize,ROIs[0],ROIs[1],Channels)
"""
batch, channels, height, width = X.shape
X_cut = np.empty((datasize, ROIs[0], ROIs[1], channels))
Y_cut = np.empty((datasize, ROIs[0], ROIs[1], channels))
for i in np.arange(datasize):
X_cut[i], Y_cut[i] = random_ROI(X, Y, ROIs)
return X_cut, Y_cut