Spaces:
Sleeping
Sleeping
import os | |
import torch | |
import numpy as np | |
import librosa | |
import gradio as gr | |
import torchaudio | |
import asyncio | |
from gradio_webrtc import ( | |
AsyncAudioVideoStreamHandler, | |
WebRTC, | |
get_twilio_turn_credentials, | |
) | |
from pathlib import Path | |
# Create directories | |
os.makedirs("voice_samples", exist_ok=True) | |
# Voice presets (simple pitch and speed modifications) | |
VOICE_PRESETS = { | |
"Deep Male": {"pitch_shift": -4, "speed_factor": 0.9}, | |
"Standard Male": {"pitch_shift": -2, "speed_factor": 0.95}, | |
"Standard Female": {"pitch_shift": 2, "speed_factor": 1.05}, | |
"High Female": {"pitch_shift": 4, "speed_factor": 1.1}, | |
} | |
# Audio processing function | |
def process_audio(waveform, sampling_rate=16000): | |
# Convert from int16 to floating point if needed | |
if waveform.dtype == np.int16: | |
waveform = waveform / 32768.0 | |
# Make sure input is mono | |
if len(waveform.shape) > 1: | |
waveform = librosa.to_mono(waveform.T) | |
# Resample to 16 kHz if needed | |
if sampling_rate != 16000: | |
waveform = librosa.resample(waveform, orig_sr=sampling_rate, target_sr=16000) | |
# Limit length to avoid memory issues | |
max_length = 16000 * 15 | |
if len(waveform) > max_length: | |
waveform = waveform[:max_length] | |
return waveform | |
# Simple voice conversion using torchaudio effects | |
def convert_voice_simple(waveform, preset): | |
try: | |
# Convert to tensor | |
if not torch.is_tensor(waveform): | |
waveform_tensor = torch.tensor(waveform).float() | |
else: | |
waveform_tensor = waveform | |
# Ensure tensor is properly shaped | |
if waveform_tensor.dim() == 1: | |
waveform_tensor = waveform_tensor.unsqueeze(0) | |
# Apply pitch shift | |
pitch_shift = preset.get("pitch_shift", 0) | |
if pitch_shift != 0: | |
waveform_tensor = torchaudio.functional.pitch_shift( | |
waveform_tensor, | |
sample_rate=16000, | |
n_steps=pitch_shift | |
) | |
# Apply speed change | |
speed_factor = preset.get("speed_factor", 1.0) | |
if speed_factor != 1.0: | |
waveform_tensor = torchaudio.functional.speed( | |
waveform_tensor, | |
speed_factor | |
) | |
# Add some effects for more natural sound | |
# Light reverb effect | |
waveform_tensor = torchaudio.functional.add_reverb( | |
waveform_tensor, | |
sample_rate=16000, | |
reverberance=20, | |
room_scale=50, | |
wet_gain=0 | |
) | |
return waveform_tensor.squeeze().numpy() | |
except Exception as e: | |
print(f"Error in voice conversion: {e}") | |
return waveform | |
class VoiceConversionHandler(AsyncAudioVideoStreamHandler): | |
def __init__( | |
self, expected_layout="mono", output_sample_rate=16000, output_frame_size=1024 | |
) -> None: | |
super().__init__( | |
expected_layout, | |
output_sample_rate, | |
output_frame_size, | |
input_sample_rate=16000, | |
) | |
self.audio_queue = asyncio.Queue() | |
self.quit = asyncio.Event() | |
self.voice_preset = None | |
self.buffer = np.array([]) | |
self.buffer_size = 4096 # Buffer size for processing | |
def copy(self) -> "VoiceConversionHandler": | |
return VoiceConversionHandler( | |
expected_layout=self.expected_layout, | |
output_sample_rate=self.output_sample_rate, | |
output_frame_size=self.output_frame_size, | |
) | |
async def receive(self, frame: tuple[int, np.ndarray]) -> None: | |
sample_rate, array = frame | |
array = array.squeeze() | |
# Add new audio to buffer | |
self.buffer = np.append(self.buffer, process_audio(array, sample_rate)) | |
# Process when buffer is large enough | |
if len(self.buffer) >= self.buffer_size: | |
# Process audio chunk | |
if self.voice_preset: | |
preset = VOICE_PRESETS.get(self.voice_preset, VOICE_PRESETS["Standard Male"]) | |
processed_audio = convert_voice_simple(self.buffer[:self.buffer_size], preset) | |
result = (processed_audio * 32767).astype(np.int16) | |
else: | |
# Return original if no voice preset is selected | |
result = (self.buffer[:self.buffer_size] * 32767).astype(np.int16) | |
self.audio_queue.put_nowait((16000, result)) | |
# Keep remainder | |
self.buffer = self.buffer[self.buffer_size:] | |
async def emit(self): | |
if not self.args_set.is_set(): | |
await self.wait_for_args() | |
# Get selected voice preset | |
if self.latest_args and len(self.latest_args) > 1: | |
self.voice_preset = self.latest_args[1] | |
# If queue is empty, return silence | |
if self.audio_queue.empty(): | |
return (16000, np.zeros(self.output_frame_size, dtype=np.int16)) | |
return await self.audio_queue.get() | |
def shutdown(self) -> None: | |
self.quit.set() | |
self.args_set.clear() | |
self.quit.clear() | |
# CSS for styling | |
css = """ | |
.container { | |
max-width: 800px; | |
margin: 0 auto; | |
padding: 20px; | |
} | |
.header { | |
text-align: center; | |
margin-bottom: 20px; | |
} | |
.voice-controls { | |
padding: 15px; | |
border-radius: 8px; | |
background-color: #f5f5f5; | |
margin-bottom: 20px; | |
} | |
""" | |
# Main application | |
def main(): | |
with gr.Blocks(css=css) as demo: | |
gr.Markdown( | |
""" | |
<div class="header"> | |
<h1>Real-time Voice Conversion</h1> | |
<p>Speak into your microphone to convert your voice in real-time using audio effects.</p> | |
</div> | |
""" | |
) | |
with gr.Row(equal_height=True): | |
with gr.Column(): | |
webrtc = WebRTC( | |
label="Voice Chat", | |
modality="audio", | |
mode="send-receive", | |
rtc_configuration=get_twilio_turn_credentials(), | |
pulse_color="rgb(35, 157, 225)", | |
) | |
with gr.Column(elem_classes="voice-controls"): | |
voice_preset = gr.Radio( | |
choices=list(VOICE_PRESETS.keys()), | |
value="Standard Male", | |
label="Target Voice" | |
) | |
gr.Markdown( | |
""" | |
### How to use: | |
1. Allow microphone access | |
2. Select your target voice style | |
3. Click the microphone button and start speaking | |
4. Your voice will be converted in real-time | |
Note: This version uses basic audio effects without SentencePiece. | |
""" | |
) | |
webrtc.stream( | |
VoiceConversionHandler(), | |
inputs=[webrtc, voice_preset], | |
outputs=[webrtc], | |
concurrency_limit=2, | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = main() | |
demo.launch() |