Spaces:
Running
Running
import os | |
import shutil | |
import tempfile | |
import subprocess | |
from pathlib import Path | |
import numpy as np | |
import soundfile as sf | |
from pydub import AudioSegment | |
from faster_whisper import WhisperModel | |
from openai import OpenAI | |
import httpx | |
import asyncio | |
import gradio as gr | |
import requests | |
# --- Demucs-based vocal separation --- | |
def separate_vocals(input_path, progress=gr.Progress()): | |
"""Use Demucs to separate vocals and background music""" | |
progress(0.1, desc="Separating vocals and music (Demucs)") | |
temp_dir = tempfile.mkdtemp() | |
try: | |
output_dir = os.path.join(temp_dir, "separated") | |
os.makedirs(output_dir, exist_ok=True) | |
from demucs.separate import main as demucs_main | |
import sys | |
original_argv = sys.argv | |
sys.argv = [ | |
"demucs", | |
"--two-stems", "vocals", | |
"-o", output_dir, | |
input_path | |
] | |
try: | |
demucs_main() | |
finally: | |
sys.argv = original_argv | |
base_name = Path(input_path).stem | |
vocals_path = os.path.join(output_dir, "htdemucs", base_name, "vocals.wav") | |
noise_path = os.path.join(output_dir, "htdemucs", base_name, "no_vocals.wav") | |
if not os.path.exists(vocals_path) or not os.path.exists(noise_path): | |
raise FileNotFoundError("Demucs output missing") | |
progress(0.3, desc="Vocals separated") | |
return vocals_path, noise_path, temp_dir | |
except Exception as e: | |
print(f"Demucs error: {e}") | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
return None, None, None | |
# --- AudioProcessor class --- | |
class AudioProcessor: | |
def __init__(self, device="cpu"): | |
self.whisper_model = WhisperModel("small", device=device) | |
self.openrouter_api_key = "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" | |
self.client = OpenAI( | |
base_url="https://openrouter.ai/api/v1", | |
api_key=self.openrouter_api_key, | |
http_client=httpx.Client(headers={ | |
"Authorization": f"Bearer {self.openrouter_api_key}", | |
"HTTP-Referer": "https://github.com", | |
"X-Title": "Audio Translation App" | |
}) | |
) | |
def transcribe_audio_with_pauses(self, audio_path, progress): | |
progress(0.35, desc="Transcribing audio (Whisper)") | |
segments, _ = self.whisper_model.transcribe(audio_path, word_timestamps=True) | |
previous_end = 0.0 | |
results = [] | |
for segment in segments: | |
if segment.start > previous_end + 0.5: | |
results.append((previous_end, segment.start, None)) | |
results.append((segment.start, segment.end, segment.text.strip())) | |
previous_end = segment.end | |
audio_duration = get_audio_duration(audio_path) | |
if audio_duration and audio_duration > previous_end + 0.5: | |
results.append((previous_end, audio_duration, None)) | |
progress(0.5, desc="Transcription complete") | |
return results | |
def translate_segments_batch(self, segments, target_language, progress): | |
"""Translate all text segments in a single batch request""" | |
progress(0.55, desc="Translating segments") | |
try: | |
# Filter out None segments (pauses) | |
text_segments = [seg for seg in segments if seg is not None] | |
if not text_segments: | |
return segments # Return original if no text to translate | |
print(f"Translating {len(text_segments)} segments in batch...") | |
# Prepare the prompt with clear formatting instructions | |
prompt = f"""Translate the following text segments to {target_language} while maintaining EXACTLY the same format and order: | |
{chr(10).join(text_segments)} | |
IMPORTANT INSTRUCTIONS: | |
1. Maintain the EXACT same order and number of segments | |
2. Each line must be a separate translation | |
3. Use natural conversational {target_language} | |
4. Preserve meaning/context | |
5. Leave proper nouns unchanged | |
6.Make sure the translated sentence is meaningful also | |
7. Match original word count where possible | |
8. Output ONLY the translations, one per line, no numbers or bullet points | |
9. Do not add any additional text or explanations | |
Example Input: | |
Hello world | |
How are you? | |
Example Output: | |
नमस्ते दुनिया | |
आप कैसे हैं? | |
""" | |
completion = self.client.chat.completions.create( | |
model="gpt-3.5-turbo", | |
messages=[ | |
{ | |
"role": "system", | |
"content": f"You are a professional translator from English to {target_language}. Translate exactly as requested." | |
}, | |
{ | |
"role": "user", | |
"content": prompt | |
} | |
], | |
temperature=0.1, # Lower temperature for more consistent results | |
max_tokens=2000 | |
) | |
translated_text = completion.choices[0].message.content.strip() | |
translations = translated_text.split('\n') | |
# Reconstruct the segments with translations | |
translated_segments = [] | |
translation_idx = 0 | |
for seg in segments: | |
if seg is None: | |
translated_segments.append(None) | |
else: | |
if translation_idx < len(translations): | |
translated_segments.append(translations[translation_idx]) | |
translation_idx += 1 | |
else: | |
translated_segments.append(seg) # Fallback to original if missing translation | |
progress(0.7, desc="Translation complete") | |
return translated_segments | |
except Exception as e: | |
print(f"Batch translation error: {e}") | |
return segments # Return original segments if translation fails | |
# --- Helper functions --- | |
def get_audio_duration(audio_path): | |
try: | |
with sf.SoundFile(audio_path) as f: | |
return len(f) / f.samplerate | |
except Exception as e: | |
print(f"Duration error: {e}") | |
return None | |
async def synthesize_tts_to_wav(text, voice, target_language): | |
import edge_tts | |
temp_mp3 = "temp_tts.mp3" | |
communicate = edge_tts.Communicate(text, voice) | |
await communicate.save(temp_mp3) | |
audio = AudioSegment.from_file(temp_mp3) | |
audio = audio.set_channels(1).set_frame_rate(22050) | |
output_wav = "temp_tts.wav" | |
audio.export(output_wav, format="wav") | |
os.remove(temp_mp3) | |
return output_wav | |
def stretch_audio(input_wav, target_duration, api_url="https://sox-api.onrender.com/stretch"): | |
# Read the input audio file | |
with open(input_wav, "rb") as f: | |
files = {"file": f} | |
data = {"target_duration": str(target_duration)} | |
response = requests.post(api_url, files=files, data=data) | |
# Check if the request was successful | |
if response.status_code != 200: | |
raise RuntimeError(f"API error: {response.status_code} - {response.text}") | |
# Save the response content to a temporary file | |
output_wav = tempfile.mkstemp(suffix=".wav")[1] | |
with open(output_wav, "wb") as out: | |
out.write(response.content) | |
return output_wav | |
def generate_silence_wav(duration_s, output_path, sample_rate=22050): | |
samples = np.zeros(int(duration_s * sample_rate), dtype=np.float32) | |
sf.write(output_path, samples, sample_rate) | |
def cleanup_files(file_list): | |
for file in file_list: | |
if os.path.exists(file): | |
os.remove(file) | |
# --- Main Process Function --- | |
async def process_audio_chunks(input_audio_path, voice, target_language, progress): | |
audio_processor = AudioProcessor() | |
print("🔎 Separating vocals and music using Demucs...") | |
vocals_path, background_path, temp_dir = separate_vocals(input_audio_path, progress) | |
if not vocals_path: | |
return None, None | |
print("🔎 Transcribing vocals...") | |
segments = audio_processor.transcribe_audio_with_pauses(vocals_path, progress) | |
print(f"Transcribed {len(segments)} segments.") | |
# Extract text segments for batch processing | |
segment_texts = [seg[2] if seg[2] is not None else None for seg in segments] | |
# Batch translate all segments at once | |
translated_texts = audio_processor.translate_segments_batch(segment_texts, target_language, progress) | |
chunk_files = [] | |
chunk_idx = 0 | |
total_segments = len(segments) | |
for (start, end, _), translated in zip(segments, translated_texts): | |
duration = end - start | |
chunk_idx += 1 | |
progress(0.7 + (chunk_idx / total_segments) * 0.15, desc=f"Processing chunk {chunk_idx}/{total_segments}") | |
if translated is None: | |
filename = f"chunk_{chunk_idx:03d}_pause.wav" | |
generate_silence_wav(duration, filename) | |
chunk_files.append(filename) | |
else: | |
print(f"🔤 {chunk_idx}: Translated: {translated}") | |
# Synthesize TTS audio | |
raw_tts = await synthesize_tts_to_wav(translated, voice, target_language) | |
# Stretch the audio to match the target duration | |
stretched = stretch_audio(raw_tts, duration) | |
chunk_files.append(stretched) | |
os.remove(raw_tts) | |
combined_tts = AudioSegment.empty() | |
for f in chunk_files: | |
combined_tts += AudioSegment.from_wav(f) | |
print("🎼 Adding original background music...") | |
background_music = AudioSegment.from_wav(background_path) | |
background_music = background_music[:len(combined_tts)] | |
final_mix = combined_tts.overlay(background_music) | |
output_path = "final_translated_with_music.wav" | |
final_mix.export(output_path, format="wav") | |
print(f"✅ Output saved as: {output_path}") | |
final_audio_path = output_path | |
final_background_path = background_path # Keep this for cleanup if needed | |
cleanup_files(chunk_files) | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
progress(0.9, desc="Audio processing complete") | |
return final_audio_path, final_background_path | |
# --- Gradio Interface --- | |
def gradio_interface(video_file, voice, target_language, progress=gr.Progress()): | |
try: | |
progress(0.05, desc="Starting video dubbing process") | |
# Create temporary directory for processing | |
temp_dir = Path(tempfile.mkdtemp()) | |
input_video_path = temp_dir / "input_video.mp4" | |
# Check if file is a video | |
if not os.path.splitext(video_file.name)[1].lower() in ['.mp4', '.mov', '.avi', '.mkv']: | |
raise ValueError("Invalid file type. Please upload a video file.") | |
# Save the uploaded file to the temporary directory | |
shutil.copyfile(video_file.name, input_video_path) | |
# Extract audio from video | |
progress(0.1, desc="Extracting audio from video") | |
audio_path, audio_temp_dir = extract_audio_from_video(str(input_video_path)) | |
if not audio_path: | |
return None | |
# Process audio chunks | |
audio_output_path, background_path = asyncio.run(process_audio_chunks(audio_path, voice, target_language, progress)) | |
if audio_output_path is None or background_path is None: | |
return None | |
# Combine with original video | |
progress(0.95, desc="Combining video and new audio") | |
output_video_path = temp_dir / "translated_video.mp4" | |
success = combine_video_audio(str(input_video_path), audio_output_path, str(output_video_path)) | |
if success: | |
progress(1.0, desc="Dubbing complete!") | |
# Return the path to the output video | |
return str(output_video_path) | |
else: | |
return None | |
except Exception as e: | |
print(f"Error processing video: {e}") | |
return None | |
finally: | |
# Cleanup temporary files | |
# Commented out for debugging purposes | |
# shutil.rmtree(temp_dir, ignore_errors=True) | |
pass | |
def extract_audio_from_video(video_path): | |
"""Extract audio from video file using ffmpeg""" | |
temp_dir = tempfile.mkdtemp() | |
audio_path = os.path.join(temp_dir, "extracted_audio.wav") | |
try: | |
subprocess.run([ | |
"ffmpeg", "-y", "-i", video_path, | |
"-vn", "-acodec", "pcm_s16le", "-ar", "44100", "-ac", "2", | |
audio_path | |
], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
if not os.path.exists(audio_path): | |
raise FileNotFoundError("Audio extraction failed") | |
return audio_path, temp_dir | |
except Exception as e: | |
print(f"Audio extraction error: {e}") | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
return None, None | |
def combine_video_audio(video_path, audio_path, output_path): | |
"""Combine original video with new audio track""" | |
try: | |
subprocess.run([ | |
"ffmpeg", "-y", "-i", video_path, | |
"-i", audio_path, | |
"-c:v", "copy", "-map", "0:v:0", "-map", "1:a:0", | |
"-shortest", output_path | |
], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
return True | |
except Exception as e: | |
print(f"Video combining error: {e}") | |
return False | |
# Voice options for each language | |
voice_options = { | |
"Hindi": [ | |
"hi-IN-MadhurNeural", # Male | |
"hi-IN-SwaraNeural" # Female | |
], | |
"English": [ | |
"en-US-GuyNeural", # Male | |
"en-US-ChristopherNeural", # Male | |
"en-US-AriaNeural", # Female | |
"en-US-JessaNeural", # Female | |
"en-US-JennyNeural" # Female | |
], | |
"Spanish": [ | |
"es-ES-AlvaroNeural", # Male | |
"es-MX-JorgeNeural", # Male | |
"es-US-AlonsoNeural", # Female | |
"es-MX-DaliaNeural", # Female | |
"es-US-PalomaNeural" # Female | |
], | |
"French": [ | |
"fr-FR-HenriNeural", # Male | |
"fr-FR-RemyMultilingualNeural", # Male | |
"fr-CA-AntoineNeural", # Male | |
"fr-FR-DeniseNeural", | |
"fr-FR-VivienneMultilingualNeural" # Female | |
], | |
"Japanese": [ | |
"ja-JP-KeitaNeural", | |
"ja-JP-NanamiNeural" | |
], | |
"Korean": [ | |
"ko-KR-InJoonNeural", # Male | |
"ko-KR-SunHiNeural" # Female | |
]} | |
custom_css = """ | |
/* Overall Body Background - Deep & Vibrant Gradient */ | |
body { | |
background: linear-gradient(135deg, #1A202C, #2D3748, #4A5568) !important; /* Dark blue-grey gradient */ | |
font-family: 'Inter', sans-serif; /* Modern font, ensure it's available or use fallback */ | |
color: #E2E8F0; /* Light text color for contrast */ | |
overflow-x: hidden; | |
} | |
/* --- Core Gradio Block Blending --- */ | |
/* Make Gradio's main container transparent to show body background */ | |
.gradio-container { | |
background: transparent !important; | |
box-shadow: none !important; | |
border: none !important; | |
padding: 0 !important; | |
} | |
/* Specific Gradio block elements - subtle transparency */ | |
.block { | |
background-color: hsla(210, 20%, 25%, 0.5) !important; /* Semi-transparent dark blue-grey */ | |
backdrop-filter: blur(8px); /* Frosted glass effect */ | |
border: 1px solid hsla(210, 20%, 35%, 0.6) !important; /* Subtle border */ | |
border-radius: 20px !important; /* Rounded corners for the block */ | |
box-shadow: 0 8px 30px hsla(0, 0%, 0%, 0.3) !important; /* Stronger shadow for depth */ | |
margin-bottom: 25px !important; | |
padding: 25px !important; /* Add internal padding to blocks */ | |
} | |
/* Remove default Gradio layout wrappers' backgrounds */ | |
.main-wrapper, .panel-container { | |
background: transparent !important; | |
box-shadow: none !important; | |
border: none !important; | |
} | |
/* --- Application Title and Description --- */ | |
.gradio-header h1 { | |
color: #8D5BFC !important; /* Vibrant Purple for main title */ | |
font-size: 3em !important; | |
text-shadow: 0 0 15px hsla(260, 90%, 70%, 0.5); /* Glowing effect */ | |
margin-bottom: 10px !important; | |
font-weight: 700 !important; | |
text-align: center; | |
} | |
.gradio-markdown p { | |
color: #CBD5E0 !important; /* Lighter text for description */ | |
font-size: 1.25em !important; | |
text-align: center; | |
margin-bottom: 40px !important; | |
font-weight: 300; | |
} | |
/* --- Input Components (File, Dropdowns) --- */ | |
.gradio-file, .gradio-dropdown { | |
background-color: hsla(210, 20%, 18%, 0.7) !important; /* Darker, slightly transparent */ | |
border: 1px solid hsla(240, 60%, 70%, 0.4) !important; /* Subtle blue border */ | |
border-radius: 15px !important; | |
padding: 12px 18px !important; | |
color: #E2E8F0 !important; /* Light text for input */ | |
font-size: 1.1em !important; | |
transition: all 0.3s ease; | |
box-shadow: 0 4px 15px hsla(0, 0%, 0%, 0.2); | |
} | |
.gradio-file input[type="file"] { | |
color: #E2E8F0 !important; | |
} | |
.gradio-file:hover, .gradio-dropdown:hover { | |
border-color: #A78BFA !important; /* Lighter purple on hover */ | |
box-shadow: 0 6px 20px hsla(0, 0%, 0%, 0.3); | |
} | |
/* Focus state for inputs */ | |
.gradio-dropdown.gr-text-input:focus, | |
.gradio-file input:focus { | |
border-color: #8D5BFC !important; /* Vibrant purple on focus */ | |
box-shadow: 0 0 20px hsla(260, 90%, 70%, 0.5); | |
background-color: hsla(210, 20%, 20%, 0.9) !important; /* Slightly less transparent */ | |
} | |
/* Labels for inputs */ | |
.gradio-label { | |
color: #A78BFA !important; /* Soft purple for labels */ | |
font-weight: 600 !important; | |
font-size: 1.15em !important; | |
margin-bottom: 8px !important; | |
text-align: left; | |
width: 100%; | |
} | |
/* --- Submit Button --- */ | |
.gradio-button { | |
background: linear-gradient(90deg, #FF6B8B, #FF8E53) !important; /* Vibrant Pink to Orange gradient */ | |
color: white !important; | |
border: none !important; | |
border-radius: 30px !important; | |
padding: 15px 35px !important; | |
font-size: 1.3em !important; | |
font-weight: bold !important; | |
cursor: pointer !important; | |
transition: all 0.3s ease !important; | |
box-shadow: 0 8px 25px hsla(0, 0%, 0%, 0.4) !important; | |
margin-top: 35px !important; | |
min-width: 220px; | |
align-self: center; | |
text-transform: uppercase; /* Make button text uppercase */ | |
letter-spacing: 1px; | |
} | |
.gradio-button:hover { | |
background: linear-gradient(90deg, #FF4B7B, #FF7E43) !important; | |
box-shadow: 0 10px 30px hsla(0, 0%, 0%, 0.5) !important; | |
transform: translateY(-3px) !important; | |
} | |
/* --- Output Video Player --- */ | |
.gradio-video { | |
background-color: hsla(210, 20%, 15%, 0.8) !important; /* Darker, more opaque background for video */ | |
border: 2px solid #8D5BFC !important; /* Vibrant purple border for the video player */ | |
border-radius: 20px !important; | |
padding: 15px !important; | |
box-shadow: 0 10px 40px hsla(0, 0%, 0%, 0.5) !important; /* Stronger shadow */ | |
margin-top: 40px !important; | |
} | |
/* --- Translated Text Output --- */ | |
.gradio-markdown-output, .gradio-textbox { | |
background-color: hsla(210, 20%, 18%, 0.7) !important; | |
border: 1px solid hsla(240, 60%, 70%, 0.4) !important; | |
border-radius: 15px !important; | |
padding: 20px !important; | |
color: #E2E8F0 !important; | |
font-size: 1.0em !important; | |
min-height: 200px; /* Give it some height */ | |
overflow-y: auto; /* Enable scrolling for long text */ | |
white-space: pre-wrap; /* Preserve line breaks */ | |
box-shadow: 0 4px 15px hsla(0, 0%, 0%, 0.2); | |
} | |
/* Flexbox for the Row to control spacing and alignment */ | |
.gradio-row { | |
display: flex; | |
justify-content: space-around; /* Distribute items with space around */ | |
align-items: flex-start; /* Align items to the start of the cross-axis */ | |
gap: 20px; /* Space between items in the row */ | |
flex-wrap: wrap; /* Allow items to wrap on smaller screens */ | |
} | |
/* Ensure individual components in a row take up appropriate space */ | |
.gradio-row > .gradio-component { | |
flex: 1; /* Allow components to grow and shrink */ | |
min-width: 250px; /* Minimum width for components in a row */ | |
} | |
/* Adjust padding for gr.Blocks content */ | |
.gr-box { | |
padding: 0 !important; /* Remove internal padding if present to let elements breathe */ | |
background: transparent !important; | |
box-shadow: none !important; | |
} | |
""" | |
# Create Gradio interface with radio buttons for both language and voice selection | |
with gr.Blocks(css=custom_css, theme=gr.themes.Soft( | |
primary_hue=gr.themes.Color( | |
c50='#e6e9ff', c100='#c2c9ff', c200='#9faaff', c300='#7c8bff', c400='#5a6bff', | |
c500='#384aff', c600='#2c38cc', c700='#202b99', c800='#141d66', c900='#080e33', | |
c950='#04071a' | |
), | |
secondary_hue=gr.themes.Color( | |
c50='#fff0e6', c100='#ffe0cc', c200='#ffb380', c300='#ff8533', c400='#ff5700', | |
c500='#cc4600', c600='#993400', c700='#662200', c800='#331100', c900='#1a0900', | |
c950='#0d0500' | |
), | |
neutral_hue=gr.themes.Color( | |
c50='#f8f8fa', c100='#f1f5f9', c200='#e2e8f0', c300='#cbd5e1', c400='#94a3b8', | |
c500='#64748b', c600='#475569', c700='#334155', c800='#1e293b', c900='#0f172a', | |
c950='#020617' | |
) | |
)) as demo: | |
gr.Markdown("# DeepDub : A Video Dubbing Application") | |
gr.Markdown("Upload a video and get a dubbed version with translated audio") | |
with gr.Row(): | |
video_input = gr.File(label="Upload Video", file_types=[".mp4", ".mov", ".avi", ".mkv"]) | |
# Use Radio buttons for language selection | |
language_radio = gr.Radio( | |
list(voice_options.keys()), | |
label="Target Language", | |
value="Hindi", | |
interactive=True | |
) | |
# Use Radio buttons for voice selection | |
voice_radio = gr.Radio( | |
voice_options["Hindi"], | |
label="Select Voice", | |
value=voice_options["Hindi"][0], | |
interactive=True | |
) | |
gr.Markdown("Note : If you see Queue that means someone is using and please wait") | |
output_video = gr.Video(label="Dubbed Video") | |
submit_btn = gr.Button("Start Dubbing") | |
def update_voice_options(language): | |
# Update voice radio buttons based on selected language | |
return gr.update(choices=voice_options[language], value=voice_options[language][0]) | |
# Update voice options when language changes | |
language_radio.change( | |
update_voice_options, | |
inputs=[language_radio], | |
outputs=[voice_radio] | |
) | |
submit_btn.click( | |
gradio_interface, | |
inputs=[video_input, voice_radio, language_radio], | |
outputs=output_video, | |
api_name="dub_video" | |
) | |
demo.queue().launch(server_name="0.0.0.0", debug=True, share=True) |