NotaGen / app.py
ElectricAlexis's picture
Upload app.py
42e9a11 verified
raw
history blame
14.7 kB
import zero
import gradio as gr
import sys
import threading
import queue
from io import TextIOBase
import datetime
import subprocess
import os
from inference import postprocess_inst_names
from inference import inference_patch
from convert import abc2xml, xml2, pdf2img
# 读取 prompt 组合
with open('prompts.txt', 'r') as f:
prompts = f.readlines()
valid_combinations = set()
for prompt in prompts:
prompt = prompt.strip()
parts = prompt.split('_')
valid_combinations.add((parts[0], parts[1], parts[2]))
# 准备下拉框选项
periods = sorted({p for p, _, _ in valid_combinations})
composers = sorted({c for _, c, _ in valid_combinations})
instruments = sorted({i for _, _, i in valid_combinations})
# 动态更新作曲家、乐器下拉选项
def update_components(period, composer):
if not period:
return [
gr.update(choices=[], value=None, interactive=False),
gr.update(choices=[], value=None, interactive=False)
]
valid_composers = sorted({c for p, c, _ in valid_combinations if p == period})
valid_instruments = sorted({i for p, c, i in valid_combinations if p == period and c == composer}) if composer else []
return [
gr.update(
choices=valid_composers,
value=composer if composer in valid_composers else None,
interactive=True
),
gr.update(
choices=valid_instruments,
value=None,
interactive=bool(valid_instruments)
)
]
# 自定义实时流,用于把模型推理过程输出到前端
class RealtimeStream(TextIOBase):
def __init__(self, queue):
self.queue = queue
def write(self, text):
self.queue.put(text)
return len(text)
def convert_files(abc_content, period, composer, instrumentation):
if not all([period, composer, instrumentation]):
raise gr.Error("Please complete a valid generation first before saving")
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
prompt_str = f"{period}_{composer}_{instrumentation}"
filename_base = f"{timestamp}_{prompt_str}"
abc_filename = f"{filename_base}.abc"
with open(abc_filename, "w", encoding="utf-8") as f:
f.write(abc_content)
# instrumentation replacement
postprocessed_inst_abc = postprocess_inst_names(abc_content)
filename_base_postinst = f"{filename_base}_postinst"
with open(filename_base_postinst + ".abc", "w", encoding="utf-8") as f:
f.write(postprocessed_inst_abc)
# 转换文件
file_paths = {'abc': abc_filename}
try:
# abc2xml
abc2xml(filename_base)
abc2xml(filename_base_postinst)
# xml2pdf
xml2(filename_base, 'pdf')
# xml2mid
xml2(filename_base, 'mid')
xml2(filename_base_postinst, 'mid')
# xml2mp3
xml2(filename_base, 'mp3')
xml2(filename_base_postinst, 'mp3')
# 将PDF转为图片
images = pdf2img(filename_base)
for i, image in enumerate(images):
image.save(f"{filename_base}_page_{i+1}.png", "PNG")
file_paths.update({
'xml': f"{filename_base_postinst}.xml",
'pdf': f"{filename_base}.pdf",
'mid': f"{filename_base_postinst}.mid",
'mp3': f"{filename_base_postinst}.mp3",
'pages': len(images),
'current_page': 0,
'base': filename_base
})
except Exception as e:
raise gr.Error(f"文件处理失败: {str(e)}")
return file_paths
# 翻页控制函数
def update_page(direction, data):
"""
data 里面包含了 'pages','current_page','base' 三个关键信息
"""
if not data:
return None, gr.update(interactive=False), gr.update(interactive=False), data
if direction == "prev" and data['current_page'] > 0:
data['current_page'] -= 1
elif direction == "next" and data['current_page'] < data['pages'] - 1:
data['current_page'] += 1
current_page_index = data['current_page']
# 更新图片路径
new_image = f"{data['base']}_page_{current_page_index+1}.png"
# 当 current_page==0 时,prev_btn 不可用;当 current_page==pages-1 时,next_btn 不可用
prev_btn_state = gr.update(interactive=(current_page_index > 0))
next_btn_state = gr.update(interactive=(current_page_index < data['pages'] - 1))
return new_image, prev_btn_state, next_btn_state, data
@spaces.GPU
def generate_music(period, composer, instrumentation):
"""
需要保证每次 yield 的返回值数量一致。
我们这里准备返回 5 个值,对应:
1) process_output (中间推理信息)
2) final_output (最终 ABC)
3) pdf_image (PDF 第一页对应的 png 路径)
4) audio_player (mp3 路径)
5) pdf_state (翻页用的 state)
"""
# Set a different random seed each time based on current timestamp
random_seed = int(time.time()) % 10000
random.seed(random_seed)
# For numpy if you're using it
try:
import numpy as np
np.random.seed(random_seed)
except ImportError:
pass
# For torch if you're using it
try:
import torch
torch.manual_seed(random_seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(random_seed)
except ImportError:
pass
if (period, composer, instrumentation) not in valid_combinations:
# 如果组合非法,直接抛出错误
raise gr.Error("Invalid prompt combination! Please re-select from the period options")
output_queue = queue.Queue()
original_stdout = sys.stdout
sys.stdout = RealtimeStream(output_queue)
result_container = []
def run_inference():
try:
# 使用下载的模型权重路径进行推理
result = inference_patch(period, composer, instrumentation)
result_container.append(result)
finally:
sys.stdout = original_stdout
thread = threading.Thread(target=run_inference)
thread.start()
process_output = ""
final_output_abc = ""
pdf_image = None
audio_file = None
pdf_state = None
# 先持续读中间输出
while thread.is_alive():
try:
text = output_queue.get(timeout=0.1)
process_output += text
# 暂时没有最终 ABC,还没有转文件
yield process_output, final_output_abc, pdf_image, audio_file, pdf_state, gr.update(value=None, visible=False)
except queue.Empty:
continue
# 线程结束后,把剩余的队列都拿出来
while not output_queue.empty():
text = output_queue.get()
process_output += text
# 最终推理结果
final_result = result_container[0] if result_container else ""
# 显示转换文件的提示
final_output_abc = "Converting files..."
yield process_output, final_output_abc, pdf_image, audio_file, pdf_state, gr.update(value=None, visible=False)
# 做文件转换
try:
file_paths = convert_files(final_result, period, composer, instrumentation)
final_output_abc = final_result
# 拿到第一张图片和 mp3 文件
if file_paths['pages'] > 0:
pdf_image = f"{file_paths['base']}_page_1.png"
audio_file = file_paths['mp3']
pdf_state = file_paths # 直接把转换后的信息字典拿来存到 state
# 准备下载文件列表
download_list = []
if 'abc' in file_paths and os.path.exists(file_paths['abc']):
download_list.append(file_paths['abc'])
if 'xml' in file_paths and os.path.exists(file_paths['xml']):
download_list.append(file_paths['xml'])
if 'pdf' in file_paths and os.path.exists(file_paths['pdf']):
download_list.append(file_paths['pdf'])
if 'mid' in file_paths and os.path.exists(file_paths['mid']):
download_list.append(file_paths['mid'])
if 'mp3' in file_paths and os.path.exists(file_paths['mp3']):
download_list.append(file_paths['mp3'])
except Exception as e:
# 如果失败了,把错误信息返回到输出框
yield process_output, f"Error converting files: {str(e)}", None, None, None, gr.update(value=None, visible=False)
return
# 最后一次 yield,带上所有信息 - 修改此处让组件可见
yield process_output, final_output_abc, pdf_image, audio_file, pdf_state, gr.update(value=download_list, visible=True)
def get_file(file_type, period, composer, instrumentation):
"""
返回本地的指定类型文件,用于 Gradio 下载
"""
# 这里其实需要你根据先前保存下来的具体文件路径来返回,演示时可以简化
# 如果是按 timestamp 去匹配,可以把转换的文件都存在某个目录下再拿最新的
# 这里仅做示例:
possible_files = [f for f in os.listdir('.') if f.endswith(f'.{file_type}')]
if not possible_files:
return None
# 简单返回最新的
possible_files.sort(key=os.path.getmtime)
return possible_files[-1]
css = """
/* 紧凑按钮样式 */
button[size="sm"] {
padding: 4px 8px !important;
margin: 2px !important;
min-width: 60px;
}
/* PDF预览区 */
#pdf-preview {
border-radius: 8px; /* 圆角 */
box-shadow: 0 2px 8px rgba(0,0,0,0.1); /* 阴影 */
}
.page-btn {
padding: 12px !important; /* 增大点击区域 */
margin: auto !important; /* 垂直居中 */
}
/* 按钮悬停效果 */
.page-btn:hover {
background: #f0f0f0 !important;
transform: scale(1.05);
}
/* 布局调整 */
.gr-row {
gap: 10px !important; /* 元素间距 */
}
/* 音频播放器 */
.audio-panel {
margin-top: 15px !important;
max-width: 400px;
}
#audio-preview audio {
height: 200px !important;
}
/* 保存功能区 */
.save-as-row {
margin-top: 15px;
padding: 10px;
border-top: 1px solid #eee;
}
.save-as-label {
font-weight: bold;
margin-right: 10px;
align-self: center;
}
.save-buttons {
gap: 5px; /* 按钮间距 */
}
/* Download files styling */
.download-files {
margin-top: 15px;
border-radius: 8px;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}
"""
with gr.Blocks(css=css) as demo:
gr.Markdown("## NotaGen")
# 用于保存 PDF 页数、当前页等信息
pdf_state = gr.State()
with gr.Column():
with gr.Row():
# 左侧栏
with gr.Column():
with gr.Row():
period_dd = gr.Dropdown(
choices=periods,
value=None,
label="Period",
interactive=True
)
composer_dd = gr.Dropdown(
choices=[],
value=None,
label="Composer",
interactive=False
)
instrument_dd = gr.Dropdown(
choices=[],
value=None,
label="Instrumentation",
interactive=False
)
generate_btn = gr.Button("Generate!", variant="primary")
process_output = gr.Textbox(
label="Generation process",
interactive=False,
lines=2,
max_lines=2,
placeholder="Generation progress will be shown here..."
)
final_output = gr.Textbox(
label="Post-processed ABC notation scores",
interactive=True,
lines=8,
max_lines=8,
placeholder="Post-processed ABC scores will be shown here..."
)
# 音频播放
audio_player = gr.Audio(
label="Audio Preview",
format="mp3",
interactive=False,
# container=False,
# elem_id="audio-preview"
)
# 右侧栏
with gr.Column():
# 图片容器
pdf_image = gr.Image(
label="Sheet Music Preview",
show_label=False,
height=650,
type="filepath",
elem_id="pdf-preview",
interactive=False,
show_download_button=False
)
# 翻页按钮
with gr.Row():
prev_btn = gr.Button(
"⬅️ Last Page",
variant="secondary",
size="sm",
elem_classes="page-btn"
)
next_btn = gr.Button(
"Next Page ➡️",
variant="secondary",
size="sm",
elem_classes="page-btn"
)
with gr.Column():
gr.Markdown("**Download Files:**")
download_files = gr.Files(
label="Generated Files",
visible=False,
elem_classes="download-files",
type="filepath" # Make sure this is set to filepath
)
# 下拉框联动
period_dd.change(
update_components,
inputs=[period_dd, composer_dd],
outputs=[composer_dd, instrument_dd]
)
composer_dd.change(
update_components,
inputs=[period_dd, composer_dd],
outputs=[composer_dd, instrument_dd]
)
# 点击生成按钮,注意 outputs 要和 generate_music 里每次 yield 保持一致
generate_btn.click(
generate_music,
inputs=[period_dd, composer_dd, instrument_dd],
outputs=[process_output, final_output, pdf_image, audio_player, pdf_state, download_files]
)
# 翻页
prev_signal = gr.Textbox(value="prev", visible=False)
next_signal = gr.Textbox(value="next", visible=False)
prev_btn.click(
update_page,
inputs=[prev_signal, pdf_state], # ✅ 使用组件
outputs=[pdf_image, prev_btn, next_btn, pdf_state]
)
next_btn.click(
update_page,
inputs=[next_signal, pdf_state], # ✅ 使用组件
outputs=[pdf_image, prev_btn, next_btn, pdf_state]
)
if __name__ == "__main__":
# Configure GPU/CPU handling
demo.launch(
server_name="0.0.0.0",
server_port=7860
)