"""
Utilities for other scripts
"""
import os
import shutil
import random
import torch
import mlflow
from mlflow.tracking import MlflowClient
import numpy as np
from IPython.display import display, Markdown
from b2sdk.v1 import *
import argparse
from torch import nn
def str2bool(string):
return string == 'True'
def np2torch(nparray):
"""Convert numpy array to torch tensor
For array with more than 3 channels, it is better to use an input array in the format BxHxWxC
Args:
numpy array (ndarray) BxHxWxC
Returns:
torch tensor (tensor) BxCxHxW"""
tensor = torch.Tensor(nparray)
if tensor.ndim == 2:
return tensor
if tensor.ndim == 3:
height, width, channels = tensor.shape
if channels <= 3: # Single image with more channels (HxWxC)
return tensor.permute(2, 0, 1)
if tensor.ndim == 4: # More images with more channels (BxHxWxC)
return tensor.permute(0, 3, 1, 2)
return tensor
def torch2np(torchtensor):
"""Convert torch tensor to numpy array
For tensor with more than 3 channels or batch, it is better to use an input tensor in the format BxCxHxW
Args:
torch tensor (tensor) BxCxHxW
Returns:
numpy array (ndarray) BxHxWxC"""
ndarray = torchtensor.detach().cpu().numpy().astype(np.float32)
if ndarray.ndim == 3: # Single image with more channels (CxHxW)
channels, height, width = ndarray.shape
if channels <= 3:
return ndarray.transpose(1, 2, 0)
if ndarray.ndim == 4: # More images with more channels (BxCxHxW)
return ndarray.transpose(0, 2, 3, 1)
return ndarray
def set_random_seed(seed):
np.random.seed(seed) # cpu vars
torch.manual_seed(seed) # cpu vars
random.seed(seed) # Python
if torch.cuda.is_available():
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed) # gpu vars
torch.backends.cudnn.deterministic = True # needed
torch.backends.cudnn.benchmark = False
def normalize(img):
"""Normalize images
Args:
imgs (ndarray): image to normalize --> size: (Height,Width,Channels)
Returns:
normalized (ndarray): normalized image
mu (ndarray): mean
sigma (ndarray): standard deviation
"""
img = img.astype(float)
if len(img.shape) == 2:
img = img[:, :, np.newaxis]
height, width, channels = img.shape
mu, sigma = np.empty(channels), np.empty(channels)
for ch in range(channels):
temp_mu = img[:, :, ch].mean()
temp_sigma = img[:, :, ch].std()
img[:, :, ch] = (img[:, :, ch] - temp_mu) / (temp_sigma + 1e-4)
mu[ch] = temp_mu
sigma[ch] = temp_sigma
return img, mu, sigma
def b2_list_files(folder=''):
bucket = get_b2_bucket()
for file_info, _ in bucket.ls(folder, show_versions=False):
print(file_info.file_name)
def get_b2_bucket():
bucket_name = 'perturbed-minds'
application_key_id = '003d6b042de536a0000000008'
application_key = 'K003HMNxnoa91Dy9c0V8JVCKNUnwR9U'
info = InMemoryAccountInfo()
b2_api = B2Api(info)
b2_api.authorize_account('production', application_key_id, application_key)
bucket = b2_api.get_bucket_by_name(bucket_name)
return bucket
def b2_download_folder(b2_dir, local_dir, force_download=False, mirror_folder=True):
"""Downloads a folder from the b2 bucket and optionally cleans
up files that are no longer on the server
Args:
b2_dir (str): path to folder on the b2 server
local_dir (str): path to folder on the local machine
force_download (bool, optional): force the download, if set to `False`,
files with matching names on the local machine will be skipped
mirror_folder (bool, optional): if set to `True`, files that are found in
the local directory, but are not on the server will be deleted
"""
bucket = get_b2_bucket()
if not os.path.exists(local_dir):
os.makedirs(local_dir)
elif not force_download:
return
download_files = [file_info.file_name.split(b2_dir + '/')[-1]
for file_info, _ in bucket.ls(b2_dir, show_versions=False)]
for file_name in download_files:
if file_name.endswith('/.bzEmpty'): # subdirectory, download recursively
subdir = file_name.replace('/.bzEmpty', '')
if len(subdir) > 0:
b2_subdir = os.path.join(b2_dir, subdir)
local_subdir = os.path.join(local_dir, subdir)
if b2_subdir != b2_dir:
b2_download_folder(b2_subdir, local_subdir, force_download=force_download,
mirror_folder=mirror_folder)
else: # file
b2_file = os.path.join(b2_dir, file_name)
local_file = os.path.join(local_dir, file_name)
if not os.path.exists(local_file) or force_download:
print(f"downloading b2://{b2_file} -> {local_file}")
bucket.download_file_by_name(b2_file, DownloadDestLocalFile(local_file))
if mirror_folder: # remove all files that are not on the b2 server anymore
for i, file in enumerate(download_files):
if file.endswith('/.bzEmpty'): # subdirectory, download recursively
download_files[i] = file.replace('/.bzEmpty', '')
for file_name in os.listdir(local_dir):
if file_name not in download_files:
local_file = os.path.join(local_dir, file_name)
print(f"deleting {local_file}")
if os.path.isdir(local_file):
shutil.rmtree(local_file)
else:
os.remove(local_file)
def get_name(obj):
return obj.__name__ if hasattr(obj, '__name__') else type(obj).__name__
def get_mlflow_model_by_name(experiment_name, run_name,
tracking_uri="http://deplo-mlflo-1ssxo94f973sj-890390d809901dbf.elb.eu-central-1.amazonaws.com",
download_model=True):
# 0. mlflow basics
mlflow.set_tracking_uri(tracking_uri)
os.environ["AWS_ACCESS_KEY_ID"] = "#TODO: add your AWS access key if you want to write your results to our collaborative lab server"
os.environ["AWS_SECRET_ACCESS_KEY"] = "#TODO: add your AWS seceret access key if you want to write your results to our collaborative lab server"
# # 1. use get_experiment_by_name to get experiment objec
experiment = mlflow.get_experiment_by_name(experiment_name)
# # 2. use search_runs with experiment_id for string search query
if os.path.isfile('cache/runs_names.pkl'):
runs = pd.read_pickle('cache/runs_names.pkl')
if runs['tags.mlflow.runName'][runs['tags.mlflow.runName'] == run_name].empty:
# returns a pandas data frame where each row is a run (if several exist under that name)
runs = fetch_runs_list_mlflow(experiment)
else:
# returns a pandas data frame where each row is a run (if several exist under that name)
runs = fetch_runs_list_mlflow(experiment)
# 3. get the selected run between all runs inside the selected experiment
run = runs.loc[runs['tags.mlflow.runName'] == run_name]
# 4. check if there is only a run with that name
assert len(run) == 1, "More runs with this name"
index_run = run.index[0]
artifact_uri = run.loc[index_run, 'artifact_uri']
# 5. load state_dict of your run
state_dict = mlflow.pytorch.load_state_dict(artifact_uri)
# 6. load model of your run
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
# model = mlflow.pytorch.load_model(os.path.join(
# artifact_uri, "model"), map_location=torch.device(DEVICE))
model = fetch_from_mlflow(os.path.join(
artifact_uri, "model"), use_cache=True, download_model=download_model)
return state_dict, model
def data_loader_mean_and_std(data_loader, transform=None):
means = []
stds = []
for x, y in data_loader:
if transform is not None:
x = transform(x)
means.append(x.mean(dim=(0, 2, 3)).unsqueeze(0))
stds.append(x.std(dim=(0, 2, 3)).unsqueeze(0))
return torch.cat(means).mean(dim=0), torch.cat(stds).mean(dim=0)
def fetch_runs_list_mlflow(experiment):
runs = mlflow.search_runs(experiment.experiment_id)
runs.to_pickle('cache/runs_names.pkl') # where to save it, usually as a .pkl
return runs
def fetch_from_mlflow(uri, type='', use_cache=True, download_model=True):
cache_loc = os.path.join('cache', uri.split('//')[1]) + '.pt'
if use_cache and os.path.exists(cache_loc):
print(f'loading cached model from {cache_loc} ...')
model = torch.load(cache_loc)
else:
print(f'fetching model from {uri} ...')
model = mlflow.pytorch.load_model(uri)
os.makedirs(os.path.dirname(cache_loc), exist_ok=True)
if download_model:
torch.save(model, cache_loc, pickle_module=mlflow.pytorch.pickle_module)
if type == 'processor':
processor = model.processor
model.processor = None
del model # free up memory space
return processor
if type == 'classifier':
classifier = model.classifier
model.classifier = None
del model # free up memory space
return classifier
return model
def display_mlflow_run_info(run):
uri = mlflow.get_tracking_uri()
experiment_id = run.info.experiment_id
experiment_name = mlflow.get_experiment(experiment_id).name
run_id = run.info.run_id
run_name = run.data.tags['mlflow.runName']
experiment_url = f'{uri}/#/experiments/{experiment_id}'
run_url = f'{experiment_url}/runs/{run_id}'
print(f'view results at {run_url}')
display(Markdown(
f"[experiment {experiment_id} '{experiment_name}']"
f" > "
f"[run '{run_name}' {run_id}]"
))
print('')
def get_train_test_indices_drone(df, frac, seed=None):
""" Split indices of a DataFrame with binary and balanced labels into balanced subindices
Args:
df (pd.DataFrame): {0,1}-labeled data
frac (float): fraction of indicies in first subset
random_seed (int): random seed used as random state in np.random and as argument for random.seed()
Returns:
train_indices (torch.tensor): balanced subset of indices corresponding to rows in the DataFrame
test_indices (torch.tensor): balanced subset of indices corresponding to rows in the DataFrame
"""
split_idx = int(len(df) * frac / 2)
df_with = df[df['label'] == 1]
df_without = df[df['label'] == 0]
np.random.seed(seed)
df_with_train = df_with.sample(n=split_idx, random_state=seed)
df_with_test = df_with.drop(df_with_train.index)
df_without_train = df_without.sample(n=split_idx, random_state=seed)
df_without_test = df_without.drop(df_without_train.index)
train_indices = list(df_without_train.index) + list(df_with_train.index)
test_indices = list(df_without_test.index) + list(df_with_test.index)
""""
print('fraction of 1-label in train set: {}'.format(len(df_with_train)/(len(df_with_train) + len(df_without_train))))
print('fraction of 1-label in test set: {}'.format(len(df_with_test)/(len(df_with_test) + len(df_with_test))))
"""
return train_indices, test_indices
# def smp_get_loss(loss):
# if loss == "Dice":
# return smp.losses.DiceLoss(mode='binary', from_logits=True)
# if loss == "BCE":
# return nn.BCELoss()
# elif loss == "BCEWithLogits":
# return smp.losses.BCEWithLogitsLoss()
# elif loss == "DicyBCE":
# from pytorch_toolbelt import losses as ptbl
# return ptbl.JointLoss(ptbl.DiceLoss(mode='binary', from_logits=False),
# nn.BCELoss(),
# first_weight=args.dice_weight,
# second_weight=args.bce_weight)
# Adversarial setup
def l2_regularization(x, y):
return ((x - y) ** 2).sum()
class AuxLoss(nn.Module):
def __init__(self, loss_aux, processor_adv, processor_default, weight=1):
super().__init__()
self.loss_aux = loss_aux
self.weight = weight
self.processor_adv = processor_adv
self.processor_default = processor_default
def forward(self, x):
with torch.no_grad():
x_reference = self.processor_default(x)
x_processed = self.processor.buffer['processed_rgb']
return self.weight * self.loss_aux(x_reference, x_processed)
class WeightedLoss(nn.Module):
def __init__(self, loss, weight=1):
super().__init__()
self.loss = loss
self.weight = weight
def forward(self, x, y):
return self.weight * self.loss(x, y)
def __repr__(self):
return f'{self.weight} * {get_name(self.loss)}'