Spaces:
Sleeping
Sleeping
import random | |
import threading | |
import time | |
import requests | |
import json | |
import base64 | |
import os | |
import gradio as gr | |
import re | |
from AIGN import AIGN | |
from openAI import openAIChatLLM | |
chatLLM = openAIChatLLM() | |
STREAM_INTERVAL = 0.2 | |
CURRENT_MODEL = "gpt-4o-mini" # 默认模型 | |
# 在文件开头添加以下函数 | |
def clear_console(): | |
# 根据操作系统类型选择清屏命令 | |
if os.name == 'nt': # Windows | |
os.system('cls') | |
else: # Mac and Linux | |
os.system('clear') | |
def get_model_options(): | |
try: | |
response = requests.get("https://api.deem.love/v1/models") | |
data = response.json() | |
return [model["id"] for model in data["data"]] | |
except: | |
return ["gpt-4o-mini", "claude-3-haiku-20240307"] # 默认选项 | |
def set_api_model(model): | |
global CURRENT_MODEL | |
CURRENT_MODEL = model | |
#return f"{model}" | |
return f"选择模型成功" | |
# 修改获取章节名的函数 | |
def get_chapter_name(content): | |
# 查找最后一个章节标题,支持数字和汉字表示的章节 | |
matches = re.findall(r'(?:##?|)?\s*第([一二三四五六七八九十百千万亿\d]+)章[::]?\s*(.+)', content) | |
if matches: | |
chapter_num, chapter_title = matches[-1] | |
# 如果章节号是数字,直接使用;如果是汉字,转换为数字 | |
if chapter_num.isdigit(): | |
return f"第{chapter_num}章:{chapter_title.strip()}" | |
else: | |
# 这里可以添加一个函数来将汉字数字转换为阿拉伯数字 | |
# 为了简单起见,这里仍然使用汉字表示 | |
return f"第{chapter_num}章:{chapter_title.strip()}" | |
return "未知章节" | |
# 修改保存进度函数 | |
def save_progress(aign): | |
chapter_name = get_chapter_name(aign.novel_content) | |
# 移除文件名中的非法字符 | |
filename = re.sub(r'[\\/*?:"<>|]', '', chapter_name) | |
filename = f"{filename}.json" | |
data = { | |
"novel_outline": aign.novel_outline, | |
"paragraph_list": aign.paragraph_list, | |
"novel_content": aign.novel_content, | |
"writing_plan": aign.writing_plan, | |
"temp_setting": aign.temp_setting, | |
"writing_memory": aign.writing_memory, | |
"user_idea": aign.user_idea, | |
"user_requriments": aign.user_requriments, | |
} | |
json_str = json.dumps(data, ensure_ascii=False, indent=2) | |
b64 = base64.b64encode(json_str.encode()).decode() | |
href = f"data:application/json;base64,{b64}" | |
download_link = f'<a href="{href}" download="{filename}">点击下载进度文件</a>' | |
return download_link | |
# 修改加载函数 | |
def load_progress(aign, file): | |
try: | |
if file is None: | |
return aign, "请选择要加载的文件", None, None, None, None, None, None, None, None | |
# 检查 file 是否为字符串(文件路径) | |
if isinstance(file, str): | |
with open(file, 'r', encoding='utf-8') as f: | |
content = f.read() | |
# 检查 file 是否有 name 属性(Gradio File 对象) | |
elif hasattr(file, 'name'): | |
with open(file.name, 'r', encoding='utf-8') as f: | |
content = f.read() | |
else: | |
return aign, f"无法读取文件", None, None, None, None, None, None, None, None | |
data = json.loads(content) | |
aign.novel_outline = data["novel_outline"] | |
aign.paragraph_list = data["paragraph_list"] | |
aign.novel_content = data["novel_content"] | |
aign.writing_plan = data["writing_plan"] | |
aign.temp_setting = data["temp_setting"] | |
aign.writing_memory = data["writing_memory"] | |
aign.user_idea = data["user_idea"] | |
aign.user_requriments = data["user_requriments"] | |
#aign.embellishment_idea = data["embellishment_idea"] | |
aign.update_chapter_list() | |
chapter_choices = [title for title, _ in aign.chapter_list] | |
return aign, f"进度已加载", data["novel_outline"], data["novel_content"], data["writing_plan"], data["temp_setting"], data["writing_memory"], data["user_idea"], data["user_requriments"], gr.Dropdown(choices=chapter_choices, value=chapter_choices[-1] if chapter_choices else None) | |
except Exception as e: | |
return aign, f"加载失败: {str(e)}", None, None, None, None, None, None, None, None | |
# 新增保存正文函数 | |
def save_content(content): | |
if not content: | |
return "正文为空,无需保存" | |
b64 = base64.b64encode(content.encode()).decode() | |
href = f"data:text/plain;base64,{b64}" | |
download_link = f'<a href="{href}" download="novel_content.txt">点击下载正文</a>' | |
return download_link | |
# 添加撤销功能的事件处理 | |
def undo_generation(aign, history): | |
if aign.undo(): | |
return [ | |
aign, | |
history, | |
aign.writing_plan, | |
aign.temp_setting, | |
aign.writing_memory, | |
aign.novel_content, | |
gr.Button(visible=True), | |
] | |
else: | |
return [ | |
aign, | |
history + [["系统", "无法撤销,已经是最初状态"]], | |
aign.writing_plan, | |
aign.temp_setting, | |
aign.writing_memory, | |
aign.novel_content, | |
gr.Button(visible=True), | |
] | |
def display_chapter(chapters, page_num): | |
if 1 <= page_num <= len(chapters): | |
chapter_title, chapter_content = chapters[page_num - 1] | |
return f"{chapter_title}\n\n{chapter_content}" | |
else: | |
return "没有更多章节了。" | |
def prev_page(aign, current_page): | |
chapters = aign.chapter_list | |
if current_page > 1: | |
current_page -= 1 | |
return current_page, display_chapter(chapters, current_page) | |
def next_page(aign, current_page): | |
chapters = aign.chapter_list | |
if current_page < len(chapters): | |
current_page += 1 | |
return current_page, display_chapter(chapters, current_page) | |
def select_chapter(aign, chapter_title): | |
chapters = aign.chapter_list | |
for i, (title, _) in enumerate(chapters, start=1): | |
if title == chapter_title: | |
return i, display_chapter(chapters, i) | |
return 1, "章节未找到。" | |
def make_middle_chat(): | |
carrier = threading.Event() | |
carrier.history = [] | |
def middle_chat(messages, temperature=None, top_p=None): | |
nonlocal carrier | |
carrier.history.append([None, ""]) | |
if len(carrier.history) > 20: | |
carrier.history = carrier.history[-16:] | |
try: | |
for resp in chatLLM( | |
messages, temperature=temperature, top_p=top_p, stream=True, model=CURRENT_MODEL | |
): | |
output_text = resp["content"] | |
total_tokens = resp["total_tokens"] | |
carrier.history[-1][1] = f"total_tokens: {total_tokens}\n{output_text}" | |
return { | |
"content": output_text, | |
"total_tokens": total_tokens, | |
} | |
except Exception as e: | |
carrier.history[-1][1] = f"Error: {e}" | |
raise e | |
return carrier, middle_chat | |
def gen_ouline_button_clicked(aign, user_idea, history): | |
clear_console() # 清空命令行 | |
aign.user_idea = user_idea | |
carrier, middle_chat = make_middle_chat() | |
carrier.history = [] # 清空历史记录 | |
aign.novel_outline_writer.chatLLM = middle_chat | |
gen_ouline_thread = threading.Thread(target=aign.genNovelOutline) | |
gen_ouline_thread.start() | |
while gen_ouline_thread.is_alive(): | |
yield [ | |
aign, | |
carrier.history, | |
aign.novel_outline, | |
gr.Button(visible=False), | |
] | |
time.sleep(STREAM_INTERVAL) | |
yield [ | |
aign, | |
carrier.history, | |
aign.novel_outline, | |
gr.Button(visible=True), | |
] | |
def gen_beginning_button_clicked( | |
aign, history, novel_outline, user_requriments#, embellishment_idea | |
): | |
clear_console() # 清空命令行 | |
aign.novel_outline = novel_outline | |
aign.user_requriments = user_requriments | |
#aign.embellishment_idea = embellishment_idea | |
carrier, middle_chat = make_middle_chat() | |
carrier.history = [] # 清空历史记录 | |
aign.novel_beginning_writer.chatLLM = middle_chat | |
#aign.novel_embellisher.chatLLM = middle_chat | |
gen_beginning_thread = threading.Thread(target=aign.genBeginning) | |
gen_beginning_thread.start() | |
while gen_beginning_thread.is_alive(): | |
yield [ | |
aign, | |
carrier.history, | |
aign.writing_plan, | |
aign.temp_setting, | |
aign.novel_content, | |
gr.Button(visible=False), | |
] | |
time.sleep(STREAM_INTERVAL) | |
yield [ | |
aign, | |
carrier.history, | |
aign.writing_plan, | |
aign.temp_setting, | |
aign.novel_content, | |
gr.Button(visible=True), | |
] | |
def gen_next_paragraph_button_clicked( | |
aign, | |
history, | |
user_idea, | |
novel_outline, | |
writing_memory, | |
temp_setting, | |
writing_plan, | |
user_requriments, | |
#embellishment_idea, | |
): | |
# 生成保存进度链接 | |
save_link = save_progress(aign) # 在函数开头定义保存进度链接 | |
# 清空命令行 | |
clear_console() | |
# 更新 AIGN 对象的各个字段 | |
aign.user_idea = user_idea | |
aign.novel_outline = novel_outline | |
aign.writing_memory = writing_memory | |
aign.temp_setting = temp_setting | |
aign.writing_plan = writing_plan | |
aign.user_requriments = user_requriments | |
#aign.embellishment_idea = embellishment_idea | |
carrier, middle_chat = make_middle_chat() | |
carrier.history = [] # 清空历史记录 | |
aign.novel_writer.chatLLM = middle_chat | |
#aign.novel_embellisher.chatLLM = middle_chat | |
aign.memory_maker.chatLLM = middle_chat | |
gen_next_paragraph_thread = threading.Thread(target=aign.genNextParagraph) | |
gen_next_paragraph_thread.start() | |
while gen_next_paragraph_thread.is_alive(): | |
# 生成保存进度链接 | |
save_link = save_progress(aign) | |
aign.update_chapter_list() | |
# 获取当前章节内容 | |
current_chapter_content = display_chapter(aign.chapter_list, len(aign.chapter_list)) | |
chapter_choices = [title for title, _ in aign.chapter_list] | |
yield [ | |
aign, | |
carrier.history, | |
aign.writing_plan, | |
aign.temp_setting, | |
aign.writing_memory, | |
#aign.novel_content,#全部章节内容 | |
current_chapter_content, # 只更新当前章节内容 | |
gr.Button(visible=False), | |
save_link, # 返回生成的保存进度链接 | |
gr.Dropdown(choices=chapter_choices, value=chapter_choices[-1] if chapter_choices else None), | |
] | |
time.sleep(STREAM_INTERVAL) | |
# 生成保存进度链接 | |
save_link = save_progress(aign) | |
aign.update_chapter_list() | |
# 获取最终的当前章节内容 | |
current_chapter_content = display_chapter(aign.chapter_list, len(aign.chapter_list)) | |
chapter_choices = [title for title, _ in aign.chapter_list] | |
yield [ | |
aign, | |
carrier.history, | |
aign.writing_plan, | |
aign.temp_setting, | |
aign.writing_memory, | |
#aign.novel_content, #全部章节内容 | |
current_chapter_content, # 只更新当前章节内容 | |
gr.Button(visible=True), | |
save_link, # 返回生成的保存进度链接 | |
gr.Dropdown(choices=chapter_choices, value=chapter_choices[-1] if chapter_choices else None), | |
] | |
css = """ | |
#row1 { | |
min-width: 200px; | |
height: calc(100vh - 180px); | |
overflow: auto; | |
} | |
#row2 { | |
min-width: 300px; | |
height: calc(100vh - 180px); | |
overflow: auto; | |
} | |
#row3 { | |
min-width: 200px; | |
height: calc(100vh - 180px); | |
overflow: auto; | |
} | |
""" | |
with gr.Blocks(css=css) as demo: | |
aign = gr.State(AIGN(chatLLM)) | |
gr.Markdown("## AI 写小说 Demo") | |
#gr.Markdown("模型服务出问题会导致反复重试,这时切换到可用模型会继续,只要不刷新网页或者保存了进度(至少生成开头之后保存)就可以无限续写(某些模型智能及上下文有限,到极限会导致内容重复,这些模型在这就不适用了,比如gpt-3.5-turbo,当然接着切换到强模型可以再继续)") | |
# 添加模型选择下拉框 | |
# with gr.Row(): 同一行 | |
with gr.Row(): | |
with gr.Column(scale=0, elem_id="row1"): | |
with gr.Tab("⚙"): | |
model_dropdown = gr.Dropdown( | |
choices=get_model_options(), | |
label="选择模型", | |
interactive=True, | |
value=CURRENT_MODEL | |
) | |
model_output = gr.Textbox(label="模型设置结果", interactive=False) | |
load_status = gr.Textbox(label="加载状态", interactive=False) | |
load_file = gr.File(label="选择加载文件", file_count="single", file_types=[".json"]) | |
load_button = gr.Button("加载进度") | |
with gr.Tab("开始"): | |
gr.Markdown("生成大纲->大纲标签->生成章节->状态标签->生成章节") | |
user_idea_text = gr.Textbox( | |
"架空历史:\n\n1. 选择关键历史节点并改变\n2. 描述由此引发的历史走向变化\n3. 塑造新兴历史人物(可改编或原创)\n4. 构建独特社会结构、文化和思潮\n5. 想象可能出现的科技创新\n6. 概述世界格局重塑(国界、政体、国际关系)\n7. 用生动叙事呈现,让读者身临其境\n\n目标:创造合理又富想象力的平行宇宙,既有趣又引人深思。", | |
label="想法", | |
lines=16, | |
interactive=True, | |
) | |
user_requriments_text = gr.Textbox( | |
"1. 语言要求:\n - 不直白\n - 句式多变\n - 避免陈词滥调\n - 使用不寻常的词句\n - 运用隐喻和象征\n2. 创作风格:\n - 抽象\n - 富有意境和想象力\n - 具创意个性\n - 有力度\n - 画面感强\n - 音乐感佳\n - 浪漫气息浓厚\n - 语言深邃\n3. 表达目标:\n - 传达独特的神秘和魔幻感\n - 探索和反思自我与世界\n - 表达对自己和社会的孤独与关注\n4. 读者体验:有趣、惊奇、新鲜", | |
label="写作要求", | |
lines=6, | |
interactive=True, | |
) | |
gen_ouline_button = gr.Button("生成大纲") | |
with gr.Tab("大纲"): | |
novel_outline_text = gr.Textbox( | |
label="大纲", lines=33, interactive=True | |
) | |
#gen_beginning_button = gr.Button("生成开头") | |
#gen_next_paragraph_button = gr.Button("生成开头") | |
with gr.Tab("状态"): | |
writing_memory_text = gr.Textbox( | |
label="记忆", | |
lines=8, | |
interactive=True, | |
max_lines=8, | |
) | |
writing_plan_text = gr.Textbox(label="计划", lines=6, interactive=True, max_lines=6) | |
temp_setting_text = gr.Textbox( | |
label="临时设定", lines=5, interactive=True, max_lines=5 | |
) | |
gen_next_paragraph_button = gr.Button("生成章节") | |
download_link = gr.HTML() | |
save_button = gr.Button("保存进度") | |
with gr.Tab("导航"): | |
save_content_button = gr.Button("保存正文") | |
current_page = gr.Number(value=1, label="当前页码", interactive=False) | |
prev_button = gr.Button("上一页") | |
next_button = gr.Button("下一页") | |
# 添加章节导航下拉列表 | |
chapter_dropdown = gr.Dropdown(label="章节导航", choices=[], interactive=True) | |
# TODO | |
# gen_next_paragraph_button = gr.Button("撤销生成") | |
#undo_button = gr.Button("撤销生成") | |
# TODO | |
# auto_gen_next_checkbox = gr.Checkbox( | |
# label="自动生成下一段", checked=False, interactive=True | |
# ) | |
with gr.Column(scale=3, elem_id="row2"): | |
chatBox = gr.Chatbot(height=f"80vh", label="输出") | |
with gr.Column(scale=0, elem_id="row3"): | |
novel_content_text = gr.Textbox( | |
label="小说正文", lines=35, interactive=True, show_copy_button=True | |
) | |
# TODO | |
# download_novel_button = gr.Button("下载小说") | |
#gr.Markdown("导航: https://deem.love") | |
prev_button.click( | |
prev_page, | |
inputs=[aign, current_page], | |
outputs=[current_page, novel_content_text], | |
queue=False | |
) | |
next_button.click( | |
next_page, | |
inputs=[aign, current_page], | |
outputs=[current_page, novel_content_text], | |
queue=False | |
) | |
# 添加章节下拉列表的事件处理 | |
chapter_dropdown.change( | |
select_chapter, | |
inputs=[aign, chapter_dropdown], | |
outputs=[current_page, novel_content_text], | |
queue=False | |
) | |
# 添加模型选择的事件处理 | |
model_dropdown.change( | |
set_api_model, | |
inputs=[model_dropdown], | |
outputs=[model_output] | |
) | |
# 修改按钮事件 | |
save_button.click( | |
save_progress, | |
inputs=[aign], | |
outputs=[download_link] | |
) | |
# 修改加载按钮事件,包含章节下拉列表的更新 | |
load_button.click( | |
load_progress, | |
inputs=[aign, load_file], | |
outputs=[aign, load_status, novel_outline_text, novel_content_text, writing_plan_text, temp_setting_text, writing_memory_text, user_idea_text, user_requriments_text, chapter_dropdown] | |
) | |
# 添加保存正文按钮事件 | |
save_content_button.click( | |
save_content, | |
inputs=[novel_content_text], | |
outputs=[download_link] | |
) | |
#undo_button.click( | |
#undo_generation, | |
#[aign, chatBox], | |
#[aign, chatBox, writing_plan_text, temp_setting_text, writing_memory_text, novel_content_text, undo_button], | |
#) | |
gen_ouline_button.click( | |
gen_ouline_button_clicked, | |
[aign, user_idea_text, chatBox], | |
[aign, chatBox, novel_outline_text, gen_ouline_button], | |
) | |
#gen_beginning_button.click( | |
#gen_beginning_button_clicked, | |
#[ | |
#aign, | |
#chatBox, | |
#novel_outline_text, | |
#user_requriments_text, | |
#embellishment_idea_text, | |
#], | |
#[ | |
#aign, | |
#chatBox, | |
#writing_plan_text, | |
#temp_setting_text, | |
#novel_content_text, | |
#gen_beginning_button, | |
#], | |
#) | |
# 修改生成下一段按钮事件,包含章节下拉列表的更新 | |
gen_next_paragraph_button.click( | |
gen_next_paragraph_button_clicked, | |
[ | |
aign, | |
chatBox, | |
user_idea_text, | |
novel_outline_text, | |
writing_memory_text, | |
temp_setting_text, | |
writing_plan_text, | |
user_requriments_text, | |
#embellishment_idea_text, | |
], | |
[ | |
aign, | |
chatBox, | |
writing_plan_text, | |
temp_setting_text, | |
writing_memory_text, | |
novel_content_text, | |
gen_next_paragraph_button, | |
download_link, # 这里添加输出到下载链接 | |
chapter_dropdown | |
], | |
) | |
if __name__ == "__main__": | |
demo.queue() | |
port = int(os.environ.get("PORT", 7860)) | |
demo.launch(server_name="0.0.0.0", server_port=port) |