Spaces:
Configuration error
Configuration error
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# | |
# Copyright (c) 2022 Intel Corporation | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
import re | |
import torch | |
import random | |
import time | |
from requests.adapters import ProxyError | |
from transformers import AutoTokenizer | |
from torch.utils.data import DataLoader as loader | |
from tlt.datasets.dataset import BaseDataset | |
class HFDataset(BaseDataset): | |
""" | |
Base class to represent Hugging Face Dataset | |
""" | |
def __init__(self, dataset_dir, dataset_name="", dataset_catalog=""): | |
BaseDataset.__init__(self, dataset_dir, dataset_name, dataset_catalog) | |
def get_batch(self, subset='all'): | |
""" | |
Get a single batch of images and labels from the dataset. | |
Args: | |
subset (str): default "all", can also be "train", "validation", or "test" | |
Returns: | |
(examples, labels) | |
Raises: | |
ValueError: if the dataset is not defined yet or the given subset is not valid | |
""" | |
if subset == 'all' and self._dataset is not None: | |
return next(iter(self._data_loader)) | |
elif subset == 'train' and self.train_subset is not None: | |
return next(iter(self._train_loader)) | |
elif subset == 'validation' and self.validation_subset is not None: | |
return next(iter(self._validation_loader)) | |
elif subset == 'test' and self.test_subset is not None: | |
return next(iter(self._test_loader)) | |
else: | |
raise ValueError("Unable to return a batch, because the dataset or subset hasn't been defined.") | |
def preprocess( | |
self, | |
model_name: str, | |
batch_size: int = 32, | |
padding: str = "max_length", | |
truncation: bool = True, | |
max_length: int = 64, | |
**kwargs | |
) -> None: | |
""" | |
Preprocess the textual dataset to apply padding, truncation and tokenize. | |
Args: | |
model_name (str): Name of the model to get a matching tokenizer. | |
batch_size (int): Number of batches to split the data. | |
padding (str): desired padding. (default: "max_length") | |
max_length (int): desired max length. (default: 64) | |
truncation (bool): Boolean specifying to truncate the word tokens to match with the | |
longest sentence. (default: True) | |
max_length (int): Maximum sequence length | |
Raises: | |
ValueError: if data has already been preprocessed (or) non integer batch size given (or) | |
given dataset hasn't been implemented into the API yet. | |
""" | |
# Sanity checks | |
if not isinstance(batch_size, int) or batch_size < 1: | |
raise ValueError("batch_size should be an positive integer") | |
if self._preprocessed: | |
raise ValueError("Data has already been preprocessed: {}".format(self._preprocessed)) | |
column_names = self._dataset.column_names | |
# There must be at least one feature named 'label' in the self._dataset. The remaining features | |
# become the text columns provided they contain only strings | |
text_column_names = [col_name for col_name in column_names if col_name != 'label' and | |
all(isinstance(s, str) for s in self._dataset[col_name])] | |
# Get the tokenizer | |
try: | |
self._tokenizer = AutoTokenizer.from_pretrained(model_name) | |
except ProxyError: | |
print("Max retries reached. Sleeping for 10 sec...") | |
time.sleep(10) | |
self._tokenizer = AutoTokenizer.from_pretrained(model_name) | |
# Define a tokenize function to map the text to the tokenizer | |
def tokenize_function(examples): | |
# Define the tokenizer args, depending on number of text columns present in the dataset | |
args = (examples[text_column_name] for text_column_name in text_column_names) | |
result = self._tokenizer(*args, padding=padding, max_length=max_length, truncation=truncation) | |
return result | |
self._dataset = self._dataset.map(tokenize_function, batched=True) | |
# Prepare the tokenized dataset in the format expected by model. | |
# Remove the rest of the features from the tokenized dataset except 'label' | |
self._dataset = self._dataset.remove_columns([col for col in column_names if col != 'label']) | |
# Set format to torch | |
self._dataset.set_format("torch") | |
self._preprocessed = { | |
'padding': padding, | |
'truncation': truncation, | |
'batch_size': batch_size, | |
} | |
self._make_data_loaders(batch_size=batch_size) | |
print("Tokenized Dataset:", self._dataset) | |
def shuffle_split(self, train_pct=.75, val_pct=.25, test_pct=0., shuffle_files=True, seed=None): | |
""" | |
Randomly split the dataset into train, validation, and test subsets with a pseudo-random seed option. | |
Args: | |
train_pct (float): default .75, percentage of dataset to use for training | |
val_pct (float): default .25, percentage of dataset to use for validation | |
test_pct (float): default 0.0, percentage of dataset to use for testing | |
shuffle_files (bool): default True, optionally control whether shuffling occurs | |
seed (None or int): default None, can be set for pseudo-randomization | |
Raises: | |
ValueError: if percentage input args are not floats or sum to greater than 1 | |
""" | |
# Sanity checks | |
if not (isinstance(train_pct, float) and isinstance(val_pct, float) and isinstance(test_pct, float)): | |
raise ValueError("Percentage arguments must be floats.") | |
if train_pct + val_pct + test_pct > 1.0: | |
raise ValueError("Sum of percentage arguments must be less than or equal to 1.") | |
self._validation_type = 'shuffle_split' | |
# Calculating splits | |
length = len(self._dataset) | |
train_size = int(train_pct * length) | |
val_size = int(val_pct * length) | |
test_size = int(test_pct * length) | |
generator = torch.Generator().manual_seed(seed) if seed else None | |
if shuffle_files: | |
dataset_indices = torch.randperm(length, generator=generator).tolist() | |
else: | |
dataset_indices = range(length) | |
self._train_indices = dataset_indices[:train_size] | |
self._validation_indices = dataset_indices[train_size:train_size + val_size] | |
if test_pct: | |
self._test_indices = dataset_indices[train_size + val_size:train_size + val_size + test_size] | |
else: | |
self._test_indices = None | |
if self._preprocessed and 'batch_size' in self._preprocessed: | |
self._make_data_loaders(batch_size=self._preprocessed['batch_size'], generator=generator) | |
print("Dataset split into:") | |
print("-------------------") | |
print("{} train samples".format(train_size)) | |
print("{} test samples".format(test_size)) | |
print("{} validation samples".format(val_size)) | |
def _make_data_loaders(self, batch_size, generator=None): | |
def seed_worker(worker_id): | |
import numpy as np | |
worker_seed = torch.initial_seed() % 2**32 | |
np.random.seed(worker_seed) | |
random.seed(worker_seed) | |
if self._validation_type == 'shuffle_split': | |
self._train_loader = loader(self.train_subset, batch_size=batch_size, shuffle=self._shuffle, | |
num_workers=self._num_workers, worker_init_fn=seed_worker, generator=generator) | |
self._validation_loader = loader(self.validation_subset, batch_size=batch_size, shuffle=self._shuffle, | |
num_workers=self._num_workers, worker_init_fn=seed_worker, | |
generator=generator) | |
if self._test_indices: | |
self._test_loader = loader(self.test_subset, batch_size=batch_size, shuffle=self._shuffle, | |
num_workers=self._num_workers, worker_init_fn=seed_worker, | |
generator=generator) | |
elif self._validation_type == 'defined_split': | |
if 'train' in self._split: | |
self._train_loader = loader(self.train_subset, batch_size=batch_size, shuffle=self._shuffle, | |
num_workers=self._num_workers, worker_init_fn=seed_worker, | |
generator=generator) | |
if 'test' in self._split: | |
self._test_loader = loader(self.test_subset, batch_size=batch_size, shuffle=self._shuffle, | |
num_workers=self._num_workers, worker_init_fn=seed_worker, | |
generator=generator) | |
if 'validation' in self._split: | |
self._validation_loader = loader(self.validation_subset, batch_size=batch_size, shuffle=self._shuffle, | |
num_workers=self._num_workers, worker_init_fn=seed_worker, | |
generator=generator) | |
elif self._validation_type is None: | |
self._data_loader = loader(self._dataset, batch_size=batch_size, shuffle=self._shuffle, | |
num_workers=self._num_workers, worker_init_fn=seed_worker, generator=generator) | |
self._train_loader = self._data_loader | |
self._test_loader = self._data_loader | |
self._validation_loader = self._data_loader | |
def get_text(self, input_ids): | |
""" | |
Helper function to decode the input_ids to text | |
""" | |
decoded_text = [] | |
if isinstance(input_ids, list): | |
input_ids = torch.tensor(input_ids) | |
if input_ids.ndim > 1: | |
decoded_tokens = self._tokenizer.batch_decode(input_ids) | |
for t in decoded_tokens: | |
decoded_text.append(re.search(r'\[CLS\] (.*?) \[SEP\]', t).group(1)) | |
else: | |
decoded_tokens = self._tokenizer.decode(input_ids) | |
decoded_text.append(re.search(r'\[CLS\] (.*?) \[SEP\]', decoded_tokens).group(1)) | |
return decoded_text | |
def train_subset(self): | |
train_ds = None | |
if self._validation_type == 'shuffle_split': | |
train_ds = self._dataset.select(self._train_indices) | |
elif self._validation_type == 'defined_split': | |
if 'train' in self._split: | |
train_ds = self._dataset.select(self._train_indices) | |
else: | |
raise ValueError("train split not specified") | |
elif self._validation_type is None: | |
train_ds = self._dataset | |
return train_ds | |
def test_subset(self): | |
test_ds = None | |
if self._validation_type == 'shuffle_split': | |
if self._test_indices: | |
test_ds = self._dataset.select(self._test_indices) | |
elif self._validation_type == 'defined_split': | |
if 'test' in self._split: | |
test_ds = self._dataset.select(self._test_indices) | |
else: | |
raise ValueError("test split not specified") | |
elif self._validation_type is None: | |
test_ds = self._dataset | |
return test_ds | |
def validation_subset(self): | |
validation_ds = None | |
if self._validation_type == 'shuffle_split': | |
validation_ds = self._dataset.select(self._validation_indices) | |
elif self._validation_type == 'defined_split': | |
if 'validation' in self._split: | |
validation_ds = self._dataset.select(self._validation_indices) | |
else: | |
raise ValueError("validation split not specified") | |
elif self._validation_type is None: | |
validation_ds = self._dataset | |
return validation_ds | |
def train_loader(self): | |
if self._train_loader: | |
return self._train_loader | |
else: | |
raise ValueError("train split not specified") | |
def test_loader(self): | |
if self._test_loader: | |
return self._test_loader | |
else: | |
raise ValueError("test split not specified") | |
def validation_loader(self): | |
if self._validation_loader: | |
return self._validation_loader | |
else: | |
raise ValueError("validation split not specified") | |
def get_inc_dataloaders(self): | |
calib_dataset = self.train_subset | |
if self.validation_loader is not None: | |
eval_dataset = self.validation_subset | |
elif self.test_loader is not None: | |
eval_dataset = self.test_subset | |
else: | |
eval_dataset = self.train_subset | |
# Drop the label column because Intel Neural Compressor does not like it embedded with the features | |
# If we need to compute metrics from the labels, we can improve this with a subclass of | |
# torch.utils.data.Dataset or neural_compressor.data.datasets.bert_dataset.PytorchBertDataset that | |
# also returns the labels from __getitem__ | |
for label_col_name in ['labels', 'label']: | |
if label_col_name in self._dataset.features.keys(): | |
calib_dataset = calib_dataset.remove_columns(label_col_name) | |
eval_dataset = eval_dataset.remove_columns(label_col_name) | |
calib_dataloader = loader(calib_dataset, batch_size=self._preprocessed['batch_size']) | |
eval_dataloader = loader(eval_dataset, batch_size=self._preprocessed['batch_size']) | |
return calib_dataloader, eval_dataloader | |