Spaces:
Running
Running
| nce the readability of the provided Python code.Key Areas for Improvement:import osimport globimport jsonimport loggingimport shutilimport time | |
| from pathlib import Path | |
| from subprocess import TimeoutExpired | |
| from typing import List, Dict, Optional, Tuple, Any | |
| import openpyxl | |
| import pexpect | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from google import genai # Assuming genai handles API key internally via env or client init | |
| from litellm import completion | |
| from mcp.server.fastmcp import FastMCP | |
| from requests.exceptions import RequestException | |
| # --- Configuration --- | |
| # Load API Keys from Environment Variables (Recommended) | |
| # Ensure these are set in your deployment environment | |
| GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY") | |
| GROQ_API_KEY = os.environ.get("GROQ_API_KEY") | |
| OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") | |
| RAPIDAPI_KEY = os.environ.get("RAPIDAPI_KEY") # Added for RapidAPI calls | |
| # Check for missing essential keys | |
| if not GEMINI_API_KEY: | |
| logging.warning("GEMINI_API_KEY environment variable not set.") | |
| # Add checks for other keys if they are strictly required | |
| # if not GROQ_API_KEY: raise ValueError("GROQ_API_KEY not set") | |
| # if not OPENROUTER_API_KEY: raise ValueError("OPENROUTER_API_KEY not set") | |
| # if not RAPIDAPI_KEY: raise ValueError("RAPIDAPI_KEY not set") | |
| # Set keys for services that require explicit environment variable setting | |
| # (litellm might read these automatically, but explicit setting is safer) | |
| if GROQ_API_KEY: | |
| os.environ["GROQ_API_KEY"] = GROQ_API_KEY | |
| if GEMINI_API_KEY: | |
| # Note: genai client might use its own way, but litellm might need this | |
| os.environ["GEMINI_API_KEY"] = GEMINI_API_KEY | |
| if OPENROUTER_API_KEY: | |
| os.environ["OPENROUTER_API_KEY"] = OPENROUTER_API_KEY | |
| # --- Constants --- | |
| CODE_DIR = Path("/app/code_interpreter") | |
| TEMP_UPLOAD_DIR = Path("/app/uploads/temp") # Source for transfer_files | |
| SERVER_BASE_URL = "https://opengpt-4ik5.onrender.com" | |
| FILES_ENDPOINT = "/upload" # Endpoint to list files | |
| UPLOAD_ENDPOINT = "/upload" # Endpoint to upload files | |
| SERVER_FILES_URL = f"{SERVER_BASE_URL}{FILES_ENDPOINT}" | |
| SERVER_UPLOAD_URL = f"{SERVER_BASE_URL}{UPLOAD_ENDPOINT}" | |
| SERVER_STATIC_URL_PREFIX = f"{SERVER_BASE_URL}/static/" | |
| # RapidAPI Endpoints | |
| YOUTUBE_TRANSCRIPT_API = "youtube-transcript3.p.rapidapi.com" | |
| SCRAPE_NINJA_API = "scrapeninja.p.rapidapi.com" | |
| # --- Global State (Use Sparingly) --- | |
| # Keep track of files present in the CODE_DIR to identify newly created ones | |
| # This state persists across tool calls within a single mcp run | |
| tracked_files_in_codedir: set[Path] = set(CODE_DIR.glob("*")) | |
| # Keep track of files downloaded from the server to avoid re-downloading | |
| server_downloaded_files: set[str] = set() | |
| # --- Logging Setup --- | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # --- Clients --- | |
| try: | |
| # Initialize Gemini Client (Ensure API key is handled, ideally via env var) | |
| # If the env var GEMINI_API_KEY is set, genai might pick it up automatically. | |
| # If not, you might need to pass it explicitly if the env var method above isn't enough: | |
| # client = genai.Client(api_key=GEMINI_API_KEY) | |
| # Or rely on application default credentials if configured. | |
| if GEMINI_API_KEY: | |
| client = genai.Client(api_key=GEMINI_API_KEY) | |
| logging.info("Gemini Client initialized using API Key.") | |
| else: | |
| # Attempt to initialize without explicit key (might use ADC or other methods) | |
| client = genai.Client() | |
| logging.info("Gemini Client initialized (attempting default credentials).") | |
| except Exception as e: | |
| logging.error(f"Failed to initialize Gemini client: {e}") | |
| client = None # Indicate client is unavailable | |
| mcp = FastMCP("code_sandbox") | |
| requests_session = requests.Session() # Use a session for potential connection pooling | |
| # --- Helper Functions --- | |
| def download_server_files( | |
| base_url: str, | |
| files_endpoint: str, | |
| download_directory: Path, | |
| already_downloaded: set[str] | |
| ) -> set[str]: | |
| """ | |
| Downloads all files listed on the server's file listing page | |
| that haven't been downloaded yet in this session. | |
| Args: | |
| base_url: The base URL of the server (e.g., "https://example.com"). | |
| files_endpoint: The path to the page listing files (e.g., "/uploads"). | |
| download_directory: The local directory (Path object) to save files. | |
| already_downloaded: A set of filenames already downloaded. | |
| Returns: | |
| The updated set of downloaded filenames. | |
| """ | |
| download_directory.mkdir(parents=True, exist_ok=True) | |
| files_url = f"{base_url}{files_endpoint}" | |
| newly_downloaded_count = 0 | |
| try: | |
| response = requests_session.get(files_url, timeout=30) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, "html.parser") | |
| file_links = soup.find_all("a") | |
| for link in file_links: | |
| file_href = link.get("href") | |
| if not file_href: | |
| continue | |
| # Construct full URL if relative | |
| if not file_href.startswith(("http://", "https://")): | |
| file_url = f"{base_url}{file_href}" | |
| else: | |
| file_url = file_href | |
| filename = Path(file_url).name | |
| if not filename: | |
| logging.warning(f"Could not extract filename from URL: {file_url}") | |
| continue | |
| # Skip if already downloaded in this session | |
| if filename in already_downloaded: | |
| continue | |
| file_path = download_directory / filename | |
| logging.info(f"Downloading: {filename} from {file_url}") | |
| try: | |
| file_response = requests_session.get(file_url, stream=True, timeout=60) | |
| file_response.raise_for_status() | |
| with open(file_path, "wb") as f: | |
| for chunk in file_response.iter_content(chunk_size=8192): | |
| if chunk: | |
| f.write(chunk) | |
| logging.info(f"Downloaded: {filename} to {file_path}") | |
| already_downloaded.add(filename) | |
| newly_downloaded_count += 1 | |
| except RequestException as e: | |
| logging.error(f"Error downloading {filename}: {e}") | |
| except OSError as e: | |
| logging.error(f"Error saving {filename}: {e}") | |
| except Exception as e: | |
| logging.error(f"Unexpected error downloading/saving {filename}: {e}") | |
| except RequestException as e: | |
| logging.error(f"Error getting file list from {files_url}: {e}") | |
| except Exception as e: | |
| logging.error(f"An unexpected error occurred during file download process: {e}") | |
| logging.info(f"Downloaded {newly_downloaded_count} new files from server.") | |
| return already_downloaded | |
| def transfer_temp_files(source_dir: Path, destination_dir: Path): | |
| """Moves files from temp upload subdirectories to the main code directory.""" | |
| destination_dir.mkdir(parents=True, exist_ok=True) | |
| moved_count = 0 | |
| if not source_dir.exists(): | |
| logging.warning(f"Source directory for transfer does not exist: {source_dir}") | |
| return | |
| for item in source_dir.iterdir(): | |
| if item.is_dir(): # Check if it's a directory (e.g., session-specific temp folder) | |
| for source_file_path in item.iterdir(): | |
| if source_file_path.is_file(): | |
| destination_file_path = destination_dir / source_file_path.name | |
| try: | |
| shutil.move(str(source_file_path), str(destination_file_path)) | |
| logging.info(f"Moved {source_file_path.name} to {destination_dir}") | |
| moved_count += 1 | |
| except OSError as e: | |
| logging.error(f"Error moving {source_file_path.name}: {e}") | |
| elif item.is_file(): # Also handle files directly in source_dir if any | |
| destination_file_path = destination_dir / item.name | |
| try: | |
| shutil.move(str(item), str(destination_file_path)) | |
| logging.info(f"Moved {item.name} directly to {destination_dir}") | |
| moved_count += 1 | |
| except OSError as e: | |
| logging.error(f"Error moving {item.name}: {e}") | |
| if moved_count > 0: | |
| logging.info(f"Transferred {moved_count} files from {source_dir} area.") | |
| def upload_file_to_server(file_path: Path, upload_url: str) -> Optional[str]: | |
| """ | |
| Uploads a single file to the specified server endpoint. | |
| Args: | |
| file_path: Path object of the file to upload. | |
| upload_url: The URL to upload the file to. | |
| Returns: | |
| The filename returned by the server upon successful upload, or None on failure. | |
| """ | |
| if not file_path.is_file(): | |
| logging.error(f"File not found or is not a file: {file_path}") | |
| return None | |
| try: | |
| with open(file_path, "rb") as f: | |
| files = {"file": (file_path.name, f)} | |
| response = requests_session.post(upload_url, files=files, timeout=60) | |
| response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) | |
| # Assuming server returns the filename (or identifier) in the body | |
| server_filename = response.text.strip() | |
| logging.info(f"File '{file_path.name}' uploaded successfully. Server identifier: {server_filename}") | |
| return server_filename | |
| except FileNotFoundError: | |
| logging.error(f"File not found during upload attempt: {file_path}") | |
| return None | |
| except RequestException as e: | |
| logging.error(f"Upload failed for {file_path.name}. Network/Server error: {e}") | |
| if hasattr(e, 'response') and e.response is not None: | |
| logging.error(f"Server response: {e.response.status_code} - {e.response.text}") | |
| return None | |
| except Exception as e: | |
| logging.error(f"An unexpected error occurred during upload of {file_path.name}: {e}") | |
| return None | |
| def run_command_in_sandbox( | |
| command: str, | |
| timeout_sec: int, | |
| run_forever: bool = False, | |
| cwd: Path = CODE_DIR | |
| ) -> str: | |
| """ | |
| Runs a shell command using pexpect in a specific directory. | |
| Args: | |
| command: The command string to execute. | |
| timeout_sec: Timeout in seconds. Ignored if run_forever is True. | |
| run_forever: If True, does not enforce timeout (use with caution). | |
| cwd: The working directory (Path object) for the command. | |
| Returns: | |
| The captured stdout/stderr output of the command. | |
| """ | |
| output = "" | |
| full_command = f"cd {shlex.quote(str(cwd))} && {command}" | |
| logging.info(f"Running command: {full_command}") | |
| try: | |
| child = pexpect.spawn("bash", timeout=30) # Base timeout for pexpect interactions | |
| # Set a unique prompt marker to detect command completion reliably | |
| prompt_marker = f"COMMAND_DONE_{time.time()}" | |
| child.sendline(f'export PS1="{prompt_marker}"') | |
| child.expect_exact(prompt_marker, timeout=10) # Wait for prompt change | |
| child.sendline(full_command) | |
| if run_forever: | |
| # For forever commands, we might just return after sending, | |
| # or wait for initial output, depending on requirements. | |
| # Here, we'll just log and return an indication it started. | |
| logging.info(f"Command '{command}' started in 'run_forever' mode.") | |
| # Optionally, capture some initial output if needed: | |
| # try: | |
| # output = child.read_nonblocking(size=1024, timeout=5).decode(errors='ignore') | |
| # except pexpect.TIMEOUT: | |
| # pass # No initial output quickly | |
| # child.close(force=True) # Or keep it running? Depends on MCP lifecycle. | |
| # For now, assume we detach: | |
| # NOTE: Pexpect might not be ideal for true 'daemonizing'. | |
| # A better approach for 'forever' might be `subprocess.Popen` without waiting. | |
| # However, sticking to the original tool's apparent intent with pexpect: | |
| # We can't easily get continuous output AND return control without threads. | |
| # Returning immediately after sending the command for 'forever' mode. | |
| return f"Command '{command}' started in background (output streaming not captured)." | |
| # For commands with timeout: | |
| start_time = time.time() | |
| while True: | |
| if time.time() - start_time > timeout_sec: | |
| raise TimeoutExpired(command, timeout_sec) | |
| try: | |
| # Expect the specific prompt marker | |
| index = child.expect([prompt_marker, pexpect.EOF, pexpect.TIMEOUT], timeout=max(1, timeout_sec - (time.time() - start_time))) | |
| line = child.before.decode(errors='ignore') | |
| output += line | |
| # logging.debug(f"Shell output: {line.strip()}") # Log intermediate output if needed | |
| if index == 0: # Prompt marker found, command finished | |
| logging.info(f"Command '{command}' finished.") | |
| break | |
| elif index == 1: # EOF | |
| logging.warning(f"Command '{command}' resulted in EOF.") | |
| break | |
| # index == 2 (TIMEOUT) is handled by the outer loop's timeout check | |
| except pexpect.TIMEOUT: | |
| logging.warning(f"Pexpect read timed out waiting for output or prompt for command: {command}") | |
| # Check outer loop timeout condition | |
| if time.time() - start_time > timeout_sec: | |
| raise TimeoutExpired(command, timeout_sec) | |
| # Otherwise, continue waiting if overall time not exceeded | |
| continue | |
| except Exception as e: | |
| logging.error(f"Pexpect error during command '{command}': {e}") | |
| output += f"\nPexpect Error: {e}" | |
| break | |
| except TimeoutExpired: | |
| output += f"\n--- TimeoutError: Command '{command}' exceeded {timeout_sec} seconds ---" | |
| logging.error(f"Command '{command}' timed out after {timeout_sec} seconds.") | |
| except pexpect.ExceptionPexpect as e: | |
| output += f"\n--- Pexpect Error: {e} ---" | |
| logging.error(f"Pexpect execution failed for command '{command}': {e}") | |
| except Exception as e: | |
| output += f"\n--- Unexpected Error: {e} ---" | |
| logging.error(f"Unexpected error running command '{command}': {e}") | |
| finally: | |
| if 'child' in locals() and child.isalive(): | |
| child.close(force=True) | |
| logging.info(f"Command '{command}' completed. Output length: {len(output)}") | |
| # logging.debug(f"Final Output:\n{output}") # Optional: log full output | |
| return output.strip() # Remove trailing newline/marker if any | |
| def _ensure_files_synced(code_dir: Path, temp_dir: Path): | |
| """Ensures local code dir has latest server files and temp uploads.""" | |
| global server_downloaded_files | |
| logging.info("Ensuring local file system is synchronized...") | |
| # 1. Transfer files moved to the temp upload area | |
| transfer_temp_files(temp_dir, code_dir) | |
| # 2. Download missing files from the server | |
| server_downloaded_files = download_server_files( | |
| SERVER_BASE_URL, FILES_ENDPOINT, code_dir, server_downloaded_files | |
| ) | |
| # 3. Update the set of tracked files *after* syncing | |
| global tracked_files_in_codedir | |
| tracked_files_in_codedir = set(code_dir.glob("*")) | |
| def _upload_new_files(code_dir: Path, known_files_before: set[Path]) -> Tuple[List[str], set[Path]]: | |
| """Finds new files in code_dir, uploads them, returns URLs and updated file set.""" | |
| current_files = set(code_dir.glob("*")) | |
| new_files = current_files - known_files_before | |
| uploaded_file_urls = [] | |
| if not new_files: | |
| logging.info("No new files detected for upload.") | |
| return [], current_files # Return empty list and the latest set | |
| logging.info(f"Detected {len(new_files)} new files for upload: {[f.name for f in new_files]}") | |
| for file_path in new_files: | |
| if file_path.is_file(): # Ensure it's a file | |
| server_filename = upload_file_to_server(file_path, SERVER_UPLOAD_URL) | |
| if server_filename: | |
| # Construct the download URL based on the server's static path convention | |
| download_url = f"{SERVER_STATIC_URL_PREFIX}{server_filename}" | |
| uploaded_file_urls.append(download_url) | |
| else: | |
| logging.error(f"Failed to upload {file_path.name}, skipping URL generation.") | |
| else: | |
| logging.warning(f"Skipping upload for non-file item: {file_path}") | |
| logging.info(f"Uploaded {len(uploaded_file_urls)} new files.") | |
| return uploaded_file_urls, current_files | |
| # --- MCP Tools --- | |
| def analyse_audio(audiopath: str, query: str) -> Dict[str, str]: | |
| """ | |
| Ask a Gemini AI model about an audio file. | |
| The AI model can listen to the audio and answer questions based on it. | |
| Args: | |
| audiopath: The path to the audio file within the '/app/code_interpreter' directory | |
| (e.g., '/app/code_interpreter/meeting.mp3'). | |
| query: The question to ask about the audio content. | |
| Returns: | |
| A dictionary containing the AI's response under the key "Output". | |
| Returns an error message if the client or file processing fails. | |
| """ | |
| _ensure_files_synced(CODE_DIR, TEMP_UPLOAD_DIR) | |
| if not client: | |
| return {"Output": "Error: Gemini client not initialized."} | |
| audio_file_path = Path(audiopath) | |
| if not audio_file_path.is_absolute(): # Assume relative to CODE_DIR if not absolute | |
| audio_file_path = CODE_DIR / audiopath | |
| if not audio_file_path.exists(): | |
| return {"Output": f"Error: Audio file not found at {audio_file_path}"} | |
| logging.info(f"Analysing audio: {audio_file_path.name} with query: '{query}'") | |
| try: | |
| # Upload file to Gemini API | |
| audio_file_ref = client.files.upload(file=str(audio_file_path)) | |
| logging.info(f"Uploaded {audio_file_path.name} to Gemini API. File ref: {audio_file_ref.name}, State: {audio_file_ref.state.name}") | |
| # Wait for processing (with timeout) | |
| start_time = time.time() | |
| timeout_seconds = 120 # Adjust as needed | |
| while audio_file_ref.state.name == "PROCESSING": | |
| if time.time() - start_time > timeout_seconds: | |
| logging.error(f"Gemini file processing timed out for {audio_file_ref.name}") | |
| return {"Output": f"Error: Gemini file processing timed out for {audio_file_path.name}."} | |
| print('.', end='', flush=True) # Keep original progress indicator | |
| time.sleep(2) | |
| audio_file_ref = client.files.get(name=audio_file_ref.name) | |
| print() # Newline after progress dots | |
| if audio_file_ref.state.name == "FAILED": | |
| logging.error(f"Gemini file processing failed for {audio_file_ref.name}. State: {audio_file_ref.state.name}") | |
| return {"Output": f"Error: Gemini failed to process the audio file {audio_file_path.name}."} | |
| if audio_file_ref.state.name != "ACTIVE": | |
| logging.warning(f"Gemini file {audio_file_ref.name} ended in unexpected state: {audio_file_ref.state.name}") | |
| # Proceed anyway, but log warning | |
| # Generate content | |
| response = client.models.generate_content( | |
| model='gemini-1.5-flash', # Use appropriate model | |
| contents=[query, audio_file_ref] | |
| ) | |
| logging.info(f"Gemini analysis complete for {audio_file_path.name}.") | |
| return {"Output": response.text} | |
| except Exception as e: | |
| logging.error(f"Error during Gemini audio analysis for {audio_file_path.name}: {e}", exc_info=True) | |
| return {"Output": f"An error occurred during audio analysis: {e}"} | |
| # Note: analyse_video and analyse_images follow the same pattern as analyse_audio | |
| # Refactoring them similarly: | |
| def analyse_video(videopath: str, query: str) -> Dict[str, str]: | |
| """ | |
| Ask a Gemini AI model about a video file. | |
| The AI model can watch the video and answer questions based on it. | |
| Args: | |
| videopath: Path to the video file within '/app/code_interpreter' | |
| (e.g., '/app/code_interpreter/presentation.mp4'). | |
| query: The question to ask about the video content. | |
| Returns: | |
| A dictionary containing the AI's response under the key "Output". | |
| Returns an error message if the client or file processing fails. | |
| """ | |
| _ensure_files_synced(CODE_DIR, TEMP_UPLOAD_DIR) | |
| if not client: | |
| return {"Output": "Error: Gemini client not initialized."} | |
| video_file_path = Path(videopath) | |
| if not video_file_path.is_absolute(): | |
| video_file_path = CODE_DIR / videopath | |
| if not video_file_path.exists(): | |
| return {"Output": f"Error: Video file not found at {video_file_path}"} | |
| logging.info(f"Analysing video: {video_file_path.name} with query: '{query}'") | |
| try: | |
| video_file_ref = client.files.upload(file=str(video_file_path)) | |
| logging.info(f"Uploaded {video_file_path.name} to Gemini API. File ref: {video_file_ref.name}, State: {video_file_ref.state.name}") | |
| start_time = time.time() | |
| timeout_seconds = 300 # Videos might take longer | |
| while video_file_ref.state.name == "PROCESSING": | |
| if time.time() - start_time > timeout_seconds: | |
| logging.error(f"Gemini file processing timed out for {video_file_ref.name}") | |
| return {"Output": f"Error: Gemini file processing timed out for {video_file_path.name}."} | |
| print('.', end='', flush=True) | |
| time.sleep(5) # Longer sleep for video | |
| video_file_ref = client.files.get(name=video_file_ref.name) | |
| print() | |
| if video_file_ref.state.name == "FAILED": | |
| logging.error(f"Gemini file processing failed for {video_file_ref.name}") | |
| return {"Output": f"Error: Gemini failed to process the video file {video_file_path.name}."} | |
| if video_file_ref.state.name != "ACTIVE": | |
| logging.warning(f"Gemini file {video_file_ref.name} ended in unexpected state: {video_file_ref.state.name}") | |
| response = client.models.generate_content( | |
| model='gemini-1.5-flash', | |
| contents=[query, video_file_ref] | |
| ) | |
| logging.info(f"Gemini analysis complete for {video_file_path.name}.") | |
| return {"Output": response.text} | |
| except Exception as e: | |
| logging.error(f"Error during Gemini video analysis for {video_file_path.name}: {e}", exc_info=True) | |
| return {"Output": f"An error occurred during video analysis: {e}"} | |
| def analyse_images(imagepath: str, query: str) -> Dict[str, str]: | |
| """ | |
| Ask a Gemini AI model about an image file. | |
| The AI model can see the image and answer questions based on it. | |
| Args: | |
| imagepath: Path to the image file within '/app/code_interpreter' | |
| (e.g., '/app/code_interpreter/diagram.png'). | |
| query: The question to ask about the image content. | |
| Returns: | |
| A dictionary containing the AI's response under the key "Output". | |
| Returns an error message if the client or file processing fails. | |
| """ | |
| _ensure_files_synced(CODE_DIR, TEMP_UPLOAD_DIR) | |
| if not client: | |
| return {"Output": "Error: Gemini client not initialized."} | |
| image_file_path = Path(imagepath) | |
| if not image_file_path.is_absolute(): | |
| image_file_path = CODE_DIR / imagepath | |
| if not image_file_path.exists(): | |
| return {"Output": f"Error: Image file not found at {image_file_path}"} | |
| logging.info(f"Analysing image: {image_file_path.name} with query: '{query}'") | |
| try: | |
| # Note: For Gemini Flash/Pro Vision, direct image data might be preferred over file API | |
| # Check Gemini API docs for best practices. Using File API for consistency here. | |
| image_file_ref = client.files.upload(file=str(image_file_path)) | |
| logging.info(f"Uploaded {image_file_path.name} to Gemini API. File ref: {image_file_ref.name}, State: {image_file_ref.state.name}") | |
| start_time = time.time() | |
| timeout_seconds = 60 | |
| while image_file_ref.state.name == "PROCESSING": | |
| if time.time() - start_time > timeout_seconds: | |
| logging.error(f"Gemini file processing timed out for {image_file_ref.name}") | |
| return {"Output": f"Error: Gemini file processing timed out for {image_file_path.name}."} | |
| print('.', end='', flush=True) | |
| time.sleep(1) | |
| image_file_ref = client.files.get(name=image_file_ref.name) | |
| print() | |
| if image_file_ref.state.name == "FAILED": | |
| logging.error(f"Gemini file processing failed for {image_file_ref.name}") | |
| return {"Output": f"Error: Gemini failed to process the image file {image_file_path.name}."} | |
| if image_file_ref.state.name != "ACTIVE": | |
| logging.warning(f"Gemini file {image_file_ref.name} ended in unexpected state: {image_file_ref.state.name}") | |
| response = client.models.generate_content( | |
| model='gemini-1.5-flash', # Or a vision-specific model | |
| contents=[query, image_file_ref] | |
| ) | |
| logging.info(f"Gemini analysis complete for {image_file_path.name}.") | |
| return {"Output": response.text} | |
| except Exception as e: | |
| logging.error(f"Error during Gemini image analysis for {image_file_path.name}: {e}", exc_info=True) | |
| return {"Output": f"An error occurred during image analysis: {e}"} | |
| def create_code_file(filename: str, code: str) -> Dict[str, str]: | |
| """ | |
| Creates a file with the specified content in the code execution directory. | |
| Overwrites the file if it already exists. | |
| Args: | |
| filename: The name of the file to create (e.g., 'script.py', 'data.txt'). | |
| The file is created in '/app/code_interpreter/'. | |
| code: The string content to write into the file. | |
| Returns: | |
| A dictionary indicating the task outcome. | |
| """ | |
| _ensure_files_synced(CODE_DIR, TEMP_UPLOAD_DIR) # Ensure dir exists, sync base files | |
| if not filename: | |
| return {"info": "Error: Filename cannot be empty."} | |
| # Basic sanitization (prevent escaping the directory) | |
| filename = Path(filename).name | |
| file_path = CODE_DIR / filename | |
| logging.info(f"Creating/overwriting file: {file_path}") | |
| try: | |
| with open(file_path, "w", encoding='utf-8') as f: | |
| f.write(code) | |
| # Update tracked files immediately after creation | |
| global tracked_files_in_codedir | |
| tracked_files_in_codedir.add(file_path) | |
| logging.info(f"Successfully wrote {len(code)} characters to {file_path}") | |
| return {"info": f"File '{filename}' created/updated successfully in {CODE_DIR}."} | |
| except OSError as e: | |
| logging.error(f"Failed to write file {file_path}: {e}") | |
| return {"info": f"Error: Could not write file '{filename}'. Reason: {e}"} | |
| except Exception as e: | |
| logging.error(f"Unexpected error writing file {file_path}: {e}", exc_info=True) | |
| return {"info": f"Error: An unexpected error occurred while writing '{filename}'. Reason: {e}"} | |
| def install_python_packages(python_packages: str) -> Dict[str, str]: | |
| """ | |
| Installs specified Python packages using pip in the sandbox environment. | |
| Args: | |
| python_packages: A space-separated string of package names (e.g., "numpy pandas matplotlib"). | |
| Returns: | |
| A dictionary containing the stdout/stderr of the pip command and an info message. | |
| """ | |
| package_names = python_packages.strip() | |
| if not package_names: | |
| return {"output": "", "info": "No packages specified for installation."} | |
| # Basic check to prevent unintended commands (though sandbox should limit this) | |
| if not all(pkg.isalnum() or pkg in ['-', '_', '.', '=', '>','<'] for pkg in package_names.replace(" ","")): | |
| logging.warning(f"Potentially unsafe package string detected: {package_names}") | |
| # Decide whether to reject or proceed cautiously | |
| # return {"output": "Error: Invalid characters in package names.", "info": "Installation aborted."} | |
| # Use --break-system-packages for modern pip behavior in managed environments | |
| command = f"pip install --break-system-packages {package_names}" | |
| logging.info(f"Attempting to install packages: {package_names}") | |
| # Use a longer timeout for package installation | |
| output = run_command_in_sandbox(command, timeout_sec=600, run_forever=False, cwd=CODE_DIR) | |
| if "Successfully installed" in output or "Requirement already satisfied" in output: | |
| logging.info(f"Pip install command finished for: {package_names}") | |
| info_msg = f"Package installation command executed for: {package_names}." | |
| else: | |
| logging.warning(f"Pip install command for '{package_names}' may have encountered issues.") | |
| info_msg = f"Package installation command executed for: {package_names}. Check output for details." | |
| return {"output": output, "info": info_msg} | |
| def run_code( | |
| filename: str, | |
| code: str, | |
| start_cmd: str, | |
| python_packages: str = "", | |
| timeout_seconds: int = 300, | |
| run_forever: bool = False | |
| ) -> Dict[str, Any]: | |
| """ | |
| Creates a code file, optionally installs Python packages, executes the code | |
| using the provided start command, and uploads any newly created files. | |
| Args: | |
| filename: Name of the file to create (e.g., "app.py"). Stored in /app/code_interpreter/. | |
| code: Full source code to write to the file. | |
| start_cmd: Command to execute the file (e.g., "python /app/code_interpreter/app.py"). | |
| Ensure paths within the command are absolute or relative to /app/code_interpreter. | |
| python_packages: Space-separated list of Python packages to install via pip. | |
| Leave empty or "" if none needed. Pre-installed: gradio, XlsxWriter, openpyxl. | |
| timeout_seconds: Maximum execution time in seconds (default 300). Ignored if run_forever is True. | |
| run_forever: If True, the command attempts to run indefinitely (e.g., for servers). | |
| Output capture might be limited, and timeout is ignored. | |
| Returns: | |
| A dictionary containing: | |
| - "output": The stdout/stderr from the execution. | |
| - "info": Status message about file creation/package installation. | |
| - "files_download_links": A list of URLs for any new files created during execution. | |
| """ | |
| global tracked_files_in_codedir | |
| _ensure_files_synced(CODE_DIR, TEMP_UPLOAD_DIR) # Sync before starting | |
| info_messages = [] | |
| # 1. Install packages if specified | |
| if python_packages: | |
| install_result = install_python_packages(python_packages) | |
| info_messages.append(install_result.get("info", "Package install attempted.")) | |
| # Optionally include install output in the main output or log it | |
| logging.debug(f"Package install output:\n{install_result.get('output', '')}") | |
| # 2. Create the code file | |
| create_result = create_code_file(filename, code) | |
| info_messages.append(create_result.get("info", "File creation attempted.")) | |
| if "Error:" in create_result.get("info", ""): | |
| return { | |
| "output": "Aborted due to file creation error.", | |
| "info": "\n".join(info_messages), | |
| "files_download_links": [] | |
| } | |
| # Refresh known files *after* creating the target file | |
| known_files_before_run = set(CODE_DIR.glob("*")) | |
| tracked_files_in_codedir = known_files_before_run # Update global state | |
| # 3. Execute the command | |
| logging.info(f"Executing start command: {start_cmd}") | |
| exec_output = run_command_in_sandbox(start_cmd, timeout_sec=timeout_seconds, run_forever=run_forever, cwd=CODE_DIR) | |
| # 4. Upload any new files created by the execution | |
| new_file_urls, tracked_files_in_codedir = _upload_new_files(CODE_DIR, known_files_before_run) | |
| return { | |
| "output": exec_output, | |
| "info": "\n".join(info_messages), | |
| "files_download_links": new_file_urls | |
| } | |
| def run_existing_code( | |
| start_cmd: str, | |
| timeout_seconds: int = 300, | |
| run_forever: bool = False | |
| ) -> Dict[str, Any]: | |
| """ | |
| Executes a command assuming the necessary code files already exist | |
| in the '/app/code_interpreter/' directory. Uploads any newly created files. | |
| Args: | |
| start_cmd: Command to execute (e.g., "python /app/code_interpreter/main.py"). | |
| Ensure paths within the command are absolute or relative to /app/code_interpreter. | |
| timeout_seconds: Maximum execution time in seconds (default 300). Ignored if run_forever is True. | |
| run_forever: If True, the command attempts to run indefinitely. Output capture might be limited. | |
| Returns: | |
| A dictionary containing: | |
| - "output": The stdout/stderr from the execution. | |
| - "files_download_links": A list of URLs for any new files created during execution. | |
| """ | |
| global tracked_files_in_codedir | |
| _ensure_files_synced(CODE_DIR, TEMP_UPLOAD_DIR) # Ensure files are present | |
| known_files_before_run = tracked_files_in_codedir # Use current tracked state | |
| # Execute the command | |
| logging.info(f"Executing command on existing files: {start_cmd}") | |
| exec_output = run_command_in_sandbox(start_cmd, timeout_sec=timeout_seconds, run_forever=run_forever, cwd=CODE_DIR) | |
| # Upload any new files created by the execution | |
| new_file_urls, tracked_files_in_codedir = _upload_new_files(CODE_DIR, known_files_before_run) | |
| return { | |
| "output": exec_output, | |
| "files_download_links": new_file_urls | |
| } | |
| def run_shell_command( | |
| cmd: str, | |
| timeout_seconds: int = 300, | |
| run_forever: bool = False | |
| ) -> Dict[str, Any]: | |
| """ | |
| Runs an arbitrary shell command in the '/app/code_interpreter/' directory. | |
| Useful for file manipulation, setup, or simple tasks. Executes on Alpine Linux. | |
| Avoid commands requiring sudo. Uploads any newly created files. | |
| Args: | |
| cmd: The shell command to execute (e.g., "mkdir output_data", "ls -l"). | |
| timeout_seconds: Maximum execution time in seconds (default 300). Ignored if run_forever is True. | |
| run_forever: If True, the command attempts to run indefinitely. Output capture might be limited. | |
| Returns: | |
| A dictionary containing: | |
| - "output": The stdout/stderr from the command execution. | |
| - "files_download_links": A list of URLs for any new files created by the command. | |
| """ | |
| global tracked_files_in_codedir | |
| # Syncing might be relevant if the command interacts with downloaded/transferred files | |
| _ensure_files_synced(CODE_DIR, TEMP_UPLOAD_DIR) | |
| known_files_before_run = tracked_files_in_codedir | |
| # Execute the command | |
| logging.info(f"Executing shell command: {cmd}") | |
| exec_output = run_command_in_sandbox(cmd, timeout_sec=timeout_seconds, run_forever=run_forever, cwd=CODE_DIR) | |
| # Upload any new files created by the execution (e.g., if cmd was `tar czf archive.tar.gz data/`) | |
| new_file_urls, tracked_files_in_codedir = _upload_new_files(CODE_DIR, known_files_before_run) | |
| return { | |
| "output": exec_output, | |
| "files_download_links": new_file_urls | |
| } | |
| def get_youtube_transcript(video_id: str) -> Dict[str, Any]: | |
| """ | |
| Fetches the transcript of a YouTube video using its video ID via RapidAPI. | |
| Args: | |
| video_id: The unique ID of the YouTube video (e.g., "ZacjOVVgoLY"). | |
| Returns: | |
| A dictionary containing the transcript data or an error message. | |
| """ | |
| if not RAPIDAPI_KEY: | |
| return {"error": "RapidAPI key is not configured."} | |
| url = f"https://{YOUTUBE_TRANSCRIPT_API}/api/transcript" | |
| params = {"videoId": video_id} | |
| headers = { | |
| 'x-rapidapi-key': RAPIDAPI_KEY, | |
| 'x-rapidapi-host': YOUTUBE_TRANSCRIPT_API | |
| } | |
| logging.info(f"Fetching YouTube transcript for video ID: {video_id}") | |
| try: | |
| response = requests_session.get(url, headers=headers, params=params, timeout=30) | |
| response.raise_for_status() | |
| data = response.json() | |
| logging.info(f"Successfully fetched transcript for {video_id}.") | |
| return data | |
| except RequestException as e: | |
| logging.error(f"Error fetching YouTube transcript for {video_id}: {e}") | |
| error_msg = f"Failed to fetch transcript: {e}" | |
| if hasattr(e, 'response') and e.response is not None: | |
| error_msg += f" (Status: {e.response.status_code}, Body: {e.response.text[:200]})" # Include snippet of response | |
| return {"error": error_msg} | |
| except json.JSONDecodeError as e: | |
| logging.error(f"Error decoding JSON response for youtube transcript {video_id}: {e}") | |
| return {"error": f"Failed to parse transcript response: {e}"} | |
| except Exception as e: | |
| logging.error(f"Unexpected error fetching YouTube transcript {video_id}: {e}", exc_info=True) | |
| return {"error": f"An unexpected error occurred: {e}"} | |
| def read_excel_file(filename: str) -> Dict[str, Any]: | |
| """ | |
| Reads data from an Excel file (.xlsx) located in '/app/code_interpreter/'. | |
| Args: | |
| filename: The name of the Excel file (e.g., 'report.xlsx'). | |
| Returns: | |
| A dictionary where keys are cell coordinates (e.g., 'Sheet1!A1') | |
| and values are the corresponding cell contents (converted to string). | |
| Returns an error message if the file cannot be read. | |
| """ | |
| _ensure_files_synced(CODE_DIR, TEMP_UPLOAD_DIR) # Make sure file is present | |
| file_path = CODE_DIR / Path(filename).name # Sanitize name | |
| if not file_path.exists(): | |
| logging.error(f"Excel file not found: {file_path}") | |
| return {"error": f"File not found: {filename}"} | |
| logging.info(f"Reading Excel file: {file_path}") | |
| excel_data_dict = {} | |
| try: | |
| workbook = openpyxl.load_workbook(file_path, data_only=True) # Read values, not formulas | |
| for sheet_name in workbook.sheetnames: | |
| sheet = workbook[sheet_name] | |
| for row in sheet.iter_rows(): | |
| for cell in row: | |
| if cell.value is not None: | |
| # Use sheet name in key for clarity if multiple sheets exist | |
| cell_coordinate = f"{sheet_name}!{cell.coordinate}" | |
| # Keep original type if simple, else convert complex types to string | |
| cell_value = cell.value | |
| if not isinstance(cell_value, (str, int, float, bool)): | |
| cell_value = str(cell_value) | |
| excel_data_dict[cell_coordinate] = cell_value | |
| logging.info(f"Successfully read {len(excel_data_dict)} cells from {filename}.") | |
| return excel_data_dict | |
| except Exception as e: | |
| logging.error(f"Failed to read Excel file {file_path}: {e}", exc_info=True) | |
| return {"error": f"Could not read Excel file '{filename}'. Reason: {e}"} | |
| def scrape_website_content(url: str, query: Optional[str] = None) -> Dict[str, str]: | |
| """ | |
| Scrapes the textual content of a single website URL using ScrapeNinja via RapidAPI | |
| and optionally asks a question about the content using an AI model. | |
| Args: | |
| url: The URL of the website to scrape. | |
| query: An optional question to ask the AI about the scraped content. | |
| Returns: | |
| A dictionary containing the scraped content ("content") and, | |
| if a query was provided, the AI's answer ("ai_answer"). | |
| Returns an error message on failure. | |
| """ | |
| if not RAPIDAPI_KEY: | |
| return {"error": "RapidAPI key is not configured."} | |
| scrape_url = f"https://{SCRAPE_NINJA_API}/scrape" | |
| headers = { | |
| 'x-rapidapi-key': RAPIDAPI_KEY, | |
| 'x-rapidapi-host': SCRAPE_NINJA_API, | |
| 'Content-Type': "application/json" | |
| } | |
| payload = json.dumps({"url": url}) | |
| logging.info(f"Scraping website: {url}") | |
| result = {} | |
| try: | |
| response = requests_session.post(scrape_url, headers=headers, data=payload, timeout=60) | |
| response.raise_for_status() | |
| # Assuming ScrapeNinja returns JSON with content, adjust based on actual API response | |
| scraped_data = response.json() | |
| # Extract main textual content - this might need adjustment based on ScrapeNinja's output format | |
| content = scraped_data.get("body", "") # Or another relevant key like 'text' | |
| if not content: | |
| content = str(scraped_data) # Fallback to string representation if body is empty | |
| # Basic cleaning (optional, enhance as needed) | |
| soup = BeautifulSoup(content, "html.parser") | |
| cleaned_content = soup.get_text(separator=' ', strip=True) | |
| result["content"] = cleaned_content | |
| logging.info(f"Successfully scraped content from {url} (length: {len(cleaned_content)}).") | |
| if query: | |
| logging.info(f"Asking AI query about scraped content: '{query}'") | |
| try: | |
| ai_response = completion( | |
| model="gemini/gemini-1.5-flash", # Use a suitable model | |
| messages=[ | |
| {"role": "system", "content": "You are an AI assistant analyzing website content."}, | |
| {"role": "user", "content": f"Based on the following website content, please answer this question: {query}\n\nWebsite Content:\n{cleaned_content[:15000]}"} # Limit context size | |
| ], | |
| max_tokens=500, | |
| temperature=0.5, | |
| ) | |
| ai_answer = ai_response.choices[0].message.content | |
| result["ai_answer"] = ai_answer | |
| logging.info(f"Received AI answer for query on {url}.") | |
| except Exception as e: | |
| logging.error(f"AI query failed for {url}: {e}", exc_info=True) | |
| result["ai_answer"] = f"Error during AI analysis: {e}" | |
| return result | |
| except RequestException as e: | |
| logging.error(f"Error scraping {url}: {e}") | |
| error_msg = f"Failed to scrape {url}: {e}" | |
| if hasattr(e, 'response') and e.response is not None: | |
| error_msg += f" (Status: {e.response.status_code}, Body: {e.response.text[:200]})" | |
| return {"error": error_msg} | |
| except json.JSONDecodeError as e: | |
| logging.error(f"Error decoding JSON response for scrape {url}: {e}") | |
| return {"error": f"Failed to parse scrape response: {e}"} | |
| except Exception as e: | |
| logging.error(f"Unexpected error scraping {url}: {e}", exc_info=True) | |
| return {"error": f"An unexpected error occurred during scraping: {e}"} | |
| # Consolidated Deep Thinking Tool | |
| def ask_advanced_ai(model_provider: str, query: str, context_info: str) -> Dict[str, str]: | |
| """ | |
| Leverages a powerful external AI model for complex reasoning or generation tasks. | |
| Args: | |
| model_provider: The provider/model to use. Supported: 'groq', 'openrouter', 'gemini'. | |
| query: The main question or task for the AI. | |
| context_info: Additional context, background information, or previous findings | |
| relevant to the query. | |
| Returns: | |
| A dictionary containing the AI's response under the key "response". | |
| Returns an error message on failure. | |
| """ | |
| logging.info(f"Sending query to advanced AI ({model_provider}): '{query[:100]}...'") | |
| model_map = { | |
| # Using specific model names known to litellm | |
| 'groq': "groq/llama3-70b-8192", # Example: Use a powerful Groq model | |
| 'openrouter': "openrouter/meta-llama/llama-3-70b-instruct", # Example: Use a powerful OpenRouter model | |
| 'gemini': "gemini/gemini-1.5-pro-latest" # Example: Use a powerful Gemini model | |
| } | |
| model_name = model_map.get(model_provider.lower()) | |
| if not model_name: | |
| logging.error(f"Unsupported model provider specified: {model_provider}") | |
| return {"response": f"Error: Unsupported model provider '{model_provider}'. Use 'groq', 'openrouter', or 'gemini'."} | |
| # Check for required API key for the selected provider | |
| key_missing = False | |
| if model_provider == 'groq' and not GROQ_API_KEY: key_missing = True | |
| if model_provider == 'openrouter' and not OPENROUTER_API_KEY: key_missing = True | |
| if model_provider == 'gemini' and not GEMINI_API_KEY: key_missing = True # litellm might need env var | |
| if key_missing: | |
| logging.error(f"API Key for {model_provider} is not configured.") | |
| return {"response": f"Error: API key for provider '{model_provider}' is missing."} | |
| messages = [ | |
| {"role": "system", "content": "You are a highly capable AI assistant performing advanced reasoning or generation."}, | |
| {"role": "user", "content": f"Based on the following information, please address the query.\n\nContext/Information Provided:\n{context_info}\n\nQuery:\n{query}"} | |
| ] | |
| try: | |
| response = completion( | |
| model=model_name, | |
| messages=messages, | |
| # stream=False # Already default | |
| # Add other parameters like temperature, max_tokens if needed | |
| ) | |
| ai_response = response.choices[0].message.content | |
| logging.info(f"Received response from {model_provider} AI.") | |
| return {"response": ai_response} | |
| except Exception as e: | |
| logging.error(f"Error calling {model_provider} AI ({model_name}): {e}", exc_info=True) | |
| # Attempt to extract more detail from the exception if possible (litellm might provide specifics) | |
| return {"response": f"Error interacting with {model_provider} AI: {e}"} | |
| # --- Main Execution --- | |
| if __name__ == "__main__": | |
| logging.info("Starting FastMCP server...") | |
| # Ensure code directory exists on startup | |
| CODE_DIR.mkdir(parents=True, exist_ok=True) | |
| # Initial scan of files in code dir | |
| tracked_files_in_codedir = set(CODE_DIR.glob("*")) | |
| logging.info(f"Initial tracked files in {CODE_DIR}: {[f.name for f in tracked_files_in_codedir]}") | |
| # Initialize and run the server using standard I/O transport | |
| mcp.run(transport='stdio') | |