import random import threading import time import requests import json import base64 import os import gradio as gr import re from AIGN import AIGN from openAI import openAIChatLLM chatLLM = openAIChatLLM() STREAM_INTERVAL = 0.2 CURRENT_MODEL = "gpt-4o-mini" # 默认模型 # 在文件开头添加以下函数 def clear_console(): # 根据操作系统类型选择清屏命令 if os.name == 'nt': # Windows os.system('cls') else: # Mac and Linux os.system('clear') def get_model_options(): try: response = requests.get("https://api.deem.love/v1/models") data = response.json() return [model["id"] for model in data["data"]] except: return ["gpt-4o-mini", "claude-3-haiku-20240307"] # 默认选项 def set_api_model(model): global CURRENT_MODEL CURRENT_MODEL = model #return f"{model}" return f"选择模型成功" # 修改获取章节名的函数 def get_chapter_name(content): # 查找最后一个章节标题,支持数字和汉字表示的章节 matches = re.findall(r'(?:##?|)?\s*第([一二三四五六七八九十百千万亿\d]+)章[::]?\s*(.+)', content) if matches: chapter_num, chapter_title = matches[-1] # 如果章节号是数字,直接使用;如果是汉字,转换为数字 if chapter_num.isdigit(): return f"第{chapter_num}章:{chapter_title.strip()}" else: # 这里可以添加一个函数来将汉字数字转换为阿拉伯数字 # 为了简单起见,这里仍然使用汉字表示 return f"第{chapter_num}章:{chapter_title.strip()}" return "未知章节" # 修改保存进度函数 def save_progress(aign): chapter_name = get_chapter_name(aign.novel_content) # 移除文件名中的非法字符 filename = re.sub(r'[\\/*?:"<>|]', '', chapter_name) filename = f"{filename}.json" data = { "novel_outline": aign.novel_outline, "paragraph_list": aign.paragraph_list, "novel_content": aign.novel_content, "writing_plan": aign.writing_plan, "temp_setting": aign.temp_setting, "writing_memory": aign.writing_memory, "user_idea": aign.user_idea, "user_requriments": aign.user_requriments, } json_str = json.dumps(data, ensure_ascii=False, indent=2) b64 = base64.b64encode(json_str.encode()).decode() href = f"data:application/json;base64,{b64}" download_link = f'点击下载进度文件' return download_link # 修改加载函数 def load_progress(aign, file): try: if file is None: return aign, "请选择要加载的文件", None, None, None, None, None, None, None, None # 检查 file 是否为字符串(文件路径) if isinstance(file, str): with open(file, 'r', encoding='utf-8') as f: content = f.read() # 检查 file 是否有 name 属性(Gradio File 对象) elif hasattr(file, 'name'): with open(file.name, 'r', encoding='utf-8') as f: content = f.read() else: return aign, f"无法读取文件", None, None, None, None, None, None, None, None data = json.loads(content) aign.novel_outline = data["novel_outline"] aign.paragraph_list = data["paragraph_list"] aign.novel_content = data["novel_content"] aign.writing_plan = data["writing_plan"] aign.temp_setting = data["temp_setting"] aign.writing_memory = data["writing_memory"] aign.user_idea = data["user_idea"] aign.user_requriments = data["user_requriments"] #aign.embellishment_idea = data["embellishment_idea"] aign.update_chapter_list() chapter_choices = [title for title, _ in aign.chapter_list] return aign, f"进度已加载", data["novel_outline"], data["novel_content"], data["writing_plan"], data["temp_setting"], data["writing_memory"], data["user_idea"], data["user_requriments"], gr.Dropdown(choices=chapter_choices, value=chapter_choices[-1] if chapter_choices else None) except Exception as e: return aign, f"加载失败: {str(e)}", None, None, None, None, None, None, None, None # 新增保存正文函数 def save_content(content): if not content: return "正文为空,无需保存" b64 = base64.b64encode(content.encode()).decode() href = f"data:text/plain;base64,{b64}" download_link = f'点击下载正文' return download_link # 添加撤销功能的事件处理 def undo_generation(aign, history): if aign.undo(): return [ aign, history, aign.writing_plan, aign.temp_setting, aign.writing_memory, aign.novel_content, gr.Button(visible=True), ] else: return [ aign, history + [["系统", "无法撤销,已经是最初状态"]], aign.writing_plan, aign.temp_setting, aign.writing_memory, aign.novel_content, gr.Button(visible=True), ] def display_chapter(chapters, page_num): if 1 <= page_num <= len(chapters): chapter_title, chapter_content = chapters[page_num - 1] return f"{chapter_title}\n\n{chapter_content}" else: return "没有更多章节了。" def prev_page(aign, current_page): chapters = aign.chapter_list if current_page > 1: current_page -= 1 return current_page, display_chapter(chapters, current_page) def next_page(aign, current_page): chapters = aign.chapter_list if current_page < len(chapters): current_page += 1 return current_page, display_chapter(chapters, current_page) def select_chapter(aign, chapter_title): chapters = aign.chapter_list for i, (title, _) in enumerate(chapters, start=1): if title == chapter_title: return i, display_chapter(chapters, i) return 1, "章节未找到。" def make_middle_chat(): carrier = threading.Event() carrier.history = [] def middle_chat(messages, temperature=None, top_p=None): nonlocal carrier carrier.history.append([None, ""]) if len(carrier.history) > 20: carrier.history = carrier.history[-16:] try: for resp in chatLLM( messages, temperature=temperature, top_p=top_p, stream=True, model=CURRENT_MODEL ): output_text = resp["content"] total_tokens = resp["total_tokens"] carrier.history[-1][1] = f"total_tokens: {total_tokens}\n{output_text}" return { "content": output_text, "total_tokens": total_tokens, } except Exception as e: carrier.history[-1][1] = f"Error: {e}" raise e return carrier, middle_chat def gen_ouline_button_clicked(aign, user_idea, history): clear_console() # 清空命令行 aign.user_idea = user_idea carrier, middle_chat = make_middle_chat() carrier.history = [] # 清空历史记录 aign.novel_outline_writer.chatLLM = middle_chat gen_ouline_thread = threading.Thread(target=aign.genNovelOutline) gen_ouline_thread.start() while gen_ouline_thread.is_alive(): yield [ aign, carrier.history, aign.novel_outline, gr.Button(visible=False), ] time.sleep(STREAM_INTERVAL) yield [ aign, carrier.history, aign.novel_outline, gr.Button(visible=True), ] def gen_beginning_button_clicked( aign, history, novel_outline, user_requriments#, embellishment_idea ): clear_console() # 清空命令行 aign.novel_outline = novel_outline aign.user_requriments = user_requriments #aign.embellishment_idea = embellishment_idea carrier, middle_chat = make_middle_chat() carrier.history = [] # 清空历史记录 aign.novel_beginning_writer.chatLLM = middle_chat #aign.novel_embellisher.chatLLM = middle_chat gen_beginning_thread = threading.Thread(target=aign.genBeginning) gen_beginning_thread.start() while gen_beginning_thread.is_alive(): yield [ aign, carrier.history, aign.writing_plan, aign.temp_setting, aign.novel_content, gr.Button(visible=False), ] time.sleep(STREAM_INTERVAL) yield [ aign, carrier.history, aign.writing_plan, aign.temp_setting, aign.novel_content, gr.Button(visible=True), ] def gen_next_paragraph_button_clicked( aign, history, user_idea, novel_outline, writing_memory, temp_setting, writing_plan, user_requriments, #embellishment_idea, ): # 生成保存进度链接 save_link = save_progress(aign) # 在函数开头定义保存进度链接 # 清空命令行 clear_console() # 更新 AIGN 对象的各个字段 aign.user_idea = user_idea aign.novel_outline = novel_outline aign.writing_memory = writing_memory aign.temp_setting = temp_setting aign.writing_plan = writing_plan aign.user_requriments = user_requriments #aign.embellishment_idea = embellishment_idea carrier, middle_chat = make_middle_chat() carrier.history = [] # 清空历史记录 aign.novel_writer.chatLLM = middle_chat #aign.novel_embellisher.chatLLM = middle_chat aign.memory_maker.chatLLM = middle_chat gen_next_paragraph_thread = threading.Thread(target=aign.genNextParagraph) gen_next_paragraph_thread.start() while gen_next_paragraph_thread.is_alive(): # 生成保存进度链接 save_link = save_progress(aign) aign.update_chapter_list() # 获取当前章节内容 current_chapter_content = display_chapter(aign.chapter_list, len(aign.chapter_list)) chapter_choices = [title for title, _ in aign.chapter_list] yield [ aign, carrier.history, aign.writing_plan, aign.temp_setting, aign.writing_memory, #aign.novel_content,#全部章节内容 current_chapter_content, # 只更新当前章节内容 gr.Button(visible=False), save_link, # 返回生成的保存进度链接 gr.Dropdown(choices=chapter_choices, value=chapter_choices[-1] if chapter_choices else None), ] time.sleep(STREAM_INTERVAL) # 生成保存进度链接 save_link = save_progress(aign) aign.update_chapter_list() # 获取最终的当前章节内容 current_chapter_content = display_chapter(aign.chapter_list, len(aign.chapter_list)) chapter_choices = [title for title, _ in aign.chapter_list] yield [ aign, carrier.history, aign.writing_plan, aign.temp_setting, aign.writing_memory, #aign.novel_content, #全部章节内容 current_chapter_content, # 只更新当前章节内容 gr.Button(visible=True), save_link, # 返回生成的保存进度链接 gr.Dropdown(choices=chapter_choices, value=chapter_choices[-1] if chapter_choices else None), ] css = """ #row1 { min-width: 200px; height: calc(100vh - 180px); overflow: auto; } #row2 { min-width: 300px; height: calc(100vh - 180px); overflow: auto; } #row3 { min-width: 200px; height: calc(100vh - 180px); overflow: auto; } """ with gr.Blocks(css=css) as demo: aign = gr.State(AIGN(chatLLM)) gr.Markdown("## AI 写小说 Demo") #gr.Markdown("模型服务出问题会导致反复重试,这时切换到可用模型会继续,只要不刷新网页或者保存了进度(至少生成开头之后保存)就可以无限续写(某些模型智能及上下文有限,到极限会导致内容重复,这些模型在这就不适用了,比如gpt-3.5-turbo,当然接着切换到强模型可以再继续)") # 添加模型选择下拉框 # with gr.Row(): 同一行 with gr.Row(): with gr.Column(scale=0, elem_id="row1"): with gr.Tab("⚙"): model_dropdown = gr.Dropdown( choices=get_model_options(), label="选择模型", interactive=True, value=CURRENT_MODEL ) model_output = gr.Textbox(label="模型设置结果", interactive=False) load_status = gr.Textbox(label="加载状态", interactive=False) load_file = gr.File(label="选择加载文件", file_count="single", file_types=[".json"]) load_button = gr.Button("加载进度") with gr.Tab("开始"): gr.Markdown("生成大纲->大纲标签->生成章节->状态标签->生成章节") user_idea_text = gr.Textbox( "架空历史:\n\n1. 选择关键历史节点并改变\n2. 描述由此引发的历史走向变化\n3. 塑造新兴历史人物(可改编或原创)\n4. 构建独特社会结构、文化和思潮\n5. 想象可能出现的科技创新\n6. 概述世界格局重塑(国界、政体、国际关系)\n7. 用生动叙事呈现,让读者身临其境\n\n目标:创造合理又富想象力的平行宇宙,既有趣又引人深思。", label="想法", lines=16, interactive=True, ) user_requriments_text = gr.Textbox( "1. 语言要求:\n - 不直白\n - 句式多变\n - 避免陈词滥调\n - 使用不寻常的词句\n - 运用隐喻和象征\n2. 创作风格:\n - 抽象\n - 富有意境和想象力\n - 具创意个性\n - 有力度\n - 画面感强\n - 音乐感佳\n - 浪漫气息浓厚\n - 语言深邃\n3. 表达目标:\n - 传达独特的神秘和魔幻感\n - 探索和反思自我与世界\n - 表达对自己和社会的孤独与关注\n4. 读者体验:有趣、惊奇、新鲜", label="写作要求", lines=6, interactive=True, ) gen_ouline_button = gr.Button("生成大纲") with gr.Tab("大纲"): novel_outline_text = gr.Textbox( label="大纲", lines=33, interactive=True ) #gen_beginning_button = gr.Button("生成开头") #gen_next_paragraph_button = gr.Button("生成开头") with gr.Tab("状态"): writing_memory_text = gr.Textbox( label="记忆", lines=8, interactive=True, max_lines=8, ) writing_plan_text = gr.Textbox(label="计划", lines=6, interactive=True, max_lines=6) temp_setting_text = gr.Textbox( label="临时设定", lines=5, interactive=True, max_lines=5 ) gen_next_paragraph_button = gr.Button("生成章节") download_link = gr.HTML() save_button = gr.Button("保存进度") with gr.Tab("导航"): save_content_button = gr.Button("保存正文") current_page = gr.Number(value=1, label="当前页码", interactive=False) prev_button = gr.Button("上一页") next_button = gr.Button("下一页") # 添加章节导航下拉列表 chapter_dropdown = gr.Dropdown(label="章节导航", choices=[], interactive=True) # TODO # gen_next_paragraph_button = gr.Button("撤销生成") #undo_button = gr.Button("撤销生成") # TODO # auto_gen_next_checkbox = gr.Checkbox( # label="自动生成下一段", checked=False, interactive=True # ) with gr.Column(scale=3, elem_id="row2"): chatBox = gr.Chatbot(height=f"80vh", label="输出") with gr.Column(scale=0, elem_id="row3"): novel_content_text = gr.Textbox( label="小说正文", lines=35, interactive=True, show_copy_button=True ) # TODO # download_novel_button = gr.Button("下载小说") #gr.Markdown("导航: https://deem.love") prev_button.click( prev_page, inputs=[aign, current_page], outputs=[current_page, novel_content_text], queue=False ) next_button.click( next_page, inputs=[aign, current_page], outputs=[current_page, novel_content_text], queue=False ) # 添加章节下拉列表的事件处理 chapter_dropdown.change( select_chapter, inputs=[aign, chapter_dropdown], outputs=[current_page, novel_content_text], queue=False ) # 添加模型选择的事件处理 model_dropdown.change( set_api_model, inputs=[model_dropdown], outputs=[model_output] ) # 修改按钮事件 save_button.click( save_progress, inputs=[aign], outputs=[download_link] ) # 修改加载按钮事件,包含章节下拉列表的更新 load_button.click( load_progress, inputs=[aign, load_file], outputs=[aign, load_status, novel_outline_text, novel_content_text, writing_plan_text, temp_setting_text, writing_memory_text, user_idea_text, user_requriments_text, chapter_dropdown] ) # 添加保存正文按钮事件 save_content_button.click( save_content, inputs=[novel_content_text], outputs=[download_link] ) #undo_button.click( #undo_generation, #[aign, chatBox], #[aign, chatBox, writing_plan_text, temp_setting_text, writing_memory_text, novel_content_text, undo_button], #) gen_ouline_button.click( gen_ouline_button_clicked, [aign, user_idea_text, chatBox], [aign, chatBox, novel_outline_text, gen_ouline_button], ) #gen_beginning_button.click( #gen_beginning_button_clicked, #[ #aign, #chatBox, #novel_outline_text, #user_requriments_text, #embellishment_idea_text, #], #[ #aign, #chatBox, #writing_plan_text, #temp_setting_text, #novel_content_text, #gen_beginning_button, #], #) # 修改生成下一段按钮事件,包含章节下拉列表的更新 gen_next_paragraph_button.click( gen_next_paragraph_button_clicked, [ aign, chatBox, user_idea_text, novel_outline_text, writing_memory_text, temp_setting_text, writing_plan_text, user_requriments_text, #embellishment_idea_text, ], [ aign, chatBox, writing_plan_text, temp_setting_text, writing_memory_text, novel_content_text, gen_next_paragraph_button, download_link, # 这里添加输出到下载链接 chapter_dropdown ], ) if __name__ == "__main__": demo.queue() port = int(os.environ.get("PORT", 7860)) demo.launch(server_name="0.0.0.0", server_port=port)