Spaces:
Runtime error
Runtime error
import subprocess | |
import numpy as np | |
import requests | |
import json | |
from typing import Dict, List | |
import random | |
import torch | |
from joblib import Parallel, delayed | |
import os | |
def random_runner(target_prob, size): | |
indice = random.choices(range(0, size[1]), k=size[0]) | |
value = target_prob[range(len(indice)), indice].sum().detach().numpy().item() | |
return indice, value | |
def query(data, model_id, api_token) -> Dict: | |
""" | |
Helper function to query text from audio file by huggingface api inference. | |
""" | |
headers = {"Authorization": f"Bearer {api_token}"} | |
api_url = f"https://api-inference.huggingface.co/models/{model_id}" | |
response = requests.request("POST", api_url, headers=headers, data=data) | |
return json.loads(response.content.decode("utf-8")) | |
def query_process(filename, model_id, api_token) -> Dict: | |
""" | |
Helper function to query text from audio file by huggingface api inference. | |
""" | |
headers = {"Authorization": f"Bearer {api_token}"} | |
api_url = f"https://api-inference.huggingface.co/models/{model_id}" | |
with open(filename, "rb") as f: | |
data = f.read() | |
response = requests.request("POST", api_url, headers=headers, data=data) | |
return json.loads(response.content.decode("utf-8")) | |
def query_dummy(raw_data, processor, model): | |
inputs = processor(raw_data, sampling_rate=16000, return_tensors="pt") | |
with torch.no_grad(): | |
logits = model(inputs.input_values, attention_mask=inputs.attention_mask).logits | |
predicted_ids = torch.argmax(logits, dim=-1) | |
transcription = processor.batch_decode(predicted_ids) | |
return transcription[0] | |
def query_raw(raw_data, word, processor, processor_with_lm, model, temperature=15) -> List: | |
""" | |
Helper function to query draw file to huggingface api inference. | |
""" | |
input_values = processor(raw_data, sampling_rate=16000, return_tensors="pt").input_values | |
with torch.no_grad(): | |
logits = model(input_values).logits | |
predicted_ids = torch.argmax(logits, dim=-1) | |
top1_prediction = processor_with_lm.decode(logits[0].cpu().numpy())['text'] | |
if word != top1_prediction.replace(" ", ""): | |
pad_token_id = processor.tokenizer.pad_token_id | |
word_delimiter_token_id = processor.tokenizer.word_delimiter_token_id | |
value_top5, ind_top5 = torch.topk(logits, 3) | |
target_index = ind_top5[(predicted_ids != word_delimiter_token_id) & (predicted_ids != pad_token_id)] | |
target_prob = value_top5[(predicted_ids != word_delimiter_token_id) & (predicted_ids != pad_token_id)] | |
size = target_index.size() | |
trial = size[1]**4//2 | |
prediction_list = Parallel(n_jobs=1, backend="multiprocessing")( | |
delayed(random_runner)(target_prob, size) for _ in range(trial) | |
) | |
target_dict = {i[1]: i[0] for i in prediction_list} | |
target_dict = sorted(target_dict.items(), reverse=True) | |
results = {} | |
for top_pred in target_dict[:temperature]: | |
indices = top_pred[1] | |
output_sentence = processor.decode(target_index[range(size[0]), indices]).lower() | |
results[output_sentence] = top_pred[0] | |
results = sorted(results.items(), key=lambda x: x[1], reverse=True) | |
return results | |
else: | |
return [(word, 100)] | |
def find_different(target, prediction): | |
# target_word = set(target) | |
# prediction_word = set(prediction) | |
# difference = target_word.symmetric_difference(prediction_word) | |
# wrong_words = [word for word in target_word if word in list(difference)] | |
if len(target) != len(prediction): | |
target = target[:len(prediction)] | |
wrong_words = [str(1) if target[index] != prediction[index] else str(0) for index in range(len(target))] | |
return "".join(wrong_words) | |
def ffmpeg_read(bpayload: bytes, sampling_rate: int) -> np.array: | |
""" | |
Helper function to read an audio file through ffmpeg. | |
""" | |
ar = f"{sampling_rate}" | |
ac = "1" | |
format_for_conversion = "f32le" | |
ffmpeg_command = [ | |
"ffmpeg", | |
"-i", | |
"pipe:0", | |
"-ac", | |
ac, | |
"-ar", | |
ar, | |
"-f", | |
format_for_conversion, | |
"-hide_banner", | |
"-loglevel", | |
"quiet", | |
"pipe:1", | |
] | |
try: | |
ffmpeg_process = subprocess.Popen(ffmpeg_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE) | |
except FileNotFoundError: | |
raise ValueError("ffmpeg was not found but is required to load audio files from filename") | |
output_stream = ffmpeg_process.communicate(bpayload) | |
out_bytes = output_stream[0] | |
audio = np.frombuffer(out_bytes, np.float32) | |
# if audio.shape[0] == 0: | |
# raise ValueError("Malformed soundfile") | |
return audio | |
def get_model_size(model): | |
torch.save(model.state_dict(), 'temp_saved_model.pt') | |
model_size_in_mb = os.path.getsize('temp_saved_model.pt') >> 20 | |
os.remove('temp_saved_model.pt') | |
return model_size_in_mb | |