File size: 16,104 Bytes
7b3b340
 
 
 
 
 
 
 
 
2899f8f
 
 
c411b7a
 
3bb5c47
c411b7a
0596274
f08082e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8cebcbb
 
 
f08082e
 
 
44c7b6f
ff3ad52
 
eae282d
ff3ad52
eae282d
ff3ad52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
284179e
2899f8f
 
 
 
 
 
ff3ad52
0d2cfad
2899f8f
ff3ad52
 
27bebc1
 
ff3ad52
3330c34
2899f8f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3330c34
2899f8f
 
 
 
afa248c
2899f8f
 
 
 
f8eb359
2899f8f
3330c34
8cebcbb
ff3ad52
f8eb359
 
 
 
 
 
 
 
8cebcbb
3330c34
ba3a67a
 
 
3330c34
2899f8f
 
 
 
 
218e261
 
2899f8f
218e261
 
 
 
 
 
 
 
 
 
 
 
 
ad48cb2
2899f8f
 
 
ba3a67a
 
0d2cfad
e4c6d2d
 
b184cb6
e4c6d2d
ff3ad52
2899f8f
 
 
 
 
b184cb6
2899f8f
ff3ad52
 
b184cb6
ff3ad52
b184cb6
ff3ad52
0d2cfad
ff3ad52
 
 
0d2cfad
b184cb6
 
218e261
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2899f8f
5021a0c
2899f8f
555abcf
 
 
 
 
 
 
0d2cfad
e4c6d2d
 
 
 
 
 
 
 
 
 
 
 
 
0d2cfad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fb2ff68
3330c34
 
 
 
 
 
0d2cfad
3330c34
 
 
9733dac
0d2cfad
 
 
 
 
 
 
e4c6d2d
ff3ad52
 
e4c6d2d
ff3ad52
 
 
8cebcbb
f08082e
 
 
d4701b9
ff3ad52
49d3ed3
 
ff3ad52
555abcf
552e1db
2899f8f
 
552e1db
 
5021a0c
552e1db
4bccf88
7b3b340
e4c6d2d
0d2cfad
 
e4c6d2d
2899f8f
e4c6d2d
7b3b340
 
e4c6d2d
 
7b3b340
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e4c6d2d
7b3b340
 
2899f8f
 
7b3b340
 
 
 
 
0d2cfad
7b3b340
 
 
 
 
 
 
 
8cebcbb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
import spaces
import gradio as gr
import edge_tts
import asyncio
import tempfile
import os
import re
from pathlib import Path
from pydub import AudioSegment
import librosa
import soundfile as sf
import numpy as np
from pydub import AudioSegment
from pydub.playback import play
from scipy.signal import butter, lfilter # Ensure this line is present


def apply_low_pass_filter(audio_segment, cutoff_freq, sample_rate, order=5):
    """Applies a low-pass filter to a pydub AudioSegment."""
    audio_np = np.array(audio_segment.get_array_of_samples()).astype(np.float32) / (2**15 - 1)
    if audio_segment.channels == 2:
        audio_np = audio_np.reshape(-1, 2)

    nyquist_freq = 0.5 * sample_rate
    normalized_cutoff = cutoff_freq / nyquist_freq
    b, a = butter(order, normalized_cutoff, btype='low', analog=False)

    filtered_data = np.zeros_like(audio_np, dtype=np.float32)
    if audio_segment.channels == 1:
        filtered_data = lfilter(b, a, audio_np)
    else:
        for channel in range(audio_segment.channels):
            filtered_data[:, channel] = lfilter(b, a, audio_np[:, channel])

    filtered_data_int16 = (filtered_data * (2**15 - 1)).astype(np.int16)
    filtered_audio = AudioSegment(filtered_data_int16.tobytes(),
                                 frame_rate=sample_rate,
                                 sample_width=audio_segment.sample_width,
                                 channels=audio_segment.channels)
    return filtered_audio


def get_silence(duration_ms=1000):
    # Create silent audio segment with specified parameters
    silent_audio = AudioSegment.silent(
        duration=duration_ms,
        frame_rate=24000  # 24kHz sampling rate
    )
    # Set audio parameters
    silent_audio = silent_audio.set_channels(1)  # Mono
    silent_audio = silent_audio.set_sample_width(4)  # 32-bit (4 bytes per sample)
    with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
        # Export with specific bitrate and codec parameters
        silent_audio.export(
            tmp_file.name,
            format="mp3",
            bitrate="48k",
            parameters=[
                "-ac", "1",  # Mono
                "-ar", "24000",  # Sample rate
                "-sample_fmt", "s32",  # 32-bit samples
                "-codec:a", "libmp3lame"  # MP3 codec
            ]
        )
        return tmp_file.name

