#!/usr/bin/env/python3 """Recipe for training a whisper-based ctc ASR system with librispeech. The system employs whisper from OpenAI (https://cdn.openai.com/papers/whisper.pdf). This recipe take only the whisper encoder and add a DNN + CTC to fine-tune. If you want to use the full whisper system, please refer to the recipe speechbrain/recipes/LibriSpeech/ASR/transformer/train_with_whisper.py To run this recipe, do the following: > python train_with_whisper.py hparams/train_hf_whisper_encoder.yaml Authors * Titouan Parcollet 2022 * Rudolf A Braun 2022 * Sung-Lin Yeh 2021 * Ju-Chieh Chou 2020 * Mirco Ravanelli 2020 * Abdel Heba 2020 * Peter Plantinga 2020 * Samuele Cornell 2020 """ import os import sys import torch import logging import speechbrain as sb from speechbrain.utils.distributed import run_on_main from speechbrain.tokenizers.SentencePiece import SentencePiece from speechbrain.utils.data_utils import undo_padding from hyperpyyaml import load_hyperpyyaml from pathlib import Path logger = logging.getLogger(__name__) # Define training procedure class ASR(sb.Brain): def compute_forward(self, batch, stage): """Forward computations from the waveform batches to the output probabilities.""" batch = batch.to(self.device) wavs, wav_lens = batch.sig wavs, wav_lens = wavs.to(self.device), wav_lens.to(self.device) # Add augmentation if specified if stage == sb.Stage.TRAIN: if hasattr(self.hparams, "augmentation"): wavs = self.hparams.augmentation(wavs, wav_lens) # Forward pass # Encode with Whisper and then DNN feats = self.modules.whisper(wavs) x = self.modules.enc(feats) # Compute outputs p_tokens = None logits = self.modules.ctc_lin(x) p_ctc = self.hparams.log_softmax(logits) if stage != sb.Stage.TRAIN: p_tokens = sb.decoders.ctc_greedy_decode( p_ctc, wav_lens, blank_id=self.hparams.blank_index ) return p_ctc, wav_lens, p_tokens def compute_objectives(self, predictions, batch, stage): """Computes the loss (CTC) given predictions and targets.""" p_ctc, wav_lens, predicted_tokens = predictions ids = batch.id tokens, tokens_lens = batch.tokens loss_ctc = self.hparams.ctc_cost(p_ctc, tokens, wav_lens, tokens_lens) loss = loss_ctc if stage != sb.Stage.TRAIN: # Decode token terms to words predicted_words = self.tokenizer( predicted_tokens, task="decode_from_list" ) # Convert indices to words target_words = undo_padding(tokens, tokens_lens) target_words = self.tokenizer(target_words, task="decode_from_list") self.wer_metric.append(ids, predicted_words, target_words) self.cer_metric.append(ids, predicted_words, target_words) return loss def fit_batch(self, batch): should_step = self.step % self.grad_accumulation_factor == 0 # Managing automatic mixed precision if self.auto_mix_prec: self.whisper_optimizer.zero_grad() self.model_optimizer.zero_grad() with torch.cuda.amp.autocast(): outputs = self.compute_forward(batch, sb.Stage.TRAIN) loss = self.compute_objectives(outputs, batch, sb.Stage.TRAIN) self.scaler.scale(loss / self.grad_accumulation_factor).backward() if should_step: self.scaler.unscale_(self.whisper_optimizer) self.scaler.unscale_(self.model_optimizer) if self.check_gradients(loss): if self.optimizer_step > self.hparams.warmup_steps: # Here we added a warmup to the CTC encoder to make sure that # it does not screw the whisper with too large gradients. self.scaler.step(self.whisper_optimizer) self.scaler.step(self.model_optimizer) self.scaler.update() self.optimizer_step += 1 else: outputs = self.compute_forward(batch, sb.Stage.TRAIN) loss = self.compute_objectives(outputs, batch, sb.Stage.TRAIN) (loss / self.grad_accumulation_factor).backward() if should_step: if self.check_gradients(loss): # Here we added a warmup to the CTC encoder to make sure that # it does not screw the whisper with too large gradients. if self.optimizer_step > self.hparams.warmup_steps: self.whisper_optimizer.step() self.model_optimizer.step() self.whisper_optimizer.zero_grad() self.model_optimizer.zero_grad() self.optimizer_step += 1 return loss.detach().cpu() def on_stage_start(self, stage, epoch): """Gets called at the beginning of each epoch""" if stage != sb.Stage.TRAIN: self.cer_metric = self.hparams.cer_computer() self.wer_metric = self.hparams.error_rate_computer() def on_stage_end(self, stage, stage_loss, epoch): """Gets called at the end of an epoch.""" # Compute/store important stats stage_stats = {"loss": stage_loss} if stage == sb.Stage.TRAIN: self.train_stats = stage_stats else: stage_stats["CER"] = self.cer_metric.summarize("error_rate") stage_stats["WER"] = self.wer_metric.summarize("error_rate") # Perform end-of-iteration things, like annealing, logging, etc. if stage == sb.Stage.VALID: old_lr_model, new_lr_model = self.hparams.lr_annealing_model( stage_stats["loss"] ) old_lr_whisper, new_lr_whisper = self.hparams.lr_annealing_whisper( stage_stats["loss"] ) sb.nnet.schedulers.update_learning_rate( self.model_optimizer, new_lr_model ) sb.nnet.schedulers.update_learning_rate( self.whisper_optimizer, new_lr_whisper ) self.hparams.train_logger.log_stats( stats_meta={ "epoch": epoch, "lr_model": old_lr_model, "lr_whisperc": old_lr_whisper, }, train_stats=self.train_stats, valid_stats=stage_stats, ) self.checkpointer.save_and_keep_only( meta={"WER": stage_stats["WER"]}, min_keys=["WER"], ) elif stage == sb.Stage.TEST: self.hparams.train_logger.log_stats( stats_meta={"Epoch loaded": self.hparams.epoch_counter.current}, test_stats=stage_stats, ) with open(self.hparams.wer_file, "w") as w: self.wer_metric.write_stats(w) def init_optimizers(self): "Initializes the whisper optimizer and model optimizer" self.whisper_optimizer = self.hparams.whisper_opt_class( self.modules.whisper.parameters() ) self.model_optimizer = self.hparams.model_opt_class( self.hparams.model.parameters() ) if self.checkpointer is not None: self.checkpointer.add_recoverable( "whisper_opt", self.whisper_optimizer ) self.checkpointer.add_recoverable("modelopt", self.model_optimizer) def dataio_prepare(hparams, tokenizer): """This function prepares the datasets to be used in the brain class. It also defines the data processing pipeline through user-defined functions.""" data_folder = hparams["data_folder"] train_data = sb.dataio.dataset.DynamicItemDataset.from_csv( csv_path=hparams["train_csv"], replacements={"data_root": data_folder}, ) if hparams["sorting"] == "ascending": # we sort training data to speed up training and get better results. train_data = train_data.filtered_sorted(sort_key="duration") # when sorting do not shuffle in dataloader ! otherwise is pointless hparams["train_dataloader_opts"]["shuffle"] = False elif hparams["sorting"] == "descending": train_data = train_data.filtered_sorted( sort_key="duration", reverse=True ) # when sorting do not shuffle in dataloader ! otherwise is pointless hparams["train_dataloader_opts"]["shuffle"] = False elif hparams["sorting"] == "random": pass else: raise NotImplementedError( "sorting must be random, ascending or descending" ) valid_data = sb.dataio.dataset.DynamicItemDataset.from_csv( csv_path=hparams["valid_csv"], replacements={"data_root": data_folder}, ) valid_data = valid_data.filtered_sorted(sort_key="duration") # test is separate test_datasets = {} for csv_file in hparams["test_csv"]: name = Path(csv_file).stem test_datasets[name] = sb.dataio.dataset.DynamicItemDataset.from_csv( csv_path=csv_file, replacements={"data_root": data_folder} ) test_datasets[name] = test_datasets[name].filtered_sorted( sort_key="duration" ) datasets = [train_data, valid_data] + [i for k, i in test_datasets.items()] # 2. Define audio pipeline: @sb.utils.data_pipeline.takes("wav") @sb.utils.data_pipeline.provides("sig") def audio_pipeline(wav): sig = sb.dataio.dataio.read_audio(wav) return sig sb.dataio.dataset.add_dynamic_item(datasets, audio_pipeline) # 3. Define text pipeline: @sb.utils.data_pipeline.takes("wrd") @sb.utils.data_pipeline.provides( "wrd", "char_list", "tokens_list", "tokens" ) def text_pipeline(wrd): yield wrd char_list = list(wrd) yield char_list tokens_list = tokenizer.sp.encode_as_ids(wrd) yield tokens_list tokens = torch.LongTensor(tokens_list) yield tokens sb.dataio.dataset.add_dynamic_item(datasets, text_pipeline) # 4. Set output: sb.dataio.dataset.set_output_keys( datasets, ["id", "sig", "wrd", "char_list", "tokens"], ) return train_data, valid_data, test_datasets if __name__ == "__main__": # CLI: hparams_file, run_opts, overrides = sb.parse_arguments(sys.argv[1:]) # If distributed_launch=True then # create ddp_group with the right communication protocol sb.utils.distributed.ddp_init_group(run_opts) with open(hparams_file) as fin: hparams = load_hyperpyyaml(fin, overrides) # Create experiment directory sb.create_experiment_directory( experiment_directory=hparams["output_folder"], hyperparams_to_save=hparams_file, overrides=overrides, ) # Dataset prep (parsing Librispeech) from librispeech_prepare import prepare_librispeech # noqa # multi-gpu (ddp) save data preparation run_on_main( prepare_librispeech, kwargs={ "data_folder": hparams["data_folder"], "tr_splits": hparams["train_splits"], "dev_splits": hparams["dev_splits"], "te_splits": hparams["test_splits"], "save_folder": hparams["output_folder"], "merge_lst": hparams["train_splits"], "merge_name": "train.csv", "skip_prep": hparams["skip_prep"], }, ) # Defining tokenizer and loading it tokenizer = SentencePiece( model_dir=hparams["save_folder"], vocab_size=hparams["output_neurons"], annotation_train=hparams["train_csv"], annotation_read="wrd", model_type=hparams["token_type"], character_coverage=hparams["character_coverage"], ) # here we create the datasets objects as well as tokenization and encoding train_data, valid_data, test_datasets = dataio_prepare(hparams, tokenizer) # Trainer initialization asr_brain = ASR( modules=hparams["modules"], hparams=hparams, run_opts=run_opts, checkpointer=hparams["checkpointer"], ) # We load the pretrained whisper model if "pretrainer" in hparams.keys(): run_on_main(hparams["pretrainer"].collect_files) hparams["pretrainer"].load_collected(asr_brain.device) # We dynamicaly add the tokenizer to our brain class. # NB: This tokenizer corresponds to the one used for the LM!! asr_brain.tokenizer = tokenizer # Training asr_brain.fit( asr_brain.hparams.epoch_counter, train_data, valid_data, train_loader_kwargs=hparams["train_dataloader_opts"], valid_loader_kwargs=hparams["valid_dataloader_opts"], ) # Testing for k in test_datasets.keys(): # keys are test_clean, test_other etc asr_brain.hparams.wer_file = os.path.join( hparams["output_folder"], "wer_{}.txt".format(k) ) asr_brain.evaluate( test_datasets[k], test_loader_kwargs=hparams["test_dataloader_opts"] )