ai-pronunciation-trainer / tests /lambdas /test_lambdaSpeechToScore.py
alessandro trinca tornidor
feat: port whisper and faster-whisper support from https://github.com/Thiagohgl/ai-pronunciation-trainer
85b7206
import json
import os
import platform
import unittest
## permit to import from parent directory also in
import sys
from pathlib import Path
parent = Path(__file__).parent.parent.parent
sys.path.append(str(parent))
print(f"## sys.path:{sys.path}.")
import lambdaSpeechToScore
from constants import app_logger
from tests import EVENTS_FOLDER, set_seed
from tests.lambdas.constants_test_lambdaSpeechToScore import text_dict
def assert_raises_get_speech_to_score_dict(cls, real_text, file_bytes_or_audiotmpfile, language, exc, error_message):
with cls.assertRaises(exc):
try:
lambdaSpeechToScore.get_speech_to_score_dict(
real_text,
file_bytes_or_audiotmpfile,
language,
)
except exc as e:
cls.assertEqual(str(e), error_message)
raise e
def check_value_by_field(value, match):
import re
assert len(value.strip()) > 0
for word in value.lstrip().rstrip().split(" "):
word_check = re.findall(match, word.strip())
assert len(word_check) == 1
assert word_check[0] == word.strip()
print("ok")
def check_output_by_field(output, key, match, expected_output):
check_value_by_field(output[key], match)
output[key] = expected_output[key]
return output
def check_output_with_splitted_audio_files(cls, output, expected_output, check_audio_files=False):
from pathlib import Path
cls.maxDiff = None
try:
assert_without_fields(cls, output, expected_output)
if check_audio_files:
audio_files = output["audio_files"]
audio_durations = output["audio_durations"]
for audio_duration in audio_durations:
assert isinstance(audio_duration, float)
assert audio_duration > 0
for audio_file in audio_files:
audio_file = EVENTS_FOLDER / audio_file
path_audio_file = Path(audio_file)
app_logger.info(f"path_audio_file:{path_audio_file}.")
assert path_audio_file.is_file()
path_audio_file.unlink()
except Exception as e:
app_logger.error(f"e:{e}.")
raise e
def check_start_end_time(output):
# split the string list into a list of floats then assert the values are greater than 0
start_time_list = [float(x) for x in output["start_time"].split()]
end_time_list = [float(x) for x in output["end_time"].split()]
assert " ".join([str(x) for x in start_time_list]) == output["start_time"]
assert " ".join([str(x) for x in end_time_list]) == output["end_time"]
## for some reason whisper even when seed is set keeps changing the start_time and end_time lists
# todo: check why the start_time and end_time are not the same as the expected ones
# count = 0
# for start_time, end_time in zip(start_time_list, end_time_list):
# if count > 0:
# assert start_time >= start_time_list[count - 1], f"start_time:{start_time} (idx:{count}) should be greater than previous element:{start_time_list[count - 1]}, start_time_list:{start_time_list}."
# assert end_time >= end_time_list[count - 1], f"end_time:{end_time} (idx:{count}) should be greater than previous element:{end_time_list[count - 1]}, end_time_list:{end_time_list}."
# assert start_time > 0, f"start_time:{start_time} (idx:{count}) should be greater or EQUAL than 0, start_time_list:{start_time_list}."
# if count == 0:
# assert start_time >= 0, f"start_time:{start_time} (idx:{count}) should be greater than 0, start_time_list:{start_time_list}."
# assert end_time > 0, f"end_time:{end_time} (idx:{count}) should be greater than 0, end_time_list:{end_time_list}."
# assert start_time < end_time, f"start_time:{start_time} should be less than end_time:{end_time}."
# count += 1
def assert_without_fields(cls, output, expected_output, fields_to_avoid="start_time,end_time,audio_files,audio_durations"):
try:
check_start_end_time(output)
fields_to_avoid = fields_to_avoid.strip()
output = {k:v for k, v in output.items() if k not in fields_to_avoid}
tmp_expected = {k:v for k, v in expected_output.items() if k not in fields_to_avoid}
app_logger.info(f"check_start_end_time>output:{output}.")
app_logger.info(f"check_start_end_time>tmp_expected:{tmp_expected}.")
cls.assertDictEqual(output, tmp_expected)
except Exception as ex:
app_logger.error(f"ex:{ex}.")
app_logger.error(f"output:{output}.")
app_logger.error(f"expected_output (with all the fields):{expected_output}.")
raise ex
def assert_get_speech_to_score_ok(cls, language):
cls.maxDiff = None
set_seed()
path = EVENTS_FOLDER / f"test_{language}_easy.wav"
output = lambdaSpeechToScore.get_speech_to_score_dict(
real_text=text_dict[language],
file_bytes_or_audiotmpfile=str(path),
language=language,
)
random_file_name = output["random_file_name"]
assert isinstance(random_file_name, str)
Path(random_file_name).is_file()
tmp_output = {k:v for k, v in output.items() if k != "random_file_name"}
json_path = EVENTS_FOLDER / f"expected_get_speech_to_score_ok_{language}.json"
# with open(json_path, "w") as dst:
# json.dump(tmp_output, dst)
with open(json_path, "r") as src:
expected = json.load(src)
assert_without_fields(cls, tmp_output, expected)
# def assert_get_speech_to_score_ok_remove_input_file(cls, language, expected):
# import shutil
# cls.maxDiff = None
#
# set_seed()
# path = EVENTS_FOLDER / f"test_{language}_easy.wav"
# path2 = EVENTS_FOLDER / f"test2_{language}_easy.wav"
# shutil.copy(path, path2)
# assert path2.exists() and path2.is_file()
# output = lambdaSpeechToScore.get_speech_to_score_dict(
# real_text=text_dict[language],
# file_bytes_or_audiotmpfile=str(path2),
# language=language,
# )
# assert not path2.exists()
# assert_without_fields(cls, output, expected)
def assert_get_speech_to_score_tuple_ok(cls, language, expected_num_words, remove_random_file: bool):
import glob
import shutil
set_seed()
path = EVENTS_FOLDER / f"test_{language}_easy.wav"
path2 = EVENTS_FOLDER / f"test2_{language}_easy.wav"
if remove_random_file:
shutil.copy(path, path2)
path = path2
(
real_transcripts,
is_letter_correct_all_words,
pronunciation_accuracy,
ipa_transcript,
real_transcripts_ipa,
num_words,
first_audio_file,
dumped,
random_file_name
) = lambdaSpeechToScore.get_speech_to_score_tuple(
real_text=text_dict[language],
file_bytes_or_audiotmpfile=str(path),
language=language,
remove_random_file=remove_random_file,
)
json_path = EVENTS_FOLDER / f"expected_get_speech_to_score_tuple_ok_{language}.json"
app_logger.info(f"json_path:{json_path} .")
# with open(json_path, "w") as dst:
# json_loaded = {k:v for k, v in json.loads(dumped).items() if k != "random_file_name"}
# expected = {
# "real_transcripts": real_transcripts,
# "is_letter_correct_all_words": is_letter_correct_all_words,
# "pronunciation_accuracy": pronunciation_accuracy,
# "ipa_transcript": ipa_transcript,
# "real_transcripts_ipa": real_transcripts_ipa,
# "num_words": num_words,
# "dumped": json_loaded
# }
# json.dump(expected, dst)
assert real_transcripts == text_dict[language]
with open(json_path, "r") as src:
expected_output = json.load(src)
assert is_letter_correct_all_words.strip() == expected_output["is_letter_correct_all_words"].strip()
assert pronunciation_accuracy == expected_output["pronunciation_accuracy"]
assert ipa_transcript.strip() == expected_output["ipa_transcript"].strip()
assert real_transcripts_ipa.strip() == expected_output["real_transcripts_ipa"]
assert num_words == expected_num_words
first_audio_file_path = Path(first_audio_file)
assert first_audio_file_path.exists() and first_audio_file_path.is_file()
if not remove_random_file:
assert Path(random_file_name).is_file()
json_loaded = json.loads(dumped)
check_output_with_splitted_audio_files(cls, json_loaded, expected_output["dumped"], check_audio_files=True)
# workaround to remove the tmp audio files created during these assertion tests
files_to_remove_list = str(EVENTS_FOLDER / "test2*")
if remove_random_file:
for file_to_del in glob.glob(files_to_remove_list):
assert Path(file_to_del).is_file()
Path(file_to_del).unlink()
def assert_get_selected_word_valid_index_ok(language):
import numpy as np
set_seed()
path = EVENTS_FOLDER / f"test_{language}_easy.wav"
tmp_audio_loaded, tmp_audio_samplerate = lambdaSpeechToScore.soundfile_load(path)
tmp_audio_loaded = np.asarray(tmp_audio_loaded)
duration_tot = tmp_audio_loaded.size / tmp_audio_samplerate
input_text = text_dict[language]
_, _, _, _, _, _, _, output_json, _ = lambdaSpeechToScore.get_speech_to_score_tuple(
input_text, str(path), language, False
)
idx_recorded_word = 2
output_loaded = json.loads(output_json)
audio_file, word, duration = lambdaSpeechToScore.get_selected_word(
idx_recorded_word, output_json
)
try:
audio_file_path = Path(audio_file)
assert audio_file_path.exists() and audio_file_path.is_file()
assert isinstance(duration, float)
assert 0 < duration < duration_tot
# assert duration == expected_end_time
words_list = text_dict[language].split()
assert word == words_list[idx_recorded_word]
for file_to_del in output_loaded["audio_files"]:
Path(file_to_del).unlink()
except Exception as ex:
app_logger.error(f"ex:{ex}.")
raise ex
def assert_get_selected_word_invalid_index(cls, language):
import glob
set_seed()
path = EVENTS_FOLDER / f"test_{language}_easy.wav"
_, _, _, _, _, _, _, output_json, _ = lambdaSpeechToScore.get_speech_to_score_tuple(
text_dict[language], str(path), language, False
)
with cls.assertRaises(IndexError):
try:
lambdaSpeechToScore.get_selected_word(120, output_json)
except IndexError as ie:
msg = str(ie)
assert msg == "list index out of range"
raise ie
for file_to_del in glob.glob(f"{EVENTS_FOLDER / path.stem}_*"):
assert Path(file_to_del).is_file()
Path(file_to_del).unlink()
def helper_get_accuracy_from_recorded_audio(cls, source, use_dtw: bool = None):
with open(EVENTS_FOLDER / "GetAccuracyFromRecordedAudio.json", "r") as src:
inputs_outputs = json.load(src)
inputs = inputs_outputs["inputs"]
for language, event_content in inputs.items():
set_seed()
event_content_loaded = json.loads(event_content["body"])
if use_dtw is not None:
event_content_loaded["useDTW"] = use_dtw
else:
del event_content_loaded["useDTW"]
event_content["body"] = json.dumps(event_content_loaded)
output = lambdaSpeechToScore.lambda_handler(event_content, {})
output = {k:v for k, v in json.loads(output).items() if k != "random_file_name"}
json_path = EVENTS_FOLDER / f"expected_get_accuracy_from_recorded_audio_{language}_{source}_dtw{use_dtw}.json"
app_logger.info(f"json_path:{json_path} .")
# with open(json_path, "w") as dst:
# json.dump(output, dst)
with open(json_path, "r") as src:
current_expected_output_loaded = json.load(src)
try:
cls.assertDictEqual(output, current_expected_output_loaded)
except Exception as ex:
app_logger.error(f"# output, source:{source}, use_dtw:{use_dtw}, language:{language}.")
app_logger.error(output)
app_logger.error(f"ex.args:{ex.args} .")
app_logger.error(f"ex:{ex} .")
raise ex
class TestGetAccuracyFromRecordedAudio(unittest.TestCase):
def setUp(self):
if platform.system() == "Windows" or platform.system() == "Win32":
os.environ["PYTHONUTF8"] = "1"
os.environ["IS_TESTING"] = "TRUE"
def tearDown(self):
if platform.system() == "Windows" or platform.system() == "Win32" and "PYTHONUTF8" in os.environ:
del os.environ["PYTHONUTF8"]
del os.environ["IS_TESTING"]
def test_GetAccuracyFromRecordedAudio_dtw_none(self):
self.maxDiff = None
try:
helper_get_accuracy_from_recorded_audio(self, "cmd", None)
except AssertionError:
helper_get_accuracy_from_recorded_audio(self, "gui", None)
def test_GetAccuracyFromRecordedAudio_dtw_false(self):
self.maxDiff = None
try:
helper_get_accuracy_from_recorded_audio(self, "cmd", False)
except AssertionError:
helper_get_accuracy_from_recorded_audio(self, "gui", False)
def test_GetAccuracyFromRecordedAudio_dtw_true(self):
self.maxDiff = None
try:
helper_get_accuracy_from_recorded_audio(self, "cmd", True)
except AssertionError:
helper_get_accuracy_from_recorded_audio(self, "gui", True)
def test_lambda_handler_empty_text(self):
from flask import Response
with open(EVENTS_FOLDER / "GetAccuracyFromRecordedAudio.json", "r") as src:
inputs_outputs = json.load(src)
event = inputs_outputs["inputs"]["en"]
event_body = event["body"]
event_body_loaded = json.loads(event_body)
event_body_loaded["title"] = ""
event_body = json.dumps(event_body_loaded)
event["body"] = event_body
output = lambdaSpeechToScore.lambda_handler(event, {})
self.assertIsInstance(output, Response)
self.assertDictEqual(output.json, {})
def test_get_speech_to_score_en_ok(self):
assert_get_speech_to_score_ok(self, "en")
def test_get_speech_to_score_de_ok1(self):
assert_get_speech_to_score_ok(self, "de")
# def test_get_speech_to_score_en_ok_remove_input_file(self):
# assert_get_speech_to_score_ok_remove_input_file(
# self, "en", expected_get_speech_to_score["en"]
# )
#
# def test_get_speech_to_score_de_ok_remove_input_file(self):
# assert_get_speech_to_score_ok_remove_input_file(
# self, "de", expected_get_speech_to_score["de"]
# )
def test_get_speech_to_score_tuple_de_ok_not_remove_random_file(self):
assert_get_speech_to_score_tuple_ok(
self, "de", expected_num_words=5, remove_random_file=False
)
def test_get_speech_to_score_tuple_en_ok_not_remove_random_file(self):
assert_get_speech_to_score_tuple_ok(
self, "en", expected_num_words=5, remove_random_file=False
)
def test_get_speech_to_score_tuple_de_ok_remove_random_file(self):
assert_get_speech_to_score_tuple_ok(
self, "de", expected_num_words=5, remove_random_file=True
)
def test_get_speech_to_score_tuple_en_ok_remove_random_file(self):
assert_get_speech_to_score_tuple_ok(
self, "en", expected_num_words=5, remove_random_file=True
)
def test_get_speech_to_score_dict__de_empty_input_text(self):
language = "de"
path = EVENTS_FOLDER / f"test_{language}_easy.wav"
assert_raises_get_speech_to_score_dict(
self,
"",
str(path),
language,
ValueError,
"cannot read an empty/None text: ''...",
)
def test_get_speech_to_score_dict__en_empty_input_text(self):
language = "en"
path = EVENTS_FOLDER / f"test_{language}_easy.wav"
assert_raises_get_speech_to_score_dict(
self,
"",
str(path),
language,
ValueError,
"cannot read an empty/None text: ''...",
)
def test_get_speech_to_score_dict__de_empty_input_file(self):
language = "de"
assert_raises_get_speech_to_score_dict(
self,
"text fake",
"",
language,
OSError,
"cannot read an empty/None file: ''...",
)
def test_get_speech_to_score_dict__en_empty_input_file(self):
language = "en"
assert_raises_get_speech_to_score_dict(
self,
"text fake",
"",
language,
OSError,
"cannot read an empty/None file: ''...",
)
def test_get_speech_to_score_dict__empty_language1(self):
assert_raises_get_speech_to_score_dict(
self,
"text fake",
"fake_file",
"",
NotImplementedError,
"Not tested/supported with '' language...",
)
def test_get_speech_to_score_dict__empty_language2(self):
language = "en"
path_file = str(EVENTS_FOLDER / "empty_file.wav")
assert_raises_get_speech_to_score_dict(
self,
"text fake",
path_file,
language,
OSError,
f"cannot read an empty file: '{path_file}'...",
)
def test_get_selected_word_valid_index_de_ok(self):
assert_get_selected_word_valid_index_ok("de")
def test_get_selected_word_valid_index_en_ok(self):
assert_get_selected_word_valid_index_ok("en")
def test_get_selected_word_invalid_index_de(self):
assert_get_selected_word_invalid_index(self, "de")
def test_get_selected_word_invalid_index_en(self):
assert_get_selected_word_invalid_index(self, "en")
def test_get_selected_word_empty_transcripts(self):
raw_json_output = json.dumps(
{"audio_files": [], "real_transcripts": "", "audio_durations": []}
)
idx_recorded_word = 0
with self.assertRaises(IndexError):
lambdaSpeechToScore.get_selected_word(idx_recorded_word, raw_json_output)
def test_get_splitted_audio_file_valid_input(self):
language = "en"
path = str(EVENTS_FOLDER / f"test_{language}_hard.wav")
start_time = [0.0, 1.0, 2.0]
end_time = [1.0, 2.0, 2.5]
audio_files, audio_durations = lambdaSpeechToScore.get_splitted_audio_file(
audiotmpfile=path, start_time=start_time, end_time=end_time
)
assert len(audio_files) == len(start_time)
assert len(audio_durations) == len(start_time)
for audio_file, duration in zip(audio_files, audio_durations):
audio_file_path = Path(audio_file)
assert audio_file_path.exists() and audio_file_path.is_file()
assert duration > 0
audio_file_path.unlink()
def test_get_splitted_audio_file_invalid_input(self):
from soundfile import LibsndfileError
start_time = [0.0, 1.0, 2.0]
end_time = [1.0, 2.0, 2.5]
with self.assertRaises(LibsndfileError):
try:
lambdaSpeechToScore.get_splitted_audio_file(
audiotmpfile="", start_time=start_time, end_time=end_time
)
except LibsndfileError as lsfe:
msg = str(lsfe)
assert msg == "Error opening '': System error."
raise lsfe
def test_get_splitted_audio_file_mismatched_times(self):
from soundfile import LibsndfileError
language = "en"
path = EVENTS_FOLDER / f"test_{language}_easy.wav"
start_time = [4.0, 5.0, 7.0]
end_time = [3.0, 4.0, 5.5]
with self.assertRaises(LibsndfileError):
try:
lambdaSpeechToScore.get_splitted_audio_file(
audiotmpfile=str(path), start_time=start_time, end_time=end_time
)
except LibsndfileError as lsfe:
msg = str(lsfe)
assert msg == "Internal psf_fseek() failed."
raise lsfe
if __name__ == "__main__":
unittest.main()