whisper-tg / app.py
muhtasham's picture
WIP
f16ee1e
raw
history blame
19.2 kB
import gradio as gr
import subprocess
import datetime
import tempfile
import requests
import os
import time
from loguru import logger
# Load API keys from environment variables
API_URL = os.getenv("API_URL")
SIEVE_API_KEY = os.getenv("SIEVE_API_KEY")
SIEVE_API_URL = "https://mango.sievedata.com/v2"
headers = {
"Accept": "application/json",
"Content-Type": "audio/flac"
}
def format_time(seconds):
"""Convert seconds to SRT time format (HH:MM:SS,mmm).
Args:
seconds (float): Time in seconds to convert.
Returns:
str: Time formatted as HH:MM:SS,mmm where:
- HH: Hours (00-99)
- MM: Minutes (00-59)
- SS: Seconds (00-59)
- mmm: Milliseconds (000-999)
Example:
>>> format_time(3661.5)
'01:01:01,500'
"""
td = datetime.timedelta(seconds=float(seconds))
hours = td.seconds // 3600
minutes = (td.seconds % 3600) // 60
seconds = td.seconds % 60
milliseconds = td.microseconds // 1000
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}"
def generate_srt(chunks):
"""Generate SRT format subtitles from transcription chunks.
Args:
chunks (list): List of dictionaries containing transcription chunks.
Each chunk must have:
- "timestamp": List of [start_time, end_time] in seconds
- "text": The transcribed text for that time segment
Returns:
str: SRT formatted subtitles string with format:
```
1
HH:MM:SS,mmm --> HH:MM:SS,mmm
Text content
2
HH:MM:SS,mmm --> HH:MM:SS,mmm
Text content
...
```
Example:
>>> chunks = [
... {"timestamp": [0.0, 1.5], "text": "Hello"},
... {"timestamp": [1.5, 3.0], "text": "World"}
... ]
>>> generate_srt(chunks)
'1\\n00:00:00,000 --> 00:00:01,500\\nHello\\n\\n2\\n00:00:01,500 --> 00:00:03,000\\nWorld\\n\\n'
"""
srt_content = []
for i, chunk in enumerate(chunks, 1):
start_time = format_time(chunk["timestamp"][0])
end_time = format_time(chunk["timestamp"][1])
text = chunk.get("text", "").strip()
srt_content.append(f"{i}\n{start_time} --> {end_time}\n{text}\n\n")
return "".join(srt_content)
def save_srt_to_file(srt_content):
"""Save SRT content to a temporary file.
Args:
srt_content (str): The SRT formatted subtitles content to save.
Returns:
str or None: Path to the temporary file if content was saved,
None if srt_content was empty.
Note:
The temporary file is created with delete=False to allow it to be
used after the function returns. The file should be deleted by the
caller when no longer needed.
"""
if not srt_content:
return None
# Create a temporary file with .srt extension
temp_file = tempfile.NamedTemporaryFile(suffix='.srt', delete=False)
temp_file.write(srt_content.encode('utf-8'))
temp_file.close()
return temp_file.name
# Check if ffmpeg is installed
def check_ffmpeg():
try:
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
logger.info("ffmpeg check passed successfully")
except (subprocess.CalledProcessError, FileNotFoundError) as e:
logger.error(f"ffmpeg check failed: {str(e)}")
raise gr.Error("ffmpeg is not installed. Please install ffmpeg to use this application.")
# Initialize ffmpeg check
check_ffmpeg()
def download_youtube_audio(url):
"""Download audio from YouTube using Sieve API.
Args:
url (str): YouTube video URL
Returns:
str: Path to downloaded audio file
Raises:
gr.Error: If download fails or API key is not set
"""
logger.info(f"Starting YouTube audio download process for URL: {url}")
if not SIEVE_API_KEY:
logger.error("SIEVE_API_KEY environment variable is not set")
raise gr.Error("SIEVE_API_KEY environment variable is not set")
try:
# Create a temporary file for the audio
temp_file = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False)
temp_file.close()
output_path = temp_file.name
logger.info(f"Created temporary file at: {output_path}")
# Prepare the request to Sieve API with exact parameters
payload = {
"function": "sieve/youtube-downloader",
"inputs": {
"url": url,
"download_type": "audio", # Ensure we're only downloading audio
"resolution": "highest-available",
"include_audio": True,
"start_time": 0,
"end_time": -1,
"include_metadata": False,
"metadata_fields": ["title", "thumbnail", "description", "tags", "duration"],
"include_subtitles": False,
"subtitle_languages": ["en"],
"video_format": "mp4",
"audio_format": "mp3"
}
}
logger.debug(f"Prepared Sieve API payload: {payload}")
# Send request to Sieve API with retries
max_retries = 3
retry_delay = 5 # seconds
for attempt in range(max_retries):
try:
logger.info(f"Sending request to Sieve API (attempt {attempt + 1}/{max_retries})...")
response = requests.post(
f"{SIEVE_API_URL}/push",
headers={"X-API-Key": SIEVE_API_KEY, "Content-Type": "application/json"},
json=payload,
timeout=1800 # Add timeout
)
response.raise_for_status()
response_data = response.json()
logger.debug(f"Sieve API response: {response_data}")
job_id = response_data.get("id")
if not job_id:
logger.error("No job ID received from Sieve API")
if attempt < max_retries - 1:
logger.warning(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
continue
raise gr.Error("Failed to get job ID from Sieve API")
break
except requests.exceptions.RequestException as e:
logger.warning(f"Request failed (attempt {attempt + 1}/{max_retries}): {str(e)}")
if attempt < max_retries - 1:
logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
continue
raise
logger.info(f"Received job ID: {job_id}")
# Poll for job completion
poll_count = 0
max_polls = 180 # Maximum number of polls (6 minutes with 2-second delay)
last_status = None
while True:
poll_count += 1
logger.info(f"Polling job status (attempt {poll_count}/{max_polls})...")
try:
job_response = requests.get(
f"{SIEVE_API_URL}/jobs/{job_id}",
headers={"X-API-Key": SIEVE_API_KEY},
timeout=1800,
)
job_response.raise_for_status()
job_data = job_response.json()
# logger.debug(f"Job status response: {job_data}")
status = job_data.get("status")
if status != last_status:
logger.info(f"Job status changed: {status}")
last_status = status
if status == "completed" or status == "finished":
logger.info("Job completed successfully")
# Get the output data
output_data = job_data.get("output_0", {})
if not output_data:
logger.error("No output data found in completed job response")
raise gr.Error("No output data in job response")
# Get the audio URL from the output
audio_url = output_data.get("url")
if not audio_url:
logger.error("No audio URL found in output data")
raise gr.Error("No audio URL in output data")
logger.info(f"Received audio URL from Sieve: {audio_url}")
# Download the audio file
logger.info("Downloading audio file from Sieve storage...")
audio_response = requests.get(audio_url, timeout=30)
audio_response.raise_for_status()
file_size = len(audio_response.content)
logger.info(f"Downloaded audio file size: {file_size/1024/1024:.2f} MB")
# Save the file
with open(output_path, "wb") as f:
f.write(audio_response.content)
logger.info(f"Successfully saved audio to: {output_path}")
# Break out of the polling loop after successful download
break
elif status == "failed":
error_msg = job_data.get("error", "Unknown error")
logger.error(f"Job failed with error: {error_msg}")
raise gr.Error(f"Job failed: {error_msg}")
if poll_count >= max_polls:
logger.error("Maximum polling attempts reached")
raise gr.Error("Download took too long. Please try again or check if the video is accessible.")
logger.info("Job still processing, waiting 2 seconds before next poll...")
time.sleep(2)
except requests.exceptions.RequestException as e:
logger.warning(f"Poll request failed: {str(e)}")
if poll_count >= max_polls:
raise gr.Error("Failed to check job status. Please try again.")
time.sleep(2)
except requests.exceptions.RequestException as e:
logger.exception(f"Network error during YouTube download: {str(e)}")
raise gr.Error(f"Failed to download YouTube audio: Network error - {str(e)}")
except Exception as e:
logger.exception(f"Unexpected error during YouTube download: {str(e)}")
raise gr.Error(f"Failed to download YouTube audio: {str(e)}")
return output_path
def transcribe_youtube(url, return_timestamps, generate_subs):
"""Transcribe audio from YouTube video.
Args:
url (str): YouTube video URL
return_timestamps (bool): Whether to include timestamps in output
generate_subs (bool): Whether to generate SRT subtitles
Returns:
tuple: (formatted_result, srt_file, correction_text)
"""
logger.info(f"Starting YouTube transcription process for URL: {url}")
logger.info(f"Options - Timestamps: {return_timestamps}, Generate subtitles: {generate_subs}")
try:
# Download audio from YouTube
logger.info("Step 1: Downloading audio from YouTube...")
audio_path = download_youtube_audio(url)
logger.info(f"Successfully downloaded audio to: {audio_path}")
# Transcribe the downloaded audio
logger.info("Step 2: Transcribing downloaded audio...")
result = transcribe(audio_path, return_timestamps, generate_subs)
logger.info("Successfully completed transcription")
# Clean up the temporary file
logger.info("Step 3: Cleaning up temporary files...")
try:
os.unlink(audio_path)
logger.info(f"Successfully deleted temporary file: {audio_path}")
except Exception as e:
logger.warning(f"Failed to delete temporary file: {str(e)}")
return result
except Exception as e:
logger.exception(f"Error in YouTube transcription: {str(e)}")
raise gr.Error(f"Failed to transcribe YouTube video: {str(e)}")
def transcribe(inputs, return_timestamps, generate_subs):
"""Transcribe audio input using Whisper model via Hugging Face Inference API.
Args:
inputs (str): Path to audio file to transcribe.
return_timestamps (bool): Whether to include timestamps in output.
generate_subs (bool): Whether to generate SRT subtitles.
Returns:
tuple: (formatted_result, srt_file, correction_text)
- formatted_result (dict): Transcription results
- srt_file (str): Path to SRT file if generated, None otherwise
- correction_text (str): Empty string for corrections
Raises:
gr.Error: If no audio file is provided or transcription fails.
"""
logger.info(f"Starting transcription process for file: {inputs}")
logger.info(f"Options - Timestamps: {return_timestamps}, Generate subtitles: {generate_subs}")
if inputs is None:
logger.warning("No audio file submitted")
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.")
try:
# Read the audio file
logger.info("Step 1: Reading audio file...")
with open(inputs, "rb") as f:
data = f.read()
file_size = len(data)
logger.info(f"Successfully read audio file, size: {file_size/1024/1024:.2f} MB")
# Send request to API
logger.info("Step 2: Sending request to Whisper API...")
response = requests.post(API_URL, headers=headers, data=data)
response.raise_for_status()
result = response.json()
logger.debug(f"API response: {result}")
logger.info("Successfully received response from API")
# Format response as JSON
logger.info("Step 3: Processing API response...")
formatted_result = {
"text": result.get("text", "")
}
logger.info(f"Transcribed text length: {len(formatted_result['text'])} characters")
chunks = []
if return_timestamps and "chunks" in result:
logger.info(f"Processing {len(result['chunks'])} chunks for timestamps")
for i, chunk in enumerate(result["chunks"]):
logger.debug(f"Processing chunk {i}: {chunk}")
try:
start_time = chunk.get("timestamp", [None, None])[0]
end_time = chunk.get("timestamp", [None, None])[1]
text = chunk.get("text", "").strip()
if start_time is not None and end_time is not None:
chunk_data = {
"text": text,
"timestamp": [start_time, end_time]
}
chunks.append(chunk_data)
else:
logger.warning(f"Invalid timestamp in chunk {i}: {chunk}")
except Exception as chunk_error:
logger.error(f"Error processing chunk {i}: {str(chunk_error)}")
continue
formatted_result["chunks"] = chunks
logger.info(f"Successfully processed {len(chunks)} chunks with timestamps")
# Generate subtitles if requested
srt_file = None
if generate_subs and chunks:
logger.info("Step 4: Generating SRT subtitles...")
srt_content = generate_srt(chunks)
srt_file = save_srt_to_file(srt_content)
logger.info(f"Successfully generated SRT file: {srt_file}")
logger.info("Transcription process completed successfully")
return formatted_result, srt_file, "" # Return empty string for correction textbox
except requests.exceptions.RequestException as e:
logger.exception(f"API request failed: {str(e)}")
raise gr.Error(f"Failed to transcribe audio: API request failed - {str(e)}")
except Exception as e:
logger.exception(f"Error during transcription: {str(e)}")
raise gr.Error(f"Failed to transcribe audio: {str(e)}")
demo = gr.Blocks(theme=gr.themes.Ocean())
# Define interfaces first
youtube_transcribe = gr.Interface(
fn=transcribe_youtube,
inputs=[
gr.Textbox(label="YouTube URL", placeholder="https://www.youtube.com/watch?v=..."),
gr.Checkbox(label="Include timestamps", value=True),
gr.Checkbox(label="Generate subtitles", value=True),
],
outputs=[
gr.JSON(label="Transcription", open=True),
gr.File(label="Subtitles (SRT)", visible=True),
],
title="Tajik Speech Transcription",
description=(
"Transcribe Tajik language audio from YouTube videos. "
"Paste a YouTube URL and get accurate transcription with optional timestamps "
"and subtitles.\n\n"
"⚠️ Note: YouTube downloads may occasionally fail due to YouTube's restrictions "
"or temporary service issues. If this happens, please try again in a few minutes "
"or use the audio file upload option instead."
)
)
mf_transcribe = gr.Interface(
fn=transcribe,
inputs=[
gr.Audio(sources="microphone", type="filepath"),
gr.Checkbox(label="Include timestamps", value=True),
gr.Checkbox(label="Generate subtitles", value=True),
],
outputs=[
gr.JSON(label="Transcription", open=True),
gr.File(label="Subtitles (SRT)", visible=True),
],
title="Tajik Speech Transcription",
description=(
"Transcribe Tajik language audio from microphone or file upload. "
"Perfect for transcribing Tajik podcasts, interviews, and conversations. "
"Supports both microphone recording and file uploads."
)
)
file_transcribe = gr.Interface(
fn=transcribe,
inputs=[
gr.Audio(sources="upload", type="filepath", label="Audio file"),
gr.Checkbox(label="Include timestamps", value=True),
gr.Checkbox(label="Generate subtitles", value=True),
],
outputs=[
gr.JSON(label="Transcription", open=True),
gr.File(label="Subtitles (SRT)", visible=True),
],
title="Tajik Speech Transcription",
description=(
"Transcribe Tajik language audio files. "
"Upload your audio file and get accurate transcription with optional timestamps "
"and subtitles. Supports various audio formats."
)
)
with demo:
gr.TabbedInterface(
[file_transcribe, mf_transcribe, youtube_transcribe],
["Audio file", "Microphone", "YouTube"]
)
logger.info("Starting Gradio interface")
demo.queue().launch(ssr_mode=False)