import gradio as gr import subprocess import os import base64 from typing import List, Dict, Any from pathlib import Path from datetime import datetime class MusicRecognitionMCPServer: def __init__(self, allowed_directories: List[str] = None): """Initialize MCP Server with configurable file access""" self.allowed_directories = allowed_directories or ["/tmp", "uploads", "output"] self.processed_files = {} # Cache of processed results # Convert relative paths to absolute and check access abs_directories = [] for directory in self.allowed_directories: # Convert relative paths to absolute if not os.path.isabs(directory): directory = os.path.abspath(directory) abs_directories.append(directory) print(f"Checking directory: {directory}") # Only try to create if it doesn't exist and we have permission if not os.path.exists(directory): try: os.makedirs(directory, exist_ok=True) os.chmod(directory, 0o755) print(f"✅ Directory created: {directory}") except PermissionError: print(f"⚠️ Directory doesn't exist but can't create: {directory}") print(f" This is normal for system directories like /tmp") except Exception as e: print(f"⚠️ Warning creating {directory}: {e}") else: print(f"✅ Directory exists: {directory}") self.allowed_directories = abs_directories print(f"Final allowed directories: {self.allowed_directories}") # Test Audiveris installation self._test_audiveris() def list_resources(self) -> List[Dict[str, Any]]: """List available resources following MCP patterns""" resources = [] # Add processed files as resources for file_id, file_info in self.processed_files.items(): resources.append({ "uri": f"musicxml://{file_id}", "name": file_info["original_name"], "description": f"MusicXML file converted from {file_info['original_name']} on {file_info['processed_at']}", "mimeType": "application/vnd.recordare.musicxml+xml" }) # Add available PDF files in allowed directories for directory in self.allowed_directories: if os.path.exists(directory): for file_path in Path(directory).rglob("*.pdf"): if self._is_file_accessible(str(file_path)): resources.append({ "uri": f"file://{file_path}", "name": file_path.name, "description": f"PDF music score available for processing: {file_path.name}", "mimeType": "application/pdf" }) return resources def read_resource(self, uri: str) -> Dict[str, Any]: """Read resource content following MCP patterns""" if uri.startswith("musicxml://"): # Return processed MusicXML file file_id = uri.replace("musicxml://", "") if file_id in self.processed_files: file_info = self.processed_files[file_id] try: with open(file_info["output_path"], "rb") as f: content = base64.b64encode(f.read()).decode() return { "contents": [{ "type": "resource", "resource": { "uri": uri, "text": content, "mimeType": "application/vnd.recordare.musicxml+xml" } }] } except FileNotFoundError: raise Exception(f"MusicXML file not found: {file_info['output_path']}") else: raise Exception(f"Resource not found: {uri}") elif uri.startswith("file://"): # Return PDF file content file_path = uri.replace("file://", "") if not self._is_file_accessible(file_path): raise Exception(f"File access denied: {file_path}") try: with open(file_path, "rb") as f: content = base64.b64encode(f.read()).decode() return { "contents": [{ "type": "resource", "resource": { "uri": uri, "text": content, "mimeType": "application/pdf" } }] } except FileNotFoundError: raise Exception(f"File not found: {file_path}") else: raise Exception(f"Unsupported URI scheme: {uri}") def _is_file_accessible(self, file_path: str) -> bool: """Check if file is within allowed directories""" abs_path = os.path.abspath(file_path) return any(abs_path.startswith(os.path.abspath(d)) for d in self.allowed_directories) def recognize_music_tool(self, pdf_uri: str, output_dir: str = None) -> Dict[str, Any]: """Tool for music recognition following MCP patterns""" # Handle different URI formats if pdf_uri.startswith("file://"): pdf_path = pdf_uri.replace("file://", "") elif pdf_uri.startswith("data:"): # Handle data URIs (base64 encoded) return self._process_data_uri(pdf_uri, output_dir) else: # Assume it's a direct file path pdf_path = pdf_uri if not self._is_file_accessible(pdf_path): raise Exception(f"File access denied: {pdf_path}") if not os.path.exists(pdf_path): raise Exception(f"PDF file not found: {pdf_path}") try: output_file = self._recognize_music_core(pdf_path, output_dir) # Store in processed files cache file_id = f"music_{len(self.processed_files) + 1}_{int(datetime.now().timestamp())}" self.processed_files[file_id] = { "original_name": os.path.basename(pdf_path), "original_path": pdf_path, "output_path": output_file, "processed_at": datetime.now().isoformat(), "file_id": file_id } # Return MCP-compliant response return { "content": [{ "type": "text", "text": f"✅ Successfully converted '{os.path.basename(pdf_path)}' to MusicXML.\n\n" f"📁 Output file: {output_file}\n" f"🔗 Resource URI: musicxml://{file_id}\n" f"📊 File size: {os.path.getsize(output_file)} bytes\n\n" f"You can now access this MusicXML file as a resource using the URI: `musicxml://{file_id}`" }], "isError": False } except Exception as e: return { "content": [{ "type": "text", "text": f"❌ Music recognition failed: {str(e)}" }], "isError": True } def _process_data_uri(self, data_uri: str, output_dir: str = None) -> Dict[str, Any]: """Process base64 encoded data URI""" try: # Parse data URI: data:application/pdf;base64, header, data = data_uri.split(',', 1) mime_type = header.split(';')[0].replace('data:', '') if mime_type != 'application/pdf': raise Exception(f"Unsupported MIME type: {mime_type}") # Fix base64 padding if needed data = self._fix_base64_padding(data) # Decode base64 data pdf_data = base64.b64decode(data) # Save to temporary file temp_dir = output_dir or "/tmp" temp_pdf = os.path.join(temp_dir, f"temp_{int(datetime.now().timestamp())}.pdf") with open(temp_pdf, 'wb') as f: f.write(pdf_data) # Process the file return self.recognize_music_tool(f"file://{temp_pdf}", output_dir) except Exception as e: raise Exception(f"Failed to process data URI: {str(e)}") def _fix_base64_padding(self, data: str) -> str: """Fix base64 padding to make it valid""" # Remove any whitespace data = data.strip().replace('\n', '').replace('\r', '').replace(' ', '') # Add padding if needed missing_padding = len(data) % 4 if missing_padding: data += '=' * (4 - missing_padding) return data def _recognize_music_core(self, pdf_file_path: str, output_dir: str = None) -> str: """Core music recognition function""" audiveris = "/opt/audiveris/bin/Audiveris" if output_dir is None: output_dir = "/tmp" # Ensure output directory exists with proper permissions os.makedirs(output_dir, exist_ok=True) try: os.chmod(output_dir, 0o755) except Exception as e: print(f"Warning: Could not set permissions for {output_dir}: {e}") if not self._is_file_accessible(output_dir): raise Exception(f"Output directory access denied: {output_dir}") # Verify input file exists if not os.path.exists(pdf_file_path): raise Exception(f"Input PDF file not found: {pdf_file_path}") pdf_file_name = os.path.basename(pdf_file_path) pdf_name_without_ext = os.path.splitext(pdf_file_name)[0] # Try both possible extensions possible_extensions = [".mxl", ".xml", ".musicxml"] output_files = [os.path.join(output_dir, f"{pdf_name_without_ext}{ext}") for ext in possible_extensions] cmd = [ audiveris, "-batch", "-export", "-output", output_dir, pdf_file_path ] print(f"Running Audiveris command: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True) print(f"Audiveris stdout: {result.stdout}") print(f"Audiveris stderr: {result.stderr}") print(f"Audiveris return code: {result.returncode}") # List files in output directory for debugging if os.path.exists(output_dir): files_in_output = os.listdir(output_dir) print(f"Files in output directory: {files_in_output}") # Check if any of the possible output files exist existing_output = None for output_file in output_files: if os.path.exists(output_file): existing_output = output_file break if existing_output: print(f"Found output file: {existing_output}") return existing_output # If no output file found, provide detailed error error_msg = f"Audiveris processing failed.\n" error_msg += f"Return code: {result.returncode}\n" error_msg += f"Stdout: {result.stdout}\n" error_msg += f"Stderr: {result.stderr}\n" error_msg += f"Expected files: {output_files}\n" error_msg += f"Files in output dir: {os.listdir(output_dir) if os.path.exists(output_dir) else 'Directory does not exist'}\n" raise Exception(error_msg) def _test_audiveris(self): """Test if Audiveris is properly installed""" audiveris = "/opt/audiveris/bin/Audiveris" if not os.path.exists(audiveris): print(f"⚠️ Warning: Audiveris not found at {audiveris}") return False try: # Test Audiveris with help command result = subprocess.run([audiveris, "-help"], capture_output=True, text=True, timeout=10) if "Audiveris" in result.stdout or "Audiveris" in result.stderr: print("✅ Audiveris installation verified") return True else: print(f"⚠️ Warning: Audiveris may not be working properly") print(f"Output: {result.stdout}") print(f"Error: {result.stderr}") return False except Exception as e: print(f"⚠️ Warning: Could not test Audiveris: {e}") return False # Initialize MCP Server with proper absolute paths mcp_server = MusicRecognitionMCPServer(["/tmp", "/app/uploads", "/app/output"]) def recognize_music_gradio(pdf_file): """Gradio wrapper for music recognition""" try: print(f"Processing file: {pdf_file.name}") result = mcp_server.recognize_music_tool(f"file://{pdf_file.name}") if result.get("isError"): error_msg = result["content"][0]["text"] print(f"Error in music recognition: {error_msg}") return None # Extract file ID from the response response_text = result["content"][0]["text"] print(f"Response text: {response_text}") if "musicxml://" in response_text: file_id = response_text.split("musicxml://")[1].split("`")[0] print(f"Extracted file ID: {file_id}") if file_id in mcp_server.processed_files: file_info = mcp_server.processed_files[file_id] output_path = file_info["output_path"] print(f"Output path from cache: {output_path}") if os.path.exists(output_path): print(f"✅ File exists: {output_path}") return output_path else: print(f"❌ File not found: {output_path}") # If the above doesn't work, try to find the file directly pdf_basename = os.path.splitext(os.path.basename(pdf_file.name))[0] possible_files = [ f"/tmp/{pdf_basename}.mxl", f"/tmp/{pdf_basename}.xml", f"/tmp/{pdf_basename}.musicxml" ] for file_path in possible_files: print(f"Checking: {file_path}") if os.path.exists(file_path): print(f"✅ Found file: {file_path}") return file_path print("❌ No output file found in any expected location") print(f"Files in /tmp: {os.listdir('/tmp') if os.path.exists('/tmp') else 'Directory not found'}") return None except Exception as e: print(f"Exception in Gradio wrapper: {str(e)}") import traceback traceback.print_exc() return None # Create Gradio interface gradio_interface = gr.Interface( fn=recognize_music_gradio, inputs=gr.File(file_types=[".pdf"], label="Upload PDF music score"), outputs=gr.File(label="Download MusicXML file"), title="Music Score Recognition", description="Upload a PDF music score and create a MusicXML file from it.", ) # Removed run_gradio function - now running directly in main thread if __name__ == "__main__": print("===== Application Startup at {} =====".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) print() print("🎵 MCP-Compliant Music Recognition Service Starting...") print("📱 Gradio UI: http://localhost:7860") # Run Gradio directly in the main thread to keep the application alive try: gradio_interface.launch( server_name="0.0.0.0", server_port=7860, mcp_server=True, share=True, # Set to False for container deployment prevent_thread_lock=False # Allow blocking to keep main thread alive ) except Exception as e: print(f"❌ Failed to start Gradio interface: {e}") import traceback traceback.print_exc()