|
|
|
import json |
|
import httpx |
|
import os |
|
import re |
|
import feedparser |
|
import asyncio |
|
import random |
|
import edge_tts |
|
import tempfile |
|
import cohere |
|
import gradio as gr |
|
from pydub import AudioSegment |
|
from moviepy.editor import AudioFileClip, concatenate_audioclips |
|
|
|
double_prompt = ''' |
|
[SYSTEM_INSTRUCT] You are an insightful podcast generator. You have to create short conversations between Xiao and Yang that gives an overview of the Info given by the user. |
|
Please provide the script and output strictly in the following JSON format: |
|
{ |
|
"title": "[string]", |
|
"content": { |
|
"Xiao_0: "[string]", |
|
"Yang_0": "[string]", |
|
... |
|
} |
|
} |
|
#Be concise. No less than five rounds of conversation. |
|
#Please note that the [string] you generate now must be in relaxed and natural Chinese. |
|
''' |
|
|
|
single_prompt = """ |
|
You are a podcast generator of sharp opinions and humor. You have to create short scripts for Yang to comment on current events in the news given by the user. |
|
Please provide the script and output strictly in the following JSON format: |
|
{ |
|
"title": "[string]", |
|
"content": { |
|
"Yang_0: "[string]", |
|
"Yang_1": "[string]", |
|
... |
|
} |
|
} |
|
#Be concise. No less than five rounds of conversation. |
|
#Please note that the [string] you generate now must be in relaxed and natural Chinese. |
|
""" |
|
|
|
DESCRIPTION = ''' |
|
<div> |
|
<h1 style="text-align: center;">📻听说demo</h1> |
|
<p>一个轻量的中文播客</p> |
|
<p>🔎 输入完整的网页链接发送即可。</p> |
|
<p>🦕 部分网址可能无法解析,请尝试更换。</p> |
|
<p>🍀 点击随机将随机获取科学资讯。</p> |
|
</div> |
|
''' |
|
|
|
css = """ |
|
h1 { |
|
text-align: center; |
|
display: block; |
|
} |
|
p { |
|
text-align: center; |
|
} |
|
footer { |
|
display:none !important |
|
} |
|
""" |
|
|
|
rss_feed = 'https://www.scmp.com/rss/4/feed' |
|
|
|
|
|
apikey = os.environ.get("API_KEY") |
|
co = cohere.Client(api_key=apikey) |
|
|
|
|
|
|
|
|
|
def is_url(string): |
|
url_pattern = re.compile( |
|
r'^(?:http|ftp)s?://' |
|
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' |
|
r'localhost|' |
|
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' |
|
r'(?::\d+)?' |
|
r'(?:/?|[/?]\S+)$', re.IGNORECASE) |
|
return re.match(url_pattern, string) is not None |
|
|
|
|
|
def validate_url(url): |
|
try: |
|
response = httpx.get(url, timeout=60.0) |
|
response.raise_for_status() |
|
return response.text |
|
except httpx.RequestError as e: |
|
return f"An error occurred while requesting {url}: {str(e)}" |
|
except httpx.HTTPStatusError as e: |
|
return f"Error response {e.response.status_code} while requesting {url}" |
|
except Exception as e: |
|
return f"An unexpected error occurred: {str(e)}" |
|
|
|
def fetch_text(url): |
|
print("Entered Webpage Extraction") |
|
prefix_url = "https://r.jina.ai/" |
|
full_url = prefix_url + url |
|
print(full_url) |
|
print("Exited Webpage Extraction") |
|
return validate_url(full_url) |
|
|
|
|
|
async def text_to_speech(text, voice, filename): |
|
communicate = edge_tts.Communicate(text, voice) |
|
await communicate.save(filename) |
|
|
|
|
|
async def gen_show(script): |
|
title = script['title'] |
|
content = script['content'] |
|
|
|
temp_files = [] |
|
|
|
tasks = [] |
|
for key, text in content.items(): |
|
speaker = key.split('_')[0] |
|
index = key.split('_')[1] |
|
voice = "zh-CN-XiaoxiaoNeural" if speaker == "Xiao" else "zh-CN-YunyangNeural" |
|
|
|
|
|
temp_file = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) |
|
temp_files.append(temp_file.name) |
|
|
|
filename = temp_file.name |
|
tasks.append(text_to_speech(text, voice, filename)) |
|
print(f"Generated audio for {speaker}_{index}: {filename}") |
|
|
|
await asyncio.gather(*tasks) |
|
|
|
|
|
audio_clips = [AudioFileClip(temp_file) for temp_file in temp_files] |
|
combined = concatenate_audioclips(audio_clips) |
|
|
|
|
|
output_filename = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False).name |
|
|
|
|
|
combined.write_audiofile(output_filename) |
|
print(f"Combined audio saved as: {output_filename}") |
|
|
|
|
|
for temp_file in temp_files: |
|
os.remove(temp_file) |
|
print(f"Deleted temporary file: {temp_file}") |
|
|
|
return output_filename |
|
|
|
|
|
def extract_content(text): |
|
"""Extracts the JSON content from the given text.""" |
|
match = re.search(r'\{(?:[^{}]|\{[^{}]*\})*\}', text, re.DOTALL) |
|
if match: |
|
return match.group(0) |
|
else: |
|
return None |
|
|
|
async def main(link, peoples="双人"): |
|
if not link.startswith("http://") and not link.startswith("https://"): |
|
return "URL must start with 'http://' or 'https://'",None |
|
system_prompt = "" |
|
text = fetch_text(link) |
|
|
|
if "Error" in text: |
|
return text, None |
|
|
|
prompt = f"Info: {text}" |
|
|
|
if peoples == "双人": |
|
system_prompt = double_prompt; |
|
else: |
|
system_prompt = single_prompt; |
|
|
|
messages = system_prompt + "\n\n\n" + prompt |
|
|
|
|
|
completion = co.chat( |
|
model="command-r", |
|
message=messages |
|
) |
|
|
|
|
|
print(completion) |
|
|
|
generated_script = extract_content(completion.text) |
|
|
|
|
|
|
|
|
|
if not generated_script or not generated_script.strip().startswith('{'): |
|
raise ValueError("Failed to generate a valid script.") |
|
|
|
script_json = json.loads(generated_script) |
|
output_filename = await gen_show(script_json) |
|
print("Output File:"+output_filename) |
|
|
|
|
|
return output_filename |
|
|
|
|
|
async def random_news(): |
|
global rss_feed |
|
if not is_url(rss_feed): |
|
raise ValueError(f"{rss_feed} is not a valid RSS feed.") |
|
news = [] |
|
feed = feedparser.parse(rss_feed) |
|
for entry in feed.entries: |
|
news.append(entry.link) |
|
random_url = random.choice(news) |
|
print(random_url) |
|
output = await main(random_url) |
|
return output |
|
|
|
Examples = [ |
|
["https://www.yahoo.com/news/shes-worlds-most-expensive-cow-040156493.html"], |
|
["https://www.yahoo.com/news/fact-check-rumor-says-ukraines-001900679.html"], |
|
["https://www.yahoo.com/tech/super-hornet-armed-sm-6-180853983.html"], |
|
] |
|
with gr.Blocks(theme='soft', css=css, title="听说") as iface: |
|
with gr.Accordion(""): |
|
gr.Markdown(DESCRIPTION) |
|
with gr.Row(): |
|
output_box = gr.Audio(label="播客", type="filepath", interactive=False, autoplay=True, elem_classes="audio") |
|
with gr.Row(): |
|
input_box = gr.Textbox(label="网址", placeholder="请输入https开头的网址") |
|
with gr.Row(): |
|
peoples = gr.Radio(["单人","双人"],value="双人",label="播音员人数") |
|
with gr.Row(): |
|
submit_btn = gr.Button("🚀 发送") |
|
random_btn = gr.Button("🤙 随机") |
|
clear_btn = gr.ClearButton(output_box, value="🗑️ 清除") |
|
gr.Examples(examples=Examples, inputs=input_box, outputs=output_box, fn=main, label="示例", cache_examples="lazy") |
|
|
|
|
|
submit_btn.click(main, inputs=[input_box, peoples], outputs=output_box) |
|
random_btn.click(fn=random_news, outputs=output_box) |
|
|
|
|
|
|
|
|
|
iface.queue().launch(show_api=False) |