Evaluation-2 / app.py
misterkissi's picture
Update app.py
13b25b1 verified
# app.py (MP3-robust loader + Luganda FKD commented; minimal feedback)
import os
import json
import time
import uuid
import logging
import shutil
import subprocess
import tempfile
import gradio as gr
from transformers import pipeline
import numpy as np
import soundfile as sf # librosa depends on this; good for wav/flac/ogg
import librosa # fallback / resampling
# Optional: modest thread hints for CPU Spaces
try:
import torch
torch.set_num_threads(2)
torch.set_num_interop_threads(1)
except Exception:
pass
# Basic logging so we can verify which model is loaded per inference
logging.basicConfig(level=logging.INFO)
# --- External logging: push to a HF Dataset repo on each submit (no local storage) ---
from datasets import Dataset, Features, Value, Audio, load_dataset
from huggingface_hub import HfApi
# -------- CONFIG: Hub dataset target (no persistent storage needed) --------
HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "DarliAI/asr-feedback-logs")
HF_TOKEN = os.environ.get("HF_TOKEN")
PUSH_TO_HF = bool(HF_TOKEN and HF_DATASET_REPO)
HF_FEATURES = Features({
"timestamp": Value("string"),
"session_id": Value("string"),
"language_display": Value("string"),
"model_id": Value("string"),
"model_revision": Value("string"),
"audio": Audio(sampling_rate=None), # uploaded only if user consents
"audio_duration_s": Value("float32"),
"sample_rate": Value("int32"),
"source": Value("string"),
"decode_params": Value("string"),
"transcript_hyp": Value("string"),
"corrected_text": Value("string"),
"latency_ms": Value("int32"),
"rtf": Value("float32"),
"score_out_of_10": Value("int32"),
"share_publicly": Value("bool"),
})
def _push_row_to_hf_dataset(row, audio_file_path):
"""
Append a single example to the HF dataset repo (train split).
If user didn't consent or no audio path, 'audio' field is None.
Uses the modern datasets library approach with proper appending.
"""
if not PUSH_TO_HF:
return "HF push disabled (missing HF_TOKEN or repo)."
example = dict(row)
# Audio: only include if user consented and file exists
example["audio"] = audio_file_path if (audio_file_path and os.path.isfile(audio_file_path)) else None
# Normalize types
def _to_int(v):
try:
return int(v)
except Exception:
return None
def _to_float(v):
try:
return float(v)
except Exception:
return None
for k in ["latency_ms", "score_out_of_10", "sample_rate"]:
example[k] = _to_int(example.get(k))
for k in ["rtf", "audio_duration_s"]:
example[k] = _to_float(example.get(k))
# Create a dataset with single row
ds_new = Dataset.from_list([example], features=HF_FEATURES)
try:
# Try to load existing dataset and append
try:
# Load existing dataset
ds_existing = load_dataset(
HF_DATASET_REPO,
split="train",
token=HF_TOKEN,
download_mode="force_redownload" # Ensure we get the latest version
)
# Concatenate with new data
ds_combined = ds_existing.add_item(example)
# Push the combined dataset
ds_combined.push_to_hub(
HF_DATASET_REPO,
split="train",
private=True,
token=HF_TOKEN,
commit_message=f"Append feedback row at {example['timestamp']}"
)
return "Successfully appended to existing HF Dataset."
except Exception as e:
# If dataset doesn't exist or error loading, create new
if "404" in str(e) or "doesn't exist" in str(e) or "EmptyDatasetError" in str(e):
# Dataset doesn't exist, create it
ds_new.push_to_hub(
HF_DATASET_REPO,
split="train",
private=True,
token=HF_TOKEN,
commit_message="Initialize dataset with first feedback row"
)
return "Created new HF Dataset with first row."
else:
# Try alternative approach: push with create_pr=True to avoid conflicts
ds_new.push_to_hub(
HF_DATASET_REPO,
split="train",
private=True,
token=HF_TOKEN,
commit_message=f"Append feedback row at {example['timestamp']}",
create_pr=True # Create a PR to avoid conflicts
)
return "Pushed to HF Dataset via PR (will auto-merge)."
except Exception as e:
logging.error(f"Failed to push to HF Dataset: {e}")
# Final fallback: try using HfApi to check if repo exists
try:
api = HfApi()
api.dataset_info(HF_DATASET_REPO, token=HF_TOKEN)
# Repo exists, try one more time with force push
ds_new.push_to_hub(
HF_DATASET_REPO,
split=f"train_{int(time.time())}", # Use unique split name as last resort
private=True,
token=HF_TOKEN,
commit_message=f"Append feedback row at {example['timestamp']}"
)
return f"Pushed to HF Dataset with unique split."
except Exception as final_error:
return f"Failed to push to HF Dataset: {final_error}"
# --- Map display names to your HF Hub model IDs ---
language_models = {
"Wolof": "misterkissi/w2v2-lg-xls-r-1b-wolof",
"Haitian Creole": "misterkissi/whisper-small-haitian-creole",
"Kabyle": "misterkissi/w2v2-lg-xls-r-1b-kabyle",
"Zulu": "misterkissi/w2v2-lg-xls-r-300m-zulu",
"Deg": "misterkissi/w2v2-lg-xls-r-300m-deg",
"Vagla": "misterkissi/w2v2-lg-xls-r-300m-vagla",
"Haitian Creole": "misterkissi/whisper-small-haitian-creole",
"Malagasy": "misterkissi/whisper-small-malagasy",
"Shona": "misterkissi/whisper-small-shona",
"Umbundu": "misterkissi/whisper-small-umbundu",
"Sesotho": "misterkissi/whisper-small-sesotho",
"Setswama": "misterkissi/whisper-small-setswana",
}
# -------- Robust audio loader (handles MP3/M4A via ffmpeg; wav/flac via soundfile) --------
TARGET_SR = 16000
def _has_ffmpeg():
return shutil.which("ffmpeg") is not None
def _load_with_soundfile(path):
data, sr = sf.read(path, always_2d=False)
if isinstance(data, np.ndarray) and data.ndim > 1:
data = data.mean(axis=1)
return data.astype(np.float32), sr
def _load_with_ffmpeg(path, target_sr=TARGET_SR):
# Convert to mono 16k wav in a temp file using ffmpeg
if not _has_ffmpeg():
raise RuntimeError("ffmpeg not available")
tmp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
tmp_wav.close()
cmd = [
"ffmpeg", "-hide_banner", "-loglevel", "error",
"-y", "-i", path,
"-ac", "1", "-ar", str(target_sr),
tmp_wav.name,
]
subprocess.run(cmd, check=True)
data, sr = sf.read(tmp_wav.name, always_2d=False)
try:
os.remove(tmp_wav.name)
except Exception:
pass
if isinstance(data, np.ndarray) and data.ndim > 1:
data = data.mean(axis=1)
return data.astype(np.float32), sr
def _resample_if_needed(y, sr, target_sr=TARGET_SR):
if sr == target_sr:
return y.astype(np.float32), sr
y_rs = librosa.resample(y.astype(np.float32), orig_sr=sr, target_sr=target_sr)
return y_rs.astype(np.float32), target_sr
def load_audio_any(path, target_sr=TARGET_SR):
"""Robust loader: wav/flac/ogg via soundfile; mp3/m4a via ffmpeg; fallback to librosa."""
ext = os.path.splitext(path)[1].lower()
try:
if ext in {".wav", ".flac", ".ogg", ".opus"}:
y, sr = _load_with_soundfile(path)
elif _has_ffmpeg():
y, sr = _load_with_ffmpeg(path, target_sr=target_sr)
return y, sr # already mono+16k
else:
# Fallback to librosa for formats like mp3/m4a when ffmpeg isn't present
y, sr = librosa.load(path, sr=None, mono=True)
y, sr = _resample_if_needed(y, sr, target_sr)
return y, sr
except Exception as e:
logging.warning(f"[AUDIO] Primary load failed for {path} ({e}). Falling back to librosa.")
y, sr = librosa.load(path, sr=target_sr, mono=True)
return y.astype(np.float32), sr
# -------- Lazy-load pipeline cache (Space-safe) --------
_PIPELINE_CACHE = {}
_CACHE_ORDER = [] # usage order
_CACHE_MAX_SIZE = 3 # tune for RAM
def _touch_cache(key):
if key in _CACHE_ORDER:
_CACHE_ORDER.remove(key)
_CACHE_ORDER.insert(0, key)
def _evict_if_needed():
while len(_PIPELINE_CACHE) > _CACHE_MAX_SIZE:
oldest = _CACHE_ORDER.pop()
try:
del _PIPELINE_CACHE[oldest]
except KeyError:
pass
def get_asr_pipeline(language_display: str):
if language_display not in language_models:
raise ValueError(f"Unknown language selection: {language_display}")
if language_display in _PIPELINE_CACHE:
_touch_cache(language_display)
return _PIPELINE_CACHE[language_display]
model_id = language_models[language_display]
logging.info(f"[ASR] Loading pipeline for '{language_display}' -> {model_id}")
pipe = pipeline(
task="automatic-speech-recognition",
model=model_id,
device=-1, # CPU on Spaces (explicit)
chunk_length_s=30
)
_PIPELINE_CACHE[language_display] = pipe
_touch_cache(language_display)
_evict_if_needed()
return pipe
# -------- Helpers --------
def _model_revision_from_pipeline(pipe) -> str:
# Best-effort capture of revision/hash for reproducibility
for attr in ("hub_revision", "revision", "_commit_hash"):
val = getattr(getattr(pipe, "model", None), attr, None)
if val:
return str(val)
try:
return str(getattr(pipe.model.config, "_name_or_path", "unknown"))
except Exception:
return "unknown"
# -------- Inference --------
def transcribe(audio_path: str, language: str):
"""
Robust audio load (mp3/m4a friendly), resample to 16 kHz mono,
then run it through the chosen ASR pipeline.
Returns transcript and a meta dict for feedback.
"""
if not audio_path:
return "⚠️ Please upload or record an audio clip.", None
speech, sr = load_audio_any(audio_path, target_sr=TARGET_SR)
duration_s = float(len(speech) / float(sr))
pipe = get_asr_pipeline(language)
decode_params = {"chunk_length_s": getattr(pipe, "chunk_length_s", 30)}
t0 = time.time()
result = pipe({"sampling_rate": sr, "raw": speech})
latency_ms = int((time.time() - t0) * 1000.0)
hyp_text = result.get("text", "")
rtf = (latency_ms / 1000.0) / max(duration_s, 1e-9)
meta = {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"session_id": f"anon-{uuid.uuid4()}",
"language_display": language,
"model_id": language_models.get(language, "unknown"),
"model_revision": _model_revision_from_pipeline(pipe),
"audio_duration_s": duration_s,
"sample_rate": sr,
"source": "upload",
"decode_params": json.dumps(decode_params),
"transcript_hyp": hyp_text,
"latency_ms": latency_ms,
"rtf": rtf,
}
return hyp_text, meta
# -------- Feedback submit (minimal) --------
def submit_feedback(meta, corrected_text, score, store_audio, share_publicly, audio_file_path):
"""
Push a minimal row to HF Dataset: model info, language, transcript, optional corrected text, score.
"""
if not meta:
return {"status": "No transcription metadata available. Please transcribe first."}
row = dict(meta)
row.update({
"corrected_text": (corrected_text or "").strip(),
"score_out_of_10": int(score) if score is not None else None,
"share_publicly": bool(share_publicly),
})
try:
audio_to_push = audio_file_path if store_audio else None
hf_status = _push_row_to_hf_dataset(row, audio_to_push)
status = f"Feedback saved. {hf_status}"
except Exception as e:
status = f"Failed to push to HF Dataset: {e}"
logging.error(f"Push error: {e}")
return {
"status": status,
"latency_ms": row["latency_ms"],
"rtf": row["rtf"],
"model_id": row["model_id"],
"model_revision": row["model_revision"],
"language": row["language_display"],
}
# -------- UI (original preserved; additions appended) --------
with gr.Blocks(title="🌐 Multilingual ASR Demo") as demo:
gr.Markdown(
"""
## 🎙️ Multilingual Speech-to-Text
Upload an audio file (MP3, WAV, FLAC, M4A, OGG,…) or record via your microphone.
Then choose the language/model and hit **Transcribe**.
"""
)
with gr.Row():
lang = gr.Dropdown(
choices=list(language_models.keys()),
value=list(language_models.keys())[0],
label="Select Language / Model"
)
with gr.Row():
audio = gr.Audio(
sources=["upload", "microphone"],
type="filepath",
label="Upload or Record Audio"
)
btn = gr.Button("Transcribe")
output = gr.Textbox(label="Transcription")
# Hidden state to carry metadata from transcribe -> feedback
meta_state = gr.State(value=None)
# Keep original behavior: output shows transcript
# Also capture meta into the hidden state
def _transcribe_and_store(audio_path, language):
hyp, meta = transcribe(audio_path, language)
# Pre-fill corrected with hypothesis for easy edits
return hyp, meta, hyp
# --- Minimal Evaluation (score + optional corrected text) ---
with gr.Accordion("Evaluation", open=False):
with gr.Row():
corrected_tb = gr.Textbox(label="Corrected transcript (optional)", lines=4, value="")
with gr.Row():
score_slider = gr.Slider(minimum=0, maximum=10, step=1, label="Score out of 10", value=7)
with gr.Row():
store_audio_cb = gr.Checkbox(label="Allow storing my audio for research/eval", value=False)
share_cb = gr.Checkbox(label="Allow sharing this example publicly", value=False)
submit_btn = gr.Button("Submit")
results_json = gr.JSON(label="Status")
# Wire events
btn.click(
fn=_transcribe_and_store,
inputs=[audio, lang],
outputs=[output, meta_state, corrected_tb]
)
submit_btn.click(
fn=submit_feedback,
inputs=[
meta_state,
corrected_tb,
score_slider,
store_audio_cb,
share_cb,
audio # raw file path from gr.Audio
],
outputs=results_json
)
# Keep Spaces stable under load
if __name__ == "__main__":
demo.queue()
demo.launch()