import random
import threading
import time
import requests
import json
import base64
import os
import gradio as gr
import re
from AIGN import AIGN
from openAI import openAIChatLLM
chatLLM = openAIChatLLM()
STREAM_INTERVAL = 0.2
CURRENT_MODEL = "gpt-4o-mini" # 默认模型
# 在文件开头添加以下函数
def clear_console():
# 根据操作系统类型选择清屏命令
if os.name == 'nt': # Windows
os.system('cls')
else: # Mac and Linux
os.system('clear')
def get_model_options():
try:
response = requests.get("https://api.deem.love/v1/models")
data = response.json()
return [model["id"] for model in data["data"]]
except:
return ["gpt-4o-mini", "claude-3-haiku-20240307"] # 默认选项
def set_api_model(model):
global CURRENT_MODEL
CURRENT_MODEL = model
#return f"{model}"
return f"选择模型成功"
# 修改获取章节名的函数
def get_chapter_name(content):
# 查找最后一个章节标题,支持数字和汉字表示的章节
matches = re.findall(r'(?:##?|)?\s*第([一二三四五六七八九十百千万亿\d]+)章[::]?\s*(.+)', content)
if matches:
chapter_num, chapter_title = matches[-1]
# 如果章节号是数字,直接使用;如果是汉字,转换为数字
if chapter_num.isdigit():
return f"第{chapter_num}章:{chapter_title.strip()}"
else:
# 这里可以添加一个函数来将汉字数字转换为阿拉伯数字
# 为了简单起见,这里仍然使用汉字表示
return f"第{chapter_num}章:{chapter_title.strip()}"
return "未知章节"
# 修改保存进度函数
def save_progress(aign):
chapter_name = get_chapter_name(aign.novel_content)
# 移除文件名中的非法字符
filename = re.sub(r'[\\/*?:"<>|]', '', chapter_name)
filename = f"{filename}.json"
data = {
"novel_outline": aign.novel_outline,
"paragraph_list": aign.paragraph_list,
"novel_content": aign.novel_content,
"writing_plan": aign.writing_plan,
"temp_setting": aign.temp_setting,
"writing_memory": aign.writing_memory,
"user_idea": aign.user_idea,
"user_requriments": aign.user_requriments,
}
json_str = json.dumps(data, ensure_ascii=False, indent=2)
b64 = base64.b64encode(json_str.encode()).decode()
href = f"data:application/json;base64,{b64}"
download_link = f'点击下载进度文件'
return download_link
# 修改加载函数
def load_progress(aign, file):
try:
if file is None:
return aign, "请选择要加载的文件", None, None, None, None, None, None, None, None
# 检查 file 是否为字符串(文件路径)
if isinstance(file, str):
with open(file, 'r', encoding='utf-8') as f:
content = f.read()
# 检查 file 是否有 name 属性(Gradio File 对象)
elif hasattr(file, 'name'):
with open(file.name, 'r', encoding='utf-8') as f:
content = f.read()
else:
return aign, f"无法读取文件", None, None, None, None, None, None, None, None
data = json.loads(content)
aign.novel_outline = data["novel_outline"]
aign.paragraph_list = data["paragraph_list"]
aign.novel_content = data["novel_content"]
aign.writing_plan = data["writing_plan"]
aign.temp_setting = data["temp_setting"]
aign.writing_memory = data["writing_memory"]
aign.user_idea = data["user_idea"]
aign.user_requriments = data["user_requriments"]
#aign.embellishment_idea = data["embellishment_idea"]
aign.update_chapter_list()
chapter_choices = [title for title, _ in aign.chapter_list]
return aign, f"进度已加载", data["novel_outline"], data["novel_content"], data["writing_plan"], data["temp_setting"], data["writing_memory"], data["user_idea"], data["user_requriments"], gr.Dropdown(choices=chapter_choices, value=chapter_choices[-1] if chapter_choices else None)
except Exception as e:
return aign, f"加载失败: {str(e)}", None, None, None, None, None, None, None, None
# 新增保存正文函数
def save_content(content):
if not content:
return "正文为空,无需保存"
b64 = base64.b64encode(content.encode()).decode()
href = f"data:text/plain;base64,{b64}"
download_link = f'点击下载正文'
return download_link
# 添加撤销功能的事件处理
def undo_generation(aign, history):
if aign.undo():
return [
aign,
history,
aign.writing_plan,
aign.temp_setting,
aign.writing_memory,
aign.novel_content,
gr.Button(visible=True),
]
else:
return [
aign,
history + [["系统", "无法撤销,已经是最初状态"]],
aign.writing_plan,
aign.temp_setting,
aign.writing_memory,
aign.novel_content,
gr.Button(visible=True),
]
def display_chapter(chapters, page_num):
if 1 <= page_num <= len(chapters):
chapter_title, chapter_content = chapters[page_num - 1]
return f"{chapter_title}\n\n{chapter_content}"
else:
return "没有更多章节了。"
def prev_page(aign, current_page):
chapters = aign.chapter_list
if current_page > 1:
current_page -= 1
return current_page, display_chapter(chapters, current_page)
def next_page(aign, current_page):
chapters = aign.chapter_list
if current_page < len(chapters):
current_page += 1
return current_page, display_chapter(chapters, current_page)
def select_chapter(aign, chapter_title):
chapters = aign.chapter_list
for i, (title, _) in enumerate(chapters, start=1):
if title == chapter_title:
return i, display_chapter(chapters, i)
return 1, "章节未找到。"
def make_middle_chat():
carrier = threading.Event()
carrier.history = []
def middle_chat(messages, temperature=None, top_p=None):
nonlocal carrier
carrier.history.append([None, ""])
if len(carrier.history) > 20:
carrier.history = carrier.history[-16:]
try:
for resp in chatLLM(
messages, temperature=temperature, top_p=top_p, stream=True, model=CURRENT_MODEL
):
output_text = resp["content"]
total_tokens = resp["total_tokens"]
carrier.history[-1][1] = f"total_tokens: {total_tokens}\n{output_text}"
return {
"content": output_text,
"total_tokens": total_tokens,
}
except Exception as e:
carrier.history[-1][1] = f"Error: {e}"
raise e
return carrier, middle_chat
def gen_ouline_button_clicked(aign, user_idea, history):
clear_console() # 清空命令行
aign.user_idea = user_idea
carrier, middle_chat = make_middle_chat()
carrier.history = [] # 清空历史记录
aign.novel_outline_writer.chatLLM = middle_chat
gen_ouline_thread = threading.Thread(target=aign.genNovelOutline)
gen_ouline_thread.start()
while gen_ouline_thread.is_alive():
yield [
aign,
carrier.history,
aign.novel_outline,
gr.Button(visible=False),
]
time.sleep(STREAM_INTERVAL)
yield [
aign,
carrier.history,
aign.novel_outline,
gr.Button(visible=True),
]
def gen_beginning_button_clicked(
aign, history, novel_outline, user_requriments#, embellishment_idea
):
clear_console() # 清空命令行
aign.novel_outline = novel_outline
aign.user_requriments = user_requriments
#aign.embellishment_idea = embellishment_idea
carrier, middle_chat = make_middle_chat()
carrier.history = [] # 清空历史记录
aign.novel_beginning_writer.chatLLM = middle_chat
#aign.novel_embellisher.chatLLM = middle_chat
gen_beginning_thread = threading.Thread(target=aign.genBeginning)
gen_beginning_thread.start()
while gen_beginning_thread.is_alive():
yield [
aign,
carrier.history,
aign.writing_plan,
aign.temp_setting,
aign.novel_content,
gr.Button(visible=False),
]
time.sleep(STREAM_INTERVAL)
yield [
aign,
carrier.history,
aign.writing_plan,
aign.temp_setting,
aign.novel_content,
gr.Button(visible=True),
]
def gen_next_paragraph_button_clicked(
aign,
history,
user_idea,
novel_outline,
writing_memory,
temp_setting,
writing_plan,
user_requriments,
#embellishment_idea,
):
# 生成保存进度链接
save_link = save_progress(aign) # 在函数开头定义保存进度链接
# 清空命令行
clear_console()
# 更新 AIGN 对象的各个字段
aign.user_idea = user_idea
aign.novel_outline = novel_outline
aign.writing_memory = writing_memory
aign.temp_setting = temp_setting
aign.writing_plan = writing_plan
aign.user_requriments = user_requriments
#aign.embellishment_idea = embellishment_idea
carrier, middle_chat = make_middle_chat()
carrier.history = [] # 清空历史记录
aign.novel_writer.chatLLM = middle_chat
#aign.novel_embellisher.chatLLM = middle_chat
aign.memory_maker.chatLLM = middle_chat
gen_next_paragraph_thread = threading.Thread(target=aign.genNextParagraph)
gen_next_paragraph_thread.start()
while gen_next_paragraph_thread.is_alive():
# 生成保存进度链接
save_link = save_progress(aign)
aign.update_chapter_list()
# 获取当前章节内容
current_chapter_content = display_chapter(aign.chapter_list, len(aign.chapter_list))
chapter_choices = [title for title, _ in aign.chapter_list]
yield [
aign,
carrier.history,
aign.writing_plan,
aign.temp_setting,
aign.writing_memory,
#aign.novel_content,#全部章节内容
current_chapter_content, # 只更新当前章节内容
gr.Button(visible=False),
save_link, # 返回生成的保存进度链接
gr.Dropdown(choices=chapter_choices, value=chapter_choices[-1] if chapter_choices else None),
]
time.sleep(STREAM_INTERVAL)
# 生成保存进度链接
save_link = save_progress(aign)
aign.update_chapter_list()
# 获取最终的当前章节内容
current_chapter_content = display_chapter(aign.chapter_list, len(aign.chapter_list))
chapter_choices = [title for title, _ in aign.chapter_list]
yield [
aign,
carrier.history,
aign.writing_plan,
aign.temp_setting,
aign.writing_memory,
#aign.novel_content, #全部章节内容
current_chapter_content, # 只更新当前章节内容
gr.Button(visible=True),
save_link, # 返回生成的保存进度链接
gr.Dropdown(choices=chapter_choices, value=chapter_choices[-1] if chapter_choices else None),
]
css = """
#row1 {
min-width: 200px;
height: calc(100vh - 180px);
overflow: auto;
}
#row2 {
min-width: 300px;
height: calc(100vh - 180px);
overflow: auto;
}
#row3 {
min-width: 200px;
height: calc(100vh - 180px);
overflow: auto;
}
"""
with gr.Blocks(css=css) as demo:
aign = gr.State(AIGN(chatLLM))
gr.Markdown("## AI 写小说 Demo")
#gr.Markdown("模型服务出问题会导致反复重试,这时切换到可用模型会继续,只要不刷新网页或者保存了进度(至少生成开头之后保存)就可以无限续写(某些模型智能及上下文有限,到极限会导致内容重复,这些模型在这就不适用了,比如gpt-3.5-turbo,当然接着切换到强模型可以再继续)")
# 添加模型选择下拉框
# with gr.Row(): 同一行
with gr.Row():
with gr.Column(scale=0, elem_id="row1"):
with gr.Tab("⚙"):
model_dropdown = gr.Dropdown(
choices=get_model_options(),
label="选择模型",
interactive=True,
value=CURRENT_MODEL
)
model_output = gr.Textbox(label="模型设置结果", interactive=False)
load_status = gr.Textbox(label="加载状态", interactive=False)
load_file = gr.File(label="选择加载文件", file_count="single", file_types=[".json"])
load_button = gr.Button("加载进度")
with gr.Tab("开始"):
gr.Markdown("生成大纲->大纲标签->生成章节->状态标签->生成章节")
user_idea_text = gr.Textbox(
"架空历史:\n\n1. 选择关键历史节点并改变\n2. 描述由此引发的历史走向变化\n3. 塑造新兴历史人物(可改编或原创)\n4. 构建独特社会结构、文化和思潮\n5. 想象可能出现的科技创新\n6. 概述世界格局重塑(国界、政体、国际关系)\n7. 用生动叙事呈现,让读者身临其境\n\n目标:创造合理又富想象力的平行宇宙,既有趣又引人深思。",
label="想法",
lines=16,
interactive=True,
)
user_requriments_text = gr.Textbox(
"1. 语言要求:\n - 不直白\n - 句式多变\n - 避免陈词滥调\n - 使用不寻常的词句\n - 运用隐喻和象征\n2. 创作风格:\n - 抽象\n - 富有意境和想象力\n - 具创意个性\n - 有力度\n - 画面感强\n - 音乐感佳\n - 浪漫气息浓厚\n - 语言深邃\n3. 表达目标:\n - 传达独特的神秘和魔幻感\n - 探索和反思自我与世界\n - 表达对自己和社会的孤独与关注\n4. 读者体验:有趣、惊奇、新鲜",
label="写作要求",
lines=6,
interactive=True,
)
gen_ouline_button = gr.Button("生成大纲")
with gr.Tab("大纲"):
novel_outline_text = gr.Textbox(
label="大纲", lines=33, interactive=True
)
#gen_beginning_button = gr.Button("生成开头")
#gen_next_paragraph_button = gr.Button("生成开头")
with gr.Tab("状态"):
writing_memory_text = gr.Textbox(
label="记忆",
lines=8,
interactive=True,
max_lines=8,
)
writing_plan_text = gr.Textbox(label="计划", lines=6, interactive=True, max_lines=6)
temp_setting_text = gr.Textbox(
label="临时设定", lines=5, interactive=True, max_lines=5
)
gen_next_paragraph_button = gr.Button("生成章节")
download_link = gr.HTML()
save_button = gr.Button("保存进度")
with gr.Tab("导航"):
save_content_button = gr.Button("保存正文")
current_page = gr.Number(value=1, label="当前页码", interactive=False)
prev_button = gr.Button("上一页")
next_button = gr.Button("下一页")
# 添加章节导航下拉列表
chapter_dropdown = gr.Dropdown(label="章节导航", choices=[], interactive=True)
# TODO
# gen_next_paragraph_button = gr.Button("撤销生成")
#undo_button = gr.Button("撤销生成")
# TODO
# auto_gen_next_checkbox = gr.Checkbox(
# label="自动生成下一段", checked=False, interactive=True
# )
with gr.Column(scale=3, elem_id="row2"):
chatBox = gr.Chatbot(height=f"80vh", label="输出")
with gr.Column(scale=0, elem_id="row3"):
novel_content_text = gr.Textbox(
label="小说正文", lines=35, interactive=True, show_copy_button=True
)
# TODO
# download_novel_button = gr.Button("下载小说")
#gr.Markdown("导航: https://deem.love")
prev_button.click(
prev_page,
inputs=[aign, current_page],
outputs=[current_page, novel_content_text],
queue=False
)
next_button.click(
next_page,
inputs=[aign, current_page],
outputs=[current_page, novel_content_text],
queue=False
)
# 添加章节下拉列表的事件处理
chapter_dropdown.change(
select_chapter,
inputs=[aign, chapter_dropdown],
outputs=[current_page, novel_content_text],
queue=False
)
# 添加模型选择的事件处理
model_dropdown.change(
set_api_model,
inputs=[model_dropdown],
outputs=[model_output]
)
# 修改按钮事件
save_button.click(
save_progress,
inputs=[aign],
outputs=[download_link]
)
# 修改加载按钮事件,包含章节下拉列表的更新
load_button.click(
load_progress,
inputs=[aign, load_file],
outputs=[aign, load_status, novel_outline_text, novel_content_text, writing_plan_text, temp_setting_text, writing_memory_text, user_idea_text, user_requriments_text, chapter_dropdown]
)
# 添加保存正文按钮事件
save_content_button.click(
save_content,
inputs=[novel_content_text],
outputs=[download_link]
)
#undo_button.click(
#undo_generation,
#[aign, chatBox],
#[aign, chatBox, writing_plan_text, temp_setting_text, writing_memory_text, novel_content_text, undo_button],
#)
gen_ouline_button.click(
gen_ouline_button_clicked,
[aign, user_idea_text, chatBox],
[aign, chatBox, novel_outline_text, gen_ouline_button],
)
#gen_beginning_button.click(
#gen_beginning_button_clicked,
#[
#aign,
#chatBox,
#novel_outline_text,
#user_requriments_text,
#embellishment_idea_text,
#],
#[
#aign,
#chatBox,
#writing_plan_text,
#temp_setting_text,
#novel_content_text,
#gen_beginning_button,
#],
#)
# 修改生成下一段按钮事件,包含章节下拉列表的更新
gen_next_paragraph_button.click(
gen_next_paragraph_button_clicked,
[
aign,
chatBox,
user_idea_text,
novel_outline_text,
writing_memory_text,
temp_setting_text,
writing_plan_text,
user_requriments_text,
#embellishment_idea_text,
],
[
aign,
chatBox,
writing_plan_text,
temp_setting_text,
writing_memory_text,
novel_content_text,
gen_next_paragraph_button,
download_link, # 这里添加输出到下载链接
chapter_dropdown
],
)
if __name__ == "__main__":
demo.queue()
port = int(os.environ.get("PORT", 7860))
demo.launch(server_name="0.0.0.0", server_port=port)