import os import tempfile import subprocess import streamlit as st from transformers import pipeline, AutoModelForSpeechSeq2Seq, AutoProcessor import torch from datetime import timedelta from deep_translator import GoogleTranslator import ffmpeg # Streamlit setup st.title("Video Translator (English to Arabic)") st.write("Upload an English video to extract speech, translate it into Arabic, and burn the subtitles into the video.") def format_time(seconds): """Convert seconds to SRT format (00:00:00,000)""" td = timedelta(seconds=seconds) hours, remainder = divmod(td.seconds, 3600) minutes, seconds = divmod(remainder, 60) milliseconds = td.microseconds // 1000 return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" def extract_audio(video_path): """Extract audio from video using ffmpeg""" temp_dir = tempfile.gettempdir() audio_path = os.path.join(temp_dir, "extracted_audio.wav") # Use ffmpeg to extract audio ffmpeg.input(video_path).output(audio_path, format='wav').run() return audio_path def transcribe_audio(audio_path): """Transcribe audio to text using the fractalego/personal-speech-to-text-model""" try: # Try using fractalego/personal-speech-to-text-model device = "cuda" if torch.cuda.is_available() else "cpu" model_id = "fractalego/personal-speech-to-text-model" model = AutoModelForSpeechSeq2Seq.from_pretrained(model_id) processor = AutoProcessor.from_pretrained(model_id) model.to(device) pipe = pipeline( "automatic-speech-recognition", model=model, tokenizer=processor.tokenizer, feature_extractor=processor.feature_extractor, max_new_tokens=128, chunk_length_s=30, batch_size=16, return_timestamps=True, device=device, ) result = pipe(audio_path) return result["chunks"] except Exception as e: print(f"Error using fractalego model: {e}") print("Using whisper model as fallback...") # Use whisper as fallback import whisper model = whisper.load_model("base") result = model.transcribe(audio_path, word_timestamps=True) return result["segments"] def translate_text(text): """Translate text from English to Arabic""" translator = GoogleTranslator(source='en', target='ar') return translator.translate(text) def create_srt(segments, output_path): """Create an SRT file from translated segments ensuring proper encoding""" with open(output_path, 'w', encoding='utf-8-sig') as srt_file: # UTF-8 with BOM for compatibility for i, segment in enumerate(segments, start=1): if hasattr(segment, 'get'): # Handle variations in output models start_time = segment.get('start', 0) end_time = segment.get('end', 0) text = segment.get('text', '') translation = segment.get('translation', '') else: start_time = segment.start end_time = segment.end text = segment.text translation = getattr(segment, 'translation', text) # Use the original text if no translation # Write SRT data srt_file.write(f"{i}\n") srt_file.write(f"{format_time(start_time)} --> {format_time(end_time)}\n") srt_file.write(f"{translation}\n\n") def burn_subtitles(video_path, srt_path, output_path): """Burn subtitles into video using FFmpeg with Arabic support""" font_path = "/usr/share/fonts/truetype/Amiri-Regular.ttf" # Path to Amiri font cmd = [ 'ffmpeg', '-y', '-i', video_path, '-vf', f"subtitles='{srt_path}':force_style='FontName={font_path},FontSize=24,PrimaryColour=&HFFFFFF,OutlineColour=&H000000,BorderStyle=3,Alignment=2,Encoding=1'", '-sub_charenc', 'UTF-8', '-c:v', 'libx264', '-crf', '18', '-c:a', 'copy', output_path ] try: subprocess.run(cmd, check=True) return output_path except subprocess.CalledProcessError as e: print(f"FFmpeg error: {e}") return None def process_video(video_path): """Process the video: extract audio, transcribe, translate, create SRT, burn subtitles""" temp_dir = tempfile.gettempdir() file_name = os.path.splitext(os.path.basename(video_path))[0] audio_path = extract_audio(video_path) segments = transcribe_audio(audio_path) translated_segments = [] for i, segment in enumerate(segments): text = segment.text if hasattr(segment, 'text') else segment.get('text', '') translated_text = translate_text(text) segment.translation = translated_text translated_segments.append(segment) srt_path = os.path.join(temp_dir, f"{file_name}.srt") create_srt(translated_segments, srt_path) output_path = os.path.join(temp_dir, f"{file_name}_translated.mp4") result_path = burn_subtitles(video_path, srt_path, output_path) return result_path, srt_path # Streamlit UI uploaded_video = st.file_uploader("Upload your video", type=["mp4", "mov", "avi"]) if uploaded_video: # Save the uploaded video temporarily temp_video_path = os.path.join(tempfile.gettempdir(), uploaded_video.name) with open(temp_video_path, "wb") as f: f.write(uploaded_video.read()) st.write("Processing your video...") result_path, srt_path = process_video(temp_video_path) # Show download links for processed video and subtitle file st.video(result_path) st.download_button("Download SRT File", srt_path)