|
import os
|
|
import re
|
|
import numpy as np
|
|
import tensorflow
|
|
from keras.callbacks import Callback, ReduceLROnPlateau
|
|
from tensorflow.keras.preprocessing.text import Tokenizer
|
|
from tensorflow.keras.preprocessing.sequence import pad_sequences
|
|
from tensorflow.keras.layers import Input, LSTM, Dense, Embedding, Dropout, Flatten
|
|
from tensorflow.keras.regularizers import l2
|
|
from tensorflow.keras.models import Model, load_model, model_from_json
|
|
from tensorflow.keras.optimizers import Adam
|
|
import matplotlib.pyplot as plt
|
|
import logging
|
|
import heapq
|
|
import pickle
|
|
import time
|
|
import json
|
|
import pdb
|
|
|
|
tensorflow.keras.mixed_precision.set_global_policy('mixed_float16')
|
|
|
|
|
|
class BeamSearchHelper:
|
|
def __init__(self, model, tokenizer, max_seq_length, encoder_filename, decoder_filename, top_k=5,
|
|
temperature=1.0, top_p=0.9, beam_width=3, scaling_factor=10, min_word=3):
|
|
self.model = model
|
|
self.tokenizer = tokenizer
|
|
self.max_seq_length = max_seq_length
|
|
self.top_k = top_k
|
|
self.encoder_filename = encoder_filename
|
|
self.decoder_filename = decoder_filename
|
|
self.temperature = temperature
|
|
self.scaling_factor = scaling_factor
|
|
self.top_p = top_p
|
|
self.beam_width = beam_width
|
|
self.min_word = min_word
|
|
self.logger = self.setup_logger()
|
|
|
|
def setup_logger(self):
|
|
logger = logging.getLogger("ChatbotBeamSearch")
|
|
logger.setLevel(logging.DEBUG)
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(logging.INFO)
|
|
console_formatter = logging.Formatter('%(levelname)s: %(message)s')
|
|
console_handler.setFormatter(console_formatter)
|
|
logger.addHandler(console_handler)
|
|
file_handler = logging.FileHandler("chatbotBeam.log")
|
|
file_handler.setLevel(logging.DEBUG)
|
|
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|
file_handler.setFormatter(file_formatter)
|
|
logger.addHandler(file_handler)
|
|
return logger
|
|
|
|
def beam_search(self, input_text):
|
|
|
|
encoder_model = load_model(self.encoder_filename)
|
|
decoder_model = load_model(self.decoder_filename)
|
|
|
|
|
|
input_seqs = self.tokenizer.texts_to_sequences([input_text])
|
|
input_seqs = pad_sequences(input_seqs, maxlen=self.max_seq_length, padding='post')
|
|
|
|
|
|
encoder_states = encoder_model.predict(input_seqs)
|
|
state_h, state_c = encoder_states
|
|
|
|
|
|
state_h = state_h[0:1, :]
|
|
state_c = state_c[0:1, :]
|
|
|
|
|
|
start_token_index = self.tokenizer.word_index.get('<start>', 1)
|
|
target_seq = np.zeros((1, 1))
|
|
target_seq[0, 0] = start_token_index
|
|
|
|
|
|
sequences = [(target_seq, state_h, state_c, 0.0, [])]
|
|
|
|
for _ in range(self.max_seq_length):
|
|
all_candidates = []
|
|
|
|
for seq, state_h, state_c, score, decoded_words in sequences:
|
|
|
|
output_tokens, state_h, state_c = decoder_model.predict([seq, state_h, state_c])
|
|
|
|
logits = output_tokens[0, -1, :] * self.scaling_factor
|
|
logits = logits / self.temperature
|
|
exp_logits = np.exp(logits - np.max(logits))
|
|
probabilities = exp_logits / np.sum(exp_logits)
|
|
|
|
|
|
top_indices = np.argsort(probabilities)[-self.beam_width:]
|
|
|
|
for idx in top_indices:
|
|
prob = probabilities[idx]
|
|
candidate_score = (score - np.log(prob + 1e-8)) / (len(decoded_words) + 1)
|
|
|
|
|
|
new_decoded_words = decoded_words + [idx]
|
|
new_seq = np.copy(seq)
|
|
new_seq[0, 0] = idx
|
|
|
|
|
|
if idx == self.tokenizer.word_index.get('<end>', -1):
|
|
if len(new_decoded_words) < self.min_word:
|
|
continue
|
|
else:
|
|
return " ".join(self.tokenizer.index_word[i] for i in new_decoded_words if i in self.tokenizer.index_word)
|
|
|
|
|
|
all_candidates.append((new_seq, state_h, state_c, candidate_score, new_decoded_words))
|
|
|
|
|
|
if not all_candidates:
|
|
break
|
|
|
|
sequences = sorted(all_candidates, key=lambda x: x[3])[:self.beam_width]
|
|
|
|
|
|
best_sequence = sequences[0][4]
|
|
return " ".join(self.tokenizer.index_word[idx] for idx in best_sequence if idx in self.tokenizer.index_word)
|
|
|
|
class BeamState:
|
|
def __init__(self, sequence, score, state, logger):
|
|
self.sequence = sequence
|
|
self.score = score
|
|
self.state = state
|
|
self.logger = logger
|
|
|
|
def __lt__(self, other):
|
|
return self.score < other.score
|
|
|
|
def log(self, message):
|
|
self.logger.debug(message)
|
|
|
|
|
|
|
|
class MonitorEarlyStopping(Callback):
|
|
def __init__(self, monitor='val_loss', patience=3, mode='min', restore_best_weights=True, verbose=1):
|
|
super(MonitorEarlyStopping, self).__init__()
|
|
self.monitor = monitor
|
|
self.patience = patience
|
|
self.mode = mode
|
|
self.restore_best_weights = restore_best_weights
|
|
self.verbose = verbose
|
|
self.best_weights = None
|
|
self.best_epoch = None
|
|
self.wait = 0
|
|
self.best_value = float('inf') if mode == 'min' else -float('inf')
|
|
self.stopped_epoch_list = []
|
|
|
|
def on_epoch_end(self, epoch, logs=None):
|
|
current_value = logs.get(self.monitor)
|
|
if current_value is None:
|
|
if self.verbose > 0:
|
|
print(f"Warning: Metric '{self.monitor}' is not available in logs.")
|
|
return
|
|
|
|
|
|
if (self.mode == 'min' and current_value < self.best_value) or (self.mode == 'max' and current_value > self.best_value):
|
|
self.best_value = current_value
|
|
self.best_weights = self.model.get_weights()
|
|
self.best_epoch = epoch
|
|
self.wait = 0
|
|
if self.verbose > 0:
|
|
print(f"Epoch {epoch + 1}: {self.monitor} improved to {self.best_value:.4f}")
|
|
else:
|
|
self.wait += 1
|
|
if self.verbose > 0:
|
|
print(f"Epoch {epoch + 1}: {self.monitor} did not improve. Patience: {self.wait}/{self.patience}")
|
|
self.stopped_epoch_list.append(epoch + 1)
|
|
|
|
|
|
if self.wait >= self.patience:
|
|
if self.verbose > 0:
|
|
print(f"Stopping early at epoch {epoch + 1}. Best {self.monitor}: {self.best_value:.4f} at epoch {self.best_epoch + 1}")
|
|
self.model.stop_training = True
|
|
if self.restore_best_weights:
|
|
if self.verbose > 0:
|
|
print(f"Restoring best model weights from epoch {self.best_epoch + 1}.")
|
|
self.model.set_weights(self.best_weights)
|
|
|
|
|
|
class ChatbotTrainer:
|
|
def __init__(self):
|
|
|
|
self.corpus = None
|
|
self.all_vocab_size = 0
|
|
|
|
|
|
self.model = None
|
|
self.name = "Alex"
|
|
self.model_filename = f"{self.name}_model.keras"
|
|
self. encoder_filename = "encoder.keras"
|
|
self.decoder_filename = "decoder.keras"
|
|
self.tokenizer_save_path = "chatBotTokenizer.pkl"
|
|
self.tokenizer = None
|
|
self.reverse_tokenizer = None
|
|
self.embedding_dim = 64
|
|
self.max_seq_length = 64
|
|
self.learning_rate = 0.0013
|
|
self.optimizer = Adam(learning_rate=self.learning_rate, clipnorm=1.0)
|
|
self.batch_size = 16
|
|
self.epochs = 30
|
|
self.early_patience = self.epochs // 2
|
|
self.lstm_units = 128
|
|
self.dropout = 0.1
|
|
self.recurrent_dropout = 0.1
|
|
self.test_size = 0.2
|
|
self.max_vocabulary = 69000
|
|
|
|
|
|
self.encoder_model = None
|
|
self.encoder_inputs = None
|
|
self.decoder_inputs = None
|
|
self.decoder_outputs = None
|
|
self.decoder_model = None
|
|
self.max_vocab_size = None
|
|
self.config = None
|
|
|
|
|
|
self.vocabularyList = []
|
|
self.troubleList = []
|
|
self.running_trouble = []
|
|
|
|
|
|
self.min_word = 10
|
|
self.temperature = 1
|
|
self.scaling_factor = 1
|
|
self.logger = self.setup_logger()
|
|
self.beam_width = 9
|
|
self.top_p = 0.7
|
|
self.top_k = 3
|
|
|
|
|
|
self.logger.info(f"""Metrics:\n
|
|
Embedding/MaxSeqLength:({self.embedding_dim}, {self.max_seq_length})\n
|
|
Batch Size: {self.batch_size}\n
|
|
LSTM Units: {self.lstm_units}\n
|
|
Epochs: {self.epochs}\n
|
|
Dropout: ({self.dropout}, {self.recurrent_dropout})\n
|
|
Test Split: {self.test_size}\n\n""")
|
|
|
|
|
|
if os.path.exists(self.tokenizer_save_path):
|
|
with open(self.tokenizer_save_path, 'rb') as tokenizer_load_file:
|
|
self.tokenizer = pickle.load(tokenizer_load_file)
|
|
self.reverse_tokenizer = {index: word for word, index in self.tokenizer.word_index.items()}
|
|
self.all_vocab_size = self.tokenizer.num_words
|
|
for words, i in self.tokenizer.word_index.items():
|
|
if words not in self.vocabularyList:
|
|
self.vocabularyList.append(words)
|
|
self.logger.info("Tokenizer loaded successfully.")
|
|
|
|
|
|
else:
|
|
self.logger.warning("Tokenizer not found, making now... ")
|
|
self.tokenizer = Tokenizer(num_words=None, filters='!"#$%&()*+,-/.:;=?@[\\]^_`{|}~\t\n')
|
|
|
|
|
|
self.tokenizer.num_words = 0
|
|
self.vocabularyList = ['<start>', '<end>']
|
|
for token in self.vocabularyList:
|
|
if token not in self.tokenizer.word_index:
|
|
self.tokenizer.word_index[token] = self.tokenizer.num_words
|
|
self.tokenizer.index_word[self.tokenizer.num_words] = token
|
|
self.all_vocab_size += 1
|
|
self.tokenizer.num_words += 1
|
|
|
|
|
|
self.tokenizer.num_words = len(self.tokenizer.word_index)
|
|
self.tokenizer.oov_token = "<oov>"
|
|
|
|
self.logger.info(f"New Tokenizer Index's: {self.tokenizer.word_index}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if os.path.exists(self.model_filename) and os.path.exists(self.encoder_filename) and os.path.exists(self.decoder_filename):
|
|
self.model, self.encoder_model, self.decoder_model =self.load_model_file()
|
|
|
|
def save_full_weights(self, encoder_path="encoder.weights.h5", decoder_path="decoder.weights.h5"):
|
|
if self.encoder_model is not None and self.decoder_model is not None:
|
|
if os.path.exists(encoder_path):
|
|
os.remove(encoder_path)
|
|
if os.path.exists(decoder_path):
|
|
os.remove(decoder_path)
|
|
self.encoder_model.save_weights(encoder_path)
|
|
self.decoder_model.save_weights(decoder_path)
|
|
self.logger.info(f"Encoder weights saved at {encoder_path}.")
|
|
self.logger.info(f"Decoder weights saved at {decoder_path}.")
|
|
else:
|
|
self.logger.warning(
|
|
"Encoder or Decoder model does not exist. Ensure models are initialized before saving weights.")
|
|
|
|
|
|
def load_corpus(self, corpus_path):
|
|
import convokit
|
|
self.logger.info("Loading and preprocessing corpus...")
|
|
self.corpus = convokit.Corpus(filename=corpus_path)
|
|
self.logger.info("Corpus loaded and preprocessed successfully.")
|
|
|
|
def load_full_weights(self, encoder_path="encoder.weights.h5", decoder_path="decoder.weights.h5"):
|
|
if self.encoder_model is not None and self.decoder_model is not None:
|
|
self.encoder_model.load_weights(encoder_path)
|
|
self.decoder_model.load_weights(decoder_path)
|
|
self.logger.info(f"Encoder weights loaded from {encoder_path}.")
|
|
self.logger.info(f"Decoder weights loaded from {decoder_path}.")
|
|
else:
|
|
self.logger.warning(
|
|
"Encoder or Decoder model does not exist. Ensure models are initialized before loading weights.")
|
|
|
|
def plot_and_save_training_metrics(self, history, speaker):
|
|
|
|
plt.figure(figsize=(10, 6))
|
|
|
|
|
|
plt.subplot(1, 2, 1)
|
|
plt.plot(history.history['loss'], label='Training Loss')
|
|
plt.plot(history.history['val_loss'], label='Validation Loss')
|
|
plt.title('Training and Validation Loss')
|
|
plt.xlabel('Epoch')
|
|
plt.ylabel('Loss')
|
|
plt.legend()
|
|
|
|
|
|
plt.subplot(1, 2, 2)
|
|
plt.plot(history.history['accuracy'], label='Training Accuracy')
|
|
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
|
|
plt.title('Training and Validation Accuracy')
|
|
plt.xlabel('Epoch')
|
|
plt.ylabel('Accuracy')
|
|
plt.legend()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return "Did Not Save in Jupyter Notebook. See plot_and_save_training_metrics"
|
|
|
|
|
|
def setup_logger(self):
|
|
logger = logging.getLogger("ChatbotTrainer")
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(logging.INFO)
|
|
console_formatter = logging.Formatter('%(levelname)s: %(message)s')
|
|
console_handler.setFormatter(console_formatter)
|
|
logger.addHandler(console_handler)
|
|
|
|
|
|
file_handler = logging.FileHandler("chatbot.log")
|
|
file_handler.setLevel(logging.DEBUG)
|
|
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|
file_handler.setFormatter(file_formatter)
|
|
logger.addHandler(file_handler)
|
|
|
|
return logger
|
|
|
|
|
|
|
|
def redo_embeddings(self):
|
|
|
|
old_embedding_weights = self.model.get_layer("embedding").get_weights()[0]
|
|
|
|
|
|
new_vocab_size = self.max_vocabulary
|
|
embedding_dim = old_embedding_weights.shape[1]
|
|
|
|
|
|
new_embedding_weights = np.random.normal(size=(new_vocab_size, embedding_dim))
|
|
new_embedding_weights[:old_embedding_weights.shape[0], :] = old_embedding_weights
|
|
|
|
|
|
self.model.get_layer("embedding").set_weights([new_embedding_weights])
|
|
|
|
def save_tokenizer(self, texts=None):
|
|
if self.tokenizer:
|
|
if texts:
|
|
for token in texts:
|
|
if token not in self.tokenizer.word_index and self.tokenizer.num_words < self.max_vocabulary:
|
|
self.tokenizer.word_index[token] = self.tokenizer.num_words
|
|
self.all_vocab_size += 1
|
|
self.tokenizer.num_words += 1
|
|
|
|
|
|
self.max_vocab_size = self.tokenizer.num_words
|
|
|
|
self.tokenizer.fit_on_texts(texts)
|
|
|
|
with open(self.tokenizer_save_path, 'wb') as tokenizer_save_file:
|
|
pickle.dump(self.tokenizer, tokenizer_save_file)
|
|
|
|
self.tokenizer.num_words = len(self.tokenizer.word_index)
|
|
|
|
elif self.tokenizer == None:
|
|
self.logger.warning("No tokenizer to save.")
|
|
|
|
def save_embedding_weights(self, filepath="embedding_weights.npy"):
|
|
if self.model is not None:
|
|
embedding_layer = self.model.get_layer('embedding')
|
|
|
|
|
|
embedding_weights = embedding_layer.get_weights()[0]
|
|
|
|
|
|
if os.path.exists(filepath):
|
|
os.remove(filepath)
|
|
|
|
np.save(filepath, embedding_weights)
|
|
self.logger.info(f"Embedding weights saved successfully at {filepath}.")
|
|
else:
|
|
self.logger.warning("No model exists to extract embedding weights.")
|
|
|
|
def load_embedding_weights(self, filepath="embedding_weights.npy"):
|
|
if self.model is not None:
|
|
embedding_layer = self.model.get_layer('embedding')
|
|
|
|
|
|
embedding_weights = np.load(filepath)
|
|
|
|
|
|
if embedding_layer.input_dim == embedding_weights.shape[0] and embedding_layer.output_dim == \
|
|
embedding_weights.shape[1]:
|
|
embedding_layer.set_weights([embedding_weights])
|
|
self.logger.info(f"Embedding weights loaded successfully from {filepath}.")
|
|
else:
|
|
self.logger.error("Mismatch in embedding weights shape. Ensure the model and weights are compatible.")
|
|
else:
|
|
self.logger.warning("No model exists to load embedding weights into.")
|
|
|
|
def clean_text(self, text):
|
|
txt = text.lower().strip()
|
|
|
|
|
|
contractions = {
|
|
"i'm": "i am", "he's": "he is", "she's": "she is", "that's": "that is",
|
|
"what's": "what is", "where's": "where is", "who's": "who is", "how's": "how is",
|
|
"it's": "it is", "let's": "let us", "they're": "they are", "we're": "we are",
|
|
"you're": "you are", "i've": "i have", "you've": "you have", "we've": "we have",
|
|
"they've": "they have", "i'd": "i would", "you'd": "you would", "he'd": "he would",
|
|
"she'd": "she would", "we'd": "we would", "they'd": "they would", "i'll": "i will",
|
|
"you'll": "you will", "he'll": "he will", "she'll": "she will", "we'll": "we will",
|
|
"they'll": "they will", "don't": "do not", "doesn't": "does not", "didn't": "did not",
|
|
"won't": "will not", "wouldn't": "would not", "can't": "cannot", "couldn't": "could not",
|
|
"shouldn't": "should not", "mightn't": "might not", "mustn't": "must not",
|
|
"isn't": "is not", "aren't": "are not", "wasn't": "was not", "weren't": "were not",
|
|
"haven't": "have not", "hasn't": "has not", "hadn't": "had not"
|
|
}
|
|
|
|
|
|
for contraction, expansion in contractions.items():
|
|
txt = re.sub(r"\b" + re.escape(contraction) + r"\b", expansion, txt)
|
|
|
|
|
|
txt = re.sub(r"[^a-zA-Z0-9' ]", " ", txt)
|
|
txt = re.sub(r"\s+", " ", txt).strip()
|
|
|
|
|
|
for word in txt.split():
|
|
if word not in self.vocabularyList:
|
|
self.vocabularyList.append(word)
|
|
|
|
self.save_tokenizer(self.vocabularyList)
|
|
|
|
return txt
|
|
|
|
|
|
def preprocess_texts(self, input_texts, target_texts):
|
|
input_texts = [self.clean_text(text) for text in input_texts.split(" ")]
|
|
target_texts = [self.clean_text(text) for text in target_texts.split(" ")]
|
|
|
|
|
|
input_texts = [f"<start> {texts} <end>" for texts in input_texts if input_texts and input_texts != ""]
|
|
target_texts = [f"<start> {texts} <end>" for texts in target_texts if target_texts and target_texts != ""]
|
|
|
|
input_sequences = self.tokenizer.texts_to_sequences(input_texts)
|
|
target_sequences = self.tokenizer.texts_to_sequences(target_texts)
|
|
|
|
input_sequences = pad_sequences(input_sequences, maxlen=self.max_seq_length, padding='post')
|
|
target_sequences = pad_sequences(target_sequences, maxlen=self.max_seq_length, padding='post')
|
|
|
|
|
|
if target_sequences.shape[0] != input_sequences.shape[0]:
|
|
print(f"Padding mismatch! Input: {input_sequences.shape}, Target: {target_sequences.shape}")
|
|
if target_sequences.shape[0] < input_sequences.shape[0]:
|
|
target_sequences = np.resize(target_sequences, input_sequences.shape)
|
|
if target_sequences.shape[0] > input_sequences.shape[0]:
|
|
target_sequences = np.resize(input_sequences, target_sequences.shape)
|
|
|
|
return input_sequences, target_sequences
|
|
|
|
|
|
def preprocess_input(self, texts):
|
|
preprocessed_input = ["<start>"]
|
|
texts = self.clean_text(texts)
|
|
|
|
preprocessed_text = texts.lower().split(" ")
|
|
preprocessed_input.extend(preprocessed_text)
|
|
preprocessed_input.append("<end>")
|
|
|
|
|
|
preprocessed_input = self.tokenizer.texts_to_sequences([preprocessed_input])
|
|
preprocessed_input = [item for sublist in preprocessed_input for item in sublist]
|
|
|
|
preprocessed_input = np.array(preprocessed_input).reshape(1, -1)
|
|
|
|
preprocessed_input = pad_sequences(preprocessed_input, maxlen=self.max_seq_length, padding='post')
|
|
|
|
|
|
return preprocessed_input
|
|
|
|
def build_model(self):
|
|
if not self.model:
|
|
|
|
self.encoder_inputs = Input(shape=(self.max_seq_length,))
|
|
encoder_embedding = Embedding(
|
|
input_dim=self.max_vocabulary,
|
|
output_dim=self.embedding_dim,
|
|
mask_zero=True,
|
|
embeddings_regularizer=l2(0.01)
|
|
)(self.encoder_inputs)
|
|
encoder_lstm = LSTM(
|
|
self.lstm_units,
|
|
return_state=True,
|
|
return_sequences=False,
|
|
dropout=self.dropout,
|
|
recurrent_dropout=self.recurrent_dropout
|
|
)
|
|
_, state_h, state_c = encoder_lstm(encoder_embedding)
|
|
encoder_states = [state_h, state_c]
|
|
self.encoder_model = Model(self.encoder_inputs, encoder_states)
|
|
|
|
|
|
self.decoder_inputs = Input(shape=(None,), name='decoder_input')
|
|
decoder_embedding = Embedding(
|
|
input_dim=self.max_vocabulary,
|
|
output_dim=self.embedding_dim,
|
|
mask_zero=True
|
|
)(self.decoder_inputs)
|
|
decoder_lstm = LSTM(
|
|
self.lstm_units,
|
|
return_sequences=True,
|
|
return_state=True,
|
|
dropout=self.dropout,
|
|
recurrent_dropout=self.recurrent_dropout,
|
|
kernel_regularizer=l2(0.001)
|
|
)
|
|
decoder_state_input_h = Input(shape=(self.lstm_units,))
|
|
decoder_state_input_c = Input(shape=(self.lstm_units,))
|
|
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
|
|
decoder_lstm_output, state_h, state_c = decoder_lstm(decoder_embedding, initial_state=decoder_states_inputs)
|
|
decoder_states = [state_h, state_c]
|
|
decoder_dense = Dense(self.max_vocabulary, activation='softmax', kernel_regularizer=l2(0.001), bias_regularizer=l2(0.001))
|
|
self.decoder_outputs = decoder_dense(decoder_lstm_output)
|
|
self.decoder_model = Model([self.decoder_inputs] + decoder_states_inputs,
|
|
[self.decoder_outputs] + decoder_states)
|
|
|
|
|
|
decoder_lstm_output, _, _ = decoder_lstm(decoder_embedding, initial_state=encoder_states)
|
|
self.decoder_outputs = decoder_dense(decoder_lstm_output)
|
|
self.model = Model([self.encoder_inputs, self.decoder_inputs], self.decoder_outputs)
|
|
self.model.compile(
|
|
optimizer=self.optimizer,
|
|
loss='sparse_categorical_crossentropy',
|
|
metrics=['accuracy']
|
|
)
|
|
return self.model, self.encoder_model, self.decoder_model
|
|
|
|
def load_model_config(self, config_filename="model_config.json"):
|
|
if os.path.exists(config_filename):
|
|
with open(config_filename, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
self.logger.info(f"Loading model config from {config_filename}")
|
|
|
|
|
|
self.model = model_from_json(data["model_config"])
|
|
|
|
|
|
self.optimizer = Adam.from_config(data["optimizer"])
|
|
|
|
|
|
self.model.compile(
|
|
optimizer=self.optimizer,
|
|
loss='sparse_categorical_crossentropy',
|
|
metrics=['accuracy']
|
|
)
|
|
self.logger.info("Model compiled successfully after loading config.")
|
|
return self.model
|
|
return None
|
|
|
|
def train_model(self, input_texts, target_texts, conversation_id, speaker):
|
|
|
|
self.running_trouble = []
|
|
|
|
|
|
loaded_model = self.load_model_config(config_filename="model_config.json")
|
|
if os.path.exists(self.model_filename) and os.path.exists(self.encoder_filename) and os.path.exists(
|
|
self.decoder_filename):
|
|
self.model, self.encoder_model, self.decoder_model = self.load_model_file()
|
|
self.logger.info("Loaded full model from saved files.")
|
|
|
|
elif not os.path.exists(self.model_filename) and not os.path.exists(self.encoder_filename) and not os.path.exists(
|
|
self.decoder_filename) and loaded_model:
|
|
self.model = loaded_model
|
|
elif not self.model and not self.encoder_model and not self.decoder_model:
|
|
self.logger.info("Building new model...")
|
|
self.model, self.encoder_model, self.decoder_model = self.build_model()
|
|
|
|
|
|
self.logger.info(f"Training Model for ConversationID: {conversation_id}")
|
|
|
|
if self.corpus is None or self.tokenizer is None:
|
|
raise ValueError("Corpus or tokenizer is not initialized.")
|
|
|
|
|
|
input_sequences, target_sequences = self.preprocess_texts(input_texts, target_texts)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.logger.info(f"Num Words: {self.tokenizer.num_words}")
|
|
self.logger.info(f"Vocabulary Size: {len(self.tokenizer.word_index)}")
|
|
self.logger.info(f"Length of Vocabulary List: {len(self.vocabularyList)}")
|
|
|
|
|
|
encoder_input_data = input_sequences
|
|
decoder_input_data = target_sequences[:, :-1]
|
|
decoder_target_data = target_sequences[:, 1:]
|
|
|
|
self.logger.info(f"Encoder Input Data Shape: {encoder_input_data.shape}")
|
|
self.logger.info(f"Decoder Input Data Shape: {decoder_input_data.shape}")
|
|
self.logger.info(f"Decoder Target Data Shape: {decoder_target_data.shape}")
|
|
|
|
|
|
early_stopping = MonitorEarlyStopping(
|
|
monitor='val_loss',
|
|
patience=self.early_patience,
|
|
mode='min',
|
|
restore_best_weights=True,
|
|
verbose=1
|
|
)
|
|
|
|
lr_patience = self.early_patience // 3
|
|
lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.3, patience=lr_patience, verbose=1)
|
|
|
|
|
|
history = self.model.fit(
|
|
[encoder_input_data, decoder_input_data],
|
|
np.expand_dims(decoder_target_data, -1),
|
|
batch_size=self.batch_size,
|
|
epochs=self.epochs,
|
|
validation_split=self.test_size,
|
|
callbacks=[early_stopping, lr_scheduler]
|
|
)
|
|
|
|
|
|
if len(early_stopping.stopped_epoch_list) > 0:
|
|
self.troubleList.append(speaker)
|
|
|
|
|
|
self.running_trouble = [item for item in early_stopping.stopped_epoch_list]
|
|
early_stopping.stopped_epoch_list = []
|
|
|
|
|
|
test_loss, test_accuracy = self.model.evaluate(
|
|
[encoder_input_data, decoder_input_data],
|
|
np.expand_dims(decoder_target_data, -1),
|
|
batch_size=self.batch_size
|
|
)
|
|
|
|
|
|
plot_filename = self.plot_and_save_training_metrics(history, speaker)
|
|
self.logger.info(f"Training metrics plot saved as {plot_filename}")
|
|
self.logger.info(f"Test loss for Conversation {speaker}: {test_loss}")
|
|
self.logger.info(f"Test accuracy for Conversation {speaker}: {test_accuracy}")
|
|
self.logger.info(f"Model trained and saved successfully for speaker: {speaker}")
|
|
|
|
|
|
self.model.compile(
|
|
optimizer=self.optimizer,
|
|
loss='sparse_categorical_crossentropy',
|
|
metrics=['accuracy']
|
|
)
|
|
|
|
|
|
self.save_tokenizer(self.vocabularyList)
|
|
self.save_model(self.model, self.encoder_model, self.decoder_model)
|
|
|
|
def save_model(self, model, encoder_model, decoder_model):
|
|
self.logger.info("Saving Model...")
|
|
if model:
|
|
self.encoder_model.save(self.encoder_filename)
|
|
self.logger.info("Encoder saved.")
|
|
time.sleep(1)
|
|
self.decoder_model.save(self.decoder_filename)
|
|
self.logger.info("Decoder saved.")
|
|
time.sleep(1)
|
|
self.model.save(self.model_filename)
|
|
self.logger.info("Model saved.")
|
|
time.sleep(1)
|
|
self.save_full_weights()
|
|
self.save_embedding_weights()
|
|
|
|
else:
|
|
self.logger.warning("No model to save.")
|
|
|
|
def load_model_file(self):
|
|
self.logger.info("Loading Model and Tokenizer...")
|
|
|
|
|
|
model = load_model(self.model_filename, compile=False)
|
|
|
|
|
|
self.optimizer = Adam(learning_rate=self.learning_rate, clipnorm=1.0)
|
|
model.compile(optimizer=self.optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
|
|
|
|
print("Model Loaded... \nNow loading encoder/decoder models... ")
|
|
|
|
encoder_model = load_model(self.encoder_filename)
|
|
decoder_model = load_model(self.decoder_filename)
|
|
|
|
print("Decoder and Encoder Loaded... ")
|
|
|
|
self.load_full_weights()
|
|
self.load_embedding_weights()
|
|
|
|
return model, encoder_model, decoder_model
|
|
|
|
def beam_search(self, input_text):
|
|
|
|
input_seq = self.preprocess_input(input_text)
|
|
|
|
|
|
beam_search_helper = BeamSearchHelper(
|
|
model=self.model,
|
|
tokenizer=self.tokenizer,
|
|
max_seq_length=self.max_seq_length,
|
|
encoder_filename=self.encoder_filename,
|
|
decoder_filename=self.decoder_filename,
|
|
top_k=self.top_k,
|
|
temperature=self.temperature,
|
|
top_p=self.top_p,
|
|
beam_width=self.beam_width,
|
|
scaling_factor=self.scaling_factor
|
|
)
|
|
|
|
|
|
output_seq = beam_search_helper.beam_search(input_seq)
|
|
|
|
|
|
output_words = [self.tokenizer.index_word[idx] for idx in output_seq if idx in self.tokenizer.index_word]
|
|
|
|
return " ".join(output_words)
|
|
|
|
def generate_response(self, input_seq):
|
|
try:
|
|
|
|
input_seqs = self.preprocess_input(input_seq)
|
|
|
|
|
|
encoder_states = self.encoder_model.predict(input_seqs)
|
|
state_h, state_c = encoder_states
|
|
state_h = state_h[0:1, :]
|
|
state_c = state_c[0:1, :]
|
|
|
|
|
|
start_token_index = self.tokenizer.word_index.get('<start>', 1)
|
|
target_seq = np.zeros((1, 1))
|
|
target_seq[0, 0] = start_token_index
|
|
|
|
|
|
|
|
|
|
|
|
decoded_sentence = []
|
|
|
|
for _ in range(self.max_seq_length):
|
|
output_tokens, state_h, state_c = self.decoder_model.predict([target_seq, state_h, state_c])
|
|
|
|
|
|
logits = output_tokens[0, -1, :] * self.scaling_factor
|
|
logits = logits / self.temperature
|
|
logits = np.clip(logits, -50, 50)
|
|
|
|
|
|
exp_logits = np.exp(logits - np.max(logits))
|
|
probabilities = exp_logits / np.sum(exp_logits)
|
|
probabilities = exp_logits / (np.sum(exp_logits) + 1e-8)
|
|
|
|
predicted_token_index = np.random.choice(len(probabilities), p=probabilities)
|
|
predicted_word = self.reverse_tokenizer.get(predicted_token_index, '<oov>')
|
|
|
|
print(f"Logits: {logits[:10]}")
|
|
print(f"Softmax Probabilities: {probabilities[:10]}")
|
|
|
|
if predicted_word == "<end>" and len(
|
|
decoded_sentence) < self.min_word:
|
|
continue
|
|
|
|
elif predicted_word == "<end>":
|
|
break
|
|
|
|
if predicted_word not in ["<oov>", "<start>", "<end>"]:
|
|
decoded_sentence.append(predicted_word)
|
|
|
|
|
|
target_seq[0, 0] = predicted_token_index
|
|
|
|
return " ".join(decoded_sentence).strip()
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error in generate_response: {str(e)}")
|
|
return "Error"
|
|
|