import gradio as gr import subprocess import datetime import tempfile import requests import os import time from loguru import logger # Load API keys from environment variables API_URL = os.getenv("API_URL") SIEVE_API_KEY = os.getenv("SIEVE_API_KEY") SIEVE_API_URL = "https://mango.sievedata.com/v2" headers = { "Accept": "application/json", "Content-Type": "audio/flac" } def format_time(seconds): """Convert seconds to SRT time format (HH:MM:SS,mmm). Args: seconds (float): Time in seconds to convert. Returns: str: Time formatted as HH:MM:SS,mmm where: - HH: Hours (00-99) - MM: Minutes (00-59) - SS: Seconds (00-59) - mmm: Milliseconds (000-999) Example: >>> format_time(3661.5) '01:01:01,500' """ td = datetime.timedelta(seconds=float(seconds)) hours = td.seconds // 3600 minutes = (td.seconds % 3600) // 60 seconds = td.seconds % 60 milliseconds = td.microseconds // 1000 return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" def generate_srt(chunks): """Generate SRT format subtitles from transcription chunks. Args: chunks (list): List of dictionaries containing transcription chunks. Each chunk must have: - "timestamp": List of [start_time, end_time] in seconds - "text": The transcribed text for that time segment Returns: str: SRT formatted subtitles string with format: ``` 1 HH:MM:SS,mmm --> HH:MM:SS,mmm Text content 2 HH:MM:SS,mmm --> HH:MM:SS,mmm Text content ... ``` Example: >>> chunks = [ ... {"timestamp": [0.0, 1.5], "text": "Hello"}, ... {"timestamp": [1.5, 3.0], "text": "World"} ... ] >>> generate_srt(chunks) '1\\n00:00:00,000 --> 00:00:01,500\\nHello\\n\\n2\\n00:00:01,500 --> 00:00:03,000\\nWorld\\n\\n' """ srt_content = [] for i, chunk in enumerate(chunks, 1): start_time = format_time(chunk["timestamp"][0]) end_time = format_time(chunk["timestamp"][1]) text = chunk.get("text", "").strip() srt_content.append(f"{i}\n{start_time} --> {end_time}\n{text}\n\n") return "".join(srt_content) def save_srt_to_file(srt_content): """Save SRT content to a temporary file. Args: srt_content (str): The SRT formatted subtitles content to save. Returns: str or None: Path to the temporary file if content was saved, None if srt_content was empty. Note: The temporary file is created with delete=False to allow it to be used after the function returns. The file should be deleted by the caller when no longer needed. """ if not srt_content: return None # Create a temporary file with .srt extension temp_file = tempfile.NamedTemporaryFile(suffix='.srt', delete=False) temp_file.write(srt_content.encode('utf-8')) temp_file.close() return temp_file.name # Check if ffmpeg is installed def check_ffmpeg(): try: subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) logger.info("ffmpeg check passed successfully") except (subprocess.CalledProcessError, FileNotFoundError) as e: logger.error(f"ffmpeg check failed: {str(e)}") raise gr.Error("ffmpeg is not installed. Please install ffmpeg to use this application.") # Initialize ffmpeg check check_ffmpeg() def download_youtube_audio(url): """Download audio from YouTube using Sieve API. Args: url (str): YouTube video URL Returns: str: Path to downloaded audio file Raises: gr.Error: If download fails or API key is not set """ logger.info(f"Starting YouTube audio download process for URL: {url}") if not SIEVE_API_KEY: logger.error("SIEVE_API_KEY environment variable is not set") raise gr.Error("SIEVE_API_KEY environment variable is not set") try: # Create a temporary file for the audio temp_file = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) temp_file.close() output_path = temp_file.name logger.info(f"Created temporary file at: {output_path}") # Prepare the request to Sieve API with exact parameters payload = { "function": "sieve/youtube-downloader", "inputs": { "url": url, "download_type": "audio", # Ensure we're only downloading audio "resolution": "highest-available", "include_audio": True, "start_time": 0, "end_time": -1, "include_metadata": False, "metadata_fields": ["title", "thumbnail", "description", "tags", "duration"], "include_subtitles": False, "subtitle_languages": ["en"], "video_format": "mp4", "audio_format": "mp3" } } logger.debug(f"Prepared Sieve API payload: {payload}") # Send request to Sieve API with retries max_retries = 3 retry_delay = 5 # seconds for attempt in range(max_retries): try: logger.info(f"Sending request to Sieve API (attempt {attempt + 1}/{max_retries})...") response = requests.post( f"{SIEVE_API_URL}/push", headers={"X-API-Key": SIEVE_API_KEY, "Content-Type": "application/json"}, json=payload, timeout=1800 # Add timeout ) response.raise_for_status() response_data = response.json() logger.debug(f"Sieve API response: {response_data}") job_id = response_data.get("id") if not job_id: logger.error("No job ID received from Sieve API") if attempt < max_retries - 1: logger.warning(f"Retrying in {retry_delay} seconds...") time.sleep(retry_delay) continue raise gr.Error("Failed to get job ID from Sieve API") break except requests.exceptions.RequestException as e: logger.warning(f"Request failed (attempt {attempt + 1}/{max_retries}): {str(e)}") if attempt < max_retries - 1: logger.info(f"Retrying in {retry_delay} seconds...") time.sleep(retry_delay) continue raise logger.info(f"Received job ID: {job_id}") # Poll for job completion poll_count = 0 max_polls = 180 # Maximum number of polls (6 minutes with 2-second delay) last_status = None while True: poll_count += 1 logger.info(f"Polling job status (attempt {poll_count}/{max_polls})...") try: job_response = requests.get( f"{SIEVE_API_URL}/jobs/{job_id}", headers={"X-API-Key": SIEVE_API_KEY}, timeout=1800, ) job_response.raise_for_status() job_data = job_response.json() # logger.debug(f"Job status response: {job_data}") status = job_data.get("status") if status != last_status: logger.info(f"Job status changed: {status}") last_status = status if status == "completed" or status == "finished": logger.info("Job completed successfully") # Get the output data output_data = job_data.get("output_0", {}) if not output_data: logger.error("No output data found in completed job response") raise gr.Error("No output data in job response") # Get the audio URL from the output audio_url = output_data.get("url") if not audio_url: logger.error("No audio URL found in output data") raise gr.Error("No audio URL in output data") logger.info(f"Received audio URL from Sieve: {audio_url}") # Download the audio file logger.info("Downloading audio file from Sieve storage...") audio_response = requests.get(audio_url, timeout=30) audio_response.raise_for_status() file_size = len(audio_response.content) logger.info(f"Downloaded audio file size: {file_size/1024/1024:.2f} MB") # Save the file with open(output_path, "wb") as f: f.write(audio_response.content) logger.info(f"Successfully saved audio to: {output_path}") # Break out of the polling loop after successful download break elif status == "failed": error_msg = job_data.get("error", "Unknown error") logger.error(f"Job failed with error: {error_msg}") raise gr.Error(f"Job failed: {error_msg}") if poll_count >= max_polls: logger.error("Maximum polling attempts reached") raise gr.Error("Download took too long. Please try again or check if the video is accessible.") logger.info("Job still processing, waiting 2 seconds before next poll...") time.sleep(2) except requests.exceptions.RequestException as e: logger.warning(f"Poll request failed: {str(e)}") if poll_count >= max_polls: raise gr.Error("Failed to check job status. Please try again.") time.sleep(2) except requests.exceptions.RequestException as e: logger.exception(f"Network error during YouTube download: {str(e)}") raise gr.Error(f"Failed to download YouTube audio: Network error - {str(e)}") except Exception as e: logger.exception(f"Unexpected error during YouTube download: {str(e)}") raise gr.Error(f"Failed to download YouTube audio: {str(e)}") return output_path def transcribe_youtube(url, return_timestamps, generate_subs): """Transcribe audio from YouTube video. Args: url (str): YouTube video URL return_timestamps (bool): Whether to include timestamps in output generate_subs (bool): Whether to generate SRT subtitles Returns: tuple: (formatted_result, srt_file, correction_text) """ logger.info(f"Starting YouTube transcription process for URL: {url}") logger.info(f"Options - Timestamps: {return_timestamps}, Generate subtitles: {generate_subs}") try: # Download audio from YouTube logger.info("Step 1: Downloading audio from YouTube...") audio_path = download_youtube_audio(url) logger.info(f"Successfully downloaded audio to: {audio_path}") # Transcribe the downloaded audio logger.info("Step 2: Transcribing downloaded audio...") result = transcribe(audio_path, return_timestamps, generate_subs) logger.info("Successfully completed transcription") # Clean up the temporary file logger.info("Step 3: Cleaning up temporary files...") try: os.unlink(audio_path) logger.info(f"Successfully deleted temporary file: {audio_path}") except Exception as e: logger.warning(f"Failed to delete temporary file: {str(e)}") return result except Exception as e: logger.exception(f"Error in YouTube transcription: {str(e)}") raise gr.Error(f"Failed to transcribe YouTube video: {str(e)}") def transcribe(inputs, return_timestamps, generate_subs): """Transcribe audio input using Whisper model via Hugging Face Inference API. Args: inputs (str): Path to audio file to transcribe. return_timestamps (bool): Whether to include timestamps in output. generate_subs (bool): Whether to generate SRT subtitles. Returns: tuple: (formatted_result, srt_file, correction_text) - formatted_result (dict): Transcription results - srt_file (str): Path to SRT file if generated, None otherwise - correction_text (str): Empty string for corrections Raises: gr.Error: If no audio file is provided or transcription fails. """ logger.info(f"Starting transcription process for file: {inputs}") logger.info(f"Options - Timestamps: {return_timestamps}, Generate subtitles: {generate_subs}") if inputs is None: logger.warning("No audio file submitted") raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") try: # Read the audio file logger.info("Step 1: Reading audio file...") with open(inputs, "rb") as f: data = f.read() file_size = len(data) logger.info(f"Successfully read audio file, size: {file_size/1024/1024:.2f} MB") # Send request to API logger.info("Step 2: Sending request to Whisper API...") response = requests.post(API_URL, headers=headers, data=data) response.raise_for_status() result = response.json() logger.debug(f"API response: {result}") logger.info("Successfully received response from API") # Format response as JSON logger.info("Step 3: Processing API response...") formatted_result = { "text": result.get("text", "") } logger.info(f"Transcribed text length: {len(formatted_result['text'])} characters") chunks = [] if return_timestamps and "chunks" in result: logger.info(f"Processing {len(result['chunks'])} chunks for timestamps") for i, chunk in enumerate(result["chunks"]): logger.debug(f"Processing chunk {i}: {chunk}") try: start_time = chunk.get("timestamp", [None, None])[0] end_time = chunk.get("timestamp", [None, None])[1] text = chunk.get("text", "").strip() if start_time is not None and end_time is not None: chunk_data = { "text": text, "timestamp": [start_time, end_time] } chunks.append(chunk_data) else: logger.warning(f"Invalid timestamp in chunk {i}: {chunk}") except Exception as chunk_error: logger.error(f"Error processing chunk {i}: {str(chunk_error)}") continue formatted_result["chunks"] = chunks logger.info(f"Successfully processed {len(chunks)} chunks with timestamps") # Generate subtitles if requested srt_file = None if generate_subs and chunks: logger.info("Step 4: Generating SRT subtitles...") srt_content = generate_srt(chunks) srt_file = save_srt_to_file(srt_content) logger.info(f"Successfully generated SRT file: {srt_file}") logger.info("Transcription process completed successfully") return formatted_result, srt_file, "" # Return empty string for correction textbox except requests.exceptions.RequestException as e: logger.exception(f"API request failed: {str(e)}") raise gr.Error(f"Failed to transcribe audio: API request failed - {str(e)}") except Exception as e: logger.exception(f"Error during transcription: {str(e)}") raise gr.Error(f"Failed to transcribe audio: {str(e)}") demo = gr.Blocks(theme=gr.themes.Ocean()) # Define interfaces first youtube_transcribe = gr.Interface( fn=transcribe_youtube, inputs=[ gr.Textbox(label="YouTube URL", placeholder="https://www.youtube.com/watch?v=..."), gr.Checkbox(label="Include timestamps", value=True), gr.Checkbox(label="Generate subtitles", value=True), ], outputs=[ gr.JSON(label="Transcription", open=True), gr.File(label="Subtitles (SRT)", visible=True), ], title="Tajik Speech Transcription", description=( "Transcribe Tajik language audio from YouTube videos. " "Paste a YouTube URL and get accurate transcription with optional timestamps " "and subtitles.\n\n" "⚠️ Note: YouTube downloads may occasionally fail due to YouTube's restrictions " "or temporary service issues. If this happens, please try again in a few minutes " "or use the audio file upload option instead." ) ) mf_transcribe = gr.Interface( fn=transcribe, inputs=[ gr.Audio(sources="microphone", type="filepath"), gr.Checkbox(label="Include timestamps", value=True), gr.Checkbox(label="Generate subtitles", value=True), ], outputs=[ gr.JSON(label="Transcription", open=True), gr.File(label="Subtitles (SRT)", visible=True), ], title="Tajik Speech Transcription", description=( "Transcribe Tajik language audio from microphone or file upload. " "Perfect for transcribing Tajik podcasts, interviews, and conversations. " "Supports both microphone recording and file uploads." ) ) file_transcribe = gr.Interface( fn=transcribe, inputs=[ gr.Audio(sources="upload", type="filepath", label="Audio file"), gr.Checkbox(label="Include timestamps", value=True), gr.Checkbox(label="Generate subtitles", value=True), ], outputs=[ gr.JSON(label="Transcription", open=True), gr.File(label="Subtitles (SRT)", visible=True), ], title="Tajik Speech Transcription", description=( "Transcribe Tajik language audio files. " "Upload your audio file and get accurate transcription with optional timestamps " "and subtitles. Supports various audio formats." ) ) with demo: gr.TabbedInterface( [file_transcribe, mf_transcribe, youtube_transcribe], ["Audio file", "Microphone", "YouTube"] ) logger.info("Starting Gradio interface") demo.queue().launch(ssr_mode=False)