Spaces:
Runtime error
Runtime error
Rename diarization.py to transcription_diarization.py
Browse files
diarization.py → transcription_diarization.py
RENAMED
|
@@ -1,7 +1,7 @@
|
|
| 1 |
import os
|
| 2 |
import torch
|
| 3 |
import math
|
| 4 |
-
from moviepy.editor import VideoFileClip
|
| 5 |
from pyannote.audio import Pipeline
|
| 6 |
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
|
| 7 |
import librosa
|
|
@@ -21,8 +21,6 @@ class LazyDiarizationPipeline:
|
|
| 21 |
self.pipeline = self.pipeline.to(torch.device("cuda"))
|
| 22 |
return self.pipeline
|
| 23 |
|
| 24 |
-
lazy_diarization_pipeline = LazyDiarizationPipeline()
|
| 25 |
-
|
| 26 |
class LazyTranscriptionPipeline:
|
| 27 |
def __init__(self):
|
| 28 |
self.model = None
|
|
@@ -53,6 +51,7 @@ class LazyTranscriptionPipeline:
|
|
| 53 |
)
|
| 54 |
return self.pipe
|
| 55 |
|
|
|
|
| 56 |
lazy_transcription_pipeline = LazyTranscriptionPipeline()
|
| 57 |
|
| 58 |
def extract_audio(video_path, audio_path):
|
|
@@ -77,8 +76,6 @@ def transcribe_audio(audio_path, language):
|
|
| 77 |
start = i * 30 * sr
|
| 78 |
end = min((i + 1) * 30 * sr, len(audio))
|
| 79 |
audio_chunk = audio[start:end]
|
| 80 |
-
|
| 81 |
-
# Convert the audio chunk to float32 numpy array
|
| 82 |
audio_chunk = (audio_chunk * 32767).astype(np.float32)
|
| 83 |
|
| 84 |
result = pipe(audio_chunk)
|
|
@@ -99,87 +96,57 @@ def create_combined_srt(transcription_chunks, diarization, output_path):
|
|
| 99 |
speaker_segments = []
|
| 100 |
speaker_durations = defaultdict(float)
|
| 101 |
|
| 102 |
-
# First pass: calculate durations
|
| 103 |
for segment, _, speaker in diarization.itertracks(yield_label=True):
|
| 104 |
speaker_durations[speaker] += segment.end - segment.start
|
| 105 |
speaker_segments.append((segment.start, segment.end, speaker))
|
| 106 |
|
| 107 |
-
# Sort speakers by duration
|
| 108 |
sorted_speakers = sorted(speaker_durations.items(), key=lambda x: x[1], reverse=True)
|
| 109 |
|
| 110 |
-
# Create mapping of original labels to Speaker 1, Speaker 2, etc.
|
| 111 |
speaker_map = {}
|
| 112 |
for i, (speaker, _) in enumerate(sorted_speakers, start=1):
|
| 113 |
speaker_map[speaker] = f"Speaker {i}"
|
| 114 |
|
| 115 |
-
# Write the SRT content
|
| 116 |
with open(output_path, 'w', encoding='utf-8') as srt_file:
|
| 117 |
for i, chunk in enumerate(transcription_chunks, 1):
|
| 118 |
start_time, end_time = chunk["start"], chunk["end"]
|
| 119 |
text = chunk["text"]
|
| 120 |
|
| 121 |
-
# Find the corresponding speaker
|
| 122 |
current_speaker = "Unknown"
|
| 123 |
for seg_start, seg_end, speaker in speaker_segments:
|
| 124 |
if seg_start <= start_time < seg_end:
|
| 125 |
current_speaker = speaker_map[speaker]
|
| 126 |
break
|
| 127 |
|
| 128 |
-
# Format timecodes as h:mm:ss (without leading zeros for hours)
|
| 129 |
start_str = format_timestamp(start_time).split('.')[0].lstrip('0')
|
| 130 |
end_str = format_timestamp(end_time).split('.')[0].lstrip('0')
|
| 131 |
|
| 132 |
srt_file.write(f"{i}\n")
|
| 133 |
srt_file.write(f"{{{current_speaker}}}\n time: ({start_str} --> {end_str})\n text: {text}\n\n")
|
| 134 |
|
| 135 |
-
# Add information about the two most frequent speakers
|
| 136 |
with open(output_path, 'a', encoding='utf-8') as srt_file:
|
| 137 |
for i, (speaker, duration) in enumerate(sorted_speakers[:2], start=1):
|
| 138 |
duration_str = format_timestamp(duration).split('.')[0].lstrip('0')
|
| 139 |
srt_file.write(f"Speaker {i} (originally {speaker}): total duration {duration_str}\n")
|
| 140 |
|
| 141 |
-
# Calculate speaker durations
|
| 142 |
-
speaker_durations = defaultdict(float)
|
| 143 |
-
for seg_start, seg_end, speaker in speaker_segments:
|
| 144 |
-
speaker_durations[speaker] += seg_end - seg_start
|
| 145 |
-
|
| 146 |
-
# Find the two most frequent speakers
|
| 147 |
-
sorted_speakers = sorted(speaker_durations.items(), key=lambda x: x[1], reverse=True)
|
| 148 |
-
|
| 149 |
-
dominant_speaker, dominant_duration = sorted_speakers[0]
|
| 150 |
-
second_speaker, second_duration = sorted_speakers[1] if len(sorted_speakers) > 1 else (None, 0)
|
| 151 |
-
|
| 152 |
-
with open(output_path, 'a', encoding='utf-8') as srt_file:
|
| 153 |
-
dominant_duration_str = format_timestamp(dominant_duration).split('.')[0].lstrip('0')
|
| 154 |
-
srt_file.write(f"\nMost dominant speaker: {dominant_speaker} with total duration {dominant_duration_str}\n")
|
| 155 |
-
|
| 156 |
-
if second_speaker:
|
| 157 |
-
second_duration_str = format_timestamp(second_duration).split('.')[0].lstrip('0')
|
| 158 |
-
srt_file.write(f"Second most frequent speaker: {second_speaker} with total duration {second_duration_str}\n")
|
| 159 |
-
|
| 160 |
@spaces.GPU(duration=100)
|
| 161 |
def process_video(video_path, diarization_access_token, language):
|
| 162 |
base_name = os.path.splitext(video_path)[0]
|
| 163 |
audio_path = f"{base_name}.wav"
|
| 164 |
extract_audio(video_path, audio_path)
|
| 165 |
|
| 166 |
-
# Diarization
|
| 167 |
print("Performing diarization...")
|
| 168 |
pipeline = lazy_diarization_pipeline.get_pipeline(diarization_access_token)
|
| 169 |
diarization = pipeline(audio_path)
|
| 170 |
print("Diarization complete.")
|
| 171 |
|
| 172 |
-
# Transcription
|
| 173 |
print("Performing transcription...")
|
| 174 |
transcription, chunks = transcribe_audio(audio_path, language)
|
| 175 |
print("Transcription complete.")
|
| 176 |
|
| 177 |
-
# Create combined SRT file
|
| 178 |
combined_srt_path = f"{base_name}_combined.srt"
|
| 179 |
create_combined_srt(chunks, diarization, combined_srt_path)
|
| 180 |
print(f"Combined SRT file created and saved to {combined_srt_path}")
|
| 181 |
|
| 182 |
-
# Clean up
|
| 183 |
os.remove(audio_path)
|
| 184 |
|
| 185 |
return combined_srt_path
|
|
|
|
| 1 |
import os
|
| 2 |
import torch
|
| 3 |
import math
|
| 4 |
+
from moviepy.editor import VideoFileClip
|
| 5 |
from pyannote.audio import Pipeline
|
| 6 |
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
|
| 7 |
import librosa
|
|
|
|
| 21 |
self.pipeline = self.pipeline.to(torch.device("cuda"))
|
| 22 |
return self.pipeline
|
| 23 |
|
|
|
|
|
|
|
| 24 |
class LazyTranscriptionPipeline:
|
| 25 |
def __init__(self):
|
| 26 |
self.model = None
|
|
|
|
| 51 |
)
|
| 52 |
return self.pipe
|
| 53 |
|
| 54 |
+
lazy_diarization_pipeline = LazyDiarizationPipeline()
|
| 55 |
lazy_transcription_pipeline = LazyTranscriptionPipeline()
|
| 56 |
|
| 57 |
def extract_audio(video_path, audio_path):
|
|
|
|
| 76 |
start = i * 30 * sr
|
| 77 |
end = min((i + 1) * 30 * sr, len(audio))
|
| 78 |
audio_chunk = audio[start:end]
|
|
|
|
|
|
|
| 79 |
audio_chunk = (audio_chunk * 32767).astype(np.float32)
|
| 80 |
|
| 81 |
result = pipe(audio_chunk)
|
|
|
|
| 96 |
speaker_segments = []
|
| 97 |
speaker_durations = defaultdict(float)
|
| 98 |
|
|
|
|
| 99 |
for segment, _, speaker in diarization.itertracks(yield_label=True):
|
| 100 |
speaker_durations[speaker] += segment.end - segment.start
|
| 101 |
speaker_segments.append((segment.start, segment.end, speaker))
|
| 102 |
|
|
|
|
| 103 |
sorted_speakers = sorted(speaker_durations.items(), key=lambda x: x[1], reverse=True)
|
| 104 |
|
|
|
|
| 105 |
speaker_map = {}
|
| 106 |
for i, (speaker, _) in enumerate(sorted_speakers, start=1):
|
| 107 |
speaker_map[speaker] = f"Speaker {i}"
|
| 108 |
|
|
|
|
| 109 |
with open(output_path, 'w', encoding='utf-8') as srt_file:
|
| 110 |
for i, chunk in enumerate(transcription_chunks, 1):
|
| 111 |
start_time, end_time = chunk["start"], chunk["end"]
|
| 112 |
text = chunk["text"]
|
| 113 |
|
|
|
|
| 114 |
current_speaker = "Unknown"
|
| 115 |
for seg_start, seg_end, speaker in speaker_segments:
|
| 116 |
if seg_start <= start_time < seg_end:
|
| 117 |
current_speaker = speaker_map[speaker]
|
| 118 |
break
|
| 119 |
|
|
|
|
| 120 |
start_str = format_timestamp(start_time).split('.')[0].lstrip('0')
|
| 121 |
end_str = format_timestamp(end_time).split('.')[0].lstrip('0')
|
| 122 |
|
| 123 |
srt_file.write(f"{i}\n")
|
| 124 |
srt_file.write(f"{{{current_speaker}}}\n time: ({start_str} --> {end_str})\n text: {text}\n\n")
|
| 125 |
|
|
|
|
| 126 |
with open(output_path, 'a', encoding='utf-8') as srt_file:
|
| 127 |
for i, (speaker, duration) in enumerate(sorted_speakers[:2], start=1):
|
| 128 |
duration_str = format_timestamp(duration).split('.')[0].lstrip('0')
|
| 129 |
srt_file.write(f"Speaker {i} (originally {speaker}): total duration {duration_str}\n")
|
| 130 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 131 |
@spaces.GPU(duration=100)
|
| 132 |
def process_video(video_path, diarization_access_token, language):
|
| 133 |
base_name = os.path.splitext(video_path)[0]
|
| 134 |
audio_path = f"{base_name}.wav"
|
| 135 |
extract_audio(video_path, audio_path)
|
| 136 |
|
|
|
|
| 137 |
print("Performing diarization...")
|
| 138 |
pipeline = lazy_diarization_pipeline.get_pipeline(diarization_access_token)
|
| 139 |
diarization = pipeline(audio_path)
|
| 140 |
print("Diarization complete.")
|
| 141 |
|
|
|
|
| 142 |
print("Performing transcription...")
|
| 143 |
transcription, chunks = transcribe_audio(audio_path, language)
|
| 144 |
print("Transcription complete.")
|
| 145 |
|
|
|
|
| 146 |
combined_srt_path = f"{base_name}_combined.srt"
|
| 147 |
create_combined_srt(chunks, diarization, combined_srt_path)
|
| 148 |
print(f"Combined SRT file created and saved to {combined_srt_path}")
|
| 149 |
|
|
|
|
| 150 |
os.remove(audio_path)
|
| 151 |
|
| 152 |
return combined_srt_path
|