import os import torch import torchvision import math from moviepy.editor import VideoFileClip, AudioFileClip from pyannote.audio import Pipeline from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline import librosa import datetime from collections import defaultdict import numpy as np import spaces def extract_audio(video_path, audio_path): video = VideoFileClip(video_path) audio = video.audio audio.write_audiofile(audio_path, codec='pcm_s16le', fps=16000) def format_timestamp(seconds): return str(datetime.timedelta(seconds=seconds)).split('.')[0] @spaces.GPU(duration=300) def transcribe_audio(audio_path, language): device = "cuda:0" if torch.cuda.is_available() else "cpu" torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 model_id = "openai/whisper-large-v3" model = AutoModelForSpeechSeq2Seq.from_pretrained( model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True ) model.to(device) processor = AutoProcessor.from_pretrained(model_id) pipe = pipeline( "automatic-speech-recognition", model=model, tokenizer=processor.tokenizer, feature_extractor=processor.feature_extractor, max_new_tokens=128, chunk_length_s=30, batch_size=1, return_timestamps=True, torch_dtype=torch_dtype, device=device, generate_kwargs={"language": language} ) audio, sr = librosa.load(audio_path, sr=16000) duration = len(audio) / sr n_chunks = math.ceil(duration / 30) transcription_txt = "" transcription_chunks = [] for i in range(n_chunks): start = i * 30 * sr end = min((i + 1) * 30 * sr, len(audio)) audio_chunk = audio[start:end] # Convert the audio chunk to float32 numpy array audio_chunk = (audio_chunk * 32767).astype(np.float32) result = pipe(audio_chunk) transcription_txt += result["text"] for chunk in result["chunks"]: start_time, end_time = chunk["timestamp"] transcription_chunks.append({ "start": start_time + i * 30, "end": end_time + i * 30, "text": chunk["text"] }) print(f"Transcription Progress: {int(((i + 1) / n_chunks) * 100)}%") return transcription_txt, transcription_chunks def create_combined_srt(transcription_chunks, diarization, output_path): speaker_segments = [] speaker_map = {} current_speaker_num = 1 for segment, _, speaker in diarization.itertracks(yield_label=True): if speaker not in speaker_map: speaker_map[speaker] = f"Speaker {current_speaker_num}" current_speaker_num += 1 speaker_segments.append((segment.start, segment.end, speaker_map[speaker])) with open(output_path, 'w', encoding='utf-8') as srt_file: for i, chunk in enumerate(transcription_chunks, 1): start_time, end_time = chunk["start"], chunk["end"] text = chunk["text"] # Find the corresponding speaker current_speaker = "Unknown" for seg_start, seg_end, speaker in speaker_segments: if seg_start <= start_time < seg_end: current_speaker = speaker break # Format timecodes as h:mm:ss (without leading zeros for hours) start_str = format_timestamp(start_time).split('.')[0].lstrip('0') end_str = format_timestamp(end_time).split('.')[0].lstrip('0') srt_file.write(f"{i}\n") srt_file.write(f"{{{current_speaker}}}\n time: ({start_str} --> {end_str})\n text: {text}\n\n") # Add dominant speaker information speaker_durations = defaultdict(float) for seg_start, seg_end, speaker in speaker_segments: speaker_durations[speaker] += seg_end - seg_start dominant_speaker = max(speaker_durations, key=speaker_durations.get) dominant_duration = speaker_durations[dominant_speaker] with open(output_path, 'a', encoding='utf-8') as srt_file: dominant_duration_str = format_timestamp(dominant_duration).split('.')[0].lstrip('0') srt_file.write(f"\nMost dominant speaker: {dominant_speaker} with total duration {dominant_duration_str}\n") @spaces.GPU(duration=600) def process_video(video_path, diarization_access_token, language): import torch base_name = os.path.splitext(video_path)[0] audio_path = f"{base_name}.wav" extract_audio(video_path, audio_path) # Diarization print("Performing diarization...") pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=diarization_access_token) pipeline = pipeline.to("cuda") diarization = pipeline(audio_path) print("Diarization complete.") # Transcription print("Performing transcription...") transcription, chunks = transcribe_audio(audio_path, language) print("Transcription complete.") # Create combined SRT file combined_srt_path = f"{base_name}_combined.srt" create_combined_srt(chunks, diarization, combined_srt_path) print(f"Combined SRT file created and saved to {combined_srt_path}") # Clean up os.remove(audio_path) return combined_srt_path if __name__ == "__main__": video_path = r"C:\Users\reab5\Downloads\MediaHuman\Music\test1.mp4" # Get Hugging Face token from Space secret access_token = os.environ.get('hf_secret') if not access_token: raise ValueError("HF_TOKEN not found in environment variables. Please set it in the Space secrets.") language = "en" process_video(video_path, access_token, language)