Spaces:
Sleeping
Sleeping
File size: 19,243 Bytes
6c226f9 daf7f7b c5741b3 2f8a4bc a6eeb9b 020bd94 a1d6c0c 6c226f9 020bd94 a6eeb9b 6c226f9 c5741b3 a1d6c0c c5741b3 a1d6c0c c5741b3 a6eeb9b c5741b3 2f8a4bc a1d6c0c 2f8a4bc daf7f7b 7c39690 daf7f7b 020bd94 2e5323f 020bd94 2e5323f 020bd94 2e5323f 020bd94 2e5323f 020bd94 2e5323f 020bd94 2e5323f 020bd94 2e5323f 020bd94 2e5323f 020bd94 2e5323f f0fe708 2e5323f f0fe708 2e5323f f0fe708 020bd94 2e5323f 020bd94 2e5323f f0fe708 2e5323f f16ee1e 2e5323f f0fe708 020bd94 9f81d50 2e5323f 020bd94 2e5323f f0fe708 2e5323f f0fe708 020bd94 2e5323f 020bd94 2e5323f 020bd94 2e5323f 020bd94 2e5323f 020bd94 f0fe708 020bd94 2e5323f 020bd94 2e5323f 020bd94 2e5323f 020bd94 2e5323f 020bd94 2e5323f 020bd94 2e5323f 020bd94 2e5323f 020bd94 a6eeb9b a1d6c0c 2e5323f 3c0cd8e 7c39690 3c0cd8e 6c226f9 1e5e969 a6eeb9b 2e5323f a6eeb9b 2e5323f a1d6c0c a6eeb9b 2e5323f a6eeb9b 2e5323f a6eeb9b 2e5323f 1e5e969 0147bd5 2e5323f 0147bd5 a6eeb9b 0147bd5 2e5323f 0147bd5 c5741b3 eb5510b 2e5323f 7c39690 c5741b3 0147bd5 c5741b3 7c39690 22095b0 2e5323f c5741b3 2f8a4bc c5741b3 2e5323f c5741b3 2f8a4bc 2e5323f 1e5e969 2e5323f 9c7822e 2e5323f a6eeb9b 1e5e969 7c39690 1e5e969 6c226f9 2a49988 47407ef 6c226f9 22095b0 020bd94 2e5323f 020bd94 6c226f9 3ce82e9 eb5510b c5741b3 3c0cd8e 478eee2 0147bd5 c5741b3 478eee2 0844bdd 3c0cd8e 0844bdd dbe4a4a 3c0cd8e 3ce82e9 eb5510b c5741b3 6c226f9 478eee2 0147bd5 c5741b3 478eee2 0844bdd 6c226f9 0844bdd dbe4a4a 6c226f9 020bd94 2e5323f 020bd94 6c226f9 7c39690 47407ef 7097513 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 |
import gradio as gr
import subprocess
import datetime
import tempfile
import requests
import os
import time
from loguru import logger
# Load API keys from environment variables
API_URL = os.getenv("API_URL")
SIEVE_API_KEY = os.getenv("SIEVE_API_KEY")
SIEVE_API_URL = "https://mango.sievedata.com/v2"
headers = {
"Accept": "application/json",
"Content-Type": "audio/flac"
}
def format_time(seconds):
"""Convert seconds to SRT time format (HH:MM:SS,mmm).
Args:
seconds (float): Time in seconds to convert.
Returns:
str: Time formatted as HH:MM:SS,mmm where:
- HH: Hours (00-99)
- MM: Minutes (00-59)
- SS: Seconds (00-59)
- mmm: Milliseconds (000-999)
Example:
>>> format_time(3661.5)
'01:01:01,500'
"""
td = datetime.timedelta(seconds=float(seconds))
hours = td.seconds // 3600
minutes = (td.seconds % 3600) // 60
seconds = td.seconds % 60
milliseconds = td.microseconds // 1000
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}"
def generate_srt(chunks):
"""Generate SRT format subtitles from transcription chunks.
Args:
chunks (list): List of dictionaries containing transcription chunks.
Each chunk must have:
- "timestamp": List of [start_time, end_time] in seconds
- "text": The transcribed text for that time segment
Returns:
str: SRT formatted subtitles string with format:
```
1
HH:MM:SS,mmm --> HH:MM:SS,mmm
Text content
2
HH:MM:SS,mmm --> HH:MM:SS,mmm
Text content
...
```
Example:
>>> chunks = [
... {"timestamp": [0.0, 1.5], "text": "Hello"},
... {"timestamp": [1.5, 3.0], "text": "World"}
... ]
>>> generate_srt(chunks)
'1\\n00:00:00,000 --> 00:00:01,500\\nHello\\n\\n2\\n00:00:01,500 --> 00:00:03,000\\nWorld\\n\\n'
"""
srt_content = []
for i, chunk in enumerate(chunks, 1):
start_time = format_time(chunk["timestamp"][0])
end_time = format_time(chunk["timestamp"][1])
text = chunk.get("text", "").strip()
srt_content.append(f"{i}\n{start_time} --> {end_time}\n{text}\n\n")
return "".join(srt_content)
def save_srt_to_file(srt_content):
"""Save SRT content to a temporary file.
Args:
srt_content (str): The SRT formatted subtitles content to save.
Returns:
str or None: Path to the temporary file if content was saved,
None if srt_content was empty.
Note:
The temporary file is created with delete=False to allow it to be
used after the function returns. The file should be deleted by the
caller when no longer needed.
"""
if not srt_content:
return None
# Create a temporary file with .srt extension
temp_file = tempfile.NamedTemporaryFile(suffix='.srt', delete=False)
temp_file.write(srt_content.encode('utf-8'))
temp_file.close()
return temp_file.name
# Check if ffmpeg is installed
def check_ffmpeg():
try:
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
logger.info("ffmpeg check passed successfully")
except (subprocess.CalledProcessError, FileNotFoundError) as e:
logger.error(f"ffmpeg check failed: {str(e)}")
raise gr.Error("ffmpeg is not installed. Please install ffmpeg to use this application.")
# Initialize ffmpeg check
check_ffmpeg()
def download_youtube_audio(url):
"""Download audio from YouTube using Sieve API.
Args:
url (str): YouTube video URL
Returns:
str: Path to downloaded audio file
Raises:
gr.Error: If download fails or API key is not set
"""
logger.info(f"Starting YouTube audio download process for URL: {url}")
if not SIEVE_API_KEY:
logger.error("SIEVE_API_KEY environment variable is not set")
raise gr.Error("SIEVE_API_KEY environment variable is not set")
try:
# Create a temporary file for the audio
temp_file = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False)
temp_file.close()
output_path = temp_file.name
logger.info(f"Created temporary file at: {output_path}")
# Prepare the request to Sieve API with exact parameters
payload = {
"function": "sieve/youtube-downloader",
"inputs": {
"url": url,
"download_type": "audio", # Ensure we're only downloading audio
"resolution": "highest-available",
"include_audio": True,
"start_time": 0,
"end_time": -1,
"include_metadata": False,
"metadata_fields": ["title", "thumbnail", "description", "tags", "duration"],
"include_subtitles": False,
"subtitle_languages": ["en"],
"video_format": "mp4",
"audio_format": "mp3"
}
}
logger.debug(f"Prepared Sieve API payload: {payload}")
# Send request to Sieve API with retries
max_retries = 3
retry_delay = 5 # seconds
for attempt in range(max_retries):
try:
logger.info(f"Sending request to Sieve API (attempt {attempt + 1}/{max_retries})...")
response = requests.post(
f"{SIEVE_API_URL}/push",
headers={"X-API-Key": SIEVE_API_KEY, "Content-Type": "application/json"},
json=payload,
timeout=1800 # Add timeout
)
response.raise_for_status()
response_data = response.json()
logger.debug(f"Sieve API response: {response_data}")
job_id = response_data.get("id")
if not job_id:
logger.error("No job ID received from Sieve API")
if attempt < max_retries - 1:
logger.warning(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
continue
raise gr.Error("Failed to get job ID from Sieve API")
break
except requests.exceptions.RequestException as e:
logger.warning(f"Request failed (attempt {attempt + 1}/{max_retries}): {str(e)}")
if attempt < max_retries - 1:
logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
continue
raise
logger.info(f"Received job ID: {job_id}")
# Poll for job completion
poll_count = 0
max_polls = 180 # Maximum number of polls (6 minutes with 2-second delay)
last_status = None
while True:
poll_count += 1
logger.info(f"Polling job status (attempt {poll_count}/{max_polls})...")
try:
job_response = requests.get(
f"{SIEVE_API_URL}/jobs/{job_id}",
headers={"X-API-Key": SIEVE_API_KEY},
timeout=1800,
)
job_response.raise_for_status()
job_data = job_response.json()
# logger.debug(f"Job status response: {job_data}")
status = job_data.get("status")
if status != last_status:
logger.info(f"Job status changed: {status}")
last_status = status
if status == "completed" or status == "finished":
logger.info("Job completed successfully")
# Get the output data
output_data = job_data.get("output_0", {})
if not output_data:
logger.error("No output data found in completed job response")
raise gr.Error("No output data in job response")
# Get the audio URL from the output
audio_url = output_data.get("url")
if not audio_url:
logger.error("No audio URL found in output data")
raise gr.Error("No audio URL in output data")
logger.info(f"Received audio URL from Sieve: {audio_url}")
# Download the audio file
logger.info("Downloading audio file from Sieve storage...")
audio_response = requests.get(audio_url, timeout=30)
audio_response.raise_for_status()
file_size = len(audio_response.content)
logger.info(f"Downloaded audio file size: {file_size/1024/1024:.2f} MB")
# Save the file
with open(output_path, "wb") as f:
f.write(audio_response.content)
logger.info(f"Successfully saved audio to: {output_path}")
# Break out of the polling loop after successful download
break
elif status == "failed":
error_msg = job_data.get("error", "Unknown error")
logger.error(f"Job failed with error: {error_msg}")
raise gr.Error(f"Job failed: {error_msg}")
if poll_count >= max_polls:
logger.error("Maximum polling attempts reached")
raise gr.Error("Download took too long. Please try again or check if the video is accessible.")
logger.info("Job still processing, waiting 2 seconds before next poll...")
time.sleep(2)
except requests.exceptions.RequestException as e:
logger.warning(f"Poll request failed: {str(e)}")
if poll_count >= max_polls:
raise gr.Error("Failed to check job status. Please try again.")
time.sleep(2)
except requests.exceptions.RequestException as e:
logger.exception(f"Network error during YouTube download: {str(e)}")
raise gr.Error(f"Failed to download YouTube audio: Network error - {str(e)}")
except Exception as e:
logger.exception(f"Unexpected error during YouTube download: {str(e)}")
raise gr.Error(f"Failed to download YouTube audio: {str(e)}")
return output_path
def transcribe_youtube(url, return_timestamps, generate_subs):
"""Transcribe audio from YouTube video.
Args:
url (str): YouTube video URL
return_timestamps (bool): Whether to include timestamps in output
generate_subs (bool): Whether to generate SRT subtitles
Returns:
tuple: (formatted_result, srt_file, correction_text)
"""
logger.info(f"Starting YouTube transcription process for URL: {url}")
logger.info(f"Options - Timestamps: {return_timestamps}, Generate subtitles: {generate_subs}")
try:
# Download audio from YouTube
logger.info("Step 1: Downloading audio from YouTube...")
audio_path = download_youtube_audio(url)
logger.info(f"Successfully downloaded audio to: {audio_path}")
# Transcribe the downloaded audio
logger.info("Step 2: Transcribing downloaded audio...")
result = transcribe(audio_path, return_timestamps, generate_subs)
logger.info("Successfully completed transcription")
# Clean up the temporary file
logger.info("Step 3: Cleaning up temporary files...")
try:
os.unlink(audio_path)
logger.info(f"Successfully deleted temporary file: {audio_path}")
except Exception as e:
logger.warning(f"Failed to delete temporary file: {str(e)}")
return result
except Exception as e:
logger.exception(f"Error in YouTube transcription: {str(e)}")
raise gr.Error(f"Failed to transcribe YouTube video: {str(e)}")
def transcribe(inputs, return_timestamps, generate_subs):
"""Transcribe audio input using Whisper model via Hugging Face Inference API.
Args:
inputs (str): Path to audio file to transcribe.
return_timestamps (bool): Whether to include timestamps in output.
generate_subs (bool): Whether to generate SRT subtitles.
Returns:
tuple: (formatted_result, srt_file, correction_text)
- formatted_result (dict): Transcription results
- srt_file (str): Path to SRT file if generated, None otherwise
- correction_text (str): Empty string for corrections
Raises:
gr.Error: If no audio file is provided or transcription fails.
"""
logger.info(f"Starting transcription process for file: {inputs}")
logger.info(f"Options - Timestamps: {return_timestamps}, Generate subtitles: {generate_subs}")
if inputs is None:
logger.warning("No audio file submitted")
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.")
try:
# Read the audio file
logger.info("Step 1: Reading audio file...")
with open(inputs, "rb") as f:
data = f.read()
file_size = len(data)
logger.info(f"Successfully read audio file, size: {file_size/1024/1024:.2f} MB")
# Send request to API
logger.info("Step 2: Sending request to Whisper API...")
response = requests.post(API_URL, headers=headers, data=data)
response.raise_for_status()
result = response.json()
logger.debug(f"API response: {result}")
logger.info("Successfully received response from API")
# Format response as JSON
logger.info("Step 3: Processing API response...")
formatted_result = {
"text": result.get("text", "")
}
logger.info(f"Transcribed text length: {len(formatted_result['text'])} characters")
chunks = []
if return_timestamps and "chunks" in result:
logger.info(f"Processing {len(result['chunks'])} chunks for timestamps")
for i, chunk in enumerate(result["chunks"]):
logger.debug(f"Processing chunk {i}: {chunk}")
try:
start_time = chunk.get("timestamp", [None, None])[0]
end_time = chunk.get("timestamp", [None, None])[1]
text = chunk.get("text", "").strip()
if start_time is not None and end_time is not None:
chunk_data = {
"text": text,
"timestamp": [start_time, end_time]
}
chunks.append(chunk_data)
else:
logger.warning(f"Invalid timestamp in chunk {i}: {chunk}")
except Exception as chunk_error:
logger.error(f"Error processing chunk {i}: {str(chunk_error)}")
continue
formatted_result["chunks"] = chunks
logger.info(f"Successfully processed {len(chunks)} chunks with timestamps")
# Generate subtitles if requested
srt_file = None
if generate_subs and chunks:
logger.info("Step 4: Generating SRT subtitles...")
srt_content = generate_srt(chunks)
srt_file = save_srt_to_file(srt_content)
logger.info(f"Successfully generated SRT file: {srt_file}")
logger.info("Transcription process completed successfully")
return formatted_result, srt_file, "" # Return empty string for correction textbox
except requests.exceptions.RequestException as e:
logger.exception(f"API request failed: {str(e)}")
raise gr.Error(f"Failed to transcribe audio: API request failed - {str(e)}")
except Exception as e:
logger.exception(f"Error during transcription: {str(e)}")
raise gr.Error(f"Failed to transcribe audio: {str(e)}")
demo = gr.Blocks(theme=gr.themes.Ocean())
# Define interfaces first
youtube_transcribe = gr.Interface(
fn=transcribe_youtube,
inputs=[
gr.Textbox(label="YouTube URL", placeholder="https://www.youtube.com/watch?v=..."),
gr.Checkbox(label="Include timestamps", value=True),
gr.Checkbox(label="Generate subtitles", value=True),
],
outputs=[
gr.JSON(label="Transcription", open=True),
gr.File(label="Subtitles (SRT)", visible=True),
],
title="Tajik Speech Transcription",
description=(
"Transcribe Tajik language audio from YouTube videos. "
"Paste a YouTube URL and get accurate transcription with optional timestamps "
"and subtitles.\n\n"
"⚠️ Note: YouTube downloads may occasionally fail due to YouTube's restrictions "
"or temporary service issues. If this happens, please try again in a few minutes "
"or use the audio file upload option instead."
)
)
mf_transcribe = gr.Interface(
fn=transcribe,
inputs=[
gr.Audio(sources="microphone", type="filepath"),
gr.Checkbox(label="Include timestamps", value=True),
gr.Checkbox(label="Generate subtitles", value=True),
],
outputs=[
gr.JSON(label="Transcription", open=True),
gr.File(label="Subtitles (SRT)", visible=True),
],
title="Tajik Speech Transcription",
description=(
"Transcribe Tajik language audio from microphone or file upload. "
"Perfect for transcribing Tajik podcasts, interviews, and conversations. "
"Supports both microphone recording and file uploads."
)
)
file_transcribe = gr.Interface(
fn=transcribe,
inputs=[
gr.Audio(sources="upload", type="filepath", label="Audio file"),
gr.Checkbox(label="Include timestamps", value=True),
gr.Checkbox(label="Generate subtitles", value=True),
],
outputs=[
gr.JSON(label="Transcription", open=True),
gr.File(label="Subtitles (SRT)", visible=True),
],
title="Tajik Speech Transcription",
description=(
"Transcribe Tajik language audio files. "
"Upload your audio file and get accurate transcription with optional timestamps "
"and subtitles. Supports various audio formats."
)
)
with demo:
gr.TabbedInterface(
[file_transcribe, mf_transcribe, youtube_transcribe],
["Audio file", "Microphone", "YouTube"]
)
logger.info("Starting Gradio interface")
demo.queue().launch(ssr_mode=False)
|