Spaces:
Running
Running
alessandro trinca tornidor
feat: port whisper and faster-whisper support from https://github.com/Thiagohgl/ai-pronunciation-trainer
85b7206
import json | |
import os | |
import platform | |
import unittest | |
## permit to import from parent directory also in | |
import sys | |
from pathlib import Path | |
parent = Path(__file__).parent.parent.parent | |
sys.path.append(str(parent)) | |
print(f"## sys.path:{sys.path}.") | |
import lambdaSpeechToScore | |
from constants import app_logger | |
from tests import EVENTS_FOLDER, set_seed | |
from tests.lambdas.constants_test_lambdaSpeechToScore import text_dict | |
def assert_raises_get_speech_to_score_dict(cls, real_text, file_bytes_or_audiotmpfile, language, exc, error_message): | |
with cls.assertRaises(exc): | |
try: | |
lambdaSpeechToScore.get_speech_to_score_dict( | |
real_text, | |
file_bytes_or_audiotmpfile, | |
language, | |
) | |
except exc as e: | |
cls.assertEqual(str(e), error_message) | |
raise e | |
def check_value_by_field(value, match): | |
import re | |
assert len(value.strip()) > 0 | |
for word in value.lstrip().rstrip().split(" "): | |
word_check = re.findall(match, word.strip()) | |
assert len(word_check) == 1 | |
assert word_check[0] == word.strip() | |
print("ok") | |
def check_output_by_field(output, key, match, expected_output): | |
check_value_by_field(output[key], match) | |
output[key] = expected_output[key] | |
return output | |
def check_output_with_splitted_audio_files(cls, output, expected_output, check_audio_files=False): | |
from pathlib import Path | |
cls.maxDiff = None | |
try: | |
assert_without_fields(cls, output, expected_output) | |
if check_audio_files: | |
audio_files = output["audio_files"] | |
audio_durations = output["audio_durations"] | |
for audio_duration in audio_durations: | |
assert isinstance(audio_duration, float) | |
assert audio_duration > 0 | |
for audio_file in audio_files: | |
audio_file = EVENTS_FOLDER / audio_file | |
path_audio_file = Path(audio_file) | |
app_logger.info(f"path_audio_file:{path_audio_file}.") | |
assert path_audio_file.is_file() | |
path_audio_file.unlink() | |
except Exception as e: | |
app_logger.error(f"e:{e}.") | |
raise e | |
def check_start_end_time(output): | |
# split the string list into a list of floats then assert the values are greater than 0 | |
start_time_list = [float(x) for x in output["start_time"].split()] | |
end_time_list = [float(x) for x in output["end_time"].split()] | |
assert " ".join([str(x) for x in start_time_list]) == output["start_time"] | |
assert " ".join([str(x) for x in end_time_list]) == output["end_time"] | |
## for some reason whisper even when seed is set keeps changing the start_time and end_time lists | |
# todo: check why the start_time and end_time are not the same as the expected ones | |
# count = 0 | |
# for start_time, end_time in zip(start_time_list, end_time_list): | |
# if count > 0: | |
# assert start_time >= start_time_list[count - 1], f"start_time:{start_time} (idx:{count}) should be greater than previous element:{start_time_list[count - 1]}, start_time_list:{start_time_list}." | |
# assert end_time >= end_time_list[count - 1], f"end_time:{end_time} (idx:{count}) should be greater than previous element:{end_time_list[count - 1]}, end_time_list:{end_time_list}." | |
# assert start_time > 0, f"start_time:{start_time} (idx:{count}) should be greater or EQUAL than 0, start_time_list:{start_time_list}." | |
# if count == 0: | |
# assert start_time >= 0, f"start_time:{start_time} (idx:{count}) should be greater than 0, start_time_list:{start_time_list}." | |
# assert end_time > 0, f"end_time:{end_time} (idx:{count}) should be greater than 0, end_time_list:{end_time_list}." | |
# assert start_time < end_time, f"start_time:{start_time} should be less than end_time:{end_time}." | |
# count += 1 | |
def assert_without_fields(cls, output, expected_output, fields_to_avoid="start_time,end_time,audio_files,audio_durations"): | |
try: | |
check_start_end_time(output) | |
fields_to_avoid = fields_to_avoid.strip() | |
output = {k:v for k, v in output.items() if k not in fields_to_avoid} | |
tmp_expected = {k:v for k, v in expected_output.items() if k not in fields_to_avoid} | |
app_logger.info(f"check_start_end_time>output:{output}.") | |
app_logger.info(f"check_start_end_time>tmp_expected:{tmp_expected}.") | |
cls.assertDictEqual(output, tmp_expected) | |
except Exception as ex: | |
app_logger.error(f"ex:{ex}.") | |
app_logger.error(f"output:{output}.") | |
app_logger.error(f"expected_output (with all the fields):{expected_output}.") | |
raise ex | |
def assert_get_speech_to_score_ok(cls, language): | |
cls.maxDiff = None | |
set_seed() | |
path = EVENTS_FOLDER / f"test_{language}_easy.wav" | |
output = lambdaSpeechToScore.get_speech_to_score_dict( | |
real_text=text_dict[language], | |
file_bytes_or_audiotmpfile=str(path), | |
language=language, | |
) | |
random_file_name = output["random_file_name"] | |
assert isinstance(random_file_name, str) | |
Path(random_file_name).is_file() | |
tmp_output = {k:v for k, v in output.items() if k != "random_file_name"} | |
json_path = EVENTS_FOLDER / f"expected_get_speech_to_score_ok_{language}.json" | |
# with open(json_path, "w") as dst: | |
# json.dump(tmp_output, dst) | |
with open(json_path, "r") as src: | |
expected = json.load(src) | |
assert_without_fields(cls, tmp_output, expected) | |
# def assert_get_speech_to_score_ok_remove_input_file(cls, language, expected): | |
# import shutil | |
# cls.maxDiff = None | |
# | |
# set_seed() | |
# path = EVENTS_FOLDER / f"test_{language}_easy.wav" | |
# path2 = EVENTS_FOLDER / f"test2_{language}_easy.wav" | |
# shutil.copy(path, path2) | |
# assert path2.exists() and path2.is_file() | |
# output = lambdaSpeechToScore.get_speech_to_score_dict( | |
# real_text=text_dict[language], | |
# file_bytes_or_audiotmpfile=str(path2), | |
# language=language, | |
# ) | |
# assert not path2.exists() | |
# assert_without_fields(cls, output, expected) | |
def assert_get_speech_to_score_tuple_ok(cls, language, expected_num_words, remove_random_file: bool): | |
import glob | |
import shutil | |
set_seed() | |
path = EVENTS_FOLDER / f"test_{language}_easy.wav" | |
path2 = EVENTS_FOLDER / f"test2_{language}_easy.wav" | |
if remove_random_file: | |
shutil.copy(path, path2) | |
path = path2 | |
( | |
real_transcripts, | |
is_letter_correct_all_words, | |
pronunciation_accuracy, | |
ipa_transcript, | |
real_transcripts_ipa, | |
num_words, | |
first_audio_file, | |
dumped, | |
random_file_name | |
) = lambdaSpeechToScore.get_speech_to_score_tuple( | |
real_text=text_dict[language], | |
file_bytes_or_audiotmpfile=str(path), | |
language=language, | |
remove_random_file=remove_random_file, | |
) | |
json_path = EVENTS_FOLDER / f"expected_get_speech_to_score_tuple_ok_{language}.json" | |
app_logger.info(f"json_path:{json_path} .") | |
# with open(json_path, "w") as dst: | |
# json_loaded = {k:v for k, v in json.loads(dumped).items() if k != "random_file_name"} | |
# expected = { | |
# "real_transcripts": real_transcripts, | |
# "is_letter_correct_all_words": is_letter_correct_all_words, | |
# "pronunciation_accuracy": pronunciation_accuracy, | |
# "ipa_transcript": ipa_transcript, | |
# "real_transcripts_ipa": real_transcripts_ipa, | |
# "num_words": num_words, | |
# "dumped": json_loaded | |
# } | |
# json.dump(expected, dst) | |
assert real_transcripts == text_dict[language] | |
with open(json_path, "r") as src: | |
expected_output = json.load(src) | |
assert is_letter_correct_all_words.strip() == expected_output["is_letter_correct_all_words"].strip() | |
assert pronunciation_accuracy == expected_output["pronunciation_accuracy"] | |
assert ipa_transcript.strip() == expected_output["ipa_transcript"].strip() | |
assert real_transcripts_ipa.strip() == expected_output["real_transcripts_ipa"] | |
assert num_words == expected_num_words | |
first_audio_file_path = Path(first_audio_file) | |
assert first_audio_file_path.exists() and first_audio_file_path.is_file() | |
if not remove_random_file: | |
assert Path(random_file_name).is_file() | |
json_loaded = json.loads(dumped) | |
check_output_with_splitted_audio_files(cls, json_loaded, expected_output["dumped"], check_audio_files=True) | |
# workaround to remove the tmp audio files created during these assertion tests | |
files_to_remove_list = str(EVENTS_FOLDER / "test2*") | |
if remove_random_file: | |
for file_to_del in glob.glob(files_to_remove_list): | |
assert Path(file_to_del).is_file() | |
Path(file_to_del).unlink() | |
def assert_get_selected_word_valid_index_ok(language): | |
import numpy as np | |
set_seed() | |
path = EVENTS_FOLDER / f"test_{language}_easy.wav" | |
tmp_audio_loaded, tmp_audio_samplerate = lambdaSpeechToScore.soundfile_load(path) | |
tmp_audio_loaded = np.asarray(tmp_audio_loaded) | |
duration_tot = tmp_audio_loaded.size / tmp_audio_samplerate | |
input_text = text_dict[language] | |
_, _, _, _, _, _, _, output_json, _ = lambdaSpeechToScore.get_speech_to_score_tuple( | |
input_text, str(path), language, False | |
) | |
idx_recorded_word = 2 | |
output_loaded = json.loads(output_json) | |
audio_file, word, duration = lambdaSpeechToScore.get_selected_word( | |
idx_recorded_word, output_json | |
) | |
try: | |
audio_file_path = Path(audio_file) | |
assert audio_file_path.exists() and audio_file_path.is_file() | |
assert isinstance(duration, float) | |
assert 0 < duration < duration_tot | |
# assert duration == expected_end_time | |
words_list = text_dict[language].split() | |
assert word == words_list[idx_recorded_word] | |
for file_to_del in output_loaded["audio_files"]: | |
Path(file_to_del).unlink() | |
except Exception as ex: | |
app_logger.error(f"ex:{ex}.") | |
raise ex | |
def assert_get_selected_word_invalid_index(cls, language): | |
import glob | |
set_seed() | |
path = EVENTS_FOLDER / f"test_{language}_easy.wav" | |
_, _, _, _, _, _, _, output_json, _ = lambdaSpeechToScore.get_speech_to_score_tuple( | |
text_dict[language], str(path), language, False | |
) | |
with cls.assertRaises(IndexError): | |
try: | |
lambdaSpeechToScore.get_selected_word(120, output_json) | |
except IndexError as ie: | |
msg = str(ie) | |
assert msg == "list index out of range" | |
raise ie | |
for file_to_del in glob.glob(f"{EVENTS_FOLDER / path.stem}_*"): | |
assert Path(file_to_del).is_file() | |
Path(file_to_del).unlink() | |
def helper_get_accuracy_from_recorded_audio(cls, source, use_dtw: bool = None): | |
with open(EVENTS_FOLDER / "GetAccuracyFromRecordedAudio.json", "r") as src: | |
inputs_outputs = json.load(src) | |
inputs = inputs_outputs["inputs"] | |
for language, event_content in inputs.items(): | |
set_seed() | |
event_content_loaded = json.loads(event_content["body"]) | |
if use_dtw is not None: | |
event_content_loaded["useDTW"] = use_dtw | |
else: | |
del event_content_loaded["useDTW"] | |
event_content["body"] = json.dumps(event_content_loaded) | |
output = lambdaSpeechToScore.lambda_handler(event_content, {}) | |
output = {k:v for k, v in json.loads(output).items() if k != "random_file_name"} | |
json_path = EVENTS_FOLDER / f"expected_get_accuracy_from_recorded_audio_{language}_{source}_dtw{use_dtw}.json" | |
app_logger.info(f"json_path:{json_path} .") | |
# with open(json_path, "w") as dst: | |
# json.dump(output, dst) | |
with open(json_path, "r") as src: | |
current_expected_output_loaded = json.load(src) | |
try: | |
cls.assertDictEqual(output, current_expected_output_loaded) | |
except Exception as ex: | |
app_logger.error(f"# output, source:{source}, use_dtw:{use_dtw}, language:{language}.") | |
app_logger.error(output) | |
app_logger.error(f"ex.args:{ex.args} .") | |
app_logger.error(f"ex:{ex} .") | |
raise ex | |
class TestGetAccuracyFromRecordedAudio(unittest.TestCase): | |
def setUp(self): | |
if platform.system() == "Windows" or platform.system() == "Win32": | |
os.environ["PYTHONUTF8"] = "1" | |
os.environ["IS_TESTING"] = "TRUE" | |
def tearDown(self): | |
if platform.system() == "Windows" or platform.system() == "Win32" and "PYTHONUTF8" in os.environ: | |
del os.environ["PYTHONUTF8"] | |
del os.environ["IS_TESTING"] | |
def test_GetAccuracyFromRecordedAudio_dtw_none(self): | |
self.maxDiff = None | |
try: | |
helper_get_accuracy_from_recorded_audio(self, "cmd", None) | |
except AssertionError: | |
helper_get_accuracy_from_recorded_audio(self, "gui", None) | |
def test_GetAccuracyFromRecordedAudio_dtw_false(self): | |
self.maxDiff = None | |
try: | |
helper_get_accuracy_from_recorded_audio(self, "cmd", False) | |
except AssertionError: | |
helper_get_accuracy_from_recorded_audio(self, "gui", False) | |
def test_GetAccuracyFromRecordedAudio_dtw_true(self): | |
self.maxDiff = None | |
try: | |
helper_get_accuracy_from_recorded_audio(self, "cmd", True) | |
except AssertionError: | |
helper_get_accuracy_from_recorded_audio(self, "gui", True) | |
def test_lambda_handler_empty_text(self): | |
from flask import Response | |
with open(EVENTS_FOLDER / "GetAccuracyFromRecordedAudio.json", "r") as src: | |
inputs_outputs = json.load(src) | |
event = inputs_outputs["inputs"]["en"] | |
event_body = event["body"] | |
event_body_loaded = json.loads(event_body) | |
event_body_loaded["title"] = "" | |
event_body = json.dumps(event_body_loaded) | |
event["body"] = event_body | |
output = lambdaSpeechToScore.lambda_handler(event, {}) | |
self.assertIsInstance(output, Response) | |
self.assertDictEqual(output.json, {}) | |
def test_get_speech_to_score_en_ok(self): | |
assert_get_speech_to_score_ok(self, "en") | |
def test_get_speech_to_score_de_ok1(self): | |
assert_get_speech_to_score_ok(self, "de") | |
# def test_get_speech_to_score_en_ok_remove_input_file(self): | |
# assert_get_speech_to_score_ok_remove_input_file( | |
# self, "en", expected_get_speech_to_score["en"] | |
# ) | |
# | |
# def test_get_speech_to_score_de_ok_remove_input_file(self): | |
# assert_get_speech_to_score_ok_remove_input_file( | |
# self, "de", expected_get_speech_to_score["de"] | |
# ) | |
def test_get_speech_to_score_tuple_de_ok_not_remove_random_file(self): | |
assert_get_speech_to_score_tuple_ok( | |
self, "de", expected_num_words=5, remove_random_file=False | |
) | |
def test_get_speech_to_score_tuple_en_ok_not_remove_random_file(self): | |
assert_get_speech_to_score_tuple_ok( | |
self, "en", expected_num_words=5, remove_random_file=False | |
) | |
def test_get_speech_to_score_tuple_de_ok_remove_random_file(self): | |
assert_get_speech_to_score_tuple_ok( | |
self, "de", expected_num_words=5, remove_random_file=True | |
) | |
def test_get_speech_to_score_tuple_en_ok_remove_random_file(self): | |
assert_get_speech_to_score_tuple_ok( | |
self, "en", expected_num_words=5, remove_random_file=True | |
) | |
def test_get_speech_to_score_dict__de_empty_input_text(self): | |
language = "de" | |
path = EVENTS_FOLDER / f"test_{language}_easy.wav" | |
assert_raises_get_speech_to_score_dict( | |
self, | |
"", | |
str(path), | |
language, | |
ValueError, | |
"cannot read an empty/None text: ''...", | |
) | |
def test_get_speech_to_score_dict__en_empty_input_text(self): | |
language = "en" | |
path = EVENTS_FOLDER / f"test_{language}_easy.wav" | |
assert_raises_get_speech_to_score_dict( | |
self, | |
"", | |
str(path), | |
language, | |
ValueError, | |
"cannot read an empty/None text: ''...", | |
) | |
def test_get_speech_to_score_dict__de_empty_input_file(self): | |
language = "de" | |
assert_raises_get_speech_to_score_dict( | |
self, | |
"text fake", | |
"", | |
language, | |
OSError, | |
"cannot read an empty/None file: ''...", | |
) | |
def test_get_speech_to_score_dict__en_empty_input_file(self): | |
language = "en" | |
assert_raises_get_speech_to_score_dict( | |
self, | |
"text fake", | |
"", | |
language, | |
OSError, | |
"cannot read an empty/None file: ''...", | |
) | |
def test_get_speech_to_score_dict__empty_language1(self): | |
assert_raises_get_speech_to_score_dict( | |
self, | |
"text fake", | |
"fake_file", | |
"", | |
NotImplementedError, | |
"Not tested/supported with '' language...", | |
) | |
def test_get_speech_to_score_dict__empty_language2(self): | |
language = "en" | |
path_file = str(EVENTS_FOLDER / "empty_file.wav") | |
assert_raises_get_speech_to_score_dict( | |
self, | |
"text fake", | |
path_file, | |
language, | |
OSError, | |
f"cannot read an empty file: '{path_file}'...", | |
) | |
def test_get_selected_word_valid_index_de_ok(self): | |
assert_get_selected_word_valid_index_ok("de") | |
def test_get_selected_word_valid_index_en_ok(self): | |
assert_get_selected_word_valid_index_ok("en") | |
def test_get_selected_word_invalid_index_de(self): | |
assert_get_selected_word_invalid_index(self, "de") | |
def test_get_selected_word_invalid_index_en(self): | |
assert_get_selected_word_invalid_index(self, "en") | |
def test_get_selected_word_empty_transcripts(self): | |
raw_json_output = json.dumps( | |
{"audio_files": [], "real_transcripts": "", "audio_durations": []} | |
) | |
idx_recorded_word = 0 | |
with self.assertRaises(IndexError): | |
lambdaSpeechToScore.get_selected_word(idx_recorded_word, raw_json_output) | |
def test_get_splitted_audio_file_valid_input(self): | |
language = "en" | |
path = str(EVENTS_FOLDER / f"test_{language}_hard.wav") | |
start_time = [0.0, 1.0, 2.0] | |
end_time = [1.0, 2.0, 2.5] | |
audio_files, audio_durations = lambdaSpeechToScore.get_splitted_audio_file( | |
audiotmpfile=path, start_time=start_time, end_time=end_time | |
) | |
assert len(audio_files) == len(start_time) | |
assert len(audio_durations) == len(start_time) | |
for audio_file, duration in zip(audio_files, audio_durations): | |
audio_file_path = Path(audio_file) | |
assert audio_file_path.exists() and audio_file_path.is_file() | |
assert duration > 0 | |
audio_file_path.unlink() | |
def test_get_splitted_audio_file_invalid_input(self): | |
from soundfile import LibsndfileError | |
start_time = [0.0, 1.0, 2.0] | |
end_time = [1.0, 2.0, 2.5] | |
with self.assertRaises(LibsndfileError): | |
try: | |
lambdaSpeechToScore.get_splitted_audio_file( | |
audiotmpfile="", start_time=start_time, end_time=end_time | |
) | |
except LibsndfileError as lsfe: | |
msg = str(lsfe) | |
assert msg == "Error opening '': System error." | |
raise lsfe | |
def test_get_splitted_audio_file_mismatched_times(self): | |
from soundfile import LibsndfileError | |
language = "en" | |
path = EVENTS_FOLDER / f"test_{language}_easy.wav" | |
start_time = [4.0, 5.0, 7.0] | |
end_time = [3.0, 4.0, 5.5] | |
with self.assertRaises(LibsndfileError): | |
try: | |
lambdaSpeechToScore.get_splitted_audio_file( | |
audiotmpfile=str(path), start_time=start_time, end_time=end_time | |
) | |
except LibsndfileError as lsfe: | |
msg = str(lsfe) | |
assert msg == "Internal psf_fseek() failed." | |
raise lsfe | |
if __name__ == "__main__": | |
unittest.main() | |