Spaces:
Paused
Paused
| import gradio as gr | |
| import subprocess | |
| import datetime | |
| import tempfile | |
| import requests | |
| import os | |
| import time | |
| from loguru import logger | |
| # Load API keys from environment variables | |
| API_URL = os.getenv("API_URL").rstrip('/') | |
| SIEVE_API_KEY = os.getenv("SIEVE_API_KEY") | |
| SIEVE_API_URL = "https://mango.sievedata.com/v2" | |
| headers = { | |
| "Accept": "application/json", | |
| "Content-Type": "audio/flac" | |
| } | |
| def format_time(seconds): | |
| """Convert seconds to SRT time format (HH:MM:SS,mmm). | |
| Args: | |
| seconds (float): Time in seconds to convert. | |
| Returns: | |
| str: Time formatted as HH:MM:SS,mmm where: | |
| - HH: Hours (00-99) | |
| - MM: Minutes (00-59) | |
| - SS: Seconds (00-59) | |
| - mmm: Milliseconds (000-999) | |
| Example: | |
| >>> format_time(3661.5) | |
| '01:01:01,500' | |
| """ | |
| td = datetime.timedelta(seconds=float(seconds)) | |
| hours = td.seconds // 3600 | |
| minutes = (td.seconds % 3600) // 60 | |
| seconds = td.seconds % 60 | |
| milliseconds = td.microseconds // 1000 | |
| return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" | |
| def generate_srt(segments): | |
| """Generate SRT format subtitles from transcription segments.""" | |
| srt_content = [] | |
| for i, segment in enumerate(segments, 1): | |
| start_time = format_time(segment["start"]) | |
| end_time = format_time(segment["end"]) | |
| text = segment.get("text", "").strip() | |
| srt_content.append(f"{i}\n{start_time} --> {end_time}\n{text}\n\n") | |
| return "".join(srt_content) | |
| def save_srt_to_file(srt_content): | |
| """Save SRT content to a temporary file.""" | |
| if not srt_content: | |
| return None | |
| temp_file = tempfile.NamedTemporaryFile(suffix='.srt', delete=False) | |
| temp_file.write(srt_content.encode('utf-8')) | |
| temp_file.close() | |
| return temp_file.name | |
| # Check if ffmpeg is installed | |
| def check_ffmpeg(): | |
| try: | |
| subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) | |
| logger.info("ffmpeg check passed successfully") | |
| except (subprocess.CalledProcessError, FileNotFoundError) as e: | |
| logger.error(f"ffmpeg check failed: {str(e)}") | |
| raise gr.Error("ffmpeg is not installed. Please install ffmpeg to use this application.") | |
| # Initialize ffmpeg check | |
| check_ffmpeg() | |
| def download_youtube_audio(url): | |
| """Download audio from YouTube using Sieve API.""" | |
| logger.info(f"Starting YouTube audio download process for URL: {url}") | |
| if not SIEVE_API_KEY: | |
| logger.error("SIEVE_API_KEY environment variable is not set") | |
| raise gr.Error("SIEVE_API_KEY environment variable is not set") | |
| try: | |
| temp_file = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) | |
| temp_file.close() | |
| output_path = temp_file.name | |
| logger.info(f"Created temporary file at: {output_path}") | |
| payload = { | |
| "function": "sieve/youtube-downloader", | |
| "inputs": { | |
| "url": url, | |
| "download_type": "audio", | |
| "resolution": "highest-available", | |
| "include_audio": True, | |
| "start_time": 0, | |
| "end_time": -1, | |
| "include_metadata": False, | |
| "metadata_fields": ["title", "thumbnail", "description", "tags", "duration"], | |
| "include_subtitles": False, | |
| "subtitle_languages": ["en"], | |
| "video_format": "mp4", | |
| "audio_format": "mp3" | |
| } | |
| } | |
| # Send request to Sieve API with retries | |
| max_retries = 3 | |
| retry_delay = 5 | |
| for attempt in range(max_retries): | |
| try: | |
| response = requests.post( | |
| f"{SIEVE_API_URL}/push", | |
| headers={"X-API-Key": SIEVE_API_KEY, "Content-Type": "application/json"}, | |
| json=payload, | |
| timeout=1800 | |
| ) | |
| response.raise_for_status() | |
| response_data = response.json() | |
| job_id = response_data.get("id") | |
| if not job_id: | |
| if attempt < max_retries - 1: | |
| time.sleep(retry_delay) | |
| continue | |
| raise gr.Error("Failed to get job ID from Sieve API") | |
| break | |
| except requests.exceptions.RequestException as e: | |
| if attempt < max_retries - 1: | |
| time.sleep(retry_delay) | |
| continue | |
| raise | |
| # Poll for job completion | |
| poll_count = 0 | |
| max_polls = 1800 | |
| while True: | |
| poll_count += 1 | |
| try: | |
| job_response = requests.get( | |
| f"{SIEVE_API_URL}/jobs/{job_id}", | |
| headers={"X-API-Key": SIEVE_API_KEY}, | |
| timeout=1800, | |
| ) | |
| job_response.raise_for_status() | |
| job_data = job_response.json() | |
| status = job_data.get("status") | |
| if status == "completed" or status == "finished": | |
| output_data = job_data.get("outputs", []) | |
| if not output_data: | |
| raise gr.Error("No output data in job response") | |
| first_output = output_data[0] | |
| if not isinstance(first_output, dict): | |
| raise gr.Error("Unexpected output format from job response") | |
| output_data = first_output.get("data", {}) | |
| if not isinstance(output_data, dict): | |
| raise gr.Error("Unexpected data format from job response") | |
| audio_url = output_data.get("url") | |
| if not audio_url: | |
| raise gr.Error("No audio URL in output data") | |
| audio_response = requests.get(audio_url, timeout=1800) | |
| audio_response.raise_for_status() | |
| with open(output_path, "wb") as f: | |
| f.write(audio_response.content) | |
| break | |
| elif status == "failed": | |
| error_msg = job_data.get("error", "Unknown error") | |
| raise gr.Error(f"Job failed: {error_msg}") | |
| if poll_count >= max_polls: | |
| raise gr.Error("Download took too long. Please try again or check if the video is accessible.") | |
| time.sleep(2) | |
| except requests.exceptions.RequestException as e: | |
| if poll_count >= max_polls: | |
| raise gr.Error("Failed to check job status. Please try again.") | |
| time.sleep(2) | |
| except Exception as e: | |
| logger.exception(f"Error during YouTube download: {str(e)}") | |
| raise gr.Error(f"Failed to download YouTube audio: {str(e)}") | |
| return output_path | |
| def check_api_health(): | |
| """Check if the API is healthy before making requests.""" | |
| try: | |
| response = requests.get(f"{API_URL}/health") | |
| response.raise_for_status() | |
| health_data = response.json() | |
| # Check if service is healthy | |
| if health_data.get("status") != "healthy": | |
| raise gr.Error("API service is not healthy. Please try again later.") | |
| # Check resource usage | |
| cpu_percent = health_data.get("cpu_percent", 0) | |
| memory_percent = health_data.get("memory_percent", 0) | |
| if cpu_percent > 90 or memory_percent > 90: | |
| logger.warning(f"High resource usage detected - CPU: {cpu_percent}%, Memory: {memory_percent}%") | |
| logger.info("API health check passed successfully") | |
| return True | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Health check failed: {str(e)}") | |
| raise gr.Error("Failed to connect to the API service. Please try again later.") | |
| def transcribe_youtube(url, return_timestamps, generate_subs, chunk_length_s=30, batch_size=128): | |
| """Transcribe audio from YouTube video using URL endpoint.""" | |
| logger.info(f"Starting YouTube transcription process for URL: {url}") | |
| logger.info(f"Parameters - return_timestamps: {return_timestamps}, generate_subs: {generate_subs}, chunk_length_s: {chunk_length_s}, batch_size: {batch_size}") | |
| try: | |
| # Check API health first | |
| logger.info("Performing API health check...") | |
| check_api_health() | |
| # Validate URL scheme | |
| if not url.startswith(('http://', 'https://')): | |
| logger.error(f"Invalid URL scheme: {url}") | |
| raise gr.Error("URL must start with http:// or https://") | |
| # Get audio URL from Sieve | |
| logger.info("Starting YouTube audio download via Sieve API...") | |
| audio_url = download_youtube_audio(url) | |
| logger.info(f"Successfully obtained audio URL from Sieve: {audio_url}") | |
| # Validate audio URL scheme | |
| if not audio_url.startswith(('http://', 'https://')): | |
| logger.error(f"Invalid audio URL scheme from Sieve: {audio_url}") | |
| raise gr.Error("Invalid audio URL scheme received from Sieve") | |
| # Prepare request parameters | |
| params = { | |
| "source_language": "tg", # Tajik language | |
| "timestamp_level": "sentence" if return_timestamps else None, | |
| "task": "transcribe", | |
| "chunk_length_s": chunk_length_s, | |
| "batch_size": batch_size | |
| } | |
| logger.info(f"Prepared API request parameters: {params}") | |
| # Send request to API | |
| logger.info("Sending transcription request to API...") | |
| response = requests.post( | |
| f"{API_URL}/transcribe/url", | |
| json={"url": audio_url}, | |
| params=params, | |
| timeout=1800 | |
| ) | |
| response.raise_for_status() | |
| result = response.json() | |
| logger.info("Successfully received response from API") | |
| # Log metadata | |
| metadata = result.get("metadata", {}) | |
| logger.info(f"Transcription metadata: {metadata}") | |
| logger.info(f"Transcription completed in {metadata.get('timing', {}).get('total_time', 0):.2f} seconds") | |
| # Format response with segments (without id) | |
| logger.info("Formatting response...") | |
| formatted_result = { | |
| "text": result["transcription"]["text"], | |
| "segments": [ | |
| { | |
| "start": segment["start"], | |
| "end": segment["end"], | |
| "text": segment["text"] | |
| } | |
| for segment in result["transcription"]["segments"] | |
| ] if return_timestamps else None | |
| } | |
| logger.info(f"Formatted result contains {len(formatted_result['segments'] or [])} segments") | |
| # Generate subtitles if requested | |
| srt_file = None | |
| if generate_subs and return_timestamps and "segments" in result["transcription"]: | |
| logger.info("Generating SRT subtitles...") | |
| srt_content = generate_srt(result["transcription"]["segments"]) | |
| srt_file = save_srt_to_file(srt_content) | |
| logger.info(f"Generated SRT file: {srt_file}") | |
| logger.info("YouTube transcription process completed successfully") | |
| return formatted_result, srt_file, "" | |
| except Exception as e: | |
| logger.exception(f"Error in YouTube transcription: {str(e)}") | |
| raise gr.Error(f"Failed to transcribe YouTube video: {str(e)}") | |
| def transcribe(inputs, return_timestamps, generate_subs, chunk_length_s=30, batch_size=128): | |
| """Transcribe audio input using Whisper API.""" | |
| logger.info(f"Starting transcription process for file: {inputs}") | |
| logger.info(f"Parameters - return_timestamps: {return_timestamps}, generate_subs: {generate_subs}, chunk_length_s: {chunk_length_s}, batch_size: {batch_size}") | |
| if inputs is None: | |
| logger.error("No audio file submitted") | |
| raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") | |
| try: | |
| # Check API health first | |
| logger.info("Performing API health check...") | |
| check_api_health() | |
| # Read the audio file | |
| logger.info(f"Reading audio file: {inputs}") | |
| with open(inputs, "rb") as f: | |
| files = {"file": f} | |
| # Prepare request parameters | |
| params = { | |
| "source_language": "tg", # Tajik language | |
| "timestamp_level": "sentence" if return_timestamps else None, | |
| "task": "transcribe", | |
| "chunk_length_s": chunk_length_s, | |
| "batch_size": batch_size | |
| } | |
| logger.info(f"Prepared API request parameters: {params}") | |
| # Send request to API | |
| logger.info("Sending transcription request to API...") | |
| response = requests.post( | |
| f"{API_URL}/transcribe", | |
| files=files, | |
| params=params, | |
| timeout=1800 | |
| ) | |
| response.raise_for_status() | |
| result = response.json() | |
| logger.info("Successfully received response from API") | |
| # Log metadata | |
| metadata = result.get("metadata", {}) | |
| logger.info(f"Transcription metadata: {metadata}") | |
| logger.info(f"Transcription completed in {metadata.get('timing', {}).get('total_time', 0):.2f} seconds") | |
| # Format response with segments (without id) | |
| logger.info("Formatting response...") | |
| formatted_result = { | |
| "text": result["transcription"]["text"], | |
| "segments": [ | |
| { | |
| "start": segment["start"], | |
| "end": segment["end"], | |
| "text": segment["text"] | |
| } | |
| for segment in result["transcription"]["segments"] | |
| ] if return_timestamps else None | |
| } | |
| logger.info(f"Formatted result contains {len(formatted_result['segments'] or [])} segments") | |
| # Generate subtitles if requested | |
| srt_file = None | |
| if generate_subs and return_timestamps and "segments" in result["transcription"]: | |
| logger.info("Generating SRT subtitles...") | |
| srt_content = generate_srt(result["transcription"]["segments"]) | |
| srt_file = save_srt_to_file(srt_content) | |
| logger.info(f"Generated SRT file: {srt_file}") | |
| logger.info("Transcription process completed successfully") | |
| return formatted_result, srt_file, "" | |
| except requests.exceptions.RequestException as e: | |
| logger.exception(f"API request failed: {str(e)}") | |
| raise gr.Error(f"Failed to transcribe audio: API request failed - {str(e)}") | |
| except Exception as e: | |
| logger.exception(f"Error during transcription: {str(e)}") | |
| raise gr.Error(f"Failed to transcribe audio: {str(e)}") | |
| demo = gr.Blocks(theme=gr.themes.Ocean()) | |
| # Define interfaces first | |
| youtube_transcribe = gr.Interface( | |
| fn=transcribe_youtube, | |
| inputs=[ | |
| gr.Textbox(label="YouTube URL", placeholder="https://www.youtube.com/watch?v=..."), | |
| gr.Checkbox(label="Include timestamps", value=True), | |
| gr.Checkbox(label="Generate subtitles", value=True), | |
| gr.Slider(minimum=10, maximum=60, value=30, step=5, label="Chunk Length (seconds)"), | |
| gr.Slider(minimum=32, maximum=256, value=128, step=32, label="Batch Size") | |
| ], | |
| outputs=[ | |
| gr.JSON(label="Transcription", open=True), | |
| gr.File(label="Subtitles (SRT)", visible=True), | |
| ], | |
| title="Tajik Speech Transcription", | |
| description=( | |
| "Transcribe Tajik language audio from YouTube videos. " | |
| "Paste a YouTube URL and get accurate transcription with optional timestamps " | |
| "and subtitles.\n\n" | |
| "⚠️ Note: YouTube downloads may occasionally fail due to YouTube's restrictions " | |
| "or temporary service issues. If this happens, please try again in a few minutes " | |
| "or use the audio file upload option instead." | |
| ) | |
| ) | |
| mf_transcribe = gr.Interface( | |
| fn=transcribe, | |
| inputs=[ | |
| gr.Audio(sources="microphone", type="filepath"), | |
| gr.Checkbox(label="Include timestamps", value=True), | |
| gr.Checkbox(label="Generate subtitles", value=True), | |
| gr.Slider(minimum=10, maximum=60, value=30, step=5, label="Chunk Length (seconds)"), | |
| gr.Slider(minimum=32, maximum=256, value=128, step=32, label="Batch Size") | |
| ], | |
| outputs=[ | |
| gr.JSON(label="Transcription", open=True), | |
| gr.File(label="Subtitles (SRT)", visible=True), | |
| ], | |
| title="Tajik Speech Transcription", | |
| description=( | |
| "Transcribe Tajik language audio from microphone or file upload. " | |
| "Perfect for transcribing Tajik podcasts, interviews, and conversations. " | |
| "Supports both microphone recording and file uploads." | |
| ) | |
| ) | |
| file_transcribe = gr.Interface( | |
| fn=transcribe, | |
| inputs=[ | |
| gr.Audio(sources="upload", type="filepath", label="Audio file"), | |
| gr.Checkbox(label="Include timestamps", value=True), | |
| gr.Checkbox(label="Generate subtitles", value=True), | |
| gr.Slider(minimum=10, maximum=60, value=15, step=5, label="Chunk Length (seconds)"), | |
| gr.Slider(minimum=32, maximum=256, value=8, step=32, label="Batch Size") | |
| ], | |
| outputs=[ | |
| gr.JSON(label="Transcription", open=True), | |
| gr.File(label="Subtitles (SRT)", visible=True), | |
| ], | |
| title="Tajik Speech Transcription", | |
| description=( | |
| "Transcribe Tajik language audio files. " | |
| "Upload your audio file and get accurate transcription with optional timestamps " | |
| "and subtitles. Supports various audio formats." | |
| ) | |
| ) | |
| with demo: | |
| gr.TabbedInterface( | |
| [file_transcribe, mf_transcribe, youtube_transcribe], | |
| ["Audio file", "Microphone", "YouTube"] | |
| ) | |
| logger.info("Starting Gradio interface") | |
| demo.queue().launch(ssr_mode=False) | |