Spaces:
Running
Running
# app.py (MP3-robust loader + Luganda FKD commented; minimal feedback) | |
import os | |
import json | |
import time | |
import uuid | |
import logging | |
import shutil | |
import subprocess | |
import tempfile | |
import gradio as gr | |
from transformers import pipeline | |
import numpy as np | |
import soundfile as sf # librosa depends on this; good for wav/flac/ogg | |
import librosa # fallback / resampling | |
# Optional: modest thread hints for CPU Spaces | |
try: | |
import torch | |
torch.set_num_threads(2) | |
torch.set_num_interop_threads(1) | |
except Exception: | |
pass | |
# Basic logging so we can verify which model is loaded per inference | |
logging.basicConfig(level=logging.INFO) | |
# --- External logging: push to a HF Dataset repo on each submit (no local storage) --- | |
from datasets import Dataset, Features, Value, Audio, load_dataset | |
from huggingface_hub import HfApi | |
# -------- CONFIG: Hub dataset target (no persistent storage needed) -------- | |
HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "DarliAI/asr-feedback-logs") | |
HF_TOKEN = os.environ.get("HF_TOKEN") | |
PUSH_TO_HF = bool(HF_TOKEN and HF_DATASET_REPO) | |
HF_FEATURES = Features({ | |
"timestamp": Value("string"), | |
"session_id": Value("string"), | |
"language_display": Value("string"), | |
"model_id": Value("string"), | |
"model_revision": Value("string"), | |
"audio": Audio(sampling_rate=None), # uploaded only if user consents | |
"audio_duration_s": Value("float32"), | |
"sample_rate": Value("int32"), | |
"source": Value("string"), | |
"decode_params": Value("string"), | |
"transcript_hyp": Value("string"), | |
"corrected_text": Value("string"), | |
"latency_ms": Value("int32"), | |
"rtf": Value("float32"), | |
"score_out_of_10": Value("int32"), | |
"share_publicly": Value("bool"), | |
}) | |
def _push_row_to_hf_dataset(row, audio_file_path): | |
""" | |
Append a single example to the HF dataset repo (train split). | |
If user didn't consent or no audio path, 'audio' field is None. | |
Uses the modern datasets library approach with proper appending. | |
""" | |
if not PUSH_TO_HF: | |
return "HF push disabled (missing HF_TOKEN or repo)." | |
example = dict(row) | |
# Audio: only include if user consented and file exists | |
example["audio"] = audio_file_path if (audio_file_path and os.path.isfile(audio_file_path)) else None | |
# Normalize types | |
def _to_int(v): | |
try: | |
return int(v) | |
except Exception: | |
return None | |
def _to_float(v): | |
try: | |
return float(v) | |
except Exception: | |
return None | |
for k in ["latency_ms", "score_out_of_10", "sample_rate"]: | |
example[k] = _to_int(example.get(k)) | |
for k in ["rtf", "audio_duration_s"]: | |
example[k] = _to_float(example.get(k)) | |
# Create a dataset with single row | |
ds_new = Dataset.from_list([example], features=HF_FEATURES) | |
try: | |
# Try to load existing dataset and append | |
try: | |
# Load existing dataset | |
ds_existing = load_dataset( | |
HF_DATASET_REPO, | |
split="train", | |
token=HF_TOKEN, | |
download_mode="force_redownload" # Ensure we get the latest version | |
) | |
# Concatenate with new data | |
ds_combined = ds_existing.add_item(example) | |
# Push the combined dataset | |
ds_combined.push_to_hub( | |
HF_DATASET_REPO, | |
split="train", | |
private=True, | |
token=HF_TOKEN, | |
commit_message=f"Append feedback row at {example['timestamp']}" | |
) | |
return "Successfully appended to existing HF Dataset." | |
except Exception as e: | |
# If dataset doesn't exist or error loading, create new | |
if "404" in str(e) or "doesn't exist" in str(e) or "EmptyDatasetError" in str(e): | |
# Dataset doesn't exist, create it | |
ds_new.push_to_hub( | |
HF_DATASET_REPO, | |
split="train", | |
private=True, | |
token=HF_TOKEN, | |
commit_message="Initialize dataset with first feedback row" | |
) | |
return "Created new HF Dataset with first row." | |
else: | |
# Try alternative approach: push with create_pr=True to avoid conflicts | |
ds_new.push_to_hub( | |
HF_DATASET_REPO, | |
split="train", | |
private=True, | |
token=HF_TOKEN, | |
commit_message=f"Append feedback row at {example['timestamp']}", | |
create_pr=True # Create a PR to avoid conflicts | |
) | |
return "Pushed to HF Dataset via PR (will auto-merge)." | |
except Exception as e: | |
logging.error(f"Failed to push to HF Dataset: {e}") | |
# Final fallback: try using HfApi to check if repo exists | |
try: | |
api = HfApi() | |
api.dataset_info(HF_DATASET_REPO, token=HF_TOKEN) | |
# Repo exists, try one more time with force push | |
ds_new.push_to_hub( | |
HF_DATASET_REPO, | |
split=f"train_{int(time.time())}", # Use unique split name as last resort | |
private=True, | |
token=HF_TOKEN, | |
commit_message=f"Append feedback row at {example['timestamp']}" | |
) | |
return f"Pushed to HF Dataset with unique split." | |
except Exception as final_error: | |
return f"Failed to push to HF Dataset: {final_error}" | |
# --- Map display names to your HF Hub model IDs --- | |
language_models = { | |
"Akan (Asante Twi)": "FarmerlineML/w2v-bert-2.0_twi_alpha_v1", | |
"Ewe": "FarmerlineML/w2v-bert-2.0_ewe_2", | |
"Kiswahili": "FarmerlineML/w2v-bert-2.0_swahili_alpha", | |
"Luganda": "FarmerlineML/w2v-bert-2.0_luganda", # active | |
# "Luganda (FKD)": "FarmerlineML/luganda_fkd", # commented out per request | |
"Brazilian Portuguese": "FarmerlineML/w2v-bert-2.0_brazilian_portugese_alpha", | |
"Fante": "misterkissi/w2v2-lg-xls-r-300m-fante", | |
"Bemba": "DarliAI/kissi-w2v2-lg-xls-r-300m-bemba", | |
"Bambara": "DarliAI/kissi-w2v2-lg-xls-r-300m-bambara", | |
"Dagaare": "DarliAI/kissi-w2v2-lg-xls-r-300m-dagaare", | |
"Kinyarwanda": "misterkissi/w2v2-lg-xls-r-300m-kinyarwanda-v2", | |
"Fula": "DarliAI/kissi-wav2vec2-fula-fleurs-full", | |
"Oromo": "DarliAI/kissi-w2v-bert-2.0-oromo", | |
"Runynakore": "misterkissi/w2v2-lg-xls-r-300m-runyankore", | |
"Ga": "misterkissi/w2v2-lg-xls-r-300m-ga", | |
"Vai": "misterkissi/whisper-small-vai", | |
"Kasem": "misterkissi/w2v2-lg-xls-r-300m-kasem", | |
"Lingala": "misterkissi/w2v2-lg-xls-r-300m-lingala", | |
"Fongbe": "misterkissi/whisper-small-fongbe", | |
"Amharic": "misterkissi/w2v2-lg-xls-r-1b-amharic", | |
"Xhosa": "misterkissi/w2v2-lg-xls-r-300m-xhosa", | |
"Tsonga": "misterkissi/w2v2-lg-xls-r-300m-tsonga", | |
# "WOLOF": "misterkissi/w2v2-lg-xls-r-1b-wolof", | |
# "HAITIAN CREOLE": "misterkissi/whisper-small-haitian-creole", | |
# "KABYLE": "misterkissi/w2v2-lg-xls-r-1b-kabyle", | |
"Yoruba": "FarmerlineML/w2v-bert-2.0_yoruba_v1", | |
"Luo": "FarmerlineML/w2v-bert-2.0_luo_v2", | |
"Somali": "FarmerlineML/w2v-bert-2.0_somali_alpha", | |
"Pidgin": "FarmerlineML/pidgin_nigerian", | |
"Kikuyu": "FarmerlineML/w2v-bert-2.0_kikuyu", | |
"Igbo": "FarmerlineML/w2v-bert-2.0_igbo_v1", | |
"Krio": "FarmerlineML/w2v-bert-2.0_krio_v3", | |
"Dyula": "FarmerlineML/w2v-bert-2.0_dyula", | |
"Kamba": "FarmerlineML/w2v-bert-2.0_kamba" | |
} | |
# -------- Robust audio loader (handles MP3/M4A via ffmpeg; wav/flac via soundfile) -------- | |
TARGET_SR = 16000 | |
def _has_ffmpeg(): | |
return shutil.which("ffmpeg") is not None | |
def _load_with_soundfile(path): | |
data, sr = sf.read(path, always_2d=False) | |
if isinstance(data, np.ndarray) and data.ndim > 1: | |
data = data.mean(axis=1) | |
return data.astype(np.float32), sr | |
def _load_with_ffmpeg(path, target_sr=TARGET_SR): | |
# Convert to mono 16k wav in a temp file using ffmpeg | |
if not _has_ffmpeg(): | |
raise RuntimeError("ffmpeg not available") | |
tmp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) | |
tmp_wav.close() | |
cmd = [ | |
"ffmpeg", "-hide_banner", "-loglevel", "error", | |
"-y", "-i", path, | |
"-ac", "1", "-ar", str(target_sr), | |
tmp_wav.name, | |
] | |
subprocess.run(cmd, check=True) | |
data, sr = sf.read(tmp_wav.name, always_2d=False) | |
try: | |
os.remove(tmp_wav.name) | |
except Exception: | |
pass | |
if isinstance(data, np.ndarray) and data.ndim > 1: | |
data = data.mean(axis=1) | |
return data.astype(np.float32), sr | |
def _resample_if_needed(y, sr, target_sr=TARGET_SR): | |
if sr == target_sr: | |
return y.astype(np.float32), sr | |
y_rs = librosa.resample(y.astype(np.float32), orig_sr=sr, target_sr=target_sr) | |
return y_rs.astype(np.float32), target_sr | |
def load_audio_any(path, target_sr=TARGET_SR): | |
"""Robust loader: wav/flac/ogg via soundfile; mp3/m4a via ffmpeg; fallback to librosa.""" | |
ext = os.path.splitext(path)[1].lower() | |
try: | |
if ext in {".wav", ".flac", ".ogg", ".opus"}: | |
y, sr = _load_with_soundfile(path) | |
elif _has_ffmpeg(): | |
y, sr = _load_with_ffmpeg(path, target_sr=target_sr) | |
return y, sr # already mono+16k | |
else: | |
# Fallback to librosa for formats like mp3/m4a when ffmpeg isn't present | |
y, sr = librosa.load(path, sr=None, mono=True) | |
y, sr = _resample_if_needed(y, sr, target_sr) | |
return y, sr | |
except Exception as e: | |
logging.warning(f"[AUDIO] Primary load failed for {path} ({e}). Falling back to librosa.") | |
y, sr = librosa.load(path, sr=target_sr, mono=True) | |
return y.astype(np.float32), sr | |
# -------- Lazy-load pipeline cache (Space-safe) -------- | |
_PIPELINE_CACHE = {} | |
_CACHE_ORDER = [] # usage order | |
_CACHE_MAX_SIZE = 3 # tune for RAM | |
def _touch_cache(key): | |
if key in _CACHE_ORDER: | |
_CACHE_ORDER.remove(key) | |
_CACHE_ORDER.insert(0, key) | |
def _evict_if_needed(): | |
while len(_PIPELINE_CACHE) > _CACHE_MAX_SIZE: | |
oldest = _CACHE_ORDER.pop() | |
try: | |
del _PIPELINE_CACHE[oldest] | |
except KeyError: | |
pass | |
def get_asr_pipeline(language_display: str): | |
if language_display not in language_models: | |
raise ValueError(f"Unknown language selection: {language_display}") | |
if language_display in _PIPELINE_CACHE: | |
_touch_cache(language_display) | |
return _PIPELINE_CACHE[language_display] | |
model_id = language_models[language_display] | |
logging.info(f"[ASR] Loading pipeline for '{language_display}' -> {model_id}") | |
pipe = pipeline( | |
task="automatic-speech-recognition", | |
model=model_id, | |
device=-1, # CPU on Spaces (explicit) | |
chunk_length_s=30 | |
) | |
_PIPELINE_CACHE[language_display] = pipe | |
_touch_cache(language_display) | |
_evict_if_needed() | |
return pipe | |
# -------- Helpers -------- | |
def _model_revision_from_pipeline(pipe) -> str: | |
# Best-effort capture of revision/hash for reproducibility | |
for attr in ("hub_revision", "revision", "_commit_hash"): | |
val = getattr(getattr(pipe, "model", None), attr, None) | |
if val: | |
return str(val) | |
try: | |
return str(getattr(pipe.model.config, "_name_or_path", "unknown")) | |
except Exception: | |
return "unknown" | |
# -------- Inference -------- | |
def transcribe(audio_path: str, language: str): | |
""" | |
Robust audio load (mp3/m4a friendly), resample to 16 kHz mono, | |
then run it through the chosen ASR pipeline. | |
Returns transcript and a meta dict for feedback. | |
""" | |
if not audio_path: | |
return "β οΈ Please upload or record an audio clip.", None | |
speech, sr = load_audio_any(audio_path, target_sr=TARGET_SR) | |
duration_s = float(len(speech) / float(sr)) | |
pipe = get_asr_pipeline(language) | |
decode_params = {"chunk_length_s": getattr(pipe, "chunk_length_s", 30)} | |
t0 = time.time() | |
result = pipe({"sampling_rate": sr, "raw": speech}) | |
latency_ms = int((time.time() - t0) * 1000.0) | |
hyp_text = result.get("text", "") | |
rtf = (latency_ms / 1000.0) / max(duration_s, 1e-9) | |
meta = { | |
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), | |
"session_id": f"anon-{uuid.uuid4()}", | |
"language_display": language, | |
"model_id": language_models.get(language, "unknown"), | |
"model_revision": _model_revision_from_pipeline(pipe), | |
"audio_duration_s": duration_s, | |
"sample_rate": sr, | |
"source": "upload", | |
"decode_params": json.dumps(decode_params), | |
"transcript_hyp": hyp_text, | |
"latency_ms": latency_ms, | |
"rtf": rtf, | |
} | |
return hyp_text, meta | |
# -------- Feedback submit (minimal) -------- | |
def submit_feedback(meta, corrected_text, score, store_audio, share_publicly, audio_file_path): | |
""" | |
Push a minimal row to HF Dataset: model info, language, transcript, optional corrected text, score. | |
""" | |
if not meta: | |
return {"status": "No transcription metadata available. Please transcribe first."} | |
row = dict(meta) | |
row.update({ | |
"corrected_text": (corrected_text or "").strip(), | |
"score_out_of_10": int(score) if score is not None else None, | |
"share_publicly": bool(share_publicly), | |
}) | |
try: | |
audio_to_push = audio_file_path if store_audio else None | |
hf_status = _push_row_to_hf_dataset(row, audio_to_push) | |
status = f"Feedback saved. {hf_status}" | |
except Exception as e: | |
status = f"Failed to push to HF Dataset: {e}" | |
logging.error(f"Push error: {e}") | |
return { | |
"status": status, | |
"latency_ms": row["latency_ms"], | |
"rtf": row["rtf"], | |
"model_id": row["model_id"], | |
"model_revision": row["model_revision"], | |
"language": row["language_display"], | |
} | |
# -------- UI (original preserved; additions appended) -------- | |
with gr.Blocks(title="π Multilingual ASR Demo") as demo: | |
gr.Markdown( | |
""" | |
## ποΈ Multilingual Speech-to-Text | |
Upload an audio file (MP3, WAV, FLAC, M4A, OGG,β¦) or record via your microphone. | |
Then choose the language/model and hit **Transcribe**. | |
""" | |
) | |
with gr.Row(): | |
lang = gr.Dropdown( | |
choices=list(language_models.keys()), | |
value=list(language_models.keys())[0], | |
label="Select Language / Model" | |
) | |
with gr.Row(): | |
audio = gr.Audio( | |
sources=["upload", "microphone"], | |
type="filepath", | |
label="Upload or Record Audio" | |
) | |
btn = gr.Button("Transcribe") | |
output = gr.Textbox(label="Transcription") | |
# Hidden state to carry metadata from transcribe -> feedback | |
meta_state = gr.State(value=None) | |
# Keep original behavior: output shows transcript | |
# Also capture meta into the hidden state | |
def _transcribe_and_store(audio_path, language): | |
hyp, meta = transcribe(audio_path, language) | |
# Pre-fill corrected with hypothesis for easy edits | |
return hyp, meta, hyp | |
# --- Minimal Evaluation (score + optional corrected text) --- | |
with gr.Accordion("Evaluation", open=False): | |
with gr.Row(): | |
corrected_tb = gr.Textbox(label="Corrected transcript (optional)", lines=4, value="") | |
with gr.Row(): | |
score_slider = gr.Slider(minimum=0, maximum=10, step=1, label="Score out of 10", value=7) | |
with gr.Row(): | |
store_audio_cb = gr.Checkbox(label="Allow storing my audio for research/eval", value=False) | |
share_cb = gr.Checkbox(label="Allow sharing this example publicly", value=False) | |
submit_btn = gr.Button("Submit") | |
results_json = gr.JSON(label="Status") | |
# Wire events | |
btn.click( | |
fn=_transcribe_and_store, | |
inputs=[audio, lang], | |
outputs=[output, meta_state, corrected_tb] | |
) | |
submit_btn.click( | |
fn=submit_feedback, | |
inputs=[ | |
meta_state, | |
corrected_tb, | |
score_slider, | |
store_audio_cb, | |
share_cb, | |
audio # raw file path from gr.Audio | |
], | |
outputs=results_json | |
) | |
# Keep Spaces stable under load | |
if __name__ == "__main__": | |
demo.queue() | |
demo.launch() |