# Get all available voices
async def get_voices():
    try:
        voices = await edge_tts.list_voices()
        return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices}
    except Exception as e:
        print(f"Error listing voices: {e}")
        return {}

async def generate_audio_with_voice_prefix(text_segment, default_voice, rate, pitch, overall_target_duration_ms=None, speed_adjustment_factor=1.0):
    """Generates audio for a text segment, handling voice prefixes and adjusting rate for duration."""
    current_voice_full = default_voice
    current_voice_short = current_voice_full.split(" - ")[0] if current_voice_full else ""
    current_rate = rate
    current_pitch = pitch
    processed_text = text_segment.strip()
    #print(f"Processing this  text segment: '{processed_text}'") # Debug
    voice_map = {
        "1F": "en-GB-SoniaNeural",
        "2M": "en-GB-RyanNeural",
        "3M": "en-US-BrianMultilingualNeural",
        "2F": "en-US-JennyNeural",
        "1M": "en-AU-WilliamNeural",
        "3F": "en-HK-YanNeural",
        "4M": "en-GB-ThomasNeural",
        "4F": "en-US-EmmaNeural",
        "1O": "en-GB-RyanNeural",  # Old Man
        "1C": "en-GB-MaisieNeural",  # Child
        "1V": "vi-VN-HoaiMyNeural",  # Vietnamese (Female)
        "2V": "vi-VN-NamMinhNeural",  # Vietnamese (Male)
        "3V": "vi-VN-HoaiMyNeural",  # Vietnamese (Female)
        "4V": "vi-VN-NamMinhNeural",  # Vietnamese (Male)
    }
    detect = 0
    #iterate throught the voice map to see if a match if found, if found then set the voice
    for prefix, voice_short in voice_map.items():
        if processed_text.startswith(prefix):
            current_voice_short = voice_short
            if prefix in ["1F", "3F", "1V", "3V"]:
                current_pitch = 0
            elif prefix in ["1O", "4V"]:
                current_pitch = -20
                current_rate = -10
            detect = 1
            processed_text = processed_text[len(prefix):].strip()  #this removes the Prefix and leave only number or text after it.
            break
    #match = re.search(r'([A-Za-z]+)-?(\d+)', processed_text)
    match = re.search(r"^(-?\d+)\s*(.*)", processed_text)    
    if match:
        #prefix_pitch = match.group(1)
        number = match.group(1)
        print(f"Prefix match found.") # Debug
        current_pitch += int(number)
        #processed_text = re.sub(r'[A-Za-z]+-?\d+', '', processed_text, count=1).strip()
        #processed_text = re.sub(r'([A-Za-z]+)([-]?\d*)', '', processed_text, count=1).strip()
        processed_text = match.group(2)
    #elif detect:
    #       processed_text = processed_text.lstrip('-0123456789').strip() # Remove potential leftover numbers

    if processed_text:
        rate_str = f"{current_rate:+d}%"
        pitch_str = f"{current_pitch:+d}Hz"
        print(f"Sending to Edge: '{processed_text}'") # Debug
        try:
            communicate = edge_tts.Communicate(processed_text, current_voice_short, rate=rate_str, pitch=pitch_str)
            with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
                audio_path = tmp_file.name
                await communicate.save(audio_path)

            if os.path.exists(audio_path):
                audio = AudioSegment.from_mp3(audio_path)
                # Trim leading and trailing silence
                def detect_leading_silence(sound, silence_threshold=-50.0, chunk_size=10):
                    trim_ms = 0
                    assert chunk_size > 0 # to avoid infinite loop
                    while sound[trim_ms:trim_ms+chunk_size].dBFS < silence_threshold and trim_ms < len(sound):
                        trim_ms += chunk_size
                    return trim_ms

                start_trim = detect_leading_silence(audio)
                end_trim = detect_leading_silence(audio.reverse())
                trimmed_audio = audio[start_trim:len(audio)-end_trim]
                trimmed_audio.export(audio_path, format="mp3") # Overwrite with trimmed version
                return audio_path

        except Exception as e:
            print(f"Edge TTS error processing '{processed_text}': {e}")
            return None
    return None

async def process_transcript_line(line, next_line_start_time, default_voice, rate, pitch, overall_duration_ms, speed_adjustment_factor):
    """Processes a single transcript line with HH:MM:SS,milliseconds timestamp."""
    match = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+(.*)', line)
    if match:
        start_h, start_m, start_s, start_ms, text_parts = match.groups()
        start_time_ms = (
            int(start_h) * 3600000 +
            int(start_m) * 60000 +
            int(start_s) * 1000 +
            int(start_ms)
        )
        audio_segments = []
        split_parts = re.split(r'[“”"]', text_parts)
        process_next = False
        for part in split_parts:
            if part == '"':
                process_next = not process_next
                continue
            if process_next and part.strip():
                audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch, overall_duration_ms, speed_adjustment_factor)
                if audio_path:
                    audio_segments.append(audio_path)
            elif not process_next and part.strip():
                audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch, overall_duration_ms, speed_adjustment_factor)
                if audio_path:
                    audio_segments.append(audio_path)

        if audio_segments:
            combined_audio = AudioSegment.empty()
            for segment_path in audio_segments:
                try:
                    segment = AudioSegment.from_mp3(segment_path)
                    combined_audio += segment
                    os.remove(segment_path) # Clean up individual segment files
                except Exception as e:
                    print(f"Error loading or combining audio segment {segment_path}: {e}")
                    return None, None, None

            combined_audio_path = f"combined_audio_{start_time_ms}.mp3"
            try:
                combined_audio.export(combined_audio_path, format="mp3")
                return start_time_ms, [combined_audio_path], overall_duration_ms
            except Exception as e:
                print(f"Error exporting combined audio: {e}")
                return None, None, None

        return start_time_ms, [], overall_duration_ms # Return empty list if no audio generated

    return None, None, None

async def transcript_to_speech(transcript_text, voice, rate, pitch, speed_adjustment_factor):
    if not transcript_text.strip():
        return None, gr.Warning("Please enter transcript text.")
    if not voice:
        return None, gr.Warning("Please select a voice.")
    lines = transcript_text.strip().split('\n')
    timed_audio_segments = []
    max_end_time_ms = 0

    for i, line in enumerate(lines):
        next_line_start_time = None
        if i < len(lines) - 1:
            next_line_match = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+.*', lines[i+1])
            if next_line_match:
                nh, nm, ns, nms = next_line_match.groups()
                next_line_start_time = (
                    int(nh) * 3600000 +
                    int(nm) * 60000 +
                    int(ns) * 1000 +
                    int(nms)
                )

        current_line_match = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+(.*)', line)
        if current_line_match:
            sh, sm, ss, sms, text_content = current_line_match.groups()
            start_time_ms = (
                int(sh) * 3600000 +
                int(sm) * 60000 +
                int(ss) * 1000 +
                int(sms)
            )
            overall_duration_ms = None
            if next_line_start_time is not None:
                overall_duration_ms = next_line_start_time - start_time_ms

            start_time, audio_paths, duration = await process_transcript_line(line, next_line_start_time, voice, rate, pitch, overall_duration_ms, speed_adjustment_factor)

            if start_time is not None and audio_paths:
                combined_line_audio = AudioSegment.empty()
                total_generated_duration_ms = 0
                for path in audio_paths:
                    if path:
                        try:
                            audio = AudioSegment.from_mp3(path)
                            combined_line_audio += audio
                            total_generated_duration_ms += len(audio)
                            os.remove(path)
                        except FileNotFoundError:
                            print(f"Warning: Audio file not found: {path}")

                if combined_line_audio and overall_duration_ms is not None and overall_duration_ms > 0 and total_generated_duration_ms > overall_duration_ms:
                    speed_factor = (total_generated_duration_ms / overall_duration_ms) * speed_adjustment_factor
                    if speed_factor > 0:
                        if speed_factor < 1.0:
                            speed_factor = 1.0
                        combined_line_audio = combined_line_audio.speedup(playback_speed=speed_factor)

                if combined_line_audio:
                    timed_audio_segments.append({'start': start_time, 'audio': combined_line_audio})
                    max_end_time_ms = max(max_end_time_ms, start_time + len(combined_line_audio))

            elif audio_paths:
                for path in audio_paths:
                    if path:
                        try:
                            os.remove(path)
                        except FileNotFoundError:
                            pass # Clean up even if no timestamp

    if not timed_audio_segments:
        return None, "No processable audio segments found."

    final_audio = AudioSegment.silent(duration=max_end_time_ms, frame_rate=24000)
    for segment in timed_audio_segments:
        final_audio = final_audio.overlay(segment['audio'], position=segment['start'])
        
    # Apply the low-pass filter here
    cutoff_frequency = 3500  # 3.5 kHz (you can make this a user-configurable parameter later)
    filtered_final_audio = apply_low_pass_filter(final_audio, cutoff_frequency, final_audio.frame_rate)

    combined_audio_path = tempfile.mktemp(suffix=".mp3")
    # Export the *filtered* audio here
    filtered_final_audio.export(combined_audio_path, format="mp3")
    return combined_audio_path, None

@spaces.GPU
def tts_interface(transcript, voice, rate, pitch, speed_adjustment_factor):
    audio, warning = asyncio.run(transcript_to_speech(transcript, voice, rate, pitch, speed_adjustment_factor))
    return audio, warning

async def create_demo():
    voices = await get_voices()
    default_voice = "en-US-AndrewMultilingualNeural - en-US (Male)"
    description = """
    Process timestamped text (HH:MM:SS,milliseconds) with voice changes within quotes.
    The duration for each line is determined by the timestamp of the following line.
    The speed of the ENTIRE generated audio for a line will be adjusted to fit within this duration.
    If there is no subsequent timestamp, the speed adjustment will be skipped.
    You can control the intensity of the speed adjustment using the "Speed Adjustment Factor" slider.
    Format: `HH:MM:SS,milliseconds "VoicePrefix Text" more text "AnotherVoicePrefix More Text"`
    Example:
    ```
    00:00:00,000 "This is the default voice." more default. "1F Now a female voice." and back to default.
    00:00:05,500 "1C Yes," said the child, "it is fun!"
    ```
    ***************************************************************************************************
    1M = en-AU-WilliamNeural - en-AU (Male)
    1F = en-GB-SoniaNeural - en-GB (Female)
    2M = en-GB-RyanNeural - en-GB (Male)
    2F = en-US-JennyNeural - en-US (Female)
    3M = en-US-BrianMultilingualNeural - en-US (Male)
    3F = en-HK-YanNeural - en-HK (Female)
    4M = en-GB-ThomasNeural - en-GB (Male)
    4F = en-US-EmmaNeural - en-US (Female)
    1O = en-GB-RyanNeural - en-GB (Male) # Old Man
    1C = en-GB-MaisieNeural - en-GB (Female) # Child
    1V = vi-VN-HoaiMyNeural - vi-VN (Female) # Vietnamese (Female)
    2V = vi-VN-NamMinhNeural - vi-VN (Male) # Vietnamese (Male)
    3V = vi-VN-HoaiMyNeural - vi-VN (Female) # Vietnamese (Female)
    4V = vi-VN-NamMinhNeural - vi-VN (Male) # Vietnamese (Male)
    ****************************************************************************************************
    """
    demo = gr.Interface(
        fn=tts_interface,
        inputs=[
            gr.Textbox(label="Timestamped Text with Voice Changes and Duration", lines=10, placeholder='00:00:00,000 "Text" more text "1F Different Voice"'),
            gr.Dropdown(choices=[""] + list(voices.keys()), label="Select Default Voice", value=default_voice),
            gr.Slider(minimum=-50, maximum=50, value=0, label="Speech Rate Adjustment (%)", step=1),
            gr.Slider(minimum=-50, maximum=50, value=0, label="Pitch Adjustment (Hz)", step=1),
            gr.Slider(minimum=0.5, maximum=1.5, value=1.0, step=0.05, label="Speed Adjustment Factor")
        ],
        outputs=[
            gr.Audio(label="Generated Audio", type="filepath"),
            gr.Markdown(label="Warning", visible=False)
        ],
        title="TTS with Line-Wide Duration Adjustment and In-Quote Voice Switching",
        description=description,
        analytics_enabled=False,
        allow_flagging=False
    )
    return demo

if __name__ == "__main__":
    demo = asyncio.run(create_demo())
    demo.launch()