Spaces:
Running
Running
""" | |
Chat handling logic for Universal MCP Client - Fixed Version with File Upload Support | |
""" | |
import re | |
import logging | |
import traceback | |
from datetime import datetime | |
from typing import Dict, Any, List, Tuple, Optional | |
import gradio as gr | |
from gradio import ChatMessage | |
from gradio_client import Client | |
import time | |
import json | |
import httpx | |
from config import AppConfig | |
from mcp_client import UniversalMCPClient | |
logger = logging.getLogger(__name__) | |
class ChatHandler: | |
"""Handles chat interactions with HF Inference Providers and MCP servers using ChatMessage dataclass""" | |
def __init__(self, mcp_client: UniversalMCPClient): | |
self.mcp_client = mcp_client | |
# Initialize the file uploader client for converting local files to public URLs | |
try: | |
self.uploader_client = Client("abidlabs/file-uploader") | |
logger.info("β File uploader client initialized") | |
except Exception as e: | |
logger.error(f"Failed to initialize file uploader: {e}") | |
self.uploader_client = None | |
def _upload_file_to_gradio_server(self, file_path: str) -> str: | |
"""Upload a file to the Gradio server and get a public URL""" | |
if not self.uploader_client: | |
logger.error("File uploader client not initialized") | |
return file_path | |
try: | |
# Open file in binary mode as your peer discovered | |
with open(file_path, "rb") as f_: | |
files = [("files", (file_path.split("/")[-1], f_))] | |
r = httpx.post( | |
self.uploader_client.upload_url, | |
files=files, | |
) | |
r.raise_for_status() | |
result = r.json() | |
uploaded_path = result[0] | |
# Construct the full public URL | |
public_url = f"{self.uploader_client.src}/gradio_api/file={uploaded_path}" | |
logger.info(f"β Uploaded {file_path} -> {public_url}") | |
return public_url | |
except Exception as e: | |
logger.error(f"Failed to upload file {file_path}: {e}") | |
return file_path # Return original path as fallback | |
def process_multimodal_message(self, message: Dict[str, Any], history: List) -> Tuple[List[ChatMessage], Dict[str, Any]]: | |
"""Enhanced MCP chat function with multimodal input support and ChatMessage formatting""" | |
if not self.mcp_client.hf_client: | |
error_msg = "β HuggingFace token not configured. Please set HF_TOKEN environment variable or login." | |
history.append(ChatMessage(role="user", content=error_msg)) | |
history.append(ChatMessage(role="assistant", content=error_msg)) | |
return history, gr.MultimodalTextbox(value=None, interactive=False) | |
if not self.mcp_client.current_provider or not self.mcp_client.current_model: | |
error_msg = "β Please select an inference provider and model first." | |
history.append(ChatMessage(role="user", content=error_msg)) | |
history.append(ChatMessage(role="assistant", content=error_msg)) | |
return history, gr.MultimodalTextbox(value=None, interactive=False) | |
# Initialize variables for error handling | |
user_text = "" | |
user_files = [] | |
uploaded_file_urls = [] # Store uploaded file URLs | |
self.file_url_mapping = {} # Add this: Map local paths to uploaded URLs | |
try: | |
# Handle multimodal input - message is a dict with 'text' and 'files' | |
user_text = message.get("text", "") if message else "" | |
user_files = message.get("files", []) if message else [] | |
# Handle case where message might be a string (backward compatibility) | |
if isinstance(message, str): | |
user_text = message | |
user_files = [] | |
logger.info(f"π¬ Processing multimodal message:") | |
logger.info(f" π Text: {user_text}") | |
logger.info(f" π Files: {len(user_files)} files uploaded") | |
logger.info(f" π History type: {type(history)}, length: {len(history)}") | |
# Convert history to ChatMessage objects if needed | |
converted_history = [] | |
for i, msg in enumerate(history): | |
try: | |
if isinstance(msg, dict): | |
# Convert dict to ChatMessage for internal processing | |
logger.info(f" π Converting dict message {i}: {msg.get('role', 'unknown')}") | |
converted_history.append(ChatMessage( | |
role=msg.get('role', 'assistant'), | |
content=msg.get('content', ''), | |
metadata=msg.get('metadata', None) | |
)) | |
else: | |
# Already a ChatMessage | |
logger.info(f" β ChatMessage {i}: {getattr(msg, 'role', 'unknown')}") | |
converted_history.append(msg) | |
except Exception as conv_error: | |
logger.error(f"Error converting message {i}: {conv_error}") | |
logger.error(f"Message content: {msg}") | |
# Skip problematic messages | |
continue | |
history = converted_history | |
# Upload files and get public URLs | |
for file_path in user_files: | |
logger.info(f" π Local File: {file_path}") | |
try: | |
# Upload file to get public URL | |
uploaded_url = self._upload_file_to_gradio_server(file_path) | |
# Store the mapping | |
self.file_url_mapping[file_path] = uploaded_url | |
logger.info(f" β Uploaded File URL: {uploaded_url}") | |
# Add to history with public URL | |
history.append(ChatMessage(role="user", content={"path": uploaded_url})) | |
except Exception as upload_error: | |
logger.error(f"Failed to upload file {file_path}: {upload_error}") | |
# Fallback to local path with warning | |
history.append(ChatMessage(role="user", content={"path": file_path})) | |
logger.warning(f"β οΈ Using local path for {file_path} - MCP servers may not be able to access it") | |
# Add text message if provided | |
if user_text and user_text.strip(): | |
history.append(ChatMessage(role="user", content=user_text)) | |
# If no text and no files, return early | |
if not user_text.strip() and not user_files: | |
return history, gr.MultimodalTextbox(value=None, interactive=False) | |
# Create messages for HF Inference API | |
messages = self._prepare_hf_messages(history, uploaded_file_urls) | |
# Process the chat and get structured responses | |
response_messages = self._call_hf_api(messages, uploaded_file_urls) | |
# Add all response messages to history | |
history.extend(response_messages) | |
return history, gr.MultimodalTextbox(value=None, interactive=False) | |
except Exception as e: | |
error_msg = f"β Error: {str(e)}" | |
logger.error(f"Chat error: {e}") | |
logger.error(traceback.format_exc()) | |
# Add user input to history if it exists | |
if user_text and user_text.strip(): | |
history.append(ChatMessage(role="user", content=user_text)) | |
if user_files: | |
for file_path in user_files: | |
history.append(ChatMessage(role="user", content={"path": file_path})) | |
history.append(ChatMessage(role="assistant", content=error_msg)) | |
return history, gr.MultimodalTextbox(value=None, interactive=False) | |
def _prepare_hf_messages(self, history: List, uploaded_file_urls: List[str] = None) -> List[Dict[str, Any]]: | |
"""Convert history (ChatMessage or dict) to HuggingFace Inference API format""" | |
messages = [] | |
# Get optimal context settings for current model/provider | |
if self.mcp_client.current_model and self.mcp_client.current_provider: | |
context_settings = AppConfig.get_optimal_context_settings( | |
self.mcp_client.current_model, | |
self.mcp_client.current_provider, | |
len(self.mcp_client.get_enabled_servers()) | |
) | |
max_history = context_settings['recommended_history_limit'] | |
else: | |
max_history = 20 # Fallback | |
# Convert history to HF API format (text only for context) | |
recent_history = history[-max_history:] if len(history) > max_history else history | |
for msg in recent_history: | |
# Handle both ChatMessage objects and dictionary format for backward compatibility | |
if hasattr(msg, 'role'): # ChatMessage object | |
role = msg.role | |
content = msg.content | |
elif isinstance(msg, dict) and 'role' in msg: # Dictionary format | |
role = msg.get('role') | |
content = msg.get('content') | |
else: | |
continue # Skip invalid messages | |
if role in ["user", "assistant"]: | |
# Convert any non-string content to string description for context | |
if isinstance(content, dict): | |
if "path" in content: | |
file_path = content.get('path', 'unknown') | |
# Check if it's a public URL or local path | |
if file_path.startswith('http'): | |
# It's already a public URL | |
if AppConfig.is_image_file(file_path): | |
content = f"[User uploaded an image: {file_path}]" | |
elif AppConfig.is_audio_file(file_path): | |
content = f"[User uploaded an audio file: {file_path}]" | |
elif AppConfig.is_video_file(file_path): | |
content = f"[User uploaded a video file: {file_path}]" | |
else: | |
content = f"[User uploaded a file: {file_path}]" | |
else: | |
# Local path - mention it's not accessible to remote servers | |
content = f"[User uploaded a file (local path, not accessible to remote servers): {file_path}]" | |
else: | |
content = f"[Object: {str(content)[:50]}...]" | |
elif isinstance(content, (list, tuple)): | |
content = f"[List: {str(content)[:50]}...]" | |
elif content is None: | |
content = "[Empty]" | |
else: | |
content = str(content) | |
messages.append({ | |
"role": role, | |
"content": content | |
}) | |
return messages | |
def _call_hf_api(self, messages: List[Dict[str, Any]], uploaded_file_urls: List[str] = None) -> List[ChatMessage]: | |
"""Call HuggingFace Inference API and return structured ChatMessage responses""" | |
# Check if we have enabled MCP servers to use | |
enabled_servers = self.mcp_client.get_enabled_servers() | |
if not enabled_servers: | |
return self._call_hf_without_mcp(messages) | |
else: | |
return self._call_hf_with_mcp(messages, uploaded_file_urls) | |
def _call_hf_without_mcp(self, messages: List[Dict[str, Any]]) -> List[ChatMessage]: | |
"""Call HF Inference API without MCP servers""" | |
logger.info("π¬ No MCP servers available, using regular HF Inference chat") | |
system_prompt = self._get_native_system_prompt() | |
# Add system prompt to messages | |
if messages and messages[0].get("role") == "system": | |
messages[0]["content"] = system_prompt + "\n\n" + messages[0]["content"] | |
else: | |
messages.insert(0, {"role": "system", "content": system_prompt}) | |
# Get optimal token settings | |
if self.mcp_client.current_model and self.mcp_client.current_provider: | |
context_settings = AppConfig.get_optimal_context_settings( | |
self.mcp_client.current_model, | |
self.mcp_client.current_provider, | |
0 # No MCP servers | |
) | |
max_tokens = context_settings['max_response_tokens'] | |
else: | |
max_tokens = 8192 | |
# Use HF Inference API | |
try: | |
response = self.mcp_client.generate_chat_completion(messages, **{"max_tokens": max_tokens}) | |
response_text = response.choices[0].message.content | |
if not response_text: | |
response_text = "I understand your request and I'm here to help." | |
return [ChatMessage(role="assistant", content=response_text)] | |
except Exception as e: | |
logger.error(f"HF Inference API call failed: {e}") | |
return [ChatMessage(role="assistant", content=f"β API call failed: {str(e)}")] | |
def _call_hf_with_mcp(self, messages: List[Dict[str, Any]], uploaded_file_urls: List[str] = None) -> List[ChatMessage]: | |
"""Call HF Inference API with MCP servers and return structured responses""" | |
# Enhanced system prompt with multimodal and MCP instructions | |
system_prompt = self._get_mcp_system_prompt(uploaded_file_urls) | |
# Add system prompt to messages | |
if messages and messages[0].get("role") == "system": | |
messages[0]["content"] = system_prompt + "\n\n" + messages[0]["content"] | |
else: | |
messages.insert(0, {"role": "system", "content": system_prompt}) | |
# Get optimal token settings | |
enabled_servers = self.mcp_client.get_enabled_servers() | |
if self.mcp_client.current_model and self.mcp_client.current_provider: | |
context_settings = AppConfig.get_optimal_context_settings( | |
self.mcp_client.current_model, | |
self.mcp_client.current_provider, | |
len(enabled_servers) | |
) | |
max_tokens = context_settings['max_response_tokens'] | |
else: | |
max_tokens = 8192 | |
# Debug logging | |
logger.info(f"π€ Sending {len(messages)} messages to HF Inference API") | |
logger.info(f"π§ Using {len(self.mcp_client.servers)} MCP servers") | |
logger.info(f"π€ Model: {self.mcp_client.current_model} via {self.mcp_client.current_provider}") | |
logger.info(f"π Max tokens: {max_tokens}") | |
start_time = time.time() | |
try: | |
# Pass file mapping to MCP client | |
if hasattr(self, 'file_url_mapping'): | |
self.mcp_client.chat_handler_file_mapping = self.file_url_mapping | |
# Call HF Inference with MCP tool support - using optimal max_tokens | |
response = self.mcp_client.generate_chat_completion_with_mcp_tools(messages, **{"max_tokens": max_tokens}) | |
return self._process_hf_response(response, start_time) | |
except Exception as e: | |
logger.error(f"HF Inference API call with MCP failed: {e}") | |
return [ChatMessage(role="assistant", content=f"β API call failed: {str(e)}")] | |
def _process_hf_response(self, response, start_time: float) -> List[ChatMessage]: | |
"""Process HF Inference response with simplified media handling and nested errors""" | |
chat_messages = [] | |
try: | |
response_text = response.choices[0].message.content | |
if not response_text: | |
response_text = "I understand your request and I'm here to help." | |
# Check if this response includes tool execution info | |
if hasattr(response, '_tool_execution'): | |
tool_info = response._tool_execution | |
logger.info(f"π§ Processing response with tool execution: {tool_info}") | |
duration = round(time.time() - start_time, 2) | |
tool_id = f"tool_{tool_info['tool']}_{int(time.time())}" | |
if tool_info['success']: | |
tool_result = str(tool_info['result']) | |
# Extract media URL if present | |
media_url = self._extract_media_url(tool_result, tool_info.get('server', '')) | |
# Create tool usage metadata message | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content="", | |
metadata={ | |
"title": f"π§ Used {tool_info['tool']}", | |
"status": "done", | |
"duration": duration, | |
"id": tool_id | |
} | |
)) | |
# Add nested success message with the raw result | |
if media_url: | |
result_preview = f"β Successfully generated media\nURL: {media_url[:100]}..." | |
else: | |
result_preview = f"β Tool executed successfully\nResult: {tool_result[:200]}..." | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content=result_preview, | |
metadata={ | |
"title": "π Server Response", | |
"parent_id": tool_id, | |
"status": "done" | |
} | |
)) | |
# Add LLM's descriptive text if present (before media) | |
if response_text and not response_text.startswith('{"use_tool"'): | |
# Clean the response text by removing URLs and tool JSON | |
clean_response = response_text | |
if media_url and media_url in clean_response: | |
clean_response = clean_response.replace(media_url, "").strip() | |
# Remove any remaining JSON tool call patterns | |
clean_response = re.sub(r'\{"use_tool"[^}]+\}', '', clean_response).strip() | |
# Remove all markdown link/image syntax completely | |
clean_response = re.sub(r'!\[([^\]]*)\]\([^)]*\)', '', clean_response) # Remove image markdown | |
clean_response = re.sub(r'\[([^\]]*)\]\([^)]*\)', '', clean_response) # Remove link markdown | |
clean_response = re.sub(r'!\[([^\]]*)\]', '', clean_response) # Remove broken image refs | |
clean_response = re.sub(r'\[([^\]]*)\]', '', clean_response) # Remove broken link refs | |
clean_response = re.sub(r'\(\s*\)', '', clean_response) # Remove empty parentheses | |
clean_response = clean_response.strip() # Final strip | |
# Only add if there's meaningful text left after cleaning | |
if clean_response and len(clean_response) > 10: | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content=clean_response | |
)) | |
# Handle media content if present | |
if media_url: | |
# Add media as a separate message - Gradio will auto-detect type | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content={"path": media_url} | |
)) | |
else: | |
# No media URL found, check if we need to show non-media result | |
if not response_text or response_text.startswith('{"use_tool"'): | |
# Only show result if there wasn't descriptive text from LLM | |
if len(tool_result) > 500: | |
result_preview = f"Operation completed successfully. Result preview: {tool_result[:500]}..." | |
else: | |
result_preview = f"Operation completed successfully. Result: {tool_result}" | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content=result_preview | |
)) | |
else: | |
# Tool execution failed | |
error_details = tool_info['result'] | |
# Create main tool message with error status | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content="", | |
metadata={ | |
"title": f"β Used {tool_info['tool']}", | |
"status": "error", | |
"duration": duration, | |
"id": tool_id | |
} | |
)) | |
# Add nested error response from server | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content=f"β Tool execution failed\n```\n{error_details}\n```", | |
metadata={ | |
"title": "π Server Response", | |
"parent_id": tool_id, | |
"status": "error" | |
} | |
)) | |
# Add suggestions as another nested message | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content="**Suggestions:**\nβ’ Try modifying your request slightly\nβ’ Wait a moment and try again\nβ’ Use a different MCP server if available", | |
metadata={ | |
"title": "π‘ Possible Solutions", | |
"parent_id": tool_id, | |
"status": "info" | |
} | |
)) | |
else: | |
# No tool usage, just return the response | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content=response_text | |
)) | |
except Exception as e: | |
logger.error(f"Error processing HF response: {e}") | |
logger.error(traceback.format_exc()) | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content="I understand your request and I'm here to help." | |
)) | |
return chat_messages | |
def _extract_media_url(self, result_text: str, server_name: str) -> Optional[str]: | |
"""Extract media URL from MCP response with improved pattern matching""" | |
if not isinstance(result_text, str): | |
return None | |
logger.info(f"π Extracting media from result: {result_text[:500]}...") | |
# Try JSON parsing first | |
try: | |
if result_text.strip().startswith('[') or result_text.strip().startswith('{'): | |
data = json.loads(result_text.strip()) | |
# Handle array format | |
if isinstance(data, list) and len(data) > 0: | |
item = data[0] | |
if isinstance(item, dict): | |
# Check for nested media structure | |
for media_type in ['audio', 'video', 'image']: | |
if media_type in item and isinstance(item[media_type], dict): | |
if 'url' in item[media_type]: | |
url = item[media_type]['url'].strip('\'"') | |
logger.info(f"π― Found {media_type} URL in JSON: {url}") | |
return url | |
# Check for direct URL | |
if 'url' in item: | |
url = item['url'].strip('\'"') | |
logger.info(f"π― Found direct URL in JSON: {url}") | |
return url | |
# Handle object format | |
elif isinstance(data, dict): | |
# Check for nested media structure | |
for media_type in ['audio', 'video', 'image']: | |
if media_type in data and isinstance(data[media_type], dict): | |
if 'url' in data[media_type]: | |
url = data[media_type]['url'].strip('\'"') | |
logger.info(f"π― Found {media_type} URL in JSON: {url}") | |
return url | |
# Check for direct URL | |
if 'url' in data: | |
url = data['url'].strip('\'"') | |
logger.info(f"π― Found direct URL in JSON: {url}") | |
return url | |
except json.JSONDecodeError: | |
pass | |
# Check for Gradio file URLs (common pattern) | |
gradio_patterns = [ | |
r'https://[^/]+\.hf\.space/gradio_api/file=/[^/]+/[^/]+/[^\s"\'<>,]+', | |
r'https://[^/]+\.hf\.space/file=[^\s"\'<>,]+', | |
r'/gradio_api/file=/[^\s"\'<>,]+' | |
] | |
for pattern in gradio_patterns: | |
match = re.search(pattern, result_text) | |
if match: | |
url = match.group(0).rstrip('\'",:;') | |
logger.info(f"π― Found Gradio file URL: {url}") | |
return url | |
# Check for any HTTP URLs with media extensions | |
url_pattern = r'https?://[^\s"\'<>]+\.(?:mp3|wav|ogg|m4a|flac|aac|opus|wma|mp4|webm|avi|mov|mkv|m4v|wmv|png|jpg|jpeg|gif|webp|bmp|svg)' | |
match = re.search(url_pattern, result_text, re.IGNORECASE) | |
if match: | |
url = match.group(0) | |
logger.info(f"π― Found media URL by extension: {url}") | |
return url | |
# Check for data URLs | |
if result_text.startswith('data:'): | |
logger.info("π― Found data URL") | |
return result_text | |
logger.info("β No media URL found in result") | |
return None | |
def _get_native_system_prompt(self) -> str: | |
"""Get system prompt for HF Inference without MCP servers""" | |
model_info = AppConfig.AVAILABLE_MODELS.get(self.mcp_client.current_model, {}) | |
context_length = model_info.get("context_length", 128000) | |
return f"""You are an AI assistant powered by {self.mcp_client.current_model} via {self.mcp_client.current_provider}. You have native capabilities for: | |
- **Text Processing**: You can analyze, summarize, translate, and process text directly | |
- **General Knowledge**: You can answer questions, explain concepts, and have conversations | |
- **Code Analysis**: You can read, analyze, and explain code | |
- **Reasoning**: You can perform step-by-step reasoning and problem-solving | |
- **Context Window**: You have access to {context_length:,} tokens of context | |
Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
Please provide helpful, accurate, and engaging responses to user queries.""" | |
def _get_mcp_system_prompt(self, uploaded_file_urls: List[str] = None) -> str: | |
"""Get enhanced system prompt for HF Inference with MCP servers""" | |
model_info = AppConfig.AVAILABLE_MODELS.get(self.mcp_client.current_model, {}) | |
context_length = model_info.get("context_length", 128000) | |
uploaded_files_context = "" | |
if uploaded_file_urls: | |
uploaded_files_context = f"\n\nFILES UPLOADED BY USER (Public URLs accessible to MCP servers):\n" | |
for i, file_url in enumerate(uploaded_file_urls, 1): | |
file_name = file_url.split('/')[-1] if '/' in file_url else file_url | |
if AppConfig.is_image_file(file_url): | |
file_type = "Image" | |
elif AppConfig.is_audio_file(file_url): | |
file_type = "Audio" | |
elif AppConfig.is_video_file(file_url): | |
file_type = "Video" | |
else: | |
file_type = "File" | |
uploaded_files_context += f"{i}. {file_type}: {file_name}\n URL: {file_url}\n" | |
# Get available tools with correct names from enabled servers only | |
enabled_servers = self.mcp_client.get_enabled_servers() | |
tools_info = [] | |
for server_name, config in enabled_servers.items(): | |
tools_info.append(f"- **{server_name}**: {config.description}") | |
return f"""You are an AI assistant powered by {self.mcp_client.current_model} via {self.mcp_client.current_provider}, with access to various MCP tools. | |
YOUR NATIVE CAPABILITIES: | |
- **Text Processing**: You can analyze, summarize, translate, and process text directly | |
- **General Knowledge**: You can answer questions, explain concepts, and have conversations | |
- **Code Analysis**: You can read, analyze, and explain code | |
- **Reasoning**: You can perform step-by-step reasoning and problem-solving | |
- **Context Window**: You have access to {context_length:,} tokens of context | |
AVAILABLE MCP TOOLS: | |
You have access to the following MCP servers: | |
{chr(10).join(tools_info)} | |
WHEN TO USE MCP TOOLS: | |
- **Image Generation**: Creating new images from text prompts | |
- **Image Editing**: Modifying, enhancing, or transforming existing images | |
- **Audio Processing**: Transcribing audio, generating speech, audio enhancement | |
- **Video Processing**: Creating or editing videos | |
- **Text to Speech**: Converting text to audio | |
- **Specialized Analysis**: Tasks requiring specific models or APIs | |
TOOL USAGE FORMAT: | |
When you need to use an MCP tool, respond with JSON in this exact format: | |
{{"use_tool": true, "server": "exact_server_name", "tool": "exact_tool_name", "arguments": {{"param": "value"}}}} | |
IMPORTANT: Always describe what you're going to do BEFORE the JSON tool call. For example: | |
"I'll generate speech for your text using the TTS tool." | |
{{"use_tool": true, "server": "text to speech", "tool": "Kokoro_TTS_mcp_test_generate_first", "arguments": {{"text": "hello"}}}} | |
IMPORTANT TOOL NAME MAPPING: | |
- For TTS server: use tool name "Kokoro_TTS_mcp_test_generate_first" | |
- For image generation: use tool name "dalle_3_xl_lora_v2_generate" | |
- For video generation: use tool name "ysharma_ltx_video_distilledtext_to_video" | |
- For letter counting: use tool name "gradio_app_dummy1_letter_counter" | |
EXACT SERVER NAMES TO USE: | |
{', '.join([f'"{name}"' for name in enabled_servers.keys()])} | |
FILE HANDLING FOR MCP TOOLS: | |
When using MCP tools with uploaded files, always use the public URLs provided above. | |
These URLs are accessible to remote MCP servers. | |
{uploaded_files_context} | |
MEDIA HANDLING: | |
When tool results contain media URLs (images, audio, videos), the system will automatically embed them as playable media. | |
IMPORTANT NOTES: | |
- Always use the EXACT server names and tool names as specified above | |
- Use proper JSON format for tool calls | |
- Include all required parameters in arguments | |
- For file inputs to MCP tools, use the public URLs provided, not local paths | |
- ALWAYS provide a descriptive message before the JSON tool call | |
- After tool execution, you can provide additional context or ask if the user needs anything else | |
Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
Current model: {self.mcp_client.current_model} via {self.mcp_client.current_provider}""" | |