File size: 5,720 Bytes
9deffb0
 
 
 
 
 
 
 
 
 
 
4ee6f09
9deffb0
 
 
 
 
 
 
 
 
4ee6f09
9deffb0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ee6f09
9deffb0
d3be1e6
9deffb0
 
 
 
 
 
 
d3be1e6
9deffb0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d3be1e6
 
9deffb0
 
 
 
 
 
 
 
8fd339e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import os
import torch
import torchvision
import math
from moviepy.editor import VideoFileClip, AudioFileClip
from pyannote.audio import Pipeline
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
import librosa
import datetime
from collections import defaultdict
import numpy as np
import spaces

def extract_audio(video_path, audio_path):
    video = VideoFileClip(video_path)
    audio = video.audio
    audio.write_audiofile(audio_path, codec='pcm_s16le', fps=16000)

def format_timestamp(seconds):
    return str(datetime.timedelta(seconds=seconds)).split('.')[0]

@spaces.GPU(duration=300)
def transcribe_audio(audio_path, language):
    device = "cuda:0" if torch.cuda.is_available() else "cpu"
    torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
    model_id = "openai/whisper-large-v3"

    model = AutoModelForSpeechSeq2Seq.from_pretrained(
        model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
    )
    model.to(device)

    processor = AutoProcessor.from_pretrained(model_id)

    pipe = pipeline(
        "automatic-speech-recognition",
        model=model,
        tokenizer=processor.tokenizer,
        feature_extractor=processor.feature_extractor,
        max_new_tokens=128,
        chunk_length_s=30,
        batch_size=1,
        return_timestamps=True,
        torch_dtype=torch_dtype,
        device=device,
        generate_kwargs={"language": language}
    )

    audio, sr = librosa.load(audio_path, sr=16000)
    duration = len(audio) / sr
    n_chunks = math.ceil(duration / 30)
    transcription_txt = ""
    transcription_chunks = []

    for i in range(n_chunks):
        start = i * 30 * sr
        end = min((i + 1) * 30 * sr, len(audio))
        audio_chunk = audio[start:end]

        # Convert the audio chunk to float32 numpy array
        audio_chunk = (audio_chunk * 32767).astype(np.float32)

        result = pipe(audio_chunk)
        transcription_txt += result["text"]
        for chunk in result["chunks"]:
            start_time, end_time = chunk["timestamp"]
            transcription_chunks.append({
                "start": start_time + i * 30,
                "end": end_time + i * 30,
                "text": chunk["text"]
            })

        print(f"Transcription Progress: {int(((i + 1) / n_chunks) * 100)}%")

    return transcription_txt, transcription_chunks

def create_combined_srt(transcription_chunks, diarization, output_path):
    speaker_segments = []
    speaker_map = {}
    current_speaker_num = 1

    for segment, _, speaker in diarization.itertracks(yield_label=True):
        if speaker not in speaker_map:
            speaker_map[speaker] = f"Speaker {current_speaker_num}"
            current_speaker_num += 1
        speaker_segments.append((segment.start, segment.end, speaker_map[speaker]))

    with open(output_path, 'w', encoding='utf-8') as srt_file:
        for i, chunk in enumerate(transcription_chunks, 1):
            start_time, end_time = chunk["start"], chunk["end"]
            text = chunk["text"]

            # Find the corresponding speaker
            current_speaker = "Unknown"
            for seg_start, seg_end, speaker in speaker_segments:
                if seg_start <= start_time < seg_end:
                    current_speaker = speaker
                    break

            # Format timecodes as h:mm:ss (without leading zeros for hours)
            start_str = format_timestamp(start_time).split('.')[0].lstrip('0')
            end_str = format_timestamp(end_time).split('.')[0].lstrip('0')

            srt_file.write(f"{i}\n")
            srt_file.write(f"{{{current_speaker}}}\n time: ({start_str} --> {end_str})\n text: {text}\n\n")

    # Add dominant speaker information
    speaker_durations = defaultdict(float)
    for seg_start, seg_end, speaker in speaker_segments:
        speaker_durations[speaker] += seg_end - seg_start

    dominant_speaker = max(speaker_durations, key=speaker_durations.get)
    dominant_duration = speaker_durations[dominant_speaker]

    with open(output_path, 'a', encoding='utf-8') as srt_file:
        dominant_duration_str = format_timestamp(dominant_duration).split('.')[0].lstrip('0')
        srt_file.write(f"\nMost dominant speaker: {dominant_speaker} with total duration {dominant_duration_str}\n")

@spaces.GPU(duration=600)
def process_video(video_path, diarization_access_token, language):
    import torch
    base_name = os.path.splitext(video_path)[0]
    audio_path = f"{base_name}.wav"
    extract_audio(video_path, audio_path)

    # Diarization
    print("Performing diarization...")
    pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=diarization_access_token)
    pipeline = pipeline.to("cuda")
    diarization = pipeline(audio_path)
    print("Diarization complete.")

    # Transcription
    print("Performing transcription...")
    transcription, chunks = transcribe_audio(audio_path, language)
    print("Transcription complete.")

    # Create combined SRT file
    combined_srt_path = f"{base_name}_combined.srt"
    create_combined_srt(chunks, diarization, combined_srt_path)
    print(f"Combined SRT file created and saved to {combined_srt_path}")

    # Clean up
    os.remove(audio_path)

    return combined_srt_path

if __name__ == "__main__":
    video_path = r"C:\Users\reab5\Downloads\MediaHuman\Music\test1.mp4"
    # Get Hugging Face token from Space secret
    access_token = os.environ.get('hf_secret')
    if not access_token:
        raise ValueError("HF_TOKEN not found in environment variables. Please set it in the Space secrets.")

    language = "en"
    process_video(video_path, access_token, language)