Spaces:
Running
Running
| import gradio as gr | |
| import os | |
| import time | |
| import sys | |
| import tempfile | |
| import subprocess | |
| import requests | |
| from urllib.parse import urlparse | |
| from pydub import AudioSegment | |
| import logging | |
| import torch | |
| from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline | |
| import yt_dlp | |
| logging.basicConfig(level=logging.INFO) | |
| # Clone and install faster-whisper from GitHub | |
| # (we should be able to do this in build.sh in a hf space) | |
| try: | |
| subprocess.run(["git", "clone", "https://github.com/SYSTRAN/faster-whisper.git"], check=True) | |
| subprocess.run(["pip", "install", "-e", "./faster-whisper"], check=True) | |
| except subprocess.CalledProcessError as e: | |
| print(f"Error during faster-whisper installation: {e}") | |
| sys.exit(1) | |
| # Add the faster-whisper directory to the Python path | |
| sys.path.append("./faster-whisper") | |
| from faster_whisper import WhisperModel | |
| from faster_whisper.transcribe import BatchedInferencePipeline | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| def download_audio(url, method_choice): | |
| parsed_url = urlparse(url) | |
| if parsed_url.netloc in ['www.youtube.com', 'youtu.be', 'youtube.com']: | |
| return download_youtube_audio(url, method_choice) | |
| else: | |
| return download_direct_audio(url, method_choice) | |
| def download_youtube_audio(url, method_choice): | |
| methods = { | |
| 'yt-dlp': youtube_dl_method, | |
| 'pytube': pytube_method, | |
| 'youtube-dl': youtube_dl_classic_method, | |
| 'yt-dlp-alt': youtube_dl_alternative_method, | |
| 'ffmpeg': ffmpeg_method, | |
| 'aria2': aria2_method | |
| } | |
| method = methods.get(method_choice, youtube_dl_method) | |
| try: | |
| return method(url) | |
| except Exception as e: | |
| logging.error(f"Error downloading using {method_choice}: {str(e)}") | |
| return None | |
| def youtube_dl_method(url): | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'mp3', | |
| 'preferredquality': '192', | |
| }], | |
| 'outtmpl': '%(id)s.%(ext)s', | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| return f"{info['id']}.mp3" | |
| def pytube_method(url): | |
| from pytube import YouTube | |
| yt = YouTube(url) | |
| audio_stream = yt.streams.filter(only_audio=True).first() | |
| out_file = audio_stream.download() | |
| base, ext = os.path.splitext(out_file) | |
| new_file = base + '.mp3' | |
| os.rename(out_file, new_file) | |
| return new_file | |
| def youtube_dl_classic_method(url): | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'mp3', | |
| 'preferredquality': '192', | |
| }], | |
| 'outtmpl': '%(id)s.%(ext)s', | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| return f"{info['id']}.mp3" | |
| def youtube_dl_alternative_method(url): | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'mp3', | |
| 'preferredquality': '192', | |
| }], | |
| 'outtmpl': '%(id)s.%(ext)s', | |
| 'no_warnings': True, | |
| 'quiet': True, | |
| 'no_check_certificate': True, | |
| 'prefer_insecure': True, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| return f"{info['id']}.mp3" | |
| def ffmpeg_method(url): | |
| output_file = tempfile.mktemp(suffix='.mp3') | |
| command = ['ffmpeg', '-i', url, '-vn', '-acodec', 'libmp3lame', '-q:a', '2', output_file] | |
| subprocess.run(command, check=True, capture_output=True) | |
| return output_file | |
| def aria2_method(url): | |
| output_file = tempfile.mktemp(suffix='.mp3') | |
| command = ['aria2c', '--split=4', '--max-connection-per-server=4', '--out', output_file, url] | |
| subprocess.run(command, check=True, capture_output=True) | |
| return output_file | |
| def download_direct_audio(url, method_choice): | |
| if method_choice == 'wget': | |
| return wget_method(url) | |
| else: | |
| try: | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file: | |
| temp_file.write(response.content) | |
| return temp_file.name | |
| else: | |
| raise Exception(f"Failed to download audio from {url}") | |
| except Exception as e: | |
| logging.error(f"Error downloading direct audio: {str(e)}") | |
| return None | |
| def wget_method(url): | |
| output_file = tempfile.mktemp(suffix='.mp3') | |
| command = ['wget', '-O', output_file, url] | |
| subprocess.run(command, check=True, capture_output=True) | |
| return output_file | |
| def trim_audio(audio_path, start_time, end_time): | |
| audio = AudioSegment.from_file(audio_path) | |
| trimmed_audio = audio[start_time*1000:end_time*1000] if end_time else audio[start_time*1000:] | |
| trimmed_audio_path = tempfile.mktemp(suffix='.wav') | |
| trimmed_audio.export(trimmed_audio_path, format="wav") | |
| return trimmed_audio_path | |
| def save_transcription(transcription): | |
| file_path = tempfile.mktemp(suffix='.txt') | |
| with open(file_path, 'w') as f: | |
| f.write(transcription) | |
| return file_path | |
| def transcribe_audio(input_source, model_choice, batch_size, download_method, start_time=None, end_time=None, verbose=False): | |
| try: | |
| if model_choice == "faster-whisper": | |
| model = WhisperModel("cstr/whisper-large-v3-turbo-int8_float32", device="auto", compute_type="int8") | |
| batched_model = BatchedInferencePipeline(model=model) | |
| elif model_choice == "primeline/whisper-large-v3-german": | |
| model_id = "primeline/whisper-large-v3-german" | |
| model = AutoModelForSpeechSeq2Seq.from_pretrained( | |
| model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True | |
| ) | |
| model.to(device) | |
| processor = AutoProcessor.from_pretrained(model_id) | |
| pipe = pipeline( | |
| "automatic-speech-recognition", | |
| model=model, | |
| tokenizer=processor.tokenizer, | |
| feature_extractor=processor.feature_extractor, | |
| max_new_tokens=128, | |
| chunk_length_s=30, | |
| batch_size=batch_size, | |
| return_timestamps=True, | |
| torch_dtype=torch_dtype, | |
| device=device, | |
| ) | |
| elif model_choice == "openai/whisper-large-v3": | |
| model_id = "openai/whisper-large-v3" | |
| model = AutoModelForSpeechSeq2Seq.from_pretrained( | |
| model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True | |
| ) | |
| model.to(device) | |
| processor = AutoProcessor.from_pretrained(model_id) | |
| pipe = pipeline( | |
| "automatic-speech-recognition", | |
| model=model, | |
| tokenizer=processor.tokenizer, | |
| feature_extractor=processor.feature_extractor, | |
| torch_dtype=torch_dtype, | |
| device=device, | |
| ) | |
| else: | |
| raise ValueError("Invalid model choice") | |
| # Rest of the code remains the same | |
| if isinstance(input_source, str) and (input_source.startswith('http://') or input_source.startswith('https://')): | |
| audio_path = download_audio(input_source, download_method) | |
| if audio_path.startswith("Error"): | |
| yield f"Error: {audio_path}", "", None | |
| return | |
| else: | |
| audio_path = input_source | |
| if start_time is not None or end_time is not None: | |
| trimmed_audio_path = trim_audio(audio_path, start_time or 0, end_time) | |
| audio_path = trimmed_audio_path | |
| if model_choice == "faster-whisper": | |
| start_time_perf = time.time() | |
| segments, info = batched_model.transcribe(audio_path, batch_size=batch_size, initial_prompt=None) | |
| end_time_perf = time.time() | |
| else: | |
| start_time_perf = time.time() | |
| result = pipe(audio_path) | |
| segments = result["chunks"] | |
| end_time_perf = time.time() | |
| transcription_time = end_time_perf - start_time_perf | |
| audio_file_size = os.path.getsize(audio_path) / (1024 * 1024) | |
| metrics_output = ( | |
| f"Transcription time: {transcription_time:.2f} seconds\n" | |
| f"Audio file size: {audio_file_size:.2f} MB\n" | |
| ) | |
| if verbose: | |
| yield metrics_output, "", None | |
| transcription = "" | |
| for segment in segments: | |
| if model_choice == "faster-whisper": | |
| transcription_segment = f"[{segment.start:.2f}s -> {segment.end:.2f}s] {segment.text}\n" | |
| else: | |
| transcription_segment = f"[{segment['timestamp'][0]:.2f}s -> {segment['timestamp'][1]:.2f}s] {segment['text']}\n" | |
| transcription += transcription_segment | |
| if verbose: | |
| yield metrics_output, transcription, None | |
| transcription_file = save_transcription(transcription) | |
| yield metrics_output, transcription, transcription_file | |
| except Exception as e: | |
| yield f"An error occurred: {str(e)}", "", None | |
| finally: | |
| if isinstance(input_source, str) and (input_source.startswith('http://') or input_source.startswith('https://')): | |
| try: | |
| os.remove(audio_path) | |
| except: | |
| pass | |
| if start_time is not None or end_time is not None: | |
| try: | |
| os.remove(trimmed_audio_path) | |
| except: | |
| pass | |
| iface = gr.Interface( | |
| fn=transcribe_audio, | |
| inputs=[ | |
| gr.Textbox(label="Audio Source (Upload, URL, or YouTube URL)"), | |
| gr.Dropdown(choices=["faster-whisper", "primeline/whisper-large-v3-german", "openai/whisper-large-v3"], label="Model Choice", value="faster-whisper"), | |
| gr.Slider(minimum=1, maximum=32, step=1, value=16, label="Batch Size"), | |
| gr.Dropdown(choices=["yt-dlp", "pytube", "youtube-dl", "yt-dlp-alt", "ffmpeg", "aria2", "wget"], label="Download Method", value="yt-dlp"), | |
| gr.Number(label="Start Time (seconds)", value=0), | |
| gr.Number(label="End Time (seconds)", value=0), | |
| gr.Checkbox(label="Verbose Output", value=False) | |
| ], | |
| outputs=[ | |
| gr.Textbox(label="Transcription Metrics and Verbose Messages", lines=10), | |
| gr.Textbox(label="Transcription", lines=10), | |
| gr.File(label="Download Transcription") | |
| ], | |
| title="Multi-Model Transcription", | |
| description="Transcribe audio using multiple models.", | |
| examples=[ | |
| ["https://www.youtube.com/watch?v=daQ_hqA6HDo", "faster-whisper", 16, "yt-dlp", 0, None, False], | |
| ["https://mcdn.podbean.com/mf/web/dir5wty678b6g4vg/HoP_453_-_The_Price_is_Right_-_Law_and_Economics_in_the_Second_Scholastic5yxzh.mp3", "primeline/whisper-large-v3-german", 16, "ffmpeg", 0, 300, True], | |
| ["path/to/local/audio.mp3", "openai/whisper-large-v3", 16, "yt-dlp", 60, 180, False] | |
| ], | |
| cache_examples=False, | |
| live=True | |
| ) | |
| iface.launch() |