Spaces:
Running
on
Zero
Running
on
Zero
import queue | |
import threading | |
import spaces | |
import os | |
import io | |
import soundfile as sf | |
import gradio as gr | |
import numpy as np | |
import torch | |
from transformers import set_seed | |
from huggingface_hub import InferenceClient | |
from kokoro import KModel, KPipeline | |
# ----------------------------------------------------------------------------- | |
# Get podcast subject | |
# ----------------------------------------------------------------------------- | |
from papers import PaperManager | |
paper_manager = PaperManager() | |
top_papers = paper_manager.get_top_content() | |
PODCAST_SUBJECT = list(top_papers.values())[0] | |
# ----------------------------------------------------------------------------- | |
# LLM that writes the script (unchanged) | |
# ----------------------------------------------------------------------------- | |
client = InferenceClient( | |
"meta-llama/Llama-3.3-70B-Instruct", | |
provider="cerebras", | |
token=os.getenv("HF_TOKEN"), | |
) | |
def generate_podcast_text(subject: str) -> str: | |
"""Ask the LLM for a script of a podcast given by two hosts.""" | |
prompt = f"""Generate the script of "Open Paper review", a podcast told by 2 hosts about. | |
Here is the topic: it's the top trending paper on Hugging Face daily papers today. You will need to analyze it by bringing profound insights. | |
{subject} | |
The podcast should be an insightful discussion, with some amount of playful banter. | |
Separate dialog as follows, each replica in one line prefaced by the host number, [S1] for the male host and [S2] for the female host, for instance: | |
[S1] Hello, how are you? | |
[S2] I'm good, thank you. How are you? | |
[S1] I'm good, thank you. | |
[S2] Great. | |
The podcast should last around 5 minutes. | |
""" | |
response = client.chat_completion( | |
[{"role": "user", "content": prompt[:1000]}], | |
max_tokens=8156, | |
) | |
return response.choices[0].message.content | |
# ----------------------------------------------------------------------------- | |
# Kokoro TTS | |
# ----------------------------------------------------------------------------- | |
CUDA_AVAILABLE = torch.cuda.is_available() | |
kmodel = KModel().to("cuda" if CUDA_AVAILABLE else "cpu").eval() | |
kpipeline = KPipeline(lang_code="a") # English voices | |
MALE_VOICE = "am_michael" # [S1] | |
FEMALE_VOICE = "af_heart" # [S2] | |
# Pre‑warm voices to avoid first‑call latency | |
for v in (MALE_VOICE, FEMALE_VOICE): | |
kpipeline.load_voice(v) | |
# ----------------------------------------------------------------------------- | |
# Audio generation system with queue | |
# ----------------------------------------------------------------------------- | |
audio_queue: queue.Queue[tuple[int, np.ndarray] | None] = queue.Queue() | |
stop_signal = threading.Event() | |
def process_audio_chunks(podcast_text: str, speed: float = 1.0) -> None: | |
"""Read each line, pick voice via tag, send chunks to the queue.""" | |
lines = [l for l in podcast_text.strip().splitlines() if l.strip()] | |
pipeline = kpipeline | |
pipeline_voice_female = pipeline.load_voice(FEMALE_VOICE) | |
pipeline_voice_male = pipeline.load_voice(MALE_VOICE) | |
for line in lines: | |
if stop_signal.is_set(): | |
break | |
# Expect "[S1] ..." or "[S2] ..." | |
if line.startswith("[S1]"): | |
pipeline_voice = pipeline_voice_male | |
voice = MALE_VOICE | |
utterance = line[len("[S1]"):].strip() | |
elif line.startswith("[S2]"): | |
pipeline_voice = pipeline_voice_female | |
voice = FEMALE_VOICE | |
utterance = line[len("[S2]"):].strip() | |
else: # fallback | |
pipeline_voice = pipeline_voice_female | |
voice = FEMALE_VOICE | |
utterance = line | |
first = True | |
for _, ps, _ in pipeline(utterance, voice, speed): | |
ref_s = pipeline_voice[len(ps) - 1] | |
audio = kmodel(ps, ref_s, speed) | |
audio_queue.put((24000, audio.numpy())) | |
audio_numpy = audio.numpy() | |
if first: | |
first = False | |
audio_queue.put((24000, torch.zeros(1).numpy())) | |
audio_queue.put(None) # Signal end of stream | |
def stream_audio_generator(podcast_text: str): | |
stop_signal.clear() | |
threading.Thread(target=process_audio_chunks, args=(podcast_text,)).start() | |
while True: | |
chunk = audio_queue.get() | |
if chunk is None: | |
break | |
sr, data = chunk | |
buf = io.BytesIO() | |
sf.write(buf, data, sr, format="wav") | |
buf.seek(0) | |
yield buf.getvalue(), "Generating podcast..." | |
def stop_generation(): | |
stop_signal.set() | |
return "Generation stopped" | |
def generate_podcast(): | |
return generate_podcast_text(PODCAST_SUBJECT) | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# NotebookLM Podcast Generator") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
# gr.Markdown(f"## Current Topic: {PODCAST_SUBJECT}") | |
gr.Markdown( | |
"This app generates a podcast discussion between two hosts about the specified topic." | |
) | |
generate_btn = gr.Button("Generate Podcast Script", variant="primary") | |
podcast_output = gr.Textbox(label="Generated Podcast Script", lines=15) | |
gr.Markdown("## Audio Preview") | |
gr.Markdown("Click below to hear the podcast with realistic voices:") | |
with gr.Row(): | |
start_audio_btn = gr.Button("▶️ Generate Podcast", variant="secondary") | |
stop_btn = gr.Button("⏹️ Stop", variant="stop") | |
audio_output = gr.Audio(label="Podcast Audio", streaming=True) | |
status_text = gr.Textbox(label="Status", visible=True) | |
generate_btn.click(fn=generate_podcast, outputs=podcast_output) | |
start_audio_btn.click(fn=stream_audio_generator, inputs=podcast_output, outputs=[audio_output, status_text]) | |
stop_btn.click(fn=stop_generation, outputs=status_text) | |
if __name__ == "__main__": | |
demo.queue().launch() | |