chaty / app.py
hashhac
testing
a958ea7
raw
history blame
7.22 kB
import os
import torch
import numpy as np
import librosa
import gradio as gr
import torchaudio
import asyncio
from gradio_webrtc import (
AsyncAudioVideoStreamHandler,
WebRTC,
get_twilio_turn_credentials,
)
from pathlib import Path
# Create directories
os.makedirs("voice_samples", exist_ok=True)
# Voice presets (simple pitch and speed modifications)
VOICE_PRESETS = {
"Deep Male": {"pitch_shift": -4, "speed_factor": 0.9},
"Standard Male": {"pitch_shift": -2, "speed_factor": 0.95},
"Standard Female": {"pitch_shift": 2, "speed_factor": 1.05},
"High Female": {"pitch_shift": 4, "speed_factor": 1.1},
}
# Audio processing function
def process_audio(waveform, sampling_rate=16000):
# Convert from int16 to floating point if needed
if waveform.dtype == np.int16:
waveform = waveform / 32768.0
# Make sure input is mono
if len(waveform.shape) > 1:
waveform = librosa.to_mono(waveform.T)
# Resample to 16 kHz if needed
if sampling_rate != 16000:
waveform = librosa.resample(waveform, orig_sr=sampling_rate, target_sr=16000)
# Limit length to avoid memory issues
max_length = 16000 * 15
if len(waveform) > max_length:
waveform = waveform[:max_length]
return waveform
# Simple voice conversion using torchaudio effects
def convert_voice_simple(waveform, preset):
try:
# Convert to tensor
if not torch.is_tensor(waveform):
waveform_tensor = torch.tensor(waveform).float()
else:
waveform_tensor = waveform
# Ensure tensor is properly shaped
if waveform_tensor.dim() == 1:
waveform_tensor = waveform_tensor.unsqueeze(0)
# Apply pitch shift
pitch_shift = preset.get("pitch_shift", 0)
if pitch_shift != 0:
waveform_tensor = torchaudio.functional.pitch_shift(
waveform_tensor,
sample_rate=16000,
n_steps=pitch_shift
)
# Apply speed change
speed_factor = preset.get("speed_factor", 1.0)
if speed_factor != 1.0:
waveform_tensor = torchaudio.functional.speed(
waveform_tensor,
speed_factor
)
# Add some effects for more natural sound
# Light reverb effect
waveform_tensor = torchaudio.functional.add_reverb(
waveform_tensor,
sample_rate=16000,
reverberance=20,
room_scale=50,
wet_gain=0
)
return waveform_tensor.squeeze().numpy()
except Exception as e:
print(f"Error in voice conversion: {e}")
return waveform
class VoiceConversionHandler(AsyncAudioVideoStreamHandler):
def __init__(
self, expected_layout="mono", output_sample_rate=16000, output_frame_size=1024
) -> None:
super().__init__(
expected_layout,
output_sample_rate,
output_frame_size,
input_sample_rate=16000,
)
self.audio_queue = asyncio.Queue()
self.quit = asyncio.Event()
self.voice_preset = None
self.buffer = np.array([])
self.buffer_size = 4096 # Buffer size for processing
def copy(self) -> "VoiceConversionHandler":
return VoiceConversionHandler(
expected_layout=self.expected_layout,
output_sample_rate=self.output_sample_rate,
output_frame_size=self.output_frame_size,
)
async def receive(self, frame: tuple[int, np.ndarray]) -> None:
sample_rate, array = frame
array = array.squeeze()
# Add new audio to buffer
self.buffer = np.append(self.buffer, process_audio(array, sample_rate))
# Process when buffer is large enough
if len(self.buffer) >= self.buffer_size:
# Process audio chunk
if self.voice_preset:
preset = VOICE_PRESETS.get(self.voice_preset, VOICE_PRESETS["Standard Male"])
processed_audio = convert_voice_simple(self.buffer[:self.buffer_size], preset)
result = (processed_audio * 32767).astype(np.int16)
else:
# Return original if no voice preset is selected
result = (self.buffer[:self.buffer_size] * 32767).astype(np.int16)
self.audio_queue.put_nowait((16000, result))
# Keep remainder
self.buffer = self.buffer[self.buffer_size:]
async def emit(self):
if not self.args_set.is_set():
await self.wait_for_args()
# Get selected voice preset
if self.latest_args and len(self.latest_args) > 1:
self.voice_preset = self.latest_args[1]
# If queue is empty, return silence
if self.audio_queue.empty():
return (16000, np.zeros(self.output_frame_size, dtype=np.int16))
return await self.audio_queue.get()
def shutdown(self) -> None:
self.quit.set()
self.args_set.clear()
self.quit.clear()
# CSS for styling
css = """
.container {
max-width: 800px;
margin: 0 auto;
padding: 20px;
}
.header {
text-align: center;
margin-bottom: 20px;
}
.voice-controls {
padding: 15px;
border-radius: 8px;
background-color: #f5f5f5;
margin-bottom: 20px;
}
"""
# Main application
def main():
with gr.Blocks(css=css) as demo:
gr.Markdown(
"""
<div class="header">
<h1>Real-time Voice Conversion</h1>
<p>Speak into your microphone to convert your voice in real-time using audio effects.</p>
</div>
"""
)
with gr.Row(equal_height=True):
with gr.Column():
webrtc = WebRTC(
label="Voice Chat",
modality="audio",
mode="send-receive",
rtc_configuration=get_twilio_turn_credentials(),
pulse_color="rgb(35, 157, 225)",
)
with gr.Column(elem_classes="voice-controls"):
voice_preset = gr.Radio(
choices=list(VOICE_PRESETS.keys()),
value="Standard Male",
label="Target Voice"
)
gr.Markdown(
"""
### How to use:
1. Allow microphone access
2. Select your target voice style
3. Click the microphone button and start speaking
4. Your voice will be converted in real-time
Note: This version uses basic audio effects without SentencePiece.
"""
)
webrtc.stream(
VoiceConversionHandler(),
inputs=[webrtc, voice_preset],
outputs=[webrtc],
concurrency_limit=2,
)
return demo
if __name__ == "__main__":
demo = main()
demo.launch()