File size: 16,497 Bytes
cdec9da
451c102
d8ba2dd
3d757e5
7b3b340
 
 
 
 
 
 
 
3d757e5
7b3b340
3d757e5
cdec9da
41426a6
cdec9da
08f8e97
33785f7
2d5b8e4
33785f7
2d5b8e4
33785f7
2d5b8e4
 
33785f7
 
2d5b8e4
 
b2ba8d0
c542ec4
 
b2ba8d0
2d5b8e4
 
c542ec4
 
 
 
 
 
33785f7
3d757e5
f08082e
44c7b6f
ff3ad52
 
eae282d
ff3ad52
eae282d
3253b38
ff3ad52
 
 
3253b38
ff3ad52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
284179e
3253b38
 
ff3ad52
3253b38
552e40c
41426a6
cdec9da
 
451c102
 
 
 
 
 
 
 
 
cdec9da
 
451c102
 
cc5bb17
 
 
 
 
 
451c102
cdec9da
 
 
 
 
27bebc1
 
ff3ad52
451c102
cdec9da
74db9d2
cdec9da
 
 
 
 
 
 
 
 
 
 
 
d8ba2dd
cdec9da
 
 
 
 
 
451c102
cdec9da
ba3a67a
 
451c102
cdec9da
451c102
 
cdec9da
451c102
 
 
 
 
c542ec4
a765a16
08f8e97
41426a6
a765a16
 
451c102
 
 
 
cdec9da
451c102
 
 
 
 
 
cdec9da
451c102
ba3a67a
dd400d4
3253b38
 
9921538
b184cb6
41426a6
3253b38
ff3ad52
3253b38
 
 
 
2899f8f
b184cb6
3253b38
41426a6
 
1aa49bc
ff3ad52
 
c542ec4
ff3ad52
c542ec4
b184cb6
ff3ad52
c542ec4
 
3253b38
ff3ad52
 
 
3253b38
b184cb6
 
6d2141a
3253b38
 
218e261
3253b38
555abcf
 
 
 
3253b38
555abcf
 
 
9921538
3229678
0d2cfad
3229678
 
3253b38
 
 
 
 
e670a9f
3253b38
e670a9f
3253b38
 
 
 
3229678
 
 
 
 
d00e1ea
 
 
 
 
 
 
 
 
3229678
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d00e1ea
 
 
 
 
 
 
 
 
3229678
 
 
 
 
 
9921538
3c9e353
9921538
3253b38
 
 
 
 
 
3229678
e4c6d2d
ff3ad52
 
e4c6d2d
0c7fa89
ff3ad52
 
 
d4701b9
ff3ad52
3253b38
0c7fa89
 
 
ff3ad52
555abcf
552e1db
3253b38
 
552e1db
 
5021a0c
552e1db
4bccf88
7b3b340
9921538
0910ca5
7b3b340
 
9921538
 
7b3b340
 
cc5bb17
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7b3b340
 
 
 
 
9921538
7b3b340
 
d5b659c
7b3b340
 
 
 
 
9921538
7b3b340
 
 
 
 
 
 
 
33785f7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
##fix overlap, remove silence, leave a tiny bit of silence
## Simplified
## Add 0 after prefix make it permanent voice

import spaces
import gradio as gr
import edge_tts
import asyncio
import tempfile
import os
import re
from pathlib import Path
from pydub.silence import detect_nonsilent
from pydub import AudioSegment

default_voice_short= ""
check1 = False  # set global variable to check to see if process_text is begin of transcript line or not.

def strip_silence(audio: AudioSegment, silence_thresh=-40, min_silence_len=100, silence_padding_ms=100):
    from pydub.silence import detect_nonsilent
    # Detect non-silent regions
    nonsilent = detect_nonsilent(audio, min_silence_len=min_silence_len, silence_thresh=silence_thresh)
    # If no speech is detected, return a small silent audio (not totally empty)
    if not nonsilent:
        return AudioSegment.silent(duration=silence_padding_ms)
    # Start and end of the first and last non-silent segments
    start_trim = nonsilent[0][0]
    end_trim = nonsilent[-1][1]
    # Add padding before and after the trimmed audio
    # Ensure the padding doesn't exceed the trimmed boundaries

    if not check1:
        silence_padding_ms=00
    start_trim = max(0, start_trim - silence_padding_ms)  # Ensure no negative start
    end_trim = min(len(audio), end_trim + silence_padding_ms)  # Ensure end doesn't go past audio length
    # Return the trimmed and padded audio
    # Debugging: print input arguments
    print(f"Audio length: {len(audio)} ms")
    print(f"Silence threshold: {silence_thresh} dB")
    print(f"Minimum silence length: {min_silence_len} ms")
    print(f"Silence padding: {silence_padding_ms} ms")
    print(f"Check1: {check1}**")
    return audio[start_trim:end_trim]


def get_silence(duration_ms=1000):
    # Create silent audio segment with specified parameters
    silent_audio = AudioSegment.silent(
        duration=duration_ms,
        frame_rate=24000  # 24kHz sampling rate
    )

    # Set audio parameters
    silent_audio = silent_audio.set_channels(1)  # Mono
    silent_audio = silent_audio.set_sample_width(4)  # 32-bit (4 bytes per sample)

    with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
        # Export with specific bitrate and codec parameters
        silent_audio.export(
            tmp_file.name,
            format="mp3",
            bitrate="48k",
            parameters=[
                "-ac", "1",  # Mono
                "-ar", "24000",  # Sample rate
                "-sample_fmt", "s32",  # 32-bit samples
                "-codec:a", "libmp3lame"  # MP3 codec
            ]
        )
        return tmp_file.name

# Get all available voices
async def get_voices():
    voices = await edge_tts.list_voices()
    return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices}

async def generate_audio_with_voice_prefix(text_segment, default_voice, rate, pitch):
    global default_voice_short  # Use the global variable
    global check1   # Use the global variable
    """Generates audio for a text segment, handling voice prefixes, retries, and fallback."""
    print(f"Text: {text_segment}")  #Debug
    voice_map = {
        "1F": ("en-GB-SoniaNeural", 25, 0),
        "2F": ("en-US-JennyNeural", 0, 0),
        "3F": ("en-HK-YanNeural", 0, 0),
        "4F": ("en-US-EmmaNeural", 0, 0),
        "1M": ("en-AU-WilliamNeural", 0, 0),
        "2M": ("en-GB-RyanNeural", 0, 0),
        "3M": ("en-US-BrianMultilingualNeural", 0, 0),
        "4M": ("en-GB-ThomasNeural", 0, 0),
        "1O": ("en-GB-RyanNeural", -20, -10),
        "1C": ("en-GB-MaisieNeural", 0, 0),
        "1V": ("vi-VN-HoaiMyNeural", 0, 0),
        "2V": ("vi-VN-NamMinhNeural", 0, 0),
        "3V": ("en-US-EmmaMultilingualNeural", 0, 0),
        "4V": ("en-US-BrianMultilingualNeural", 0, 0),
        "5V": ("en-US-AvaMultilingualNeural", 0, 0),
        "6V": ("en-US-AndrewMultilingualNeural", 0, 0),
        "7V": ("de-DE-SeraphinaMultilingualNeural", 0, 0),
        "8V": ("ko-KR-HyunsuMultilingualNeural", 0, 0),
    }
    if default_voice_short == "":
        current_voice_full = default_voice
        current_voice_short = current_voice_full.split(" - ")[0] if current_voice_full else ""
    else:
        current_voice_short = default_voice_short
    current_rate = rate
    current_pitch = pitch
    processed_text = text_segment.strip()
    
    detect = False

    prefix = processed_text[:2]
    if prefix in voice_map:
        current_voice_short, pitch_adj, rate_adj = voice_map[prefix]
        current_pitch += pitch_adj
        current_rate += rate_adj
        detect = True

    match = re.search(r'[A-Za-z]+\-?\d+', processed_text)
    if match:
        group = match.group()
        prefix_only = ''.join(filter(str.isalpha, group))
        number = int(''.join(ch for ch in group if ch.isdigit() or ch == '-'))
        if number == 0:
            default_voice_short= current_voice_short
        current_pitch += number
        processed_text = re.sub(r'[A-Za-z]+\-?\d+', '', processed_text, count=1).strip()
        processed_text = processed_text[len(prefix_only):].strip()
    elif detect:
        processed_text = processed_text[2:].strip()

    if processed_text:
        rate_str = f"{current_rate:+d}%"
        pitch_str = f"{current_pitch:+d}Hz"

        # Retry logic
        for attempt in range(3):
            try:
                communicate = edge_tts.Communicate(processed_text, current_voice_short, rate=rate_str, pitch=pitch_str)
                with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
                    audio_path = tmp_file.name
                    await communicate.save(audio_path)

                audio = AudioSegment.from_mp3(audio_path)
                if not check1:
                    print(f"not last part of sentence - SHORT silence")
                    audio = strip_silence(audio, silence_thresh=-40, min_silence_len=50, silence_padding_ms=50)  ##silence between sentences
                else:
                    audio = strip_silence(audio, silence_thresh=-40, min_silence_len=50, silence_padding_ms=100)  ##less silence for mid-sentence segments
                    print(f"Last part of sentence - long silence")
                stripped_path = tempfile.mktemp(suffix=".mp3")
                audio.export(stripped_path, format="mp3")
                return stripped_path
            except Exception as e:
                print(f"Edge TTS Failed# {attempt}:: {e}")  #Debug
                if attempt == 2:
                    # Final failure: return 500ms of silence
                    silent_audio = AudioSegment.silent(duration=500)
                    fallback_path = tempfile.mktemp(suffix=".mp3")
                    silent_audio.export(fallback_path, format="mp3")
                    return fallback_path
                await asyncio.sleep(0.5)  # brief wait before retry

    return None

async def process_transcript_line(line, default_voice, rate, pitch):
    """Processes a single transcript line with HH:MM:SS.milliseconds timestamp and quoted text segments."""
    match = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+(.*)', line) # Modified timestamp regex
    if match:
        count = 0
        hours, minutes, seconds, milliseconds, text_parts = match.groups()
        start_time_ms = (
            int(hours) * 3600000 +
            int(minutes) * 60000 +
            int(seconds) * 1000 +
            int(milliseconds)
        )
        audio_segments = []
        split_parts = re.split(r'(")', text_parts)  # Split by quote marks, keeping the quotes
        # Initialize a variable to track if it's the first iteration
        global check1   # Use the global variable
        check1 = False
        process_next = False
        for part in split_parts:
            if part == '"':  #process text that are inside quote
                process_next = not process_next
                check1 = False             # set it to False 
                continue
            if process_next and part.strip():
                #if part == split_parts[-1]: # check if this is laster iteration, 
                #    check1 = False             # set it to False                
                audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch)
                if audio_path:
                    audio_segments.append(audio_path)
            elif not process_next and part.strip():
                audio_path = await generate_audio_with_voice_prefix(part, default_voice, rate, pitch) # Process unquoted text with default voice
                if audio_path:
                    audio_segments.append(audio_path)
        
        return start_time_ms, audio_segments
    return None, None

async def transcript_to_speech(transcript_text, voice, rate, pitch):
    if not transcript_text.strip():
        return None, gr.Warning("Please enter transcript text.")
    if not voice:
        return None, gr.Warning("Please select a voice.")

    lines = transcript_text.strip().split('\n')
    timed_audio_segments = []
    max_end_time_ms = 0
    previous_end_time_ms = 0
    i = 0

    while i < len(lines):
        start_time, audio_paths = await process_transcript_line(lines[i], voice, rate, pitch)
        if start_time is not None and audio_paths:
            combined_line_audio = AudioSegment.empty()
            for path in audio_paths:
                try:
                    audio = AudioSegment.from_mp3(path)
                    #audio = strip_silence(audio, silence_thresh=-40, min_silence_len=100)
                    combined_line_audio += audio
                    #combined_line_audio = strip_silence(combined_line_audio, silence_thresh=-40, min_silence_len=100)
                    os.remove(path)
                except FileNotFoundError:
                    print(f"Warning: Audio file not found: {path}")

            current_audio_duration = len(combined_line_audio)
            intended_start_time = start_time
            
            # Check duration until the next timestamp
            if i + 1 < len(lines):
                next_start_time_line = lines[i+1]
                next_start_time_match = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+.*', next_start_time_line)
                if next_start_time_match:
                    next_h, next_m, next_s, next_ms = next_start_time_match.groups()
                    next_start_time_ms = (int(next_h) * 3600000 + int(next_m) * 60000 + int(next_s) * 1000 + int(next_ms))
                    duration_to_next = next_start_time_ms - start_time
                else:
                    duration_to_next = float('inf')  # Or some other large value

                if current_audio_duration > duration_to_next:
                    # Hold and append audio from subsequent lines
                    j = i + 1
                    while j < len(lines):
                        next_start_time, next_audio_paths = await process_transcript_line(lines[j], voice, rate, pitch)
                        if next_start_time is not None and next_audio_paths:
                            for next_path in next_audio_paths:
                                try:
                                    next_audio = AudioSegment.from_mp3(next_path)
                                    combined_line_audio += next_audio
                                    os.remove(next_path)
                                except FileNotFoundError:
                                    print(f"Warning: Audio file not found: {next_path}")
                            current_audio_duration = len(combined_line_audio)
                            
                            #check duration to the next timestamp.
                            if j + 1 < len(lines):
                                next_start_time_line_2 = lines[j+1]
                                next_start_time_match_2 = re.match(r'(\d{2}):(\d{2}):(\d{2}),(\d{3})\s+.*', next_start_time_line_2)
                                if next_start_time_match_2:
                                    next_h_2, next_m_2, next_s_2, next_ms_2 = next_start_time_match_2.groups()
                                    next_start_time_ms_2 = (int(next_h_2) * 3600000 + int(next_m_2) * 60000 + int(next_s_2) * 1000 + int(next_ms_2))
                                    duration_to_next_2 = next_start_time_ms_2 - start_time
                                    if current_audio_duration <= duration_to_next_2:
                                        break
                                else:
                                    break
                            j += 1
                        else:
                            break
                    i = j #update i to j
                
                timed_audio_segments.append({'start': intended_start_time, 'audio': combined_line_audio})
                previous_end_time_ms = max(previous_end_time_ms, intended_start_time + current_audio_duration)
                max_end_time_ms = max(max_end_time_ms, previous_end_time_ms)
        elif audio_paths:
            for path in audio_paths:
                try:
                    os.remove(path)
                except FileNotFoundError:
                    pass # Clean up even if no timestamp
        i += 1

    if not timed_audio_segments:
        return None, "No processable audio segments found."

    print(f"Combining Audio - final stage.")    
    final_audio = AudioSegment.silent(duration=max_end_time_ms, frame_rate=24000)
    for segment in timed_audio_segments:
        final_audio = final_audio.overlay(segment['audio'], position=segment['start'])

    combined_audio_path = tempfile.mktemp(suffix=".mp3")
    final_audio.export(combined_audio_path, format="mp3")
    global default_voice_short  # Use the global variable
    default_voice_short=""
    print(f"Job done! reset voice back to default.")
    return combined_audio_path, None

@spaces.GPU
def tts_interface(transcript, voice, rate, pitch):
    audio, warning = asyncio.run(transcript_to_speech(transcript, voice, rate, pitch))
    return audio, warning

async def create_demo():
    voices = await get_voices()
    default_voice = "en-US-AndrewMultilingualNeural - en-US (Male)"
    description = """
    Process timestamped text (HH:MM:SS,milliseconds) with voice changes within quotes.
    Format: `HH:MM:SS,milliseconds "VoicePrefix Text" more text "1F Different Voice"
    Example:
    ```
    00:00:00,000 "This is the default voice." more default. "1F Now a female voice." and back to default.
    00:00:05,000 "1C Yes," said the child, "it is fun!"
    ```
    ***************************************************************************************************
     <b>   1F : en-GB-SoniaNeural
        2F : en-US-JennyNeural
        3F : en-HK-YanNeural
        4F : en-US-EmmaNeural
        1M : en-AU-WilliamNeural
        2M : en-GB-RyanNeural
        3M : en-US-BrianMultilingualNeural
        4M : en-GB-ThomasNeural
        1O : en-GB-RyanNeural"
        1C : en-GB-MaisieNeural
        1V : vi-VN-HoaiMyNeural
        2V : vi-VN-NamMinhNeural
        3V : en-US-EmmaMultilingualNeural
        4V : en-US-BrianMultilingualNeural
        5V : en-US-AvaMultilingualNeural
        6V : en-US-AndrewMultilingualNeural
        7V : de-DE-SeraphinaMultilingualNeural
        8V : ko-KR-HyunsuMultilingualNeural  </b>
    ****************************************************************************************************
    """
    demo = gr.Interface(
        fn=tts_interface,
        inputs=[
            gr.Textbox(label="Timestamped Text with Voice Changes", lines=10, placeholder='00:00:00,000 "Text" more text "1F Different Voice"'),
            gr.Dropdown(choices=[""] + list(voices.keys()), label="Select Default Voice", value=default_voice),
            gr.Slider(minimum=-50, maximum=50, value=0, label="Speech Rate Adjustment (%)", step=1),
            gr.Slider(minimum=-50, maximum=50, value=0, label="Pitch Adjustment (Hz)", step=1) # Removed the duplicate value argument
        ],
        outputs=[
            gr.Audio(label="Generated Audio", type="filepath"),
            gr.Markdown(label="Warning", visible=False)
        ],
        title="TTS with HH:MM:SS,milliseconds and In-Quote Voice Switching",
        description=description,
        analytics_enabled=False,
        allow_flagging=False
    )
    return demo

if __name__ == "__main__":
    demo = asyncio.run(create_demo())
    demo.launch()