import numpy as np import random import torch from data.data_processing import transpose, tensor_to_ind_tensor from data.data_processing_reverse import tuples_to_str import sys sys.path.append("..") from utils import get_n_instruments import os """ Main data loader """ class Loader: def __init__(self, data_folder, data, input_len, conditioning, save_input_dir=None, pad=True, use_start_token=True, use_end_token=False, max_transpose=3, n_try=5, bar_start_prob=0.5, debug=False, overfit=False, regression=False, max_samples=None, min_n_instruments=3, use_cls_token=True, always_use_discrete_condition=False): self.data_folder = data_folder self.bar_start_prob = bar_start_prob self.save_input_dir = save_input_dir self.input_len = input_len self.n_try = n_try # max number of trials to find suitable sample self.min_n_instruments = min_n_instruments self.overfit = overfit self.one_sample = None self.transpose_options = list(range(-max_transpose, max_transpose + 1)) self.conditioning = conditioning self.regression = regression self.use_cls_token = use_cls_token self.pad = pad self.always_use_discrete_condition = always_use_discrete_condition self.pad_token = '' if pad else None self.start_token = '' if use_start_token else None self.end_token = '' if use_end_token else None self.cls_token = "" if debug or overfit: data_folder = data_folder + "_debug" self.data = data data_files = os.listdir(self.data_folder) self.data = [sample for sample in self.data if sample["file"] + '.pt' in data_files] maps_file = os.path.join(os.path.abspath(data_folder + "/.."), "maps.pt") self.maps = torch.load(maps_file) extra_tokens = [] if self.conditioning == "continuous_token": # two condition tokens will be concatenated later self.input_len -= 2 elif self.conditioning == "discrete_token": # add emotion tokens to mappings for sample in self.data: for label in ["valence", "arousal"]: token = sample[label] if token not in extra_tokens: extra_tokens.append(token) extra_tokens = sorted(extra_tokens) if self.regression and self.use_cls_token: extra_tokens.append(self.cls_token) if extra_tokens != []: # add to maps maps_list = list(self.maps["idx2tuple"].values()) maps_list += extra_tokens self.maps["idx2tuple"] = {i: val for i, val in enumerate(maps_list)} self.maps["tuple2idx"] = {val: i for i, val in enumerate(maps_list)} if max_samples is not None and not debug and not overfit: self.data = self.data[:max_samples] # roughly / 256, but *4 for flexibility. it is later cut anyway self.n_bars = max(round(input_len / 256 * 4), 1) def get_vocab_len(self): return len(self.maps["tuple2idx"]) def get_maps(self): return self.maps def get_pad_idx(self): return self.maps["tuple2idx"][self.pad_token] def __len__(self): return len(self.data) def __getitem__(self, idx): if not self.overfit or self.one_sample is None: data_path = os.path.join(self.data_folder, self.data[idx]["file"] + ".pt") item = torch.load(data_path) all_bars = item["bars"] n_instruments = 0 j = 0 while j < self.n_try and n_instruments < self.min_n_instruments: # make sure to have n many instruments # choose random bar max_bar_start_idx = max(0, len(all_bars) - self.n_bars - 1) bar_start_idx = random.randint(0, max_bar_start_idx) bar_end_idx = min(len(all_bars), bar_start_idx + self.n_bars) bars = all_bars[bar_start_idx:bar_end_idx] # flatten if bars != []: bars = torch.cat(bars, dim=0) symbols = tuples_to_str(bars.cpu().numpy(), self.maps["idx2event"]) n_instruments = get_n_instruments(symbols) else: n_instruments = 0 j += 1 if n_instruments < self.min_n_instruments: return None, None, None # transpose if self.transpose_options != []: n_transpose = random.choice(self.transpose_options) bars = transpose(bars, n_transpose, self.maps["transposable_event_inds"]) # convert to indices (final input) bars = tensor_to_ind_tensor(bars, self.maps["tuple2idx"]) # Decide taking the sample from the start of a bar or not r = np.random.uniform() start_at_beginning = not (r > self.bar_start_prob and bars.size(0) > self.input_len) if start_at_beginning: # starts exactly at bar location if self.start_token is not None: # add start token start_idx = torch.ShortTensor( [self.maps["tuple2idx"][self.start_token]]) bars = torch.cat((start_idx, bars), dim=0) else: # it doesn't have to start at bar location so shift arbitrarily start = np.random.randint(0, bars.size(0)-self.input_len) bars = bars[start:start+self.input_len+1] if self.regression and self.use_cls_token: # prepend token cls_idx = torch.ShortTensor( [self.maps["tuple2idx"][self.cls_token]]) bars = torch.cat((cls_idx, bars), 0) # for now, no auxiliary conditions condition = torch.FloatTensor([np.nan, np.nan]) if self.conditioning == "discrete_token" and \ (start_at_beginning or self.always_use_discrete_condition): # add emotion tokens valence, arousal = self.data[idx]["valence"], self.data[idx]["arousal"] valence = torch.ShortTensor([self.maps["tuple2idx"][valence]]) arousal = torch.ShortTensor([self.maps["tuple2idx"][arousal]]) bars = torch.cat((valence, arousal, bars), dim=0) elif self.conditioning in ("continuous_token", "continuous_concat") or self.regression: # continuous conditions condition = torch.FloatTensor([self.data[idx]["valence"], self.data[idx]["arousal"]]) bars = bars[:self.input_len + 1] # trim to length, +1 to include target if self.pad_token is not None: n_pad = self.input_len + 1 - bars.shape[0] if n_pad > 0: # pad if necessary bars = torch.nn.functional.pad(bars, (0, n_pad), value=self.get_pad_idx()) bars = bars.long() # to int32 input_ = bars[:-1] if self.regression: target = None # will use condition as target else: target = bars[1:] if self.conditioning == "continuous_token": # pad target from left, because input will get conditions concatenated # their sizes should match target = torch.nn.functional.pad(target, (condition.size(0), 0), value=self.get_pad_idx()) if self.overfit: self.one_sample = [input_, condition, target] else: # sanity check, using one sample repeatedly input_, condition, target = self.one_sample return input_, condition, target