Spaces:
Configuration error
Configuration error
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# | |
# Copyright (c) 2023 Intel Corporation | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
import os | |
import dill # nosec: B403 | |
import time | |
import tensorflow as tf | |
import tensorflow_hub as hub | |
import numpy as np | |
from transformers import TFBertModel, BertConfig | |
from pydoc import locate | |
from tlt.utils.dataset_utils import prepare_huggingface_input_data | |
from tlt.models.model_factory import get_model_info | |
# This needs to be imported last to avoid "free(): invalid pointer" error | |
import horovod.tensorflow.keras as hvd | |
class DistributedTrainingArguments: | |
def __init__(self, use_case, train_data, model, optimizer, loss, test_data=None, val_data=None, | |
epochs=1, global_batch_size=128, shuffle=True, scaling='weak', **kwargs) -> None: | |
self.use_case = use_case | |
# Model related arguments | |
self.model = model | |
self.optimizer = optimizer | |
self.loss = loss | |
# Data related arguments | |
self.train_data = train_data | |
self.test_data = test_data | |
self.val_data = val_data | |
self.num_classes = kwargs.get('num_classes', None) | |
# Training related arguments | |
self.epochs = epochs | |
self.scaling = scaling | |
self.global_batch_size = global_batch_size | |
self.batch_denom = kwargs.get('batch_denom', 1) | |
self.shuffle = shuffle | |
# Use case related arguments | |
# For image classification | |
self.image_size = kwargs.get('image_size', None) | |
self.image_shape = kwargs.get('image_shape', None) | |
# For text classification | |
self.max_seq_length = kwargs.get('max_seq_length', None) | |
self.padding = kwargs.get('padding', None) | |
self.truncation = kwargs.get('truncation', None) | |
self.hf_bert_tokenizer = kwargs.get('hf_bert_tokenizer', None) | |
class DistributedTF: | |
def __init__(self) -> None: | |
hvd.init() | |
def prepare_dataset(self, dataset, use_case, global_batch_size, scaling, **kwargs): | |
if scaling.lower() == 'weak': | |
batch_size = global_batch_size | |
elif scaling.lower() == 'strong': | |
batch_size = global_batch_size // hvd.size() | |
if use_case == 'image_classification': | |
dataset = dataset.shard(num_shards=hvd.size(), index=hvd.rank()) | |
dataset = dataset.cache() | |
if 'map_func' in kwargs: | |
dataset = dataset.map(map_func=kwargs.get('map_func'), num_parallel_calls=tf.data.AUTOTUNE) | |
dataset = dataset.batch(batch_size) | |
dataset = dataset.prefetch(tf.data.AUTOTUNE) | |
elif use_case == 'text_classification': | |
max_seq_length = kwargs.get('max_seq_length') | |
bert_tokenizer = kwargs.get('hf_bert_tokenizer') | |
input_ids_shape = (len(dataset), max_seq_length) | |
attention_mask_shape = (len(dataset), max_seq_length) | |
input_ids = tf.zeros(input_ids_shape, dtype=tf.int32) | |
attention_mask = tf.zeros(attention_mask_shape, dtype=tf.int32) | |
labels = tf.ones(len(dataset), dtype=tf.int32) | |
# Preprocessing text could be done only on one worker and the tensors are synced later among workers | |
if hvd.rank() == 0: | |
dataset = [(sentence.numpy().decode(), label.numpy()) for sentence, label in dataset] | |
sentences = [x[0] for x in dataset] | |
labels = [x[1] for x in dataset] | |
print('Tokenizing the dataset...') | |
tokenized_dataset = bert_tokenizer(sentences, padding='max_length', max_length=max_seq_length, | |
truncation=True, return_tensors='tf') | |
input_ids = tokenized_dataset['input_ids'] | |
attention_mask = tokenized_dataset['attention_mask'] | |
labels = tf.convert_to_tensor(labels, dtype=tf.int32) | |
input_ids = hvd.allreduce(input_ids, average=False, name='barrier1') | |
attention_mask = hvd.allreduce(attention_mask, average=False, name='barrier2') | |
labels = hvd.allreduce(labels, average=False, name='labels') | |
dataset = ({ | |
'input_ids': input_ids, | |
'attention_mask': attention_mask | |
}, labels) | |
dataset = tf.data.Dataset.from_tensor_slices(dataset) | |
dataset = dataset.shard(hvd.size(), hvd.rank()) | |
dataset = dataset.cache() | |
dataset = dataset.batch(batch_size) | |
dataset = dataset.prefetch(tf.data.AUTOTUNE) | |
return dataset | |
def prepare_model(self, model_name, use_case, input_shape, num_classes, **kwargs): | |
# Try to get model url from TLT supported models | |
model_info = get_model_info(model_name, 'tensorflow', use_case) | |
if model_info != {}: | |
fw_enum = list(model_info.keys())[0] | |
model_name = model_info[fw_enum]['tensorflow']['feature_vector'] | |
if use_case == 'image_classification': | |
model = tf.keras.models.Sequential([ | |
hub.KerasLayer(model_name, input_shape=input_shape), | |
tf.keras.layers.Dense(num_classes, activation='softmax') | |
]) | |
elif use_case == 'text_classification': | |
bert_config = BertConfig.from_pretrained(model_name, output_hidden_states=True) | |
bert_model = TFBertModel.from_pretrained(model_name, config=bert_config, from_pt=True) | |
dense_layer_dims = 1 if num_classes == 2 else num_classes | |
input_ids = tf.keras.layers.Input(input_shape, dtype=tf.int32, name='input_ids') | |
attention_mask = tf.keras.layers.Input(input_shape, dtype=tf.int32, name='attention_mask') | |
bert_output = bert_model.bert(input_ids, attention_mask=attention_mask)[1] | |
classifier = tf.keras.layers.Dense(dense_layer_dims, activation=None, name='classifier')(bert_output) | |
model = tf.keras.Model(inputs=[input_ids, attention_mask], outputs=classifier) | |
return model | |
def launch_distributed_job(self, training_args: DistributedTrainingArguments): | |
model = training_args.model | |
optimizer = training_args.optimizer | |
loss = training_args.loss | |
# This is required if using intel-tensorflow==2.12.0 | |
optimizer = self._get_legacy_optimizer(optimizer) | |
# Horovod: pin GPU to be used to process local rank (one GPU per process) | |
gpus = tf.config.experimental.list_physical_devices('GPU') | |
for gpu in gpus: | |
tf.config.experimental.set_memory_growth(gpu, True) | |
if gpus: | |
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU') | |
if training_args.scaling.lower() == 'weak': | |
multiplier = np.sqrt(training_args.global_batch_size // training_args.batch_denom) | |
optimizer.lr = optimizer.lr * multiplier | |
batch_size = training_args.global_batch_size | |
elif training_args.scaling.lower() == 'strong': | |
optimizer.lr = optimizer.lr * hvd.size() | |
batch_size = training_args.global_batch_size // hvd.size() | |
if training_args.use_case == 'image_classification': | |
hvd_optimizer = hvd.DistributedOptimizer( | |
optimizer, backward_passes_per_step=5, average_aggregated_gradients=True) | |
elif training_args.use_case == 'text_classification': | |
hvd_optimizer = hvd.DistributedOptimizer( | |
optimizer, backward_passes_per_step=1, average_aggregated_gradients=True) | |
model.compile( | |
loss=loss, | |
optimizer=hvd_optimizer, | |
metrics=['acc'], | |
experimental_run_tf_function=False | |
) | |
warmup = 3 | |
if hvd.size() == 1: | |
warmup = 1 | |
callbacks = [] | |
# Horovod: broadcast initial variable states from rank 0 to all other processes. | |
callbacks.append(hvd.callbacks.BroadcastGlobalVariablesCallback(0)) | |
# Horovod: average metrics among workers at the end of every epoch. | |
callbacks.append(hvd.callbacks.MetricAverageCallback()) | |
# Horovod: using `lr = 1.0 * hvd.size()` from the very beginning leads to worse final accuracy. | |
callbacks.append(hvd.callbacks.LearningRateWarmupCallback( | |
initial_lr=optimizer.lr, warmup_epochs=warmup, verbose=1)) | |
# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them. | |
if hvd.rank() == 0: | |
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join( | |
os.environ['HOME'], 'model_checkpoints'), save_weights_only=False, monitor='val_acc', | |
mode='max', save_best_only=True) | |
callbacks.append(model_checkpoint_callback) | |
# Horovod: write logs on worker 0. | |
verbose = 1 if hvd.rank() == 0 else 0 | |
x_input_data = training_args.train_data | |
y_target_data = None | |
val_data = training_args.val_data | |
# Prepare dataset for Hugging Face text classification | |
if training_args.hf_bert_tokenizer: | |
bert_tokenizer_name = training_args.hf_bert_tokenizer | |
max_seq_length = training_args.max_seq_length | |
tokenized_data, labels = prepare_huggingface_input_data(x_input_data, bert_tokenizer_name, max_seq_length) | |
x_input_data = [tokenized_data['input_ids'], tokenized_data['attention_mask']] | |
y_target_data = tf.convert_to_tensor(labels) | |
if training_args.val_data: | |
tokenized_val_data, val_labels = prepare_huggingface_input_data(training_args.val_data, | |
bert_tokenizer_name, max_seq_length) | |
val_data = ([tokenized_val_data['input_ids'], tokenized_val_data['attention_mask']], | |
tf.convert_to_tensor(val_labels)) | |
start = time.time() | |
steps_per_epoch_per_worker = len(training_args.train_data) // batch_size | |
steps_per_epoch_per_worker = steps_per_epoch_per_worker // hvd.size() | |
if hvd.size() > 2: | |
steps_per_epoch_per_worker += 1 | |
self.history = model.fit( | |
x=x_input_data, | |
y=y_target_data, | |
validation_data=val_data, | |
callbacks=callbacks, | |
steps_per_epoch=steps_per_epoch_per_worker, | |
epochs=training_args.epochs, | |
initial_epoch=0, | |
verbose=verbose | |
) | |
end = time.time() | |
if hvd.rank() == 0: | |
print("Total elapsed time in seconds = ", end - start) | |
print("Total elapsed time in minutes = ", ((end - start) / 60)) | |
print("Total epochs = ", len(self.history.history['loss'])) | |
print("Time per epoch in seconds = ", ((end - start) / len(self.history.history['loss']))) | |
print("Maximum validation accuracy = ", np.max(self.history.history['val_acc'])) | |
def _get_legacy_optimizer(self, optimizer): | |
optimizer_config = optimizer.get_config() | |
optimizer_name = optimizer_config['name'] | |
legacy_optimizer_class = locate('tensorflow.keras.optimizers.legacy.{}'.format(optimizer_name)) | |
if legacy_optimizer_class is None: | |
# No matching legacy optimizer is found. | |
return optimizer | |
legacy_optimizer_config = legacy_optimizer_class().get_config() | |
legacy_optimizer = legacy_optimizer_class.from_config( | |
{k: v for k, v in optimizer_config.items() if k in legacy_optimizer_config} | |
) | |
return legacy_optimizer | |
def load_saved_objects(self, saved_objects_dir): | |
# Load the saved_model.pb | |
model = tf.keras.models.load_model(filepath=saved_objects_dir, compile=False) | |
# Load the optimizer and restore its state | |
checkpoint = tf.train.Checkpoint(optimizer=tf.optimizers.Adam()) | |
checkpoint.restore(os.path.join(saved_objects_dir, 'saved_optimizer-1')) | |
# Load the saved loss class name and instatiate the loss | |
with open(os.path.join(saved_objects_dir, 'saved_loss'), 'rb') as f: | |
loss_class, loss_args = dill.load(f) # nosec: B301 | |
# load the dataset(s) | |
train_data = tf.data.Dataset.load(os.path.join(saved_objects_dir, 'train_data')) | |
try: | |
val_data = tf.data.Dataset.load(os.path.join(saved_objects_dir, 'val_data')) | |
except FileNotFoundError: | |
val_data = None | |
if loss_class is None: | |
dataset = train_data.unbatch() | |
dataset = list(dataset.as_numpy_iterator()) | |
labels = list() | |
for _, label in dataset: | |
labels.append(label) | |
loss = tf.keras.losses.BinaryCrossentropy(from_logits=True) if len(set(labels)) == 2 else \ | |
tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) | |
else: | |
loss = loss_class(**loss_args) | |
return (model, checkpoint.optimizer, loss, train_data, val_data) | |