|
import gradio as gr |
|
import google.generativeai as genai |
|
import numpy as np |
|
import io |
|
import re |
|
import torch |
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
from huggingface_hub import snapshot_download, login |
|
import torchaudio |
|
from torchaudio.functional import resample |
|
import threading |
|
import queue |
|
|
|
|
|
import logging |
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
genai.configure(api_key='YOUR_GEMINI_API_KEY') |
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
print("Loading Orpheus model...") |
|
model_name = "canopylabs/orpheus-3b-0.1-ft" |
|
|
|
HF_TOKEN = "YOUR_HUGGINGFACE_TOKEN" |
|
login(token=HF_TOKEN) |
|
|
|
snapshot_download( |
|
repo_id=model_name, |
|
use_auth_token=HF_TOKEN, |
|
allow_patterns=[ |
|
"config.json", |
|
"*.safetensors", |
|
"model.safetensors.index.json", |
|
], |
|
ignore_patterns=[ |
|
"optimizer.pt", |
|
"pytorch_model.bin", |
|
"training_args.bin", |
|
"scheduler.pt", |
|
"tokenizer.json", |
|
"tokenizer_config.json", |
|
"special_tokens_map.json", |
|
"vocab.json", |
|
"merges.txt", |
|
"tokenizer.*" |
|
] |
|
) |
|
|
|
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16) |
|
model.to(device) |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
print(f"Orpheus model loaded to {device}") |
|
|
|
def generate_podcast_script(api_key, content, duration, num_hosts): |
|
genai.configure(api_key=api_key) |
|
model = genai.GenerativeModel('gemini-2.5-pro-preview-03-25') |
|
|
|
if num_hosts == 1: |
|
prompt = f""" |
|
Create a podcast script for one person discussing the following content: |
|
{content} |
|
|
|
The podcast should last approximately {duration}. Include natural speech patterns, |
|
humor, and occasional off-topic thoughts. Use occasional speech fillers like um, ah, |
|
yes, I see, Ok now. Vary the emotional tone. |
|
Format the script as a monologue without speaker labels. |
|
Separate each paragraph with a blank line. |
|
Do not use any special characters or markdown. Only include the monologue with proper punctuation. |
|
Ensure the content flows naturally and stays relevant to the topic. |
|
Limit the script length to match the requested duration of {duration}. |
|
""" |
|
else: |
|
prompt = f""" |
|
Create a podcast script for two people discussing the following content: |
|
{content} |
|
|
|
The podcast should last approximately {duration}. Include natural speech patterns, |
|
humor, and occasional off-topic chit-chat. Use occasional speech fillers like um, ah, |
|
yes, I see, Ok now. Vary the emotional tone. |
|
Format the script as alternating lines of dialogue without speaker labels. |
|
Separate each line with a blank line. |
|
Do not use any special characters or markdown. Only include the alternating dialogue lines with proper punctuation. |
|
Ensure the conversation flows naturally and stays relevant to the topic. |
|
Limit the script length to match the requested duration of {duration}. |
|
""" |
|
|
|
response = model.generate_content(prompt) |
|
clean_text = re.sub(r'[^a-zA-Z0-9\s.,?!]', '', response.text) |
|
return clean_text |
|
|
|
def text_to_speech(text, voice): |
|
inputs = tokenizer(text, return_tensors="pt").to(device) |
|
with torch.no_grad(): |
|
output = model.generate(**inputs, max_new_tokens=256) |
|
audio = output.audio.cpu().numpy() |
|
return audio |
|
|
|
def process_audio_segment(line, voice, result_queue): |
|
audio = text_to_speech(line, voice) |
|
result_queue.put(audio) |
|
|
|
def render_podcast(api_key, script, voice1, voice2, num_hosts): |
|
lines = [line for line in script.split('\n') if line.strip()] |
|
audio_segments = [] |
|
threads = [] |
|
result_queue = queue.Queue() |
|
|
|
for i, line in enumerate(lines): |
|
voice = voice1 if num_hosts == 1 or i % 2 == 0 else voice2 |
|
thread = threading.Thread(target=process_audio_segment, args=(line, voice, result_queue)) |
|
threads.append(thread) |
|
thread.start() |
|
|
|
for thread in threads: |
|
thread.join() |
|
|
|
while not result_queue.empty(): |
|
audio_segments.append(result_queue.get()) |
|
|
|
if not audio_segments: |
|
logger.warning("No valid audio segments were generated.") |
|
return (24000, np.zeros(24000, dtype=np.float32)) |
|
|
|
podcast_audio = np.concatenate(audio_segments) |
|
podcast_audio = resample(torch.from_numpy(podcast_audio), 24000, 24000).numpy() |
|
|
|
return (24000, podcast_audio) |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# AI Podcast Generator") |
|
|
|
api_key_input = gr.Textbox(label="Enter your Gemini API Key", type="password") |
|
|
|
with gr.Row(): |
|
content_input = gr.Textbox(label="Paste your content or upload a document") |
|
document_upload = gr.File(label="Upload Document") |
|
|
|
duration = gr.Radio(["1-5 min", "5-10 min", "10-15 min"], label="Estimated podcast duration") |
|
|
|
num_hosts = gr.Radio([1, 2], label="Number of podcast hosts", value=2) |
|
|
|
voice_options = ["tara", "leah", "jess", "leo", "dan", "mia", "zac", "zoe"] |
|
|
|
with gr.Row(): |
|
voice1_select = gr.Dropdown(label="Select Voice 1", choices=voice_options, value="tara") |
|
|
|
with gr.Row(): |
|
voice2_select = gr.Dropdown(label="Select Voice 2", choices=voice_options, value="leo") |
|
|
|
generate_btn = gr.Button("Generate Script") |
|
script_output = gr.Textbox(label="Generated Script", lines=10) |
|
|
|
render_btn = gr.Button("Render Podcast") |
|
audio_output = gr.Audio(label="Generated Podcast") |
|
|
|
def generate_script_wrapper(api_key, content, duration, num_hosts): |
|
return generate_podcast_script(api_key, content, duration, num_hosts) |
|
|
|
def render_podcast_wrapper(api_key, script, voice1, voice2, num_hosts): |
|
return render_podcast(api_key, script, voice1, voice2, num_hosts) |
|
|
|
generate_btn.click(generate_script_wrapper, inputs=[api_key_input, content_input, duration, num_hosts], outputs=script_output) |
|
render_btn.click(render_podcast_wrapper, inputs=[api_key_input, script_output, voice1_select, voice2_select, num_hosts], outputs=audio_output) |
|
|
|
def update_second_voice_visibility(num_hosts): |
|
return gr.update(visible=num_hosts == 2) |
|
|
|
num_hosts.change(update_second_voice_visibility, inputs=[num_hosts], outputs=[voice2_select]) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |