Spaces:
Sleeping
Sleeping
| # stt.py | |
| # -*- coding: utf-8 -*- | |
| """ | |
| stt.py — a module for Speech-to-Text via pywhispercpp | |
| Classes | |
| ------- | |
| SpeechToText | |
| Encapsulates model loading, recording, saving, and transcription. | |
| Usage (as a script) | |
| ------------------- | |
| python -m stt --model tiny.en --duration 5 | |
| or in code | |
| ----------- | |
| from stt import SpeechToText | |
| stt = SpeechToText() | |
| text = stt.transcribe() | |
| """ | |
| import os | |
| import tempfile | |
| import time | |
| import datetime | |
| import numpy as np | |
| import sounddevice as sd | |
| from scipy.io.wavfile import write as write_wav | |
| import webrtcvad | |
| from pywhispercpp.model import Model as Whisper | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.http.models import Distance, VectorParams | |
| class SpeechToText: | |
| """ | |
| A Speech-to-Text helper using pywhispercpp's Whisper + Qdrant for speaker metadata. | |
| Parameters | |
| ---------- | |
| model_name : str | |
| Whisper model to load (e.g. "tiny.en", "base", "small.en", etc.). | |
| sample_rate : int | |
| Audio sample rate (must match Whisper's 16 kHz). | |
| record_duration : float | |
| Default seconds to record when calling `.record_audio()`. | |
| temp_dir : str | |
| Directory for temporary WAV files. | |
| verbose : bool | |
| Print progress messages if True. | |
| """ | |
| def __init__( | |
| self, | |
| model_name: str = "tiny.en", | |
| sample_rate: int = 16_000, | |
| record_duration: float = 5.0, | |
| temp_dir: str = None, | |
| verbose: bool = True, | |
| ): | |
| self.model_name = model_name | |
| self.sample_rate = sample_rate | |
| self.record_duration = record_duration | |
| self.temp_dir = temp_dir or tempfile.gettempdir() | |
| self.verbose = verbose | |
| # load Whisper model | |
| if self.verbose: | |
| print(f"[STT] Loading Whisper model '{self.model_name}'...") | |
| t0 = time.time() | |
| self._model = Whisper(model=self.model_name) | |
| if self.verbose: | |
| print(f"[STT] Model loaded in {time.time() - t0:.2f}s") | |
| def record_audio(self, duration: float = None) -> np.ndarray: | |
| """ | |
| Record from the default mic for `duration` seconds, return float32 mono waveform. | |
| """ | |
| duration = duration or self.record_duration | |
| if self.verbose: | |
| print(f"[STT] Recording for {duration}s at {self.sample_rate}Hz...") | |
| frames = sd.rec( | |
| int(duration * self.sample_rate), | |
| samplerate=self.sample_rate, | |
| channels=1, | |
| dtype="int16", | |
| ) | |
| sd.wait() | |
| if self.verbose: | |
| print("[STT] Recording finished.") | |
| # convert to float32 in [-1, 1] | |
| return (frames.astype(np.float32) / 32768.0).flatten() | |
| def save_wav(self, audio: np.ndarray, filename: str = None) -> str: | |
| """ | |
| Save float32 waveform `audio` to an int16 WAV at `filename`. | |
| If filename is None, create one in temp_dir. | |
| Returns the path. | |
| """ | |
| filename = filename or os.path.join( | |
| self.temp_dir, | |
| f"stt_{datetime.datetime.now():%Y%m%d_%H%M%S}.wav" | |
| ) | |
| os.makedirs(os.path.dirname(filename), exist_ok=True) | |
| # convert back to int16 | |
| int16 = (audio * 32767).astype(np.int16) | |
| write_wav(filename, self.sample_rate, int16) | |
| if self.verbose: | |
| print(f"[STT] Saved WAV to {filename}") | |
| return filename | |
| def transcribe_file(self, wav_path: str, n_threads: int = 4) -> str: | |
| """ | |
| Transcribe existing WAV file at `wav_path`. Returns the text. | |
| """ | |
| if not os.path.isfile(wav_path): | |
| raise FileNotFoundError(f"No such file: {wav_path}") | |
| if self.verbose: | |
| print(f"[STT] Transcribing file {wav_path}…") | |
| t0 = time.time() | |
| # pywhispercpp API may return segments or text | |
| result = self._model.transcribe(wav_path, n_threads=n_threads) | |
| # cleanup temp if in our temp_dir | |
| if wav_path.startswith(self.temp_dir): | |
| try: | |
| os.remove(wav_path) | |
| except OSError: | |
| pass | |
| # collect text | |
| if isinstance(result, list): | |
| text = "".join([seg.text for seg in result]) | |
| else: | |
| # assume Whisper stores text internally | |
| text = self._model.get_text() | |
| if self.verbose: | |
| print(f"[STT] Transcription complete ({time.time() - t0:.2f}s).") | |
| return text.strip() | |
| def transcribe( | |
| self, | |
| duration: float = None, | |
| save_temp: bool = False, | |
| n_threads: int = 4, | |
| ) -> str: | |
| """ | |
| Record + save (optional) + transcribe in one call. | |
| Returns the transcribed text. | |
| """ | |
| audio = self.record_audio(duration) | |
| wav_path = self.save_wav(audio) if save_temp else self.save_wav(audio) | |
| return self.transcribe_file(wav_path, n_threads=n_threads) | |
| # Optional: make module runnable as a script | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="STT using pywhispercpp") | |
| parser.add_argument( | |
| "--model", "-m", | |
| default="small.en", | |
| help="Whisper model name (e.g. tiny.en, base, small.en)", | |
| ) | |
| parser.add_argument( | |
| "--duration", "-d", | |
| type=float, | |
| default=5.0, | |
| help="Seconds to record", | |
| ) | |
| parser.add_argument( | |
| "--no-save", action="store_true", | |
| help="Do not save the recorded WAV", | |
| ) | |
| args = parser.parse_args() | |
| stt = SpeechToText( | |
| model_name=args.model, | |
| record_duration=args.duration, | |
| verbose=True | |
| ) | |
| text = stt.transcribe(save_temp=not args.no_save) | |
| print("\n=== Transcription ===") | |
| print(text) | |