# app.py (MP3-robust loader + Luganda FKD commented; minimal feedback) import os import json import time import uuid import logging import shutil import subprocess import tempfile import gradio as gr from transformers import pipeline import numpy as np import soundfile as sf # librosa depends on this; good for wav/flac/ogg import librosa # fallback / resampling # Optional: modest thread hints for CPU Spaces try: import torch torch.set_num_threads(2) torch.set_num_interop_threads(1) except Exception: pass # Basic logging so we can verify which model is loaded per inference logging.basicConfig(level=logging.INFO) # --- External logging: push to a HF Dataset repo on each submit (no local storage) --- from datasets import Dataset, Features, Value, Audio, load_dataset from huggingface_hub import HfApi # -------- CONFIG: Hub dataset target (no persistent storage needed) -------- HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "DarliAI/asr-feedback-logs") HF_TOKEN = os.environ.get("HF_TOKEN") PUSH_TO_HF = bool(HF_TOKEN and HF_DATASET_REPO) HF_FEATURES = Features({ "timestamp": Value("string"), "session_id": Value("string"), "language_display": Value("string"), "model_id": Value("string"), "model_revision": Value("string"), "audio": Audio(sampling_rate=None), # uploaded only if user consents "audio_duration_s": Value("float32"), "sample_rate": Value("int32"), "source": Value("string"), "decode_params": Value("string"), "transcript_hyp": Value("string"), "corrected_text": Value("string"), "latency_ms": Value("int32"), "rtf": Value("float32"), "score_out_of_10": Value("int32"), "share_publicly": Value("bool"), }) def _push_row_to_hf_dataset(row, audio_file_path): """ Append a single example to the HF dataset repo (train split). If user didn't consent or no audio path, 'audio' field is None. Uses the modern datasets library approach with proper appending. """ if not PUSH_TO_HF: return "HF push disabled (missing HF_TOKEN or repo)." example = dict(row) # Audio: only include if user consented and file exists example["audio"] = audio_file_path if (audio_file_path and os.path.isfile(audio_file_path)) else None # Normalize types def _to_int(v): try: return int(v) except Exception: return None def _to_float(v): try: return float(v) except Exception: return None for k in ["latency_ms", "score_out_of_10", "sample_rate"]: example[k] = _to_int(example.get(k)) for k in ["rtf", "audio_duration_s"]: example[k] = _to_float(example.get(k)) # Create a dataset with single row ds_new = Dataset.from_list([example], features=HF_FEATURES) try: # Try to load existing dataset and append try: # Load existing dataset ds_existing = load_dataset( HF_DATASET_REPO, split="train", token=HF_TOKEN, download_mode="force_redownload" # Ensure we get the latest version ) # Concatenate with new data ds_combined = ds_existing.add_item(example) # Push the combined dataset ds_combined.push_to_hub( HF_DATASET_REPO, split="train", private=True, token=HF_TOKEN, commit_message=f"Append feedback row at {example['timestamp']}" ) return "Successfully appended to existing HF Dataset." except Exception as e: # If dataset doesn't exist or error loading, create new if "404" in str(e) or "doesn't exist" in str(e) or "EmptyDatasetError" in str(e): # Dataset doesn't exist, create it ds_new.push_to_hub( HF_DATASET_REPO, split="train", private=True, token=HF_TOKEN, commit_message="Initialize dataset with first feedback row" ) return "Created new HF Dataset with first row." else: # Try alternative approach: push with create_pr=True to avoid conflicts ds_new.push_to_hub( HF_DATASET_REPO, split="train", private=True, token=HF_TOKEN, commit_message=f"Append feedback row at {example['timestamp']}", create_pr=True # Create a PR to avoid conflicts ) return "Pushed to HF Dataset via PR (will auto-merge)." except Exception as e: logging.error(f"Failed to push to HF Dataset: {e}") # Final fallback: try using HfApi to check if repo exists try: api = HfApi() api.dataset_info(HF_DATASET_REPO, token=HF_TOKEN) # Repo exists, try one more time with force push ds_new.push_to_hub( HF_DATASET_REPO, split=f"train_{int(time.time())}", # Use unique split name as last resort private=True, token=HF_TOKEN, commit_message=f"Append feedback row at {example['timestamp']}" ) return f"Pushed to HF Dataset with unique split." except Exception as final_error: return f"Failed to push to HF Dataset: {final_error}" # --- Map display names to your HF Hub model IDs --- language_models = { "Akan (Asante Twi)": "FarmerlineML/w2v-bert-2.0_twi_alpha_v1", "Ewe": "FarmerlineML/w2v-bert-2.0_ewe_2", "Kiswahili": "FarmerlineML/w2v-bert-2.0_swahili_alpha", "Luganda": "FarmerlineML/w2v-bert-2.0_luganda", # active # "Luganda (FKD)": "FarmerlineML/luganda_fkd", # commented out per request "Brazilian Portuguese": "FarmerlineML/w2v-bert-2.0_brazilian_portugese_alpha", "Fante": "misterkissi/w2v2-lg-xls-r-300m-fante", "Bemba": "DarliAI/kissi-w2v2-lg-xls-r-300m-bemba", "Bambara": "DarliAI/kissi-w2v2-lg-xls-r-300m-bambara", "Dagaare": "DarliAI/kissi-w2v2-lg-xls-r-300m-dagaare", "Kinyarwanda": "misterkissi/w2v2-lg-xls-r-300m-kinyarwanda-v2", "Fula": "DarliAI/kissi-wav2vec2-fula-fleurs-full", "Oromo": "DarliAI/kissi-w2v-bert-2.0-oromo", "Runynakore": "misterkissi/w2v2-lg-xls-r-300m-runyankore", "Ga": "misterkissi/w2v2-lg-xls-r-300m-ga", "Vai": "misterkissi/whisper-small-vai", "Kasem": "misterkissi/w2v2-lg-xls-r-300m-kasem", "Lingala": "misterkissi/w2v2-lg-xls-r-300m-lingala", "Fongbe": "misterkissi/whisper-small-fongbe", "Amharic": "misterkissi/w2v2-lg-xls-r-1b-amharic", "Xhosa": "misterkissi/w2v2-lg-xls-r-300m-xhosa", "Tsonga": "misterkissi/w2v2-lg-xls-r-300m-tsonga", # "WOLOF": "misterkissi/w2v2-lg-xls-r-1b-wolof", # "HAITIAN CREOLE": "misterkissi/whisper-small-haitian-creole", # "KABYLE": "misterkissi/w2v2-lg-xls-r-1b-kabyle", "Yoruba": "FarmerlineML/w2v-bert-2.0_yoruba_v1", "Luo": "FarmerlineML/w2v-bert-2.0_luo_v2", "Somali": "FarmerlineML/w2v-bert-2.0_somali_alpha", "Pidgin": "FarmerlineML/pidgin_nigerian", "Kikuyu": "FarmerlineML/w2v-bert-2.0_kikuyu", "Igbo": "FarmerlineML/w2v-bert-2.0_igbo_v1", "Krio": "FarmerlineML/w2v-bert-2.0_krio_v3", "Dyula": "FarmerlineML/w2v-bert-2.0_dyula", "Kamba": "FarmerlineML/w2v-bert-2.0_kamba" } # -------- Robust audio loader (handles MP3/M4A via ffmpeg; wav/flac via soundfile) -------- TARGET_SR = 16000 def _has_ffmpeg(): return shutil.which("ffmpeg") is not None def _load_with_soundfile(path): data, sr = sf.read(path, always_2d=False) if isinstance(data, np.ndarray) and data.ndim > 1: data = data.mean(axis=1) return data.astype(np.float32), sr def _load_with_ffmpeg(path, target_sr=TARGET_SR): # Convert to mono 16k wav in a temp file using ffmpeg if not _has_ffmpeg(): raise RuntimeError("ffmpeg not available") tmp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) tmp_wav.close() cmd = [ "ffmpeg", "-hide_banner", "-loglevel", "error", "-y", "-i", path, "-ac", "1", "-ar", str(target_sr), tmp_wav.name, ] subprocess.run(cmd, check=True) data, sr = sf.read(tmp_wav.name, always_2d=False) try: os.remove(tmp_wav.name) except Exception: pass if isinstance(data, np.ndarray) and data.ndim > 1: data = data.mean(axis=1) return data.astype(np.float32), sr def _resample_if_needed(y, sr, target_sr=TARGET_SR): if sr == target_sr: return y.astype(np.float32), sr y_rs = librosa.resample(y.astype(np.float32), orig_sr=sr, target_sr=target_sr) return y_rs.astype(np.float32), target_sr def load_audio_any(path, target_sr=TARGET_SR): """Robust loader: wav/flac/ogg via soundfile; mp3/m4a via ffmpeg; fallback to librosa.""" ext = os.path.splitext(path)[1].lower() try: if ext in {".wav", ".flac", ".ogg", ".opus"}: y, sr = _load_with_soundfile(path) elif _has_ffmpeg(): y, sr = _load_with_ffmpeg(path, target_sr=target_sr) return y, sr # already mono+16k else: # Fallback to librosa for formats like mp3/m4a when ffmpeg isn't present y, sr = librosa.load(path, sr=None, mono=True) y, sr = _resample_if_needed(y, sr, target_sr) return y, sr except Exception as e: logging.warning(f"[AUDIO] Primary load failed for {path} ({e}). Falling back to librosa.") y, sr = librosa.load(path, sr=target_sr, mono=True) return y.astype(np.float32), sr # -------- Lazy-load pipeline cache (Space-safe) -------- _PIPELINE_CACHE = {} _CACHE_ORDER = [] # usage order _CACHE_MAX_SIZE = 3 # tune for RAM def _touch_cache(key): if key in _CACHE_ORDER: _CACHE_ORDER.remove(key) _CACHE_ORDER.insert(0, key) def _evict_if_needed(): while len(_PIPELINE_CACHE) > _CACHE_MAX_SIZE: oldest = _CACHE_ORDER.pop() try: del _PIPELINE_CACHE[oldest] except KeyError: pass def get_asr_pipeline(language_display: str): if language_display not in language_models: raise ValueError(f"Unknown language selection: {language_display}") if language_display in _PIPELINE_CACHE: _touch_cache(language_display) return _PIPELINE_CACHE[language_display] model_id = language_models[language_display] logging.info(f"[ASR] Loading pipeline for '{language_display}' -> {model_id}") pipe = pipeline( task="automatic-speech-recognition", model=model_id, device=-1, # CPU on Spaces (explicit) chunk_length_s=30 ) _PIPELINE_CACHE[language_display] = pipe _touch_cache(language_display) _evict_if_needed() return pipe # -------- Helpers -------- def _model_revision_from_pipeline(pipe) -> str: # Best-effort capture of revision/hash for reproducibility for attr in ("hub_revision", "revision", "_commit_hash"): val = getattr(getattr(pipe, "model", None), attr, None) if val: return str(val) try: return str(getattr(pipe.model.config, "_name_or_path", "unknown")) except Exception: return "unknown" # -------- Inference -------- def transcribe(audio_path: str, language: str): """ Robust audio load (mp3/m4a friendly), resample to 16 kHz mono, then run it through the chosen ASR pipeline. Returns transcript and a meta dict for feedback. """ if not audio_path: return "⚠️ Please upload or record an audio clip.", None speech, sr = load_audio_any(audio_path, target_sr=TARGET_SR) duration_s = float(len(speech) / float(sr)) pipe = get_asr_pipeline(language) decode_params = {"chunk_length_s": getattr(pipe, "chunk_length_s", 30)} t0 = time.time() result = pipe({"sampling_rate": sr, "raw": speech}) latency_ms = int((time.time() - t0) * 1000.0) hyp_text = result.get("text", "") rtf = (latency_ms / 1000.0) / max(duration_s, 1e-9) meta = { "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "session_id": f"anon-{uuid.uuid4()}", "language_display": language, "model_id": language_models.get(language, "unknown"), "model_revision": _model_revision_from_pipeline(pipe), "audio_duration_s": duration_s, "sample_rate": sr, "source": "upload", "decode_params": json.dumps(decode_params), "transcript_hyp": hyp_text, "latency_ms": latency_ms, "rtf": rtf, } return hyp_text, meta # -------- Feedback submit (minimal) -------- def submit_feedback(meta, corrected_text, score, store_audio, share_publicly, audio_file_path): """ Push a minimal row to HF Dataset: model info, language, transcript, optional corrected text, score. """ if not meta: return {"status": "No transcription metadata available. Please transcribe first."} row = dict(meta) row.update({ "corrected_text": (corrected_text or "").strip(), "score_out_of_10": int(score) if score is not None else None, "share_publicly": bool(share_publicly), }) try: audio_to_push = audio_file_path if store_audio else None hf_status = _push_row_to_hf_dataset(row, audio_to_push) status = f"Feedback saved. {hf_status}" except Exception as e: status = f"Failed to push to HF Dataset: {e}" logging.error(f"Push error: {e}") return { "status": status, "latency_ms": row["latency_ms"], "rtf": row["rtf"], "model_id": row["model_id"], "model_revision": row["model_revision"], "language": row["language_display"], } # -------- UI (original preserved; additions appended) -------- with gr.Blocks(title="🌐 Multilingual ASR Demo") as demo: gr.Markdown( """ ## 🎙️ Multilingual Speech-to-Text Upload an audio file (MP3, WAV, FLAC, M4A, OGG,…) or record via your microphone. Then choose the language/model and hit **Transcribe**. """ ) with gr.Row(): lang = gr.Dropdown( choices=list(language_models.keys()), value=list(language_models.keys())[0], label="Select Language / Model" ) with gr.Row(): audio = gr.Audio( sources=["upload", "microphone"], type="filepath", label="Upload or Record Audio" ) btn = gr.Button("Transcribe") output = gr.Textbox(label="Transcription") # Hidden state to carry metadata from transcribe -> feedback meta_state = gr.State(value=None) # Keep original behavior: output shows transcript # Also capture meta into the hidden state def _transcribe_and_store(audio_path, language): hyp, meta = transcribe(audio_path, language) # Pre-fill corrected with hypothesis for easy edits return hyp, meta, hyp # --- Minimal Evaluation (score + optional corrected text) --- with gr.Accordion("Evaluation", open=False): with gr.Row(): corrected_tb = gr.Textbox(label="Corrected transcript (optional)", lines=4, value="") with gr.Row(): score_slider = gr.Slider(minimum=0, maximum=10, step=1, label="Score out of 10", value=7) with gr.Row(): store_audio_cb = gr.Checkbox(label="Allow storing my audio for research/eval", value=False) share_cb = gr.Checkbox(label="Allow sharing this example publicly", value=False) submit_btn = gr.Button("Submit") results_json = gr.JSON(label="Status") # Wire events btn.click( fn=_transcribe_and_store, inputs=[audio, lang], outputs=[output, meta_state, corrected_tb] ) submit_btn.click( fn=submit_feedback, inputs=[ meta_state, corrected_tb, score_slider, store_audio_cb, share_cb, audio # raw file path from gr.Audio ], outputs=results_json ) # Keep Spaces stable under load if __name__ == "__main__": demo.queue() demo.launch()