""" 多模态大语言模型聊天Demo - 网络优化版本 主要优化: 1. 缓冲机制:积累多个chunk后再yield,减少网络交互次数(50-80%) 2. State更新优化:降低state更新频率,减少数据传输量 3. 超时配置优化:增加代理超时时间,提高网络容错性 4. 图像质量优化:保持原始尺寸和高质量编码,不进行缩放 这些优化可显著改善网络延迟高时的前端卡顿问题,同时保证图像质量。 """ import os import uuid import json import base64 import io import gradio as gr import modelscope_studio.components.antd as antd import modelscope_studio.components.antdx as antdx import modelscope_studio.components.base as ms from openai import OpenAI import requests from typing import Generator, Dict, Any, List, Union import logging import time from PIL import Image import datetime # =========== Configuration # MODEL NAME model = os.getenv("MODEL_NAME") # 代理服务器配置 - 支持多个URL用逗号分隔 PROXY_BASE_URLS = [url.strip() for url in os.getenv("PROXY_API_BASE", "http://localhost:8000").split(",") if url.strip()] PROXY_TIMEOUT = int(os.getenv("PROXY_TIMEOUT", 300)) # 增加超时时间从30秒到60秒 MAX_RETRIES = int(os.getenv("MAX_RETRIES", 5)) # 负载均衡配置 current_proxy_index = 0 # 用于轮询的当前索引 # 保存历史 save_history = True # 保存对话日志 save_conversation = False # =========== Configuration # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # =========== 负载均衡机制 def get_next_proxy_url(): """获取下一个代理URL(轮询方式)""" global current_proxy_index if not PROXY_BASE_URLS: raise Exception("No proxy URLs configured") proxy_url = PROXY_BASE_URLS[current_proxy_index] current_proxy_index = (current_proxy_index + 1) % len(PROXY_BASE_URLS) logger.info(f"Selected proxy URL: {proxy_url} (index: {current_proxy_index-1 if current_proxy_index > 0 else len(PROXY_BASE_URLS)-1})") return proxy_url def get_all_proxy_urls(): """获取所有代理URL列表""" return PROXY_BASE_URLS.copy() # =========== 负载均衡机制 # =========== 对话日志功能 # 创建对话日志文件夹 if save_conversation: CONVERSATION_LOG_DIR = "conversation_logs" os.makedirs(CONVERSATION_LOG_DIR, exist_ok=True) def save_conversation_log(history_messages, assistant_content, metadata=None): """保存对话日志到JSON文件""" if not save_conversation: return try: timestamp = datetime.datetime.now() filename = f"gradio_app_{timestamp.strftime('%Y%m%d_%H%M%S_%f')}.json" filepath = os.path.join(CONVERSATION_LOG_DIR, filename) log_data = { "timestamp": timestamp.isoformat(), "history_messages": history_messages, # 原封不动保存发送给模型的消息 "assistant_content": assistant_content, "metadata": metadata or {} } with open(filepath, 'w', encoding='utf-8') as f: json.dump(log_data, f, ensure_ascii=False, indent=2) logger.info(f"对话日志已保存: {filepath}") except Exception as e: logger.error(f"保存对话日志失败: {str(e)}") # =========== 图像处理工具函数 def encode_image_to_base64(image_path_or_pil: Union[str, Image.Image]) -> str: """将图像文件或PIL图像对象转换为base64编码字符串""" try: if isinstance(image_path_or_pil, str): # 如果是文件路径 with open(image_path_or_pil, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') else: # 如果是PIL图像对象,尽量保持原始格式和质量 buffer = io.BytesIO() # 检测原始格式,优先保持原格式 original_format = getattr(image_path_or_pil, 'format', None) if image_path_or_pil.mode == 'RGBA': # 如果是RGBA模式且原格式支持透明度,优先保存为PNG if original_format in ['PNG', 'WEBP'] or original_format is None: image_path_or_pil.save(buffer, format="PNG") # PNG无损保存 else: # 否则转换为RGB并保存为高质量JPEG rgb_image = Image.new('RGB', image_path_or_pil.size, (255, 255, 255)) rgb_image.paste(image_path_or_pil, mask=image_path_or_pil.split()[-1]) rgb_image.save(buffer, format="JPEG", quality=95) else: # 非RGBA模式,根据原格式选择保存方式 if original_format == 'PNG': image_path_or_pil.save(buffer, format="PNG") # PNG无损保存 elif original_format in ['WEBP', 'BMP', 'TIFF']: # 其他格式转为高质量JPEG image_path_or_pil.save(buffer, format="JPEG", quality=95) else: # 默认保存为高质量JPEG image_path_or_pil.save(buffer, format="JPEG", quality=95) image_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') return image_base64 except Exception as e: logger.error(f"Error encoding image to base64: {str(e)}") raise def create_multimodal_content(text: str, images: List[Union[str, Image.Image]] = None) -> List[Dict]: """创建多模态内容格式,兼容OpenAI API""" content = [] # 添加文本内容 if text and text.strip(): content.append({ "type": "text", "text": text }) # 添加图像内容 if images: for i, image in enumerate(images): try: image_base64 = encode_image_to_base64(image) content.append({ "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image_base64}" } }) logger.info(f"Added image {i+1}/{len(images)} to multimodal content") except Exception as e: logger.error(f"Failed to process image {i+1}: {str(e)}") continue return content if content else [{"type": "text", "text": text or ""}] def convert_images_to_base64_list(images: List[Union[str, Image.Image]]) -> List[str]: """将图片列表转换为base64字符串列表,用于持久化存储""" base64_images = [] for i, image in enumerate(images): try: base64_str = encode_image_to_base64(image) base64_images.append(base64_str) logger.info(f"Converted image {i+1}/{len(images)} to base64 for storage") except Exception as e: logger.error(f"Failed to convert image {i+1} to base64: {str(e)}") continue return base64_images def restore_images_from_base64_list(base64_images: List[str]) -> List[Image.Image]: """从base64字符串列表恢复图片对象""" images = [] for i, base64_str in enumerate(base64_images): try: image_data = base64.b64decode(base64_str) image = Image.open(io.BytesIO(image_data)) images.append(image) logger.info(f"Restored image {i+1}/{len(base64_images)} from base64") except Exception as e: logger.error(f"Failed to restore image {i+1} from base64: {str(e)}") continue return images class DeltaObject: """模拟OpenAI Delta对象""" def __init__(self, data: dict): self.content = data.get('content') self.role = data.get('role') class ChoiceObject: """模拟OpenAI Choice对象""" def __init__(self, choice_data: dict): delta_data = choice_data.get('delta', {}) self.delta = DeltaObject(delta_data) self.finish_reason = choice_data.get('finish_reason') self.index = choice_data.get('index', 0) class ChunkObject: """模拟OpenAI Chunk对象""" def __init__(self, chunk_data: dict): choices_data = chunk_data.get('choices', []) self.choices = [ChoiceObject(choice) for choice in choices_data] self.id = chunk_data.get('id', '') self.object = chunk_data.get('object', 'chat.completion.chunk') self.created = chunk_data.get('created', 0) self.model = chunk_data.get('model', '') class ProxyClient: """代理客户端,用于与中间服务通信,支持负载均衡""" def __init__(self, timeout: int = 30): self.timeout = timeout self.session = requests.Session() def chat_completions_create(self, model: str, messages: list, stream: bool = True, **kwargs): """创建聊天完成请求 - 使用负载均衡选择代理""" base_url = get_next_proxy_url().rstrip('/') # 动态获取下一个代理URL if base_url.endswith('/v1'): url = f"{base_url}/chat/completions" else: url = f"{base_url}/v1/chat/completions" payload = { "model": model, "messages": messages, "stream": stream, **kwargs } try: response = self.session.post( url, json=payload, stream=stream, timeout=self.timeout, headers={"Content-Type": "application/json"} ) response.raise_for_status() if stream: return self._parse_stream_response(response) else: return response.json() except requests.exceptions.RequestException as e: logger.error(f"Request failed: {str(e)}") raise Exception(f"Failed to connect to proxy server: {str(e)}") def _parse_stream_response(self, response) -> Generator[ChunkObject, None, None]: """解析流式响应""" try: # 确保响应编码正确 response.encoding = 'utf-8' for line in response.iter_lines(decode_unicode=True): if not line: continue line = line.strip() if line.startswith('data: '): data = line[6:] # 移除 'data: ' 前缀 if data == '[DONE]': break try: chunk_data = json.loads(data) # 检查是否是错误响应 if 'error' in chunk_data: raise Exception(f"Stream error: {chunk_data.get('detail', chunk_data['error'])}") # 创建与OpenAI客户端兼容的响应对象 yield ChunkObject(chunk_data) except json.JSONDecodeError as e: logger.warning(f"Failed to parse JSON: {data}, error: {str(e)}") continue except Exception as e: logger.error(f"Error parsing stream response: {str(e)}") raise def health_check(self, specific_url: str = None) -> dict: """健康检查 - 可以检查特定URL或使用负载均衡选择""" if specific_url: base_url = specific_url.rstrip('/') else: base_url = get_next_proxy_url().rstrip('/') try: url = f"{base_url}/health" response = self.session.get(url, timeout=self.timeout) response.raise_for_status() # 处理空响应体的情况 if response.text.strip(): result = response.json() else: # 如果响应体为空但状态码是200,认为服务健康 logger.info(f"Health check for {base_url} returned empty response with 200 status, assuming healthy") result = {"status": "healthy"} result["proxy_url"] = base_url # 添加代理URL信息 return result except Exception as e: logger.error(f"Health check failed for {base_url}: {str(e)}") return {"status": "unhealthy", "error": str(e), "proxy_url": base_url} def health_check_all(self) -> dict: """检查所有代理服务器的健康状态""" results = {} all_urls = get_all_proxy_urls() for i, url in enumerate(all_urls): results[f"proxy_{i}"] = self.health_check(specific_url=url) # 统计健康状态 healthy_count = sum(1 for result in results.values() if result.get("status") == "healthy") total_count = len(results) return { "overall_status": "healthy" if healthy_count > 0 else "unhealthy", "healthy_proxies": healthy_count, "total_proxies": total_count, "proxy_details": results } # 初始化代理客户端 client = ProxyClient(PROXY_TIMEOUT) # 显示代理配置信息 logger.info("=== 代理服务器配置 ===") logger.info(f"配置的代理服务器数量: {len(PROXY_BASE_URLS)}") for i, url in enumerate(PROXY_BASE_URLS): logger.info(f"代理 {i+1}: {url}") logger.info(f"代理超时时间: {PROXY_TIMEOUT}秒") logger.info(f"最大重试次数: {MAX_RETRIES}") if len(PROXY_BASE_URLS) > 1: logger.info("负载均衡模式: 轮询 (Round-robin)") logger.info("========================") def chat_with_retry(history_messages, max_retries=MAX_RETRIES): """带重试机制的聊天函数,支持代理故障转移""" last_exception = None failed_proxies = set() # 记录失败的代理 # 如果有多个代理,每个代理都尝试一次,然后再进行常规重试 total_proxies = len(PROXY_BASE_URLS) max_proxy_attempts = min(total_proxies, max_retries) for attempt in range(max_retries): try: logger.info(f"Chat attempt {attempt + 1}/{max_retries}") # 如果在前几次尝试中,且有多个代理可用,进行健康检查 if attempt < max_proxy_attempts and total_proxies > 1: # 检查当前要使用的代理是否健康 current_proxy_to_check = PROXY_BASE_URLS[current_proxy_index % total_proxies] if current_proxy_to_check not in failed_proxies: health = client.health_check(specific_url=current_proxy_to_check) if health.get("status") != "healthy": logger.warning(f"Proxy {current_proxy_to_check} is unhealthy, marking as failed") failed_proxies.add(current_proxy_to_check) # 跳过此代理,尝试下一个 continue response = client.chat_completions_create( model=model, messages=history_messages, stream=True, temperature=0.7, top_p=0.9, max_tokens=50000 ) return response except Exception as e: last_exception = e current_failed_proxy = None # 尝试从错误信息中识别失败的代理 try: current_failed_proxy = PROXY_BASE_URLS[(current_proxy_index - 1) % total_proxies] failed_proxies.add(current_failed_proxy) logger.warning(f"Attempt {attempt + 1} failed with proxy {current_failed_proxy}: {str(e)}") except: logger.warning(f"Attempt {attempt + 1} failed: {str(e)}") if attempt < max_retries - 1: # 如果还有未尝试的代理,立即尝试下一个 if len(failed_proxies) < total_proxies and attempt < max_proxy_attempts: logger.info(f"Trying next proxy immediately...") continue else: # 指数退避 wait_time = min(2 ** (attempt - max_proxy_attempts + 1), 4) if attempt >= max_proxy_attempts else 1 logger.info(f"Retrying in {wait_time} seconds...") time.sleep(wait_time) else: logger.error(f"All {max_retries} attempts failed across {len(failed_proxies)}/{total_proxies} proxies") raise last_exception is_modelscope_studio = os.getenv('MODELSCOPE_ENVIRONMENT') == 'studio' def get_text(text: str, cn_text: str): if is_modelscope_studio: return cn_text return text logo_img = os.path.join(os.path.dirname(__file__), "rednote_hilab.png") DEFAULT_CONVERSATIONS_HISTORY = [{"role": "placeholder"}] DEFAULT_LOCALE = 'zh_CN' if is_modelscope_studio else 'en_US' DEFAULT_THEME = { "token": { "colorPrimary": "#6A57FF", } } def format_history(history): messages = [{ "role": "system", "content": "", }] for item in history: if item["role"] == "user": # 支持多模态内容格式 content = item["content"] if isinstance(content, dict): if "multimodal" in content: # 如果是保存的多模态内容,直接使用 messages.append({ "role": "user", "content": content["multimodal"] }) logger.info(f"Added multimodal message with {content.get('images_count', 0)} images to context") elif "images_base64" in content: # 如果有base64图片数据,重新构建多模态内容 text = content.get("text", "") images_base64 = content.get("images_base64", []) if images_base64: # 从base64恢复图片并创建多模态内容 restored_images = restore_images_from_base64_list(images_base64) multimodal_content = create_multimodal_content(text, restored_images) messages.append({ "role": "user", "content": multimodal_content }) logger.info(f"Restored and added multimodal message with {len(restored_images)} images to context") else: # 没有图片,只有文本 messages.append({"role": "user", "content": text}) else: # 如果content是复杂对象,提取text字段 text_content = content.get("text", str(content)) messages.append({"role": "user", "content": text_content}) else: # 传统文本内容 messages.append({"role": "user", "content": content}) elif item["role"] == "assistant": # 助手消息:合并thinking内容和content,保持原始格式 assistant_content = item["content"] or "" # 检查是否有thinking内容需要合并 thinking_content = item.get("meta", {}).get("thinking_content", "") if thinking_content: # 重建完整的原始输出,不添加额外换行符 # thinking_content 和 assistant_content 都已包含原始的换行符 full_content = f"{thinking_content}{assistant_content}" else: full_content = assistant_content messages.append({"role": "assistant", "content": full_content}) return messages class Gradio_Events: @staticmethod def _submit(state_value): history = state_value["conversations_history"][ state_value["conversation_id"]] # submit history_messages = format_history(history) history.append({ "role": "assistant", "content": "", "key": str(uuid.uuid4()), "meta": { "reason_content": "", "thinking_content": "", # 添加thinking内容存储 "is_thinking": False, # 添加thinking状态 "thinking_done": False # 添加thinking完成状态 }, "loading": True, }) yield { chatbot: gr.update(items=history), state: gr.update(value=state_value), } try: response = chat_with_retry(history_messages) thought_done = False in_thinking = False accumulated_content = "" # 缓冲逻辑变量 buffer_content = "" # 临时缓冲内容 last_yield_time = time.time() chunk_count = 0 state_update_count = 0 # state更新计数器 BUFFER_INTERVAL = 0.5 # 秒 - 缓冲时间间隔,减少网络交互频率 BUFFER_CHUNKS = 5 # 每5个chunk强制yield - 平衡实时性和性能 STATE_UPDATE_INTERVAL = 3 # 每3次yield更新一次state - 减少state传输频率 # 优化state更新策略:减少不必要的历史数据传输 for chunk in response: # 安全地访问chunk属性 if chunk.choices and len(chunk.choices) > 0: content = chunk.choices[0].delta.content else: content = None raise ValueError('Content is None') history[-1]["loading"] = False print(content, end='') if content: accumulated_content += content buffer_content += content # 添加到缓冲 chunk_count += 1 # 检查是否进入thinking模式 if "" in accumulated_content and not in_thinking: in_thinking = True history[-1]["meta"]["is_thinking"] = True # 提取thinking标签之前的内容并保存 before_think = accumulated_content.split("")[0] if before_think.strip(): # 保存thinking之前的内容 history[-1]["content"] = before_think # 重置accumulated_content为thinking标签后的内容 think_parts = accumulated_content.split("", 1) if len(think_parts) > 1: accumulated_content = think_parts[1] else: accumulated_content = "" # 立即yield thinking状态变化,这种重要状态变化总是需要更新state yield { chatbot: gr.update(items=history), state: gr.update(value=state_value) } buffer_content = "" # 重置缓冲 last_yield_time = time.time() chunk_count = 0 continue # 检查是否退出thinking模式 if "" in accumulated_content and in_thinking: in_thinking = False history[-1]["meta"]["is_thinking"] = False history[-1]["meta"]["thinking_done"] = True history[-1]["meta"]["just_finished_thinking"] = True # 标记刚完成thinking # 分离thinking内容和后续内容 think_parts = accumulated_content.split("", 1) thinking_content = think_parts[0] history[-1]["meta"]["thinking_content"] = thinking_content # 处理thinking后的内容 - 追加而不是覆盖 if len(think_parts) > 1: after_think_content = think_parts[1] if after_think_content.strip(): # 如果之前已有内容,则追加;否则直接设置 current_content = history[-1]["content"] or "" history[-1]["content"] = current_content + after_think_content accumulated_content = "" # 重置累积内容 # 立即yield thinking完成状态,这种重要状态变化总是需要更新state yield { chatbot: gr.update(items=history), state: gr.update(value=state_value) } buffer_content = "" # 重置缓冲 last_yield_time = time.time() chunk_count = 0 continue # 缓冲检查:时间或chunk数达到时 yield current_time = time.time() should_yield = False if (current_time - last_yield_time >= BUFFER_INTERVAL) or (chunk_count >= BUFFER_CHUNKS): should_yield = True # 在thinking模式中,更新thinking内容 if in_thinking: # 检查是否包含完整的thinking结束标签 if "" not in accumulated_content: history[-1]["meta"]["thinking_content"] = accumulated_content if should_yield: state_update_count += 1 # 条件更新state:只在特定间隔更新 should_update_state = (state_update_count % STATE_UPDATE_INTERVAL == 0) yield { chatbot: gr.update(items=history), state: gr.update(value=state_value) if should_update_state else gr.skip() } buffer_content = "" last_yield_time = current_time chunk_count = 0 else: # 如果不在thinking模式中,正常添加内容到content if not thought_done: thought_done = True if not history[-1]["content"]: # 如果content为空才初始化 history[-1]["content"] = "" # 应用缓冲内容到history if should_yield: # 将缓冲的内容添加到content中 history[-1]["content"] += buffer_content # 清除"刚完成thinking"标记,因为现在在正常输出内容 if history[-1]["meta"].get("just_finished_thinking"): history[-1]["meta"]["just_finished_thinking"] = False state_update_count += 1 # 条件更新state:只在特定间隔更新 should_update_state = (state_update_count % STATE_UPDATE_INTERVAL == 0) yield { chatbot: gr.update(items=history), state: gr.update(value=state_value) if should_update_state else gr.skip() } # 重置缓冲 buffer_content = "" last_yield_time = current_time chunk_count = 0 else: # 不yield,但需要更新content以保持逻辑一致性 # 注意:这里不直接添加content,而是等待缓冲yield时一起添加 pass # 循环结束后,处理剩余的缓冲内容 if buffer_content: if in_thinking: # 如果还在thinking模式中,更新thinking内容 history[-1]["meta"]["thinking_content"] = accumulated_content else: # 如果不在thinking模式中,添加剩余内容 if not history[-1]["content"]: history[-1]["content"] = "" history[-1]["content"] += buffer_content # 清除"刚完成thinking"标记 if history[-1]["meta"].get("just_finished_thinking"): history[-1]["meta"]["just_finished_thinking"] = False # 最终yield,确保所有内容都被发送并强制更新state yield { chatbot: gr.update(items=history), state: gr.update(value=state_value) # 最终总是更新state } history[-1]["meta"]["end"] = True print("Answer: ", history[-1]["content"]) # 保存对话日志(如果启用) if save_conversation: # 获取用户消息(倒数第二个消息) user_message = None for i in range(len(history) - 2, -1, -1): if history[i]["role"] == "user": user_message = history[i] break if user_message: save_conversation_log( history_messages=history_messages, # 这是发送给模型的原始数据 assistant_content=history[-1]["content"], metadata={ "model": model, "proxy_base_urls": PROXY_BASE_URLS, "conversation_id": state_value["conversation_id"], "thinking_content": history[-1]["meta"].get("thinking_content", ""), "has_thinking": bool(history[-1]["meta"].get("thinking_content")) } ) except Exception as e: history[-1]["loading"] = False history[-1]["meta"]["end"] = True history[-1]["meta"]["error"] = True history[-1]["content"] = "Failed to respond, please try again." yield { chatbot: gr.update(items=history), state: gr.update(value=state_value) } print('Error: ',e) raise e @staticmethod def submit(sender_value, state_value): if not state_value["conversation_id"]: random_id = str(uuid.uuid4()) history = [] state_value["conversation_id"] = random_id state_value["conversations_history"][random_id] = history # 使用文本内容作为对话标签 label = sender_value if isinstance(sender_value, str) else "New Chat" state_value["conversations"].append({ "label": label, "key": random_id }) history = state_value["conversations_history"][ state_value["conversation_id"]] # 处理多模态内容 uploaded_images = state_value.get("uploaded_images", []) if uploaded_images: # 创建多模态内容 multimodal_content = create_multimodal_content(sender_value, uploaded_images) # 转换图片为base64用于持久化存储 images_base64 = convert_images_to_base64_list(uploaded_images) message_content = { "text": sender_value, "images_count": len(uploaded_images), # 保存图片数量 "images_base64": images_base64, # 保存base64图片数据 "multimodal": multimodal_content # 用于API调用的多模态内容 } logger.info(f"Saving message with {len(uploaded_images)} images to history") # 清空已上传的图片 state_value["uploaded_images"] = [] state_value["image_file_paths"] = [] else: # 纯文本内容 message_content = sender_value history.append({ "role": "user", "meta": {}, "key": str(uuid.uuid4()), "content": message_content }) # preprocess submit yield Gradio_Events.preprocess_submit()(state_value) try: for chunk in Gradio_Events._submit(state_value): yield chunk except Exception as e: raise e finally: # postprocess submit - 包括清空图片上传组件 yield Gradio_Events.postprocess_submit(state_value) @staticmethod def regenerate_message(state_value, e: gr.EventData): conversation_key = e._data["component"]["conversationKey"] history = state_value["conversations_history"][ state_value["conversation_id"]] index = -1 for i, conversation in enumerate(history): if conversation["key"] == conversation_key: index = i break if index == -1: yield gr.skip() history = history[:index] state_value["conversations_history"][ state_value["conversation_id"]] = history yield { chatbot:gr.update(items=history), state: gr.update(value=state_value) } # preprocess submit yield Gradio_Events.preprocess_submit(clear_input=False)(state_value) try: for chunk in Gradio_Events._submit(state_value): yield chunk except Exception as e: raise e finally: # postprocess submit yield Gradio_Events.postprocess_submit(state_value) @staticmethod def preprocess_submit(clear_input=True): def preprocess_submit_handler(state_value): history = state_value["conversations_history"][ state_value["conversation_id"]] for conversation in history: if "meta" in conversation: conversation["meta"]["disabled"] = True return { sender: gr.update(value=None, loading=True) if clear_input else gr.update(loading=True), conversations: gr.update(active_key=state_value["conversation_id"], items=list( map( lambda item: { **item, "disabled": True if item["key"] != state_value[ "conversation_id"] else False, }, state_value["conversations"]))), add_conversation_btn: gr.update(disabled=True), clear_btn: gr.update(disabled=True), conversation_delete_menu_item: gr.update(disabled=True), chatbot: gr.update(items=history), state: gr.update(value=state_value), image_upload: gr.update(value=None), # 发送消息时立即清空图片上传组件 green_image_indicator: gr.update(count=0, elem_style=dict(display="block")), # 左侧绿色指示器显示0 trash_button: gr.update(elem_style=dict(display="none")), # 隐藏垃圾桶按钮 stop_btn: gr.update(visible=True) # 显示停止按钮 } return preprocess_submit_handler @staticmethod def postprocess_submit(state_value): history = state_value["conversations_history"][ state_value["conversation_id"]] for conversation in history: if "meta" in conversation: conversation["meta"]["disabled"] = False return { sender: gr.update(loading=False), conversation_delete_menu_item: gr.update(disabled=False), clear_btn: gr.update(disabled=False), conversations: gr.update(items=state_value["conversations"]), add_conversation_btn: gr.update(disabled=False), chatbot: gr.update(items=history), state: gr.update(value=state_value), stop_btn: gr.update(visible=False) # 隐藏停止按钮 } @staticmethod def cancel(state_value): history = state_value["conversations_history"][ state_value["conversation_id"]] history[-1]["loading"] = False history[-1]["meta"]["end"] = True history[-1]["meta"]["canceled"] = True return Gradio_Events.postprocess_submit(state_value) @staticmethod def delete_message(state_value, e: gr.EventData): conversation_key = e._data["component"]["conversationKey"] history = state_value["conversations_history"][ state_value["conversation_id"]] history = [item for item in history if item["key"] != conversation_key] state_value["conversations_history"][ state_value["conversation_id"]] = history return gr.update(items=history if len(history) > 0 else DEFAULT_CONVERSATIONS_HISTORY), gr.update( value=state_value) @staticmethod def edit_message(state_value, e: gr.EventData): conversation_key = e._data["component"]["conversationKey"] history = state_value["conversations_history"][ state_value["conversation_id"]] index = -1 for i, conversation in enumerate(history): if conversation["key"] == conversation_key: index = i break if index == -1: return gr.skip() state_value["editing_message_index"] = index text = '' if isinstance(history[index]["content"], str): text = history[index]["content"] else: text = history[index]["content"]["text"] return gr.update(value=text), gr.update(value=state_value) @staticmethod def confirm_edit_message(edit_textarea_value, state_value): history = state_value["conversations_history"][ state_value["conversation_id"]] message = history[state_value["editing_message_index"]] if isinstance(message["content"], str): message["content"] = edit_textarea_value else: message["content"]["text"] = edit_textarea_value return gr.update(items=history), gr.update(value=state_value) @staticmethod def select_suggestion(sender_value, e: gr.EventData): return gr.update(value=sender_value[:-1] + e._data["payload"][0]) @staticmethod def new_chat(state_value): if not state_value["conversation_id"]: return gr.skip() state_value["conversation_id"] = "" # 清空上传的图片(修复新对话图片泄露bug) state_value["uploaded_images"] = [] state_value["image_file_paths"] = [] return gr.update(active_key=state_value["conversation_id"]), gr.update( items=DEFAULT_CONVERSATIONS_HISTORY), gr.update(value=state_value) @staticmethod def select_conversation(state_value, e: gr.EventData): active_key = e._data["payload"][0] if state_value["conversation_id"] == active_key or ( active_key not in state_value["conversations_history"]): return gr.skip() state_value["conversation_id"] = active_key # 切换对话时清空上传的图片(避免图片泄露到其他对话) state_value["uploaded_images"] = [] state_value["image_file_paths"] = [] return gr.update(active_key=active_key), gr.update( items=state_value["conversations_history"][active_key]), gr.update( value=state_value) @staticmethod def click_conversation_menu(state_value, e: gr.EventData): conversation_id = e._data["payload"][0]["key"] operation = e._data["payload"][1]["key"] if operation == "delete": del state_value["conversations_history"][conversation_id] state_value["conversations"] = [ item for item in state_value["conversations"] if item["key"] != conversation_id ] if state_value["conversation_id"] == conversation_id: state_value["conversation_id"] = "" # 删除当前对话时清空上传的图片 state_value["uploaded_images"] = [] state_value["image_file_paths"] = [] return gr.update( items=state_value["conversations"], active_key=state_value["conversation_id"]), gr.update( items=DEFAULT_CONVERSATIONS_HISTORY), gr.update( value=state_value) else: return gr.update( items=state_value["conversations"]), gr.skip(), gr.update( value=state_value) return gr.skip() @staticmethod def clear_conversation_history(state_value): if not state_value["conversation_id"]: return gr.skip() state_value["conversations_history"][ state_value["conversation_id"]] = [] # 清空对话历史时也清空上传的图片 state_value["uploaded_images"] = [] state_value["image_file_paths"] = [] return gr.update(items=DEFAULT_CONVERSATIONS_HISTORY), gr.update( value=state_value) @staticmethod def close_modal(): return gr.update(open=False) @staticmethod def open_modal(): return gr.update(open=True) @staticmethod def update_browser_state(state_value): return gr.update(value=dict( conversations=state_value["conversations"], conversations_history=state_value["conversations_history"])) @staticmethod def apply_browser_state(browser_state_value, state_value): state_value["conversations"] = browser_state_value["conversations"] state_value["conversations_history"] = browser_state_value[ "conversations_history"] return gr.update( items=browser_state_value["conversations"]), gr.update( value=state_value) @staticmethod def handle_image_upload(files, state_value): """处理图片上传 - 支持拖拽和粘贴功能""" logger.info(f"handle_image_upload called with files: {files}, type: {type(files)}") if not files: # 没有文件时重置为默认状态 logger.info("No files provided, resetting to default state") return ( gr.update(value=state_value), gr.update(count=0, elem_style=dict(display="block")), # 左侧绿色指示器显示0 gr.update(elem_style=dict(display="none")), # 隐藏垃圾桶按钮 ) # 显示上传中状态 logger.info("Upload in progress...") try: # 处理上传的文件 uploaded_images = [] image_file_paths = [] # 确保files是列表格式 if not isinstance(files, list): files = [files] if files else [] for i, file_info in enumerate(files): logger.info(f"Processing file {i}: {file_info}, type: {type(file_info)}") file_path = None if isinstance(file_info, dict): # 如果是文件信息字典(Gradio上传格式) file_path = file_info.get('name') or file_info.get('path') logger.info(f"Extracted path from dict: {file_path}") elif isinstance(file_info, str): # 如果直接是文件路径 file_path = file_info logger.info(f"Direct file path: {file_path}") elif hasattr(file_info, 'name') and hasattr(file_info, 'read'): # 如果是文件对象(拖拽/粘贴可能产生) logger.info(f"File object detected: {file_info.name if hasattr(file_info, 'name') else 'unnamed'}") # 对于文件对象,我们需要特殊处理 try: if hasattr(file_info, 'name'): file_path = file_info.name else: # 创建临时文件名 import tempfile with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp_file: if hasattr(file_info, 'read'): tmp_file.write(file_info.read()) file_path = tmp_file.name logger.info(f"Created temporary file: {file_path}") except Exception as file_error: logger.error(f"Error processing file object: {str(file_error)}") continue else: logger.warning(f"Unknown file format: {type(file_info)}") continue if file_path: try: # 保存文件路径 image_file_paths.append(file_path) logger.info(f"Added to image_file_paths: {file_path}") # 使用PIL加载图片 image = Image.open(file_path) logger.info(f"Loaded image with size: {image.size} (原始尺寸,不进行缩放)") # 保持原始图片尺寸,不进行任何缩放处理 uploaded_images.append(image) except Exception as img_error: logger.error(f"Error processing image {file_path}: {str(img_error)}") continue # 替换而不是追加图片(修复累积bug) state_value["uploaded_images"] = uploaded_images state_value["image_file_paths"] = image_file_paths logger.info(f"Successfully uploaded {len(uploaded_images)} images via drag/paste/upload") # 显示状态指示器,显示图片数量 return ( gr.update(value=state_value), gr.update(count=len(uploaded_images), elem_style=dict(display="block")), # 左侧绿色指示器 gr.update(elem_style=dict(display="block")), # 显示垃圾桶按钮 ) except Exception as e: logger.error(f"Error handling image upload: {str(e)}") import traceback logger.error(f"Full traceback: {traceback.format_exc()}") return ( gr.update(value=state_value), gr.update(count=0, elem_style=dict(display="block")), # 左侧绿色指示器显示0 gr.update(elem_style=dict(display="none")), # 隐藏垃圾桶按钮 ) @staticmethod def clear_images(state_value): """清空上传的图片""" state_value["uploaded_images"] = [] state_value["image_file_paths"] = [] logger.info("Cleared all uploaded images") return ( gr.update(value=state_value), gr.update(count=0, elem_style=dict(display="block")), # 左侧绿色指示器显示0 gr.update(elem_style=dict(display="none")), # 隐藏垃圾桶按钮 gr.update(value=None), # 清空图片上传组件 ) css = """ .gradio-container { padding: 0 !important; } .gradio-container > main.fillable { padding: 0 !important; } #chatbot { height: calc(100vh - 21px - 16px); } #chatbot .chatbot-conversations { height: 100%; background-color: var(--ms-gr-ant-color-bg-layout); } #chatbot .chatbot-conversations .chatbot-conversations-list { padding-left: 0; padding-right: 0; } #chatbot .chatbot-chat { padding: 32px; height: 100%; } @media (max-width: 768px) { #chatbot .chatbot-chat { padding: 0; } } #chatbot .chatbot-chat .chatbot-chat-messages { flex: 1; } #chatbot .chatbot-chat .chatbot-chat-messages .chatbot-chat-message .chatbot-chat-message-footer { visibility: hidden; opacity: 0; transition: opacity 0.2s; } #chatbot .chatbot-chat .chatbot-chat-message:last-child .chatbot-chat-message-footer { visibility: visible; opacity: 1; } #chatbot .chatbot-chat .chatbot-chat-message:hover .chatbot-chat-message-footer { visibility: visible; opacity: 1; } /* Thinking区域样式 */ .thinking-content .ant-collapse { background: linear-gradient(135deg, #f8f9fc 0%, #f2f5f8 100%); border: 1px solid #e1e8ed; border-radius: 8px; margin-bottom: 12px; } .thinking-content .ant-collapse > .ant-collapse-item > .ant-collapse-header { padding: 8px 12px; font-size: 13px; color: #5a6c7d; font-weight: 500; } .thinking-content .ant-collapse-content > .ant-collapse-content-box { padding: 12px; background: #fafbfc; border-radius: 0 0 6px 6px; font-size: 13px; color: #667788; line-height: 1.5; white-space: pre-wrap; font-family: 'SF Mono', 'Monaco', 'Inconsolata', 'Roboto Mono', monospace; } .thinking-content .ant-collapse-content-box .markdown-body { font-size: 13px; line-height: 1.5; color: #667788; } .thinking-content .ant-collapse-content-box pre { background: #f6f8fa; padding: 8px; border-radius: 4px; overflow: auto; } .thinking-content .ant-collapse-content-box h1, .thinking-content .ant-collapse-content-box h2, .thinking-content .ant-collapse-content-box h3, .thinking-content .ant-collapse-content-box h4, .thinking-content .ant-collapse-content-box h5, .thinking-content .ant-collapse-content-box h6 { margin-top: 16px; margin-bottom: 8px; font-weight: 600; } .thinking-content .ant-collapse-content-box ul, .thinking-content .ant-collapse-content-box ol { margin: 8px 0; padding-left: 20px; } .thinking-content .ant-collapse-content-box li { margin: 4px 0; } .thinking-content .ant-collapse-content-box code { background: #f1f3f4; padding: 2px 4px; border-radius: 3px; font-size: 85%; } /* 图片预览和展示样式 */ .image-preview-container { background: #fafafa; border: 1px solid #d9d9d9; border-radius: 8px; padding: 12px; margin-bottom: 12px; } .image-gallery img { transition: all 0.2s ease; border-radius: 4px; } .image-gallery img:hover { transform: scale(1.05); box-shadow: 0 4px 12px rgba(0, 0, 0, 0.15); z-index: 10; position: relative; } .image-thumbnail { position: relative; display: inline-block; margin: 4px; border-radius: 6px; overflow: hidden; box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1); transition: all 0.2s ease; } .image-thumbnail:hover { box-shadow: 0 4px 16px rgba(0, 0, 0, 0.2); transform: translateY(-2px); } .image-upload-preview { background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); border: 2px dashed #d9d9d9; border-radius: 8px; padding: 16px; margin-bottom: 16px; text-align: center; transition: all 0.3s ease; } .image-upload-preview.has-images { border-style: solid; border-color: #6A57FF; background: linear-gradient(135deg, #f6f9fc 0%, #f0f4f8 100%); } /* 拖拽区域样式 */ .drop-zone { position: relative; transition: all 0.3s ease; } .drop-zone.drag-over { background: linear-gradient(135deg, #e6f7ff 0%, #d6f7ff 100%); border: 2px dashed #1890ff; border-radius: 8px; } .drop-zone.drag-over::before { content: "释放以上传图片"; position: absolute; top: 50%; left: 50%; transform: translate(-50%, -50%); background: rgba(24, 144, 255, 0.9); color: white; padding: 12px 24px; border-radius: 6px; font-size: 16px; font-weight: 500; z-index: 1000; pointer-events: none; } /* 响应式图片展示 */ @media (max-width: 768px) { .image-gallery img { width: 80px !important; height: 60px !important; } .image-thumbnail { width: 80px; height: 60px; } } /* 图片加载动画 */ @keyframes imageLoad { from { opacity: 0; transform: scale(0.8); } to { opacity: 1; transform: scale(1); } } .image-gallery img { animation: imageLoad 0.3s ease; } /* 粘贴提示样式 */ .paste-hint { position: fixed; top: 20px; right: 20px; background: rgba(24, 144, 255, 0.9); color: white; padding: 8px 16px; border-radius: 6px; font-size: 14px; z-index: 1001; opacity: 0; transform: translateY(-10px); transition: all 0.3s ease; } .paste-hint.show { opacity: 1; transform: translateY(0); } """ # 添加JavaScript代码来处理拖拽和粘贴 drag_and_paste_js = """ """ def logo(): with antd.Typography.Title(level=1, elem_style=dict(fontSize=24, padding=8, margin=0)): with antd.Flex(align="center", gap="small", justify="center"): antd.Image(logo_img, preview=False, alt="logo", width=24, height=24) ms.Span("dots.vlm1.inst") with gr.Blocks(css=css, fill_width=True, head=drag_and_paste_js) as demo: state = gr.State({ "conversations_history": {}, "conversations": [], "conversation_id": "", "editing_message_index": -1, "uploaded_images": [], # 存储当前上传的图片 "image_file_paths": [], # 存储图片文件路径用于预览 }) with ms.Application(), antdx.XProvider( theme=DEFAULT_THEME, locale=DEFAULT_LOCALE), ms.AutoLoading(): with antd.Row(gutter=[20, 20], wrap=False, elem_id="chatbot"): # Left Column with antd.Col(md=dict(flex="0 0 260px", span=24, order=0), span=0, order=1, elem_classes="chatbot-conversations", elem_style=dict( maxWidth="260px", minWidth="260px", overflow="hidden")): with antd.Flex(vertical=True, gap="small", elem_style=dict(height="100%", width="100%", minWidth="0")): # Logo logo() # New Conversation Button with antd.Button(value=None, color="primary", variant="filled", block=True, elem_style=dict(maxWidth="100%")) as add_conversation_btn: ms.Text(get_text("New Conversation", "新建对话")) with ms.Slot("icon"): antd.Icon("PlusOutlined") # Conversations List with antdx.Conversations( elem_classes="chatbot-conversations-list", elem_style=dict( width="100%", minWidth="0", overflow="hidden", flex="1" ) ) as conversations: with ms.Slot('menu.items'): with antd.Menu.Item( label="Delete", key="delete", danger=True ) as conversation_delete_menu_item: with ms.Slot("icon"): antd.Icon("DeleteOutlined") # Right Column with antd.Col(flex=1, elem_style=dict(height="100%")): with antd.Flex(vertical=True, gap="middle", elem_classes="chatbot-chat"): # Chatbot with antdx.Bubble.List( items=DEFAULT_CONVERSATIONS_HISTORY, elem_classes="chatbot-chat-messages") as chatbot: # Define Chatbot Roles with ms.Slot("roles"): # Placeholder Role with antdx.Bubble.List.Role( role="placeholder", styles=dict(content=dict(width="100%")), variant="borderless"): with ms.Slot("messageRender"): with antd.Space( direction="vertical", size=16, elem_style=dict(width="100%")): with antdx.Welcome( styles=dict(icon=dict( flexShrink=0)), variant="borderless", title=get_text( "Hello, I'm dots.", "你好,我是 dots."), description=get_text( "", ""), ): with ms.Slot("icon"): antd.Image(logo_img, preview=False) # User Role with antdx.Bubble.List.Role( role="user", placement="end", elem_classes="chatbot-chat-message", class_names=dict( footer="chatbot-chat-message-footer"), styles=dict(content=dict( maxWidth="100%", overflow='auto', ))): with ms.Slot( "messageRender", params_mapping="""(content) => { // 检查多种图片存储格式 let imageCount = 0; let textContent = ''; let imagesBase64 = []; if (typeof content === 'object') { // 新格式:检查 images_count if (content.images_count && content.images_count > 0) { imageCount = content.images_count; textContent = content.text || ''; imagesBase64 = content.images_base64 || []; } // 旧格式:检查 images 数组 else if (content.images && content.images.length > 0) { imageCount = content.images.length; textContent = content.text || ''; imagesBase64 = content.images || []; } // 纯文本格式 else { textContent = content.text || content; } } else { // 字符串格式 textContent = content; } if (imageCount > 0 && imagesBase64.length > 0) { const imageHtml = imagesBase64.map((base64, index) => `Image ${index + 1}` ).join(''); return { image_info: { style: { marginBottom: '8px', fontSize: '13px', color: '#666' }, value: `📷 包含 ${imageCount} 张图片` }, text_content: { value: textContent }, image_gallery: { value: `
${imageHtml}
` } }; } return { text_content: { value: textContent }, image_info: { style: { display: 'none' } }, image_gallery: { value: '' } }; }"""): # 图片信息提示 antd.Typography.Text(as_item="image_info", type="secondary") # 图片展示区域 - 使用Markdown组件显示HTML ms.Markdown(as_item="image_gallery") # 文本内容 ms.Markdown(as_item="text_content") with ms.Slot("footer", params_mapping="""(bubble) => { return { copy_btn: { copyable: { text: typeof bubble.content === 'string' ? bubble.content : bubble.content?.text, tooltips: false }, }, edit_btn: { conversationKey: bubble.key, disabled: bubble.meta.disabled }, delete_btn: { conversationKey: bubble.key, disabled: bubble.meta.disabled }, }; }"""): with antd.Typography.Text( copyable=dict(tooltips=False), as_item="copy_btn"): with ms.Slot("copyable.icon"): with antd.Button(value=None, size="small", color="default", variant="text"): with ms.Slot("icon"): antd.Icon("CopyOutlined") with antd.Button(value=None, size="small", color="default", variant="text"): with ms.Slot("icon"): antd.Icon("CheckOutlined") with antd.Button(value=None, size="small", color="default", variant="text", as_item="edit_btn" ) as user_edit_btn: with ms.Slot("icon"): antd.Icon("EditOutlined") with antd.Popconfirm( title="Delete the message", description= "Are you sure to delete this message?", ok_button_props=dict(danger=True), as_item="delete_btn" ) as user_delete_popconfirm: with antd.Button(value=None, size="small", color="default", variant="text", as_item="delete_btn"): with ms.Slot("icon"): antd.Icon("DeleteOutlined") # Chatbot Role with antdx.Bubble.List.Role( role="assistant", placement="start", elem_classes="chatbot-chat-message", class_names=dict( footer="chatbot-chat-message-footer"), styles=dict(content=dict( maxWidth="100%", overflow='auto'))): with ms.Slot("avatar"): antd.Avatar( os.path.join(os.path.dirname(__file__), "rednote_hilab.png")) with ms.Slot( "messageRender", params_mapping="""(content, bubble) => { const has_error = bubble?.meta?.error const thinking_content = bubble?.meta?.thinking_content || "" const is_thinking = bubble?.meta?.is_thinking || false const thinking_done = bubble?.meta?.thinking_done || false const just_finished_thinking = bubble?.meta?.just_finished_thinking || false // 改进的自动折叠逻辑: // 1. 刚完成thinking且有回答内容时自动折叠 // 2. 考虑用户交互状态,避免频繁重置 const shouldAutoCollapse = just_finished_thinking && content && content.trim().length > 0 // 动态生成唯一key以触发组件重新渲染,但保持用户控制能力 let collapseKey = 'thinking' let collapseProps = {} if (shouldAutoCollapse) { // 刚完成thinking且有内容时,设置为折叠状态 // 使用时间戳确保key的唯一性,触发折叠 collapseKey = 'thinking-auto-collapsed-' + Date.now() collapseProps.active_key = [] // 强制折叠 } else if (thinking_done) { // thinking完成但用户可能已经手动展开,使用稳定key collapseKey = 'thinking-user-controlled' // 不设置active_key,让用户控制 } else { // thinking进行中,默认展开 collapseKey = 'thinking-active' collapseProps.default_active_key = ['1'] } return { thinking_collapse_props: Object.assign({ key: collapseKey, style: { display: (thinking_content || is_thinking) ? 'block' : 'none', marginBottom: thinking_content || is_thinking ? '12px' : '0' } }, collapseProps), thinking_label: is_thinking ? '🤔 正在思考...' : '🤔 思考过程', thinking_markdown: { value: thinking_content || '思考中...' }, answer: { value: content }, canceled: bubble.meta?.canceled ? undefined : { style: { display: 'none' } } } }"""): # Thinking区域 - 可折叠 + Markdown 渲染 with antd.Collapse( size='small', ghost=True, elem_classes="thinking-content", as_item="thinking_collapse_props" # 动态控制所有属性 ): with antd.Collapse.Item( as_item="thinking_label", # 动态 label 作为 header key='1', force_render=True # 确保即使折叠也预渲染内容(可选,提高性能) ): ms.Markdown(as_item="thinking_markdown") # 动态 value,支持 Markdown 渲染 # 回答内容 ms.Markdown( as_item="answer", elem_classes="answer-content") antd.Divider(as_item="canceled") antd.Typography.Text(get_text( "Chat completion paused.", "聊天已暂停。"), as_item="canceled", type="warning") with ms.Slot("footer", params_mapping="""(bubble) => { if (bubble?.meta?.end) { return { copy_btn: { copyable: { text: bubble.content, tooltips: false }, }, regenerate_btn: { conversationKey: bubble.key, disabled: bubble.meta.disabled }, delete_btn: { conversationKey: bubble.key, disabled: bubble.meta.disabled }, edit_btn: { conversationKey: bubble.key, disabled: bubble.meta.disabled }, }; } return { actions_container: { style: { display: 'none' } } }; }"""): with ms.Div(as_item="actions_container"): with antd.Typography.Text( copyable=dict(tooltips=False), as_item="copy_btn"): with ms.Slot("copyable.icon"): with antd.Button( value=None, size="small", color="default", variant="text"): with ms.Slot("icon"): antd.Icon( "CopyOutlined") with antd.Button( value=None, size="small", color="default", variant="text"): with ms.Slot("icon"): antd.Icon( "CheckOutlined") with antd.Popconfirm( title=get_text( "Regenerate the message", "重新生成消息"), description=get_text( "Regenerate the message will also delete all subsequent messages.", "重新生成消息将会删除所有的后续消息。"), ok_button_props=dict( danger=True), as_item="regenerate_btn" ) as chatbot_regenerate_popconfirm: with antd.Button( value=None, size="small", color="default", variant="text", as_item="regenerate_btn", ): with ms.Slot("icon"): antd.Icon("SyncOutlined") with antd.Button(value=None, size="small", color="default", variant="text", as_item="edit_btn" ) as chatbot_edit_btn: with ms.Slot("icon"): antd.Icon("EditOutlined") with antd.Popconfirm( title=get_text("Delete the message", "删除消息"), description=get_text( "Are you sure to delete this message?", "确定要删除这条消息吗?"), ok_button_props=dict( danger=True), as_item="delete_btn" ) as chatbot_delete_popconfirm: with antd.Button( value=None, size="small", color="default", variant="text", as_item="delete_btn"): with ms.Slot("icon"): antd.Icon("DeleteOutlined") # Sender with antdx.Suggestion( # onKeyDown Handler in Javascript should_trigger="""(e, { onTrigger, onKeyDown }) => { switch(e.key) { case '/': onTrigger() break case 'ArrowRight': case 'ArrowLeft': case 'ArrowUp': case 'ArrowDown': break; default: onTrigger(false) } onKeyDown(e) }""") as suggestion: with ms.Slot("children"): with antdx.Sender(placeholder=get_text( "Enter Prompt (Drag & Drop or Ctrl+V to paste images)", "输入内容(可拖拽图片或 Ctrl+V 粘贴图片)"), ) as sender: with ms.Slot("actions"): # 停止生成按钮 with antd.Button( type="text", size="large", visible=False, # 初始隐藏 elem_style=dict( color="#ff4d4f", # 红色 border="none", background="transparent" ) ) as stop_btn: with ms.Slot("icon"): antd.Icon("PauseCircleOutlined") with ms.Slot("prefix"): # Image Upload Button with Counter - 图片上传按钮 with antd.Space(size="small"): with antd.Tooltip(title="点击上传图片", color="green"): with antd.Upload( accept="image/*", multiple=True, show_upload_list=False, elem_style=dict(display="inline-block") ) as image_upload: with antd.Badge( count=0, # 默认显示0 size="small", color="#52c41a", # 绿色 elem_style=dict(display="block") # 默认显示 ) as green_image_indicator: with antd.Button( type="text", size="large", elem_style=dict( color="#52c41a", # 绿色图标 border="none", background="transparent" ) ): with ms.Slot("icon"): antd.Icon("PictureOutlined") # Trash Button - 垃圾桶清理按钮 with antd.Tooltip(title="清除已上传的图片", color="red"): with antd.Button( type="text", size="large", elem_style=dict( color="#ff4d4f", # 红色图标 border="none", background="transparent", display="none" # 默认隐藏,有图片时显示 ) ) as trash_button: with ms.Slot("icon"): antd.Icon("DeleteOutlined") # Clear Button - 清空对话历史按钮 with antd.Tooltip(title=get_text( "Clear Conversation History", "清空对话历史"), ): with antd.Button( value=None, type="text") as clear_btn: with ms.Slot("icon"): antd.Icon("ClearOutlined") # Modals with antd.Modal(title=get_text("Edit Message", "编辑消息"), open=False, centered=True, width="60%") as edit_modal: edit_textarea = antd.Input.Textarea(auto_size=dict(minRows=2, maxRows=6), elem_style=dict(width="100%")) # Events Handler if save_history: browser_state = gr.BrowserState( { "conversations_history": {}, "conversations": [], }, storage_key="dots_chatbot_storage") state.change(fn=Gradio_Events.update_browser_state, inputs=[state], outputs=[browser_state]) demo.load(fn=Gradio_Events.apply_browser_state, inputs=[browser_state, state], outputs=[conversations, state]) add_conversation_btn.click(fn=Gradio_Events.new_chat, inputs=[state], outputs=[conversations, chatbot, state]) conversations.active_change(fn=Gradio_Events.select_conversation, inputs=[state], outputs=[conversations, chatbot, state]) conversations.menu_click(fn=Gradio_Events.click_conversation_menu, inputs=[state], outputs=[conversations, chatbot, state]) clear_btn.click(fn=Gradio_Events.clear_conversation_history, inputs=[state], outputs=[chatbot, state]) suggestion.select(fn=Gradio_Events.select_suggestion, inputs=[sender], outputs=[sender]) gr.on(triggers=[user_edit_btn.click, chatbot_edit_btn.click], fn=Gradio_Events.edit_message, inputs=[state], outputs=[edit_textarea, state]).then(fn=Gradio_Events.open_modal, outputs=[edit_modal]) edit_modal.ok(fn=Gradio_Events.confirm_edit_message, inputs=[edit_textarea, state], outputs=[chatbot, state]).then(fn=Gradio_Events.close_modal, outputs=[edit_modal]) edit_modal.cancel(fn=Gradio_Events.close_modal, outputs=[edit_modal]) gr.on(triggers=[ chatbot_delete_popconfirm.confirm, user_delete_popconfirm.confirm ], fn=Gradio_Events.delete_message, inputs=[state], outputs=[chatbot, state]) regenerating_event = chatbot_regenerate_popconfirm.confirm( fn=Gradio_Events.regenerate_message, inputs=[state], outputs=[sender, clear_btn, conversation_delete_menu_item, add_conversation_btn, conversations, chatbot, state, image_upload, green_image_indicator, trash_button, stop_btn]) # 图片上传事件 image_upload.change(fn=Gradio_Events.handle_image_upload, inputs=[image_upload, state], outputs=[state, green_image_indicator, trash_button]) # 清空图片事件 - 垃圾桶按钮 trash_button.click(fn=Gradio_Events.clear_images, inputs=[state], outputs=[state, green_image_indicator, trash_button, image_upload]) submit_event = sender.submit(fn=Gradio_Events.submit, inputs=[sender, state], outputs=[sender, clear_btn, conversation_delete_menu_item, add_conversation_btn, conversations, chatbot, state, image_upload, green_image_indicator, trash_button, stop_btn]) # 停止按钮点击事件 stop_btn.click(fn=None, cancels=[submit_event, regenerating_event]) stop_btn.click(fn=Gradio_Events.cancel, inputs=[state], outputs=[ sender, conversation_delete_menu_item, clear_btn, conversations, add_conversation_btn, chatbot, state, stop_btn ]) sender.cancel(fn=None, cancels=[submit_event, regenerating_event]) sender.cancel(fn=Gradio_Events.cancel, inputs=[state], outputs=[ sender, conversation_delete_menu_item, clear_btn, conversations, add_conversation_btn, chatbot, state, stop_btn ]) if __name__ == "__main__": import sys import argparse parser = argparse.ArgumentParser(description="启动 Gradio Demo") parser.add_argument("--port", type=int, default=7860, help="指定服务端口,默认为7960") args = parser.parse_args() demo.queue(default_concurrency_limit=200).launch( ssr_mode=False, max_threads=200, server_port=args.port, server_name="0.0.0.0" )