Generative-Sequence-Chatbot / chatbotTrainer.py
Hyacinthax's picture
Fixed generate_response
3a7ba3c verified
import os
import re
import numpy as np
import tensorflow
from keras.callbacks import Callback, ReduceLROnPlateau
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Input, LSTM, Dense, Embedding, Dropout, Flatten
from tensorflow.keras.regularizers import l2
from tensorflow.keras.models import Model, load_model, model_from_json
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
import logging
import heapq
import pickle
import time
import json
import pdb
tensorflow.keras.mixed_precision.set_global_policy('mixed_float16')
class BeamSearchHelper:
def __init__(self, model, tokenizer, max_seq_length, encoder_filename, decoder_filename, top_k=5,
temperature=1.0, top_p=0.9, beam_width=3, scaling_factor=10, min_word=3):
self.model = model
self.tokenizer = tokenizer
self.max_seq_length = max_seq_length
self.top_k = top_k
self.encoder_filename = encoder_filename
self.decoder_filename = decoder_filename
self.temperature = temperature
self.scaling_factor = scaling_factor
self.top_p = top_p
self.beam_width = beam_width
self.min_word = min_word
self.logger = self.setup_logger()
def setup_logger(self):
logger = logging.getLogger("ChatbotBeamSearch")
logger.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter('%(levelname)s: %(message)s')
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
file_handler = logging.FileHandler("chatbotBeam.log")
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
return logger
def beam_search(self, input_text):
# Load encoder and decoder models
encoder_model = load_model(self.encoder_filename)
decoder_model = load_model(self.decoder_filename)
# Preprocess input
input_seqs = self.tokenizer.texts_to_sequences([input_text])
input_seqs = pad_sequences(input_seqs, maxlen=self.max_seq_length, padding='post')
# Encode input sequence
encoder_states = encoder_model.predict(input_seqs)
state_h, state_c = encoder_states
# Ensure batch size of 1
state_h = state_h[0:1, :]
state_c = state_c[0:1, :]
# Initialize decoder with <start> token
start_token_index = self.tokenizer.word_index.get('<start>', 1)
target_seq = np.zeros((1, 1))
target_seq[0, 0] = start_token_index
# Initialize beam search candidates
sequences = [(target_seq, state_h, state_c, 0.0, [])] # (seq, h, c, score, decoded_words)
for _ in range(self.max_seq_length):
all_candidates = []
for seq, state_h, state_c, score, decoded_words in sequences:
# Predict the next token
output_tokens, state_h, state_c = decoder_model.predict([seq, state_h, state_c])
logits = output_tokens[0, -1, :] * self.scaling_factor
logits = logits / self.temperature
exp_logits = np.exp(logits - np.max(logits)) # Prevent overflow
probabilities = exp_logits / np.sum(exp_logits)
# Get the top beam_width candidate indices
top_indices = np.argsort(probabilities)[-self.beam_width:]
for idx in top_indices:
prob = probabilities[idx]
candidate_score = (score - np.log(prob + 1e-8)) / (len(decoded_words) + 1) # Normalize by length
# Append predicted token
new_decoded_words = decoded_words + [idx]
new_seq = np.copy(seq)
new_seq[0, 0] = idx # Set new token in sequence
# Enforce min_word before stopping at <end>
if idx == self.tokenizer.word_index.get('<end>', -1):
if len(new_decoded_words) < self.min_word:
continue # Ignore <end> if min_word isn't reached
else:
return " ".join(self.tokenizer.index_word[i] for i in new_decoded_words if i in self.tokenizer.index_word)
# Add to candidate list
all_candidates.append((new_seq, state_h, state_c, candidate_score, new_decoded_words))
# Select best beam_width sequences
if not all_candidates: # If no valid candidates, exit early
break
sequences = sorted(all_candidates, key=lambda x: x[3])[:self.beam_width]
# Convert token indices back to words
best_sequence = sequences[0][4] # Get best decoded words
return " ".join(self.tokenizer.index_word[idx] for idx in best_sequence if idx in self.tokenizer.index_word)
class BeamState:
def __init__(self, sequence, score, state, logger):
self.sequence = sequence
self.score = score
self.state = state
self.logger = logger
def __lt__(self, other):
return self.score < other.score
def log(self, message):
self.logger.debug(message)
class MonitorEarlyStopping(Callback):
def __init__(self, monitor='val_loss', patience=3, mode='min', restore_best_weights=True, verbose=1):
super(MonitorEarlyStopping, self).__init__()
self.monitor = monitor
self.patience = patience
self.mode = mode
self.restore_best_weights = restore_best_weights
self.verbose = verbose
self.best_weights = None
self.best_epoch = None
self.wait = 0
self.best_value = float('inf') if mode == 'min' else -float('inf')
self.stopped_epoch_list = [] # List to track stopped epochs
def on_epoch_end(self, epoch, logs=None):
current_value = logs.get(self.monitor)
if current_value is None:
if self.verbose > 0:
print(f"Warning: Metric '{self.monitor}' is not available in logs.")
return
# Check for improvement based on mode
if (self.mode == 'min' and current_value < self.best_value) or (self.mode == 'max' and current_value > self.best_value):
self.best_value = current_value
self.best_weights = self.model.get_weights()
self.best_epoch = epoch
self.wait = 0
if self.verbose > 0:
print(f"Epoch {epoch + 1}: {self.monitor} improved to {self.best_value:.4f}")
else:
self.wait += 1
if self.verbose > 0:
print(f"Epoch {epoch + 1}: {self.monitor} did not improve. Patience: {self.wait}/{self.patience}")
self.stopped_epoch_list.append(epoch + 1)
# Stop training if patience is exceeded
if self.wait >= self.patience:
if self.verbose > 0:
print(f"Stopping early at epoch {epoch + 1}. Best {self.monitor}: {self.best_value:.4f} at epoch {self.best_epoch + 1}")
self.model.stop_training = True
if self.restore_best_weights:
if self.verbose > 0:
print(f"Restoring best model weights from epoch {self.best_epoch + 1}.")
self.model.set_weights(self.best_weights)
class ChatbotTrainer:
def __init__(self):
# Corpus Setup
self.corpus = None
self.all_vocab_size = 0
# Model Setup
self.model = None
self.name = "Alex"
self.model_filename = f"{self.name}_model.keras"
self. encoder_filename = "encoder.keras"
self.decoder_filename = "decoder.keras"
self.tokenizer_save_path = "chatBotTokenizer.pkl"
self.tokenizer = None
self.reverse_tokenizer = None
self.embedding_dim = 64
self.max_seq_length = 64
self.learning_rate = 0.0013
self.optimizer = Adam(learning_rate=self.learning_rate, clipnorm=1.0)
self.batch_size = 16
self.epochs = 30
self.early_patience = self.epochs // 2
self.lstm_units = 128
self.dropout = 0.1
self.recurrent_dropout = 0.1
self.test_size = 0.2
self.max_vocabulary = 69000
# Model but instantiated here but filled later
self.encoder_model = None
self.encoder_inputs = None
self.decoder_inputs = None
self.decoder_outputs = None
self.decoder_model = None
self.max_vocab_size = None
self.config = None
# Training Setup
self.vocabularyList = []
self.troubleList = []
self.running_trouble = []
# Prediction Setup (Everything here will take priority)
self.min_word = 10 # Only for generate_response
self.temperature = 1
self.scaling_factor = 1
self.logger = self.setup_logger() # Initialize your logger here
self.beam_width = 9
self.top_p = 0.7
self.top_k = 3
# Log Metrics...
self.logger.info(f"""Metrics:\n
Embedding/MaxSeqLength:({self.embedding_dim}, {self.max_seq_length})\n
Batch Size: {self.batch_size}\n
LSTM Units: {self.lstm_units}\n
Epochs: {self.epochs}\n
Dropout: ({self.dropout}, {self.recurrent_dropout})\n
Test Split: {self.test_size}\n\n""")
# Tokenizer setup & propagation
if os.path.exists(self.tokenizer_save_path):
with open(self.tokenizer_save_path, 'rb') as tokenizer_load_file:
self.tokenizer = pickle.load(tokenizer_load_file)
self.reverse_tokenizer = {index: word for word, index in self.tokenizer.word_index.items()}
self.all_vocab_size = self.tokenizer.num_words
for words, i in self.tokenizer.word_index.items():
if words not in self.vocabularyList:
self.vocabularyList.append(words)
self.logger.info("Tokenizer loaded successfully.")
# print(f"Number of words in loaded tokenizer: {len(self.tokenizer.word_index)}")
# print(f"Number of words in the Vocab List: {len(self.vocabularyList)}")
else:
self.logger.warning("Tokenizer not found, making now... ")
self.tokenizer = Tokenizer(num_words=None, filters='!"#$%&()*+,-/.:;=?@[\\]^_`{|}~\t\n')
# Save '<OOV>', '<start>', and '<end>' to word index
self.tokenizer.num_words = 0
self.vocabularyList = ['<start>', '<end>']
for token in self.vocabularyList:
if token not in self.tokenizer.word_index:
self.tokenizer.word_index[token] = self.tokenizer.num_words
self.tokenizer.index_word[self.tokenizer.num_words] = token
self.all_vocab_size += 1
self.tokenizer.num_words += 1
# Set Tokenizer Values:
self.tokenizer.num_words = len(self.tokenizer.word_index)
self.tokenizer.oov_token = "<oov>"
self.logger.info(f"New Tokenizer Index's: {self.tokenizer.word_index}")
# Debug Lines
# for token in ['<start>', '<end>', '<oov>']:
# print(f"Index of {token}: {self.tokenizer.word_index.get(token)}")
# Debug Line
# print(list(self.tokenizer.word_index.keys()))
if os.path.exists(self.model_filename) and os.path.exists(self.encoder_filename) and os.path.exists(self.decoder_filename):
self.model, self.encoder_model, self.decoder_model =self.load_model_file()
def save_full_weights(self, encoder_path="encoder.weights.h5", decoder_path="decoder.weights.h5"):
if self.encoder_model is not None and self.decoder_model is not None:
if os.path.exists(encoder_path):
os.remove(encoder_path)
if os.path.exists(decoder_path):
os.remove(decoder_path)
self.encoder_model.save_weights(encoder_path)
self.decoder_model.save_weights(decoder_path)
self.logger.info(f"Encoder weights saved at {encoder_path}.")
self.logger.info(f"Decoder weights saved at {decoder_path}.")
else:
self.logger.warning(
"Encoder or Decoder model does not exist. Ensure models are initialized before saving weights.")
def load_corpus(self, corpus_path):
import convokit
self.logger.info("Loading and preprocessing corpus...")
self.corpus = convokit.Corpus(filename=corpus_path)
self.logger.info("Corpus loaded and preprocessed successfully.")
def load_full_weights(self, encoder_path="encoder.weights.h5", decoder_path="decoder.weights.h5"):
if self.encoder_model is not None and self.decoder_model is not None:
self.encoder_model.load_weights(encoder_path)
self.decoder_model.load_weights(decoder_path)
self.logger.info(f"Encoder weights loaded from {encoder_path}.")
self.logger.info(f"Decoder weights loaded from {decoder_path}.")
else:
self.logger.warning(
"Encoder or Decoder model does not exist. Ensure models are initialized before loading weights.")
def plot_and_save_training_metrics(self, history, speaker):
# Plot training metrics such as loss and accuracy
plt.figure(figsize=(10, 6))
# Plot training loss
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
# Plot training accuracy
plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
# Save the plot as an image file
# plot_filename = f"{speaker}_training_metrics.png"
# plt.tight_layout()
# plt.savefig(plot_filename) # Save the plot as an image
# plt.close() # Close the plot to free up memory
return "Did Not Save in Jupyter Notebook. See plot_and_save_training_metrics"
def setup_logger(self):
logger = logging.getLogger("ChatbotTrainer")
logger.setLevel(logging.DEBUG)
# Create console handler and set level to INFO for progress reports
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter('%(levelname)s: %(message)s')
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
# Create a file handler and set level to DEBUG for progress reports and ERROR for error notifications
file_handler = logging.FileHandler("chatbot.log")
file_handler.setLevel(logging.DEBUG) # Set level to DEBUG to capture progress reports
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
return logger
# This function allows to reformat the embedding weights to a new max_vocabulary
# If max_vocabulary(defined in build_model) is changed incrementally (or set large to begin with; this is N/A)
def redo_embeddings(self):
# Get current embedding weights
old_embedding_weights = self.model.get_layer("embedding").get_weights()[0]
# Define new max vocabulary size
new_vocab_size = self.max_vocabulary # Set this to the updated size
embedding_dim = old_embedding_weights.shape[1]
# Expand the embedding matrix
new_embedding_weights = np.random.normal(size=(new_vocab_size, embedding_dim)) # Initialize new words randomly
new_embedding_weights[:old_embedding_weights.shape[0], :] = old_embedding_weights # Keep old weights
# Replace the embedding layer
self.model.get_layer("embedding").set_weights([new_embedding_weights])
def save_tokenizer(self, texts=None):
if self.tokenizer:
if texts:
for token in texts:
if token not in self.tokenizer.word_index and self.tokenizer.num_words < self.max_vocabulary:
self.tokenizer.word_index[token] = self.tokenizer.num_words
self.all_vocab_size += 1
self.tokenizer.num_words += 1
# Debug Line
# print(f"Word: {token}\nIndex: {self.tokenizer.num_words}")
self.max_vocab_size = self.tokenizer.num_words
self.tokenizer.fit_on_texts(texts)
with open(self.tokenizer_save_path, 'wb') as tokenizer_save_file:
pickle.dump(self.tokenizer, tokenizer_save_file)
self.tokenizer.num_words = len(self.tokenizer.word_index)
elif self.tokenizer == None:
self.logger.warning("No tokenizer to save.")
def save_embedding_weights(self, filepath="embedding_weights.npy"):
if self.model is not None:
embedding_layer = self.model.get_layer('embedding')
# Extract the weights
embedding_weights = embedding_layer.get_weights()[0] # Weights are stored as a list, take the first element
# Save weights to a file
if os.path.exists(filepath):
os.remove(filepath)
np.save(filepath, embedding_weights)
self.logger.info(f"Embedding weights saved successfully at {filepath}.")
else:
self.logger.warning("No model exists to extract embedding weights.")
def load_embedding_weights(self, filepath="embedding_weights.npy"):
if self.model is not None:
embedding_layer = self.model.get_layer('embedding')
# Load weights from the file
embedding_weights = np.load(filepath)
# Ensure the weights shape matches the layer's expected shape
if embedding_layer.input_dim == embedding_weights.shape[0] and embedding_layer.output_dim == \
embedding_weights.shape[1]:
embedding_layer.set_weights([embedding_weights])
self.logger.info(f"Embedding weights loaded successfully from {filepath}.")
else:
self.logger.error("Mismatch in embedding weights shape. Ensure the model and weights are compatible.")
else:
self.logger.warning("No model exists to load embedding weights into.")
def clean_text(self, text):
txt = text.lower().strip()
# Contraction mapping (expanded)
contractions = {
"i'm": "i am", "he's": "he is", "she's": "she is", "that's": "that is",
"what's": "what is", "where's": "where is", "who's": "who is", "how's": "how is",
"it's": "it is", "let's": "let us", "they're": "they are", "we're": "we are",
"you're": "you are", "i've": "i have", "you've": "you have", "we've": "we have",
"they've": "they have", "i'd": "i would", "you'd": "you would", "he'd": "he would",
"she'd": "she would", "we'd": "we would", "they'd": "they would", "i'll": "i will",
"you'll": "you will", "he'll": "he will", "she'll": "she will", "we'll": "we will",
"they'll": "they will", "don't": "do not", "doesn't": "does not", "didn't": "did not",
"won't": "will not", "wouldn't": "would not", "can't": "cannot", "couldn't": "could not",
"shouldn't": "should not", "mightn't": "might not", "mustn't": "must not",
"isn't": "is not", "aren't": "are not", "wasn't": "was not", "weren't": "were not",
"haven't": "have not", "hasn't": "has not", "hadn't": "had not"
}
# Expand contractions
for contraction, expansion in contractions.items():
txt = re.sub(r"\b" + re.escape(contraction) + r"\b", expansion, txt)
# Remove unwanted characters but keep apostrophes
txt = re.sub(r"[^a-zA-Z0-9' ]", " ", txt) # Keep words, numbers, and apostrophes
txt = re.sub(r"\s+", " ", txt).strip() # Remove extra spaces
# Preserve words in vocabulary list
for word in txt.split():
if word not in self.vocabularyList:
self.vocabularyList.append(word)
self.save_tokenizer(self.vocabularyList)
return txt
# Training
def preprocess_texts(self, input_texts, target_texts):
input_texts = [self.clean_text(text) for text in input_texts.split(" ")]
target_texts = [self.clean_text(text) for text in target_texts.split(" ")]
# Initialize lists to store processed inputs and targets
input_texts = [f"<start> {texts} <end>" for texts in input_texts if input_texts and input_texts != ""]
target_texts = [f"<start> {texts} <end>" for texts in target_texts if target_texts and target_texts != ""]
input_sequences = self.tokenizer.texts_to_sequences(input_texts)
target_sequences = self.tokenizer.texts_to_sequences(target_texts)
input_sequences = pad_sequences(input_sequences, maxlen=self.max_seq_length, padding='post')
target_sequences = pad_sequences(target_sequences, maxlen=self.max_seq_length, padding='post')
# Ensure target_sequences has enough samples
if target_sequences.shape[0] != input_sequences.shape[0]:
print(f"Padding mismatch! Input: {input_sequences.shape}, Target: {target_sequences.shape}")
if target_sequences.shape[0] < input_sequences.shape[0]:
target_sequences = np.resize(target_sequences, input_sequences.shape)
if target_sequences.shape[0] > input_sequences.shape[0]:
target_sequences = np.resize(input_sequences, target_sequences.shape)
return input_sequences, target_sequences
# Prediction
def preprocess_input(self, texts):
preprocessed_input = ["<start>"]
texts = self.clean_text(texts)
preprocessed_text = texts.lower().split(" ")
preprocessed_input.extend(preprocessed_text)
preprocessed_input.append("<end>")
# Convert words to token IDs
preprocessed_input = self.tokenizer.texts_to_sequences([preprocessed_input])
preprocessed_input = [item for sublist in preprocessed_input for item in sublist] # Flatten
preprocessed_input = np.array(preprocessed_input).reshape(1, -1) # (1, length)
preprocessed_input = pad_sequences(preprocessed_input, maxlen=self.max_seq_length, padding='post')
# ("Final Input Shape:", preprocessed_input.shape) # Debugging
return preprocessed_input
def build_model(self):
if not self.model:
# Encoder
self.encoder_inputs = Input(shape=(self.max_seq_length,))
encoder_embedding = Embedding(
input_dim=self.max_vocabulary,
output_dim=self.embedding_dim,
mask_zero=True,
embeddings_regularizer=l2(0.01)
)(self.encoder_inputs)
encoder_lstm = LSTM(
self.lstm_units,
return_state=True,
return_sequences=False,
dropout=self.dropout,
recurrent_dropout=self.recurrent_dropout
)
_, state_h, state_c = encoder_lstm(encoder_embedding)
encoder_states = [state_h, state_c]
self.encoder_model = Model(self.encoder_inputs, encoder_states)
# Decoder
self.decoder_inputs = Input(shape=(None,), name='decoder_input')
decoder_embedding = Embedding(
input_dim=self.max_vocabulary,
output_dim=self.embedding_dim,
mask_zero=True
)(self.decoder_inputs)
decoder_lstm = LSTM(
self.lstm_units,
return_sequences=True,
return_state=True,
dropout=self.dropout,
recurrent_dropout=self.recurrent_dropout,
kernel_regularizer=l2(0.001)
)
decoder_state_input_h = Input(shape=(self.lstm_units,))
decoder_state_input_c = Input(shape=(self.lstm_units,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
decoder_lstm_output, state_h, state_c = decoder_lstm(decoder_embedding, initial_state=decoder_states_inputs)
decoder_states = [state_h, state_c]
decoder_dense = Dense(self.max_vocabulary, activation='softmax', kernel_regularizer=l2(0.001), bias_regularizer=l2(0.001))
self.decoder_outputs = decoder_dense(decoder_lstm_output)
self.decoder_model = Model([self.decoder_inputs] + decoder_states_inputs,
[self.decoder_outputs] + decoder_states)
# Combine encoder and decoder into the full model
decoder_lstm_output, _, _ = decoder_lstm(decoder_embedding, initial_state=encoder_states)
self.decoder_outputs = decoder_dense(decoder_lstm_output)
self.model = Model([self.encoder_inputs, self.decoder_inputs], self.decoder_outputs)
self.model.compile(
optimizer=self.optimizer,
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return self.model, self.encoder_model, self.decoder_model
def load_model_config(self, config_filename="model_config.json"):
if os.path.exists(config_filename):
with open(config_filename, "r", encoding="utf-8") as f:
data = json.load(f)
self.logger.info(f"Loading model config from {config_filename}")
# Rebuild model from config
self.model = model_from_json(data["model_config"])
# Rebuild optimizer
self.optimizer = Adam.from_config(data["optimizer"])
# Compile model with restored optimizer
self.model.compile(
optimizer=self.optimizer,
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
self.logger.info("Model compiled successfully after loading config.")
return self.model
return None
def train_model(self, input_texts, target_texts, conversation_id, speaker):
# We Define running_trouble at the start of a new training
self.running_trouble = []
# We make sure everything to do with the model is loaded properly, or generated if it doesn't exist
loaded_model = self.load_model_config(config_filename="model_config.json")
if os.path.exists(self.model_filename) and os.path.exists(self.encoder_filename) and os.path.exists(
self.decoder_filename):
self.model, self.encoder_model, self.decoder_model = self.load_model_file()
self.logger.info("Loaded full model from saved files.")
elif not os.path.exists(self.model_filename) and not os.path.exists(self.encoder_filename) and not os.path.exists(
self.decoder_filename) and loaded_model:
self.model = loaded_model
elif not self.model and not self.encoder_model and not self.decoder_model:
self.logger.info("Building new model...")
self.model, self.encoder_model, self.decoder_model = self.build_model()
# Once everything loads properly we start training:
self.logger.info(f"Training Model for ConversationID: {conversation_id}")
if self.corpus is None or self.tokenizer is None:
raise ValueError("Corpus or tokenizer is not initialized.")
# Preprocess the texts into sequences
input_sequences, target_sequences = self.preprocess_texts(input_texts, target_texts)
# Debug Lines
# for token in ['<start>', '<end>', '<oov>']:
# print(f"Index of {token}: {self.tokenizer.word_index.get(token)}")
# Stats
self.logger.info(f"Num Words: {self.tokenizer.num_words}")
self.logger.info(f"Vocabulary Size: {len(self.tokenizer.word_index)}")
self.logger.info(f"Length of Vocabulary List: {len(self.vocabularyList)}")
# Prepare training data
encoder_input_data = input_sequences
decoder_input_data = target_sequences[:, :-1]
decoder_target_data = target_sequences[:, 1:]
self.logger.info(f"Encoder Input Data Shape: {encoder_input_data.shape}")
self.logger.info(f"Decoder Input Data Shape: {decoder_input_data.shape}")
self.logger.info(f"Decoder Target Data Shape: {decoder_target_data.shape}")
# Instantiate the callback
early_stopping = MonitorEarlyStopping(
monitor='val_loss',
patience=self.early_patience,
mode='min',
restore_best_weights=True,
verbose=1
)
lr_patience = self.early_patience // 3
lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.3, patience=lr_patience, verbose=1)
# Train the model
history = self.model.fit(
[encoder_input_data, decoder_input_data],
np.expand_dims(decoder_target_data, -1),
batch_size=self.batch_size,
epochs=self.epochs,
validation_split=self.test_size,
callbacks=[early_stopping, lr_scheduler]
)
# Log any early stopping events
if len(early_stopping.stopped_epoch_list) > 0:
self.troubleList.append(speaker)
# Reset stopped epoch list & save to running trouble
self.running_trouble = [item for item in early_stopping.stopped_epoch_list]
early_stopping.stopped_epoch_list = []
# Evaluate the model on the training data
test_loss, test_accuracy = self.model.evaluate(
[encoder_input_data, decoder_input_data],
np.expand_dims(decoder_target_data, -1),
batch_size=self.batch_size
)
# Save training metrics as a plot
plot_filename = self.plot_and_save_training_metrics(history, speaker)
self.logger.info(f"Training metrics plot saved as {plot_filename}")
self.logger.info(f"Test loss for Conversation {speaker}: {test_loss}")
self.logger.info(f"Test accuracy for Conversation {speaker}: {test_accuracy}")
self.logger.info(f"Model trained and saved successfully for speaker: {speaker}")
# Compile the model before saving
self.model.compile(
optimizer=self.optimizer,
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# Save the model after training
self.save_tokenizer(self.vocabularyList)
self.save_model(self.model, self.encoder_model, self.decoder_model)
def save_model(self, model, encoder_model, decoder_model):
self.logger.info("Saving Model...")
if model:
self.encoder_model.save(self.encoder_filename)
self.logger.info("Encoder saved.")
time.sleep(1)
self.decoder_model.save(self.decoder_filename)
self.logger.info("Decoder saved.")
time.sleep(1)
self.model.save(self.model_filename)
self.logger.info("Model saved.")
time.sleep(1)
self.save_full_weights()
self.save_embedding_weights()
else:
self.logger.warning("No model to save.")
def load_model_file(self):
self.logger.info("Loading Model and Tokenizer...")
# Load model without the optimizer first
model = load_model(self.model_filename, compile=False)
# Manually recompile with a fresh Adam optimizer
self.optimizer = Adam(learning_rate=self.learning_rate, clipnorm=1.0)
model.compile(optimizer=self.optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
print("Model Loaded... \nNow loading encoder/decoder models... ")
encoder_model = load_model(self.encoder_filename)
decoder_model = load_model(self.decoder_filename)
print("Decoder and Encoder Loaded... ")
self.load_full_weights()
self.load_embedding_weights()
return model, encoder_model, decoder_model
def beam_search(self, input_text):
# Preprocess input to match generate_response format
input_seq = self.preprocess_input(input_text)
# Perform beam search using the BeamSearchHelper class
beam_search_helper = BeamSearchHelper(
model=self.model,
tokenizer=self.tokenizer,
max_seq_length=self.max_seq_length,
encoder_filename=self.encoder_filename,
decoder_filename=self.decoder_filename,
top_k=self.top_k,
temperature=self.temperature,
top_p=self.top_p,
beam_width=self.beam_width,
scaling_factor=self.scaling_factor
)
# Perform beam search
output_seq = beam_search_helper.beam_search(input_seq)
# Convert token indices back to words
output_words = [self.tokenizer.index_word[idx] for idx in output_seq if idx in self.tokenizer.index_word]
return " ".join(output_words)
def generate_response(self, input_seq):
try:
# Clean and tokenize input text
input_seqs = self.preprocess_input(input_seq)
# Encode the input sequence using the encoder model
encoder_states = self.encoder_model.predict(input_seqs)
state_h, state_c = encoder_states
state_h = state_h[0:1, :] # Ensure batch size 1
state_c = state_c[0:1, :]
# Initialize the decoder input with the <start> token
start_token_index = self.tokenizer.word_index.get('<start>', 1)
target_seq = np.zeros((1, 1))
target_seq[0, 0] = start_token_index
# Debugging before passing to the decoder
# print(f"Initial Target Seq Shape: {target_seq.shape}, state_h Shape: {state_h.shape}, state_c Shape: {state_c.shape}")
# Decode the sequence
decoded_sentence = []
for _ in range(self.max_seq_length):
output_tokens, state_h, state_c = self.decoder_model.predict([target_seq, state_h, state_c])
# Scale logits immediately after getting output_tokens
logits = output_tokens[0, -1, :] * self.scaling_factor
logits = logits / self.temperature
logits = np.clip(logits, -50, 50)
# Compute softmax
exp_logits = np.exp(logits - np.max(logits)) # Prevent overflow
probabilities = exp_logits / np.sum(exp_logits)
probabilities = exp_logits / (np.sum(exp_logits) + 1e-8)
predicted_token_index = np.random.choice(len(probabilities), p=probabilities)
predicted_word = self.reverse_tokenizer.get(predicted_token_index, '<oov>')
print(f"Logits: {logits[:10]}") # Debugging (First 10 values)
print(f"Softmax Probabilities: {probabilities[:10]}") # Debugging
if predicted_word == "<end>" and len(
decoded_sentence) < self.min_word:
continue
elif predicted_word == "<end>":
break
if predicted_word not in ["<oov>", "<start>", "<end>"]:
decoded_sentence.append(predicted_word)
# Update target sequence for the next iteration
target_seq[0, 0] = predicted_token_index
return " ".join(decoded_sentence).strip()
except Exception as e:
self.logger.error(f"Error in generate_response: {str(e)}")
return "Error"