|
import gradio as gr |
|
import torch |
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
from huggingface_hub import snapshot_download, login |
|
import logging |
|
import os |
|
import spaces |
|
import warnings |
|
from snac import SNAC |
|
import numpy as np |
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
warnings.filterwarnings("ignore", category=UserWarning) |
|
warnings.filterwarnings("ignore", category=RuntimeWarning) |
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
logger.info(f"Using device: {device}") |
|
|
|
model = None |
|
tokenizer = None |
|
snac_model = None |
|
|
|
EMOTIVE_TAGS = ["<laugh>", "<sigh>", "<gasp>", "<cry>", "<yawn>"] |
|
|
|
@spaces.GPU() |
|
def load_model(): |
|
global model, tokenizer, snac_model |
|
try: |
|
logger.info("Loading SNAC model...") |
|
snac_model = SNAC.from_pretrained("hubertsiuzdak/snac_24khz") |
|
snac_model = snac_model.to(device) |
|
|
|
logger.info("Loading Orpheus model...") |
|
model_name = "canopylabs/orpheus-3b-0.1-ft" |
|
|
|
hf_token = os.environ.get("HUGGINGFACE_TOKEN") |
|
if not hf_token: |
|
raise ValueError("HUGGINGFACE_TOKEN environment variable is not set") |
|
|
|
login(token=hf_token) |
|
|
|
snapshot_download( |
|
repo_id=model_name, |
|
use_auth_token=hf_token, |
|
allow_patterns=["config.json", "*.safetensors", "model.safetensors.index.json"], |
|
ignore_patterns=["optimizer.pt", "pytorch_model.bin", "training_args.bin", "scheduler.pt", "tokenizer.*"] |
|
) |
|
|
|
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16) |
|
model.to(device) |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
logger.info(f"Orpheus model and tokenizer loaded to {device}") |
|
except Exception as e: |
|
logger.error(f"Error loading model: {str(e)}") |
|
raise |
|
|
|
def process_prompt(prompt, voice, tokenizer, device): |
|
prompt = f"{voice}: {prompt}" |
|
input_ids = tokenizer(prompt, return_tensors="pt").input_ids |
|
|
|
start_token = torch.tensor([[128259]], dtype=torch.int64) |
|
end_tokens = torch.tensor([[128009, 128260]], dtype=torch.int64) |
|
|
|
modified_input_ids = torch.cat([start_token, input_ids, end_tokens], dim=1) |
|
attention_mask = torch.ones_like(modified_input_ids) |
|
|
|
return modified_input_ids.to(device), attention_mask.to(device) |
|
|
|
def parse_output(generated_ids): |
|
token_to_find = 128257 |
|
token_to_remove = 128258 |
|
|
|
token_indices = (generated_ids == token_to_find).nonzero(as_tuple=True) |
|
|
|
if len(token_indices[1]) > 0: |
|
last_occurrence_idx = token_indices[1][-1].item() |
|
cropped_tensor = generated_ids[:, last_occurrence_idx+1:] |
|
else: |
|
cropped_tensor = generated_ids |
|
|
|
processed_rows = [] |
|
for row in cropped_tensor: |
|
masked_row = row[row != token_to_remove] |
|
processed_rows.append(masked_row) |
|
|
|
code_lists = [] |
|
for row in processed_rows: |
|
row_length = row.size(0) |
|
new_length = (row_length // 7) * 7 |
|
trimmed_row = row[:new_length] |
|
trimmed_row = [t - 128266 for t in trimmed_row] |
|
code_lists.append(trimmed_row) |
|
|
|
return code_lists[0] |
|
|
|
def redistribute_codes(code_list, snac_model): |
|
device = next(snac_model.parameters()).device |
|
|
|
layer_1, layer_2, layer_3 = [], [], [] |
|
for i in range((len(code_list)+1)//7): |
|
layer_1.append(code_list[7*i]) |
|
layer_2.append(code_list[7*i+1]-4096) |
|
layer_3.append(code_list[7*i+2]-(2*4096)) |
|
layer_3.append(code_list[7*i+3]-(3*4096)) |
|
layer_2.append(code_list[7*i+4]-(4*4096)) |
|
layer_3.append(code_list[7*i+5]-(5*4096)) |
|
layer_3.append(code_list[7*i+6]-(6*4096)) |
|
|
|
codes = [ |
|
torch.tensor(layer_1, device=device).unsqueeze(0), |
|
torch.tensor(layer_2, device=device).unsqueeze(0), |
|
torch.tensor(layer_3, device=device).unsqueeze(0) |
|
] |
|
|
|
audio_hat = snac_model.decode(codes) |
|
return audio_hat.detach().squeeze().cpu().numpy() |
|
|
|
@spaces.GPU() |
|
def generate_speech(text, voice, temperature, top_p, repetition_penalty, max_new_tokens, progress=gr.Progress()): |
|
if not text.strip(): |
|
return None |
|
|
|
try: |
|
progress(0.1, "Processing text...") |
|
input_ids, attention_mask = process_prompt(text, voice, tokenizer, device) |
|
|
|
progress(0.3, "Generating speech tokens...") |
|
with torch.no_grad(): |
|
generated_ids = model.generate( |
|
input_ids=input_ids, |
|
attention_mask=attention_mask, |
|
max_new_tokens=max_new_tokens, |
|
do_sample=True, |
|
temperature=temperature, |
|
top_p=top_p, |
|
repetition_penalty=repetition_penalty, |
|
num_return_sequences=1, |
|
eos_token_id=128258, |
|
) |
|
|
|
progress(0.6, "Processing speech tokens...") |
|
code_list = parse_output(generated_ids) |
|
|
|
progress(0.8, "Converting to audio...") |
|
audio_samples = redistribute_codes(code_list, snac_model) |
|
|
|
return (24000, audio_samples) |
|
except Exception as e: |
|
print(f"Error generating speech: {e}") |
|
return None |
|
|
|
with gr.Blocks(title="Orpheus Text-to-Speech") as demo: |
|
gr.Markdown(f""" |
|
# 🎵 [Orpheus Text-to-Speech](https://github.com/canopyai/Orpheus-TTS) |
|
Enter your text below and hear it converted to natural-sounding speech with the Orpheus TTS model. |
|
|
|
## Tips for better prompts: |
|
- Add paralinguistic elements like {", ".join(EMOTIVE_TAGS)} or `uhm` for more human-like speech. |
|
- Longer text prompts generally work better than very short phrases |
|
""") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
text_input = gr.Textbox( |
|
label="Text Input", |
|
placeholder="Enter the text you want to convert to speech...", |
|
lines=8 |
|
) |
|
voice_select = gr.Dropdown( |
|
choices=["tara", "leah", "jess", "leo", "dan", "mia", "zac", "zoe"], |
|
value="tara", |
|
label="Voice" |
|
) |
|
with gr.Accordion("Advanced Options", open=False): |
|
temperature = gr.Slider( |
|
minimum=0.1, maximum=1.0, value=0.6, step=0.1, |
|
label="Temperature", |
|
info="Higher values increase randomness in the output" |
|
) |
|
top_p = gr.Slider( |
|
minimum=0.1, maximum=1.0, value=0.95, step=0.05, |
|
label="Top-p", |
|
info="Lower values increase determinism in the output" |
|
) |
|
repetition_penalty = gr.Slider( |
|
minimum=1.0, maximum=2.0, value=1.1, step=0.1, |
|
label="Repetition Penalty", |
|
info="Higher values discourage repetitive patterns" |
|
) |
|
max_new_tokens = gr.Slider( |
|
minimum=100, maximum=2000, value=1200, step=100, |
|
label="Max Length", |
|
info="Maximum length of generated audio (in tokens)" |
|
) |
|
|
|
with gr.Row(): |
|
submit_btn = gr.Button("Generate Speech", variant="primary") |
|
clear_btn = gr.Button("Clear") |
|
|
|
with gr.Column(): |
|
audio_output = gr.Audio(label="Generated Speech") |
|
|
|
submit_btn.click( |
|
generate_speech, |
|
inputs=[text_input, voice_select, temperature, top_p, repetition_penalty, max_new_tokens], |
|
outputs=audio_output |
|
) |
|
|
|
clear_btn.click(lambda: "", inputs=None, outputs=text_input) |
|
|
|
if __name__ == "__main__": |
|
try: |
|
load_model() |
|
demo.launch() |
|
except Exception as e: |
|
logger.error(f"Error launching the application: {str(e)}") |