Spaces:
Running
Running
import gradio as gr | |
import subprocess | |
import datetime | |
import tempfile | |
import requests | |
import os | |
import time | |
from loguru import logger | |
# Load API keys from environment variables | |
API_URL = os.getenv("API_URL") | |
SIEVE_API_KEY = os.getenv("SIEVE_API_KEY") | |
SIEVE_API_URL = "https://mango.sievedata.com/v2" | |
headers = { | |
"Accept": "application/json", | |
"Content-Type": "audio/flac" | |
} | |
def format_time(seconds): | |
"""Convert seconds to SRT time format (HH:MM:SS,mmm). | |
Args: | |
seconds (float): Time in seconds to convert. | |
Returns: | |
str: Time formatted as HH:MM:SS,mmm where: | |
- HH: Hours (00-99) | |
- MM: Minutes (00-59) | |
- SS: Seconds (00-59) | |
- mmm: Milliseconds (000-999) | |
Example: | |
>>> format_time(3661.5) | |
'01:01:01,500' | |
""" | |
td = datetime.timedelta(seconds=float(seconds)) | |
hours = td.seconds // 3600 | |
minutes = (td.seconds % 3600) // 60 | |
seconds = td.seconds % 60 | |
milliseconds = td.microseconds // 1000 | |
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" | |
def generate_srt(chunks): | |
"""Generate SRT format subtitles from transcription chunks. | |
Args: | |
chunks (list): List of dictionaries containing transcription chunks. | |
Each chunk must have: | |
- "timestamp": List of [start_time, end_time] in seconds | |
- "text": The transcribed text for that time segment | |
Returns: | |
str: SRT formatted subtitles string with format: | |
``` | |
1 | |
HH:MM:SS,mmm --> HH:MM:SS,mmm | |
Text content | |
2 | |
HH:MM:SS,mmm --> HH:MM:SS,mmm | |
Text content | |
... | |
``` | |
Example: | |
>>> chunks = [ | |
... {"timestamp": [0.0, 1.5], "text": "Hello"}, | |
... {"timestamp": [1.5, 3.0], "text": "World"} | |
... ] | |
>>> generate_srt(chunks) | |
'1\\n00:00:00,000 --> 00:00:01,500\\nHello\\n\\n2\\n00:00:01,500 --> 00:00:03,000\\nWorld\\n\\n' | |
""" | |
srt_content = [] | |
for i, chunk in enumerate(chunks, 1): | |
start_time = format_time(chunk["timestamp"][0]) | |
end_time = format_time(chunk["timestamp"][1]) | |
text = chunk.get("text", "").strip() | |
srt_content.append(f"{i}\n{start_time} --> {end_time}\n{text}\n\n") | |
return "".join(srt_content) | |
def save_srt_to_file(srt_content): | |
"""Save SRT content to a temporary file. | |
Args: | |
srt_content (str): The SRT formatted subtitles content to save. | |
Returns: | |
str or None: Path to the temporary file if content was saved, | |
None if srt_content was empty. | |
Note: | |
The temporary file is created with delete=False to allow it to be | |
used after the function returns. The file should be deleted by the | |
caller when no longer needed. | |
""" | |
if not srt_content: | |
return None | |
# Create a temporary file with .srt extension | |
temp_file = tempfile.NamedTemporaryFile(suffix='.srt', delete=False) | |
temp_file.write(srt_content.encode('utf-8')) | |
temp_file.close() | |
return temp_file.name | |
# Check if ffmpeg is installed | |
def check_ffmpeg(): | |
try: | |
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) | |
logger.info("ffmpeg check passed successfully") | |
except (subprocess.CalledProcessError, FileNotFoundError) as e: | |
logger.error(f"ffmpeg check failed: {str(e)}") | |
raise gr.Error("ffmpeg is not installed. Please install ffmpeg to use this application.") | |
# Initialize ffmpeg check | |
check_ffmpeg() | |
def download_youtube_audio(url): | |
"""Download audio from YouTube using Sieve API. | |
Args: | |
url (str): YouTube video URL | |
Returns: | |
str: Path to downloaded audio file | |
Raises: | |
gr.Error: If download fails or API key is not set | |
""" | |
logger.info(f"Starting YouTube audio download process for URL: {url}") | |
if not SIEVE_API_KEY: | |
logger.error("SIEVE_API_KEY environment variable is not set") | |
raise gr.Error("SIEVE_API_KEY environment variable is not set") | |
try: | |
# Create a temporary file for the audio | |
temp_file = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) | |
temp_file.close() | |
output_path = temp_file.name | |
logger.info(f"Created temporary file at: {output_path}") | |
# Prepare the request to Sieve API with exact parameters | |
payload = { | |
"function": "sieve/youtube-downloader", | |
"inputs": { | |
"url": url, | |
"download_type": "audio", # Ensure we're only downloading audio | |
"resolution": "highest-available", | |
"include_audio": True, | |
"start_time": 0, | |
"end_time": -1, | |
"include_metadata": False, | |
"metadata_fields": ["title", "thumbnail", "description", "tags", "duration"], | |
"include_subtitles": False, | |
"subtitle_languages": ["en"], | |
"video_format": "mp4", | |
"audio_format": "mp3" | |
} | |
} | |
logger.debug(f"Prepared Sieve API payload: {payload}") | |
# Send request to Sieve API with retries | |
max_retries = 3 | |
retry_delay = 5 # seconds | |
for attempt in range(max_retries): | |
try: | |
logger.info(f"Sending request to Sieve API (attempt {attempt + 1}/{max_retries})...") | |
response = requests.post( | |
f"{SIEVE_API_URL}/push", | |
headers={"X-API-Key": SIEVE_API_KEY, "Content-Type": "application/json"}, | |
json=payload, | |
timeout=1800 # Add timeout | |
) | |
response.raise_for_status() | |
response_data = response.json() | |
logger.debug(f"Sieve API response: {response_data}") | |
job_id = response_data.get("id") | |
if not job_id: | |
logger.error("No job ID received from Sieve API") | |
if attempt < max_retries - 1: | |
logger.warning(f"Retrying in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
continue | |
raise gr.Error("Failed to get job ID from Sieve API") | |
break | |
except requests.exceptions.RequestException as e: | |
logger.warning(f"Request failed (attempt {attempt + 1}/{max_retries}): {str(e)}") | |
if attempt < max_retries - 1: | |
logger.info(f"Retrying in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
continue | |
raise | |
logger.info(f"Received job ID: {job_id}") | |
# Poll for job completion | |
poll_count = 0 | |
max_polls = 180 # Maximum number of polls (6 minutes with 2-second delay) | |
last_status = None | |
while True: | |
poll_count += 1 | |
logger.info(f"Polling job status (attempt {poll_count}/{max_polls})...") | |
try: | |
job_response = requests.get( | |
f"{SIEVE_API_URL}/jobs/{job_id}", | |
headers={"X-API-Key": SIEVE_API_KEY}, | |
timeout=1800, | |
) | |
job_response.raise_for_status() | |
job_data = job_response.json() | |
# logger.debug(f"Job status response: {job_data}") | |
status = job_data.get("status") | |
if status != last_status: | |
logger.info(f"Job status changed: {status}") | |
last_status = status | |
if status == "completed" or status == "finished": | |
logger.info("Job completed successfully") | |
# Get the output data | |
output_data = job_data.get("output_0", {}) | |
if not output_data: | |
logger.error("No output data found in completed job response") | |
raise gr.Error("No output data in job response") | |
# Get the audio URL from the output | |
audio_url = output_data.get("url") | |
if not audio_url: | |
logger.error("No audio URL found in output data") | |
raise gr.Error("No audio URL in output data") | |
logger.info(f"Received audio URL from Sieve: {audio_url}") | |
# Download the audio file | |
logger.info("Downloading audio file from Sieve storage...") | |
audio_response = requests.get(audio_url, timeout=30) | |
audio_response.raise_for_status() | |
file_size = len(audio_response.content) | |
logger.info(f"Downloaded audio file size: {file_size/1024/1024:.2f} MB") | |
# Save the file | |
with open(output_path, "wb") as f: | |
f.write(audio_response.content) | |
logger.info(f"Successfully saved audio to: {output_path}") | |
# Break out of the polling loop after successful download | |
break | |
elif status == "failed": | |
error_msg = job_data.get("error", "Unknown error") | |
logger.error(f"Job failed with error: {error_msg}") | |
raise gr.Error(f"Job failed: {error_msg}") | |
if poll_count >= max_polls: | |
logger.error("Maximum polling attempts reached") | |
raise gr.Error("Download took too long. Please try again or check if the video is accessible.") | |
logger.info("Job still processing, waiting 2 seconds before next poll...") | |
time.sleep(2) | |
except requests.exceptions.RequestException as e: | |
logger.warning(f"Poll request failed: {str(e)}") | |
if poll_count >= max_polls: | |
raise gr.Error("Failed to check job status. Please try again.") | |
time.sleep(2) | |
except requests.exceptions.RequestException as e: | |
logger.exception(f"Network error during YouTube download: {str(e)}") | |
raise gr.Error(f"Failed to download YouTube audio: Network error - {str(e)}") | |
except Exception as e: | |
logger.exception(f"Unexpected error during YouTube download: {str(e)}") | |
raise gr.Error(f"Failed to download YouTube audio: {str(e)}") | |
return output_path | |
def transcribe_youtube(url, return_timestamps, generate_subs): | |
"""Transcribe audio from YouTube video. | |
Args: | |
url (str): YouTube video URL | |
return_timestamps (bool): Whether to include timestamps in output | |
generate_subs (bool): Whether to generate SRT subtitles | |
Returns: | |
tuple: (formatted_result, srt_file, correction_text) | |
""" | |
logger.info(f"Starting YouTube transcription process for URL: {url}") | |
logger.info(f"Options - Timestamps: {return_timestamps}, Generate subtitles: {generate_subs}") | |
try: | |
# Download audio from YouTube | |
logger.info("Step 1: Downloading audio from YouTube...") | |
audio_path = download_youtube_audio(url) | |
logger.info(f"Successfully downloaded audio to: {audio_path}") | |
# Transcribe the downloaded audio | |
logger.info("Step 2: Transcribing downloaded audio...") | |
result = transcribe(audio_path, return_timestamps, generate_subs) | |
logger.info("Successfully completed transcription") | |
# Clean up the temporary file | |
logger.info("Step 3: Cleaning up temporary files...") | |
try: | |
os.unlink(audio_path) | |
logger.info(f"Successfully deleted temporary file: {audio_path}") | |
except Exception as e: | |
logger.warning(f"Failed to delete temporary file: {str(e)}") | |
return result | |
except Exception as e: | |
logger.exception(f"Error in YouTube transcription: {str(e)}") | |
raise gr.Error(f"Failed to transcribe YouTube video: {str(e)}") | |
def transcribe(inputs, return_timestamps, generate_subs): | |
"""Transcribe audio input using Whisper model via Hugging Face Inference API. | |
Args: | |
inputs (str): Path to audio file to transcribe. | |
return_timestamps (bool): Whether to include timestamps in output. | |
generate_subs (bool): Whether to generate SRT subtitles. | |
Returns: | |
tuple: (formatted_result, srt_file, correction_text) | |
- formatted_result (dict): Transcription results | |
- srt_file (str): Path to SRT file if generated, None otherwise | |
- correction_text (str): Empty string for corrections | |
Raises: | |
gr.Error: If no audio file is provided or transcription fails. | |
""" | |
logger.info(f"Starting transcription process for file: {inputs}") | |
logger.info(f"Options - Timestamps: {return_timestamps}, Generate subtitles: {generate_subs}") | |
if inputs is None: | |
logger.warning("No audio file submitted") | |
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") | |
try: | |
# Read the audio file | |
logger.info("Step 1: Reading audio file...") | |
with open(inputs, "rb") as f: | |
data = f.read() | |
file_size = len(data) | |
logger.info(f"Successfully read audio file, size: {file_size/1024/1024:.2f} MB") | |
# Send request to API | |
logger.info("Step 2: Sending request to Whisper API...") | |
response = requests.post(API_URL, headers=headers, data=data) | |
response.raise_for_status() | |
result = response.json() | |
logger.debug(f"API response: {result}") | |
logger.info("Successfully received response from API") | |
# Format response as JSON | |
logger.info("Step 3: Processing API response...") | |
formatted_result = { | |
"text": result.get("text", "") | |
} | |
logger.info(f"Transcribed text length: {len(formatted_result['text'])} characters") | |
chunks = [] | |
if return_timestamps and "chunks" in result: | |
logger.info(f"Processing {len(result['chunks'])} chunks for timestamps") | |
for i, chunk in enumerate(result["chunks"]): | |
logger.debug(f"Processing chunk {i}: {chunk}") | |
try: | |
start_time = chunk.get("timestamp", [None, None])[0] | |
end_time = chunk.get("timestamp", [None, None])[1] | |
text = chunk.get("text", "").strip() | |
if start_time is not None and end_time is not None: | |
chunk_data = { | |
"text": text, | |
"timestamp": [start_time, end_time] | |
} | |
chunks.append(chunk_data) | |
else: | |
logger.warning(f"Invalid timestamp in chunk {i}: {chunk}") | |
except Exception as chunk_error: | |
logger.error(f"Error processing chunk {i}: {str(chunk_error)}") | |
continue | |
formatted_result["chunks"] = chunks | |
logger.info(f"Successfully processed {len(chunks)} chunks with timestamps") | |
# Generate subtitles if requested | |
srt_file = None | |
if generate_subs and chunks: | |
logger.info("Step 4: Generating SRT subtitles...") | |
srt_content = generate_srt(chunks) | |
srt_file = save_srt_to_file(srt_content) | |
logger.info(f"Successfully generated SRT file: {srt_file}") | |
logger.info("Transcription process completed successfully") | |
return formatted_result, srt_file, "" # Return empty string for correction textbox | |
except requests.exceptions.RequestException as e: | |
logger.exception(f"API request failed: {str(e)}") | |
raise gr.Error(f"Failed to transcribe audio: API request failed - {str(e)}") | |
except Exception as e: | |
logger.exception(f"Error during transcription: {str(e)}") | |
raise gr.Error(f"Failed to transcribe audio: {str(e)}") | |
demo = gr.Blocks(theme=gr.themes.Ocean()) | |
# Define interfaces first | |
youtube_transcribe = gr.Interface( | |
fn=transcribe_youtube, | |
inputs=[ | |
gr.Textbox(label="YouTube URL", placeholder="https://www.youtube.com/watch?v=..."), | |
gr.Checkbox(label="Include timestamps", value=True), | |
gr.Checkbox(label="Generate subtitles", value=True), | |
], | |
outputs=[ | |
gr.JSON(label="Transcription", open=True), | |
gr.File(label="Subtitles (SRT)", visible=True), | |
], | |
title="Tajik Speech Transcription", | |
description=( | |
"Transcribe Tajik language audio from YouTube videos. " | |
"Paste a YouTube URL and get accurate transcription with optional timestamps " | |
"and subtitles.\n\n" | |
"⚠️ Note: YouTube downloads may occasionally fail due to YouTube's restrictions " | |
"or temporary service issues. If this happens, please try again in a few minutes " | |
"or use the audio file upload option instead." | |
) | |
) | |
mf_transcribe = gr.Interface( | |
fn=transcribe, | |
inputs=[ | |
gr.Audio(sources="microphone", type="filepath"), | |
gr.Checkbox(label="Include timestamps", value=True), | |
gr.Checkbox(label="Generate subtitles", value=True), | |
], | |
outputs=[ | |
gr.JSON(label="Transcription", open=True), | |
gr.File(label="Subtitles (SRT)", visible=True), | |
], | |
title="Tajik Speech Transcription", | |
description=( | |
"Transcribe Tajik language audio from microphone or file upload. " | |
"Perfect for transcribing Tajik podcasts, interviews, and conversations. " | |
"Supports both microphone recording and file uploads." | |
) | |
) | |
file_transcribe = gr.Interface( | |
fn=transcribe, | |
inputs=[ | |
gr.Audio(sources="upload", type="filepath", label="Audio file"), | |
gr.Checkbox(label="Include timestamps", value=True), | |
gr.Checkbox(label="Generate subtitles", value=True), | |
], | |
outputs=[ | |
gr.JSON(label="Transcription", open=True), | |
gr.File(label="Subtitles (SRT)", visible=True), | |
], | |
title="Tajik Speech Transcription", | |
description=( | |
"Transcribe Tajik language audio files. " | |
"Upload your audio file and get accurate transcription with optional timestamps " | |
"and subtitles. Supports various audio formats." | |
) | |
) | |
with demo: | |
gr.TabbedInterface( | |
[file_transcribe, mf_transcribe, youtube_transcribe], | |
["Audio file", "Microphone", "YouTube"] | |
) | |
logger.info("Starting Gradio interface") | |
demo.queue().launch(ssr_mode=False) | |