File size: 7,221 Bytes
a958ea7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import os
import torch
import numpy as np
import librosa
import gradio as gr
import torchaudio
import asyncio
from gradio_webrtc import (
    AsyncAudioVideoStreamHandler,
    WebRTC,
    get_twilio_turn_credentials,
)
from pathlib import Path

# Create directories
os.makedirs("voice_samples", exist_ok=True)

# Voice presets (simple pitch and speed modifications)
VOICE_PRESETS = {
    "Deep Male": {"pitch_shift": -4, "speed_factor": 0.9},
    "Standard Male": {"pitch_shift": -2, "speed_factor": 0.95},
    "Standard Female": {"pitch_shift": 2, "speed_factor": 1.05},
    "High Female": {"pitch_shift": 4, "speed_factor": 1.1},
}

# Audio processing function
def process_audio(waveform, sampling_rate=16000):
    # Convert from int16 to floating point if needed
    if waveform.dtype == np.int16:
        waveform = waveform / 32768.0
    
    # Make sure input is mono
    if len(waveform.shape) > 1:
        waveform = librosa.to_mono(waveform.T)
    
    # Resample to 16 kHz if needed
    if sampling_rate != 16000:
        waveform = librosa.resample(waveform, orig_sr=sampling_rate, target_sr=16000)
    
    # Limit length to avoid memory issues
    max_length = 16000 * 15
    if len(waveform) > max_length:
        waveform = waveform[:max_length]
    
    return waveform

# Simple voice conversion using torchaudio effects
def convert_voice_simple(waveform, preset):
    try:
        # Convert to tensor
        if not torch.is_tensor(waveform):
            waveform_tensor = torch.tensor(waveform).float()
        else:
            waveform_tensor = waveform
            
        # Ensure tensor is properly shaped
        if waveform_tensor.dim() == 1:
            waveform_tensor = waveform_tensor.unsqueeze(0)
            
        # Apply pitch shift
        pitch_shift = preset.get("pitch_shift", 0)
        if pitch_shift != 0:
            waveform_tensor = torchaudio.functional.pitch_shift(
                waveform_tensor, 
                sample_rate=16000, 
                n_steps=pitch_shift
            )
            
        # Apply speed change
        speed_factor = preset.get("speed_factor", 1.0)
        if speed_factor != 1.0:
            waveform_tensor = torchaudio.functional.speed(
                waveform_tensor, 
                speed_factor
            )
            
        # Add some effects for more natural sound
        # Light reverb effect
        waveform_tensor = torchaudio.functional.add_reverb(
            waveform_tensor,
            sample_rate=16000,
            reverberance=20,
            room_scale=50,
            wet_gain=0
        )
        
        return waveform_tensor.squeeze().numpy()
        
    except Exception as e:
        print(f"Error in voice conversion: {e}")
        return waveform

class VoiceConversionHandler(AsyncAudioVideoStreamHandler):
    def __init__(
        self, expected_layout="mono", output_sample_rate=16000, output_frame_size=1024
    ) -> None:
        super().__init__(
            expected_layout,
            output_sample_rate,
            output_frame_size,
            input_sample_rate=16000,
        )
        self.audio_queue = asyncio.Queue()
        self.quit = asyncio.Event()
        self.voice_preset = None
        self.buffer = np.array([])
        self.buffer_size = 4096  # Buffer size for processing

    def copy(self) -> "VoiceConversionHandler":
        return VoiceConversionHandler(
            expected_layout=self.expected_layout,
            output_sample_rate=self.output_sample_rate,
            output_frame_size=self.output_frame_size,
        )
    
    async def receive(self, frame: tuple[int, np.ndarray]) -> None:
        sample_rate, array = frame
        array = array.squeeze()
        
        # Add new audio to buffer
        self.buffer = np.append(self.buffer, process_audio(array, sample_rate))
        
        # Process when buffer is large enough
        if len(self.buffer) >= self.buffer_size:
            # Process audio chunk
            if self.voice_preset:
                preset = VOICE_PRESETS.get(self.voice_preset, VOICE_PRESETS["Standard Male"])
                processed_audio = convert_voice_simple(self.buffer[:self.buffer_size], preset)
                result = (processed_audio * 32767).astype(np.int16)
            else:
                # Return original if no voice preset is selected
                result = (self.buffer[:self.buffer_size] * 32767).astype(np.int16)
                
            self.audio_queue.put_nowait((16000, result))
            # Keep remainder
            self.buffer = self.buffer[self.buffer_size:]

    async def emit(self):
        if not self.args_set.is_set():
            await self.wait_for_args()
            
        # Get selected voice preset
        if self.latest_args and len(self.latest_args) > 1:
            self.voice_preset = self.latest_args[1]
        
        # If queue is empty, return silence
        if self.audio_queue.empty():
            return (16000, np.zeros(self.output_frame_size, dtype=np.int16))
        
        return await self.audio_queue.get()

    def shutdown(self) -> None:
        self.quit.set()
        self.args_set.clear()
        self.quit.clear()

# CSS for styling
css = """
.container {
    max-width: 800px;
    margin: 0 auto;
    padding: 20px;
}
.header {
    text-align: center;
    margin-bottom: 20px;
}
.voice-controls {
    padding: 15px;
    border-radius: 8px;
    background-color: #f5f5f5;
    margin-bottom: 20px;
}
"""

# Main application
def main():
    with gr.Blocks(css=css) as demo:
        gr.Markdown(
            """
            <div class="header">
                <h1>Real-time Voice Conversion</h1>
                <p>Speak into your microphone to convert your voice in real-time using audio effects.</p>
            </div>
            """
        )
        
        with gr.Row(equal_height=True):
            with gr.Column():
                webrtc = WebRTC(
                    label="Voice Chat",
                    modality="audio",
                    mode="send-receive",
                    rtc_configuration=get_twilio_turn_credentials(),
                    pulse_color="rgb(35, 157, 225)",
                )
            
            with gr.Column(elem_classes="voice-controls"):
                voice_preset = gr.Radio(
                    choices=list(VOICE_PRESETS.keys()),
                    value="Standard Male",
                    label="Target Voice"
                )
                
                gr.Markdown(
                    """
                    ### How to use:
                    1. Allow microphone access
                    2. Select your target voice style
                    3. Click the microphone button and start speaking
                    4. Your voice will be converted in real-time
                    
                    Note: This version uses basic audio effects without SentencePiece.
                    """
                )

        webrtc.stream(
            VoiceConversionHandler(),
            inputs=[webrtc, voice_preset],
            outputs=[webrtc],
            concurrency_limit=2,
        )

    return demo

if __name__ == "__main__":
    demo = main()
    demo.launch()