audiveris / app.py
judith.ibanez
add some modifications
3723edd
raw
history blame
16.5 kB
import gradio as gr
import subprocess
import os
import base64
from typing import List, Dict, Any
from pathlib import Path
from datetime import datetime
class MusicRecognitionMCPServer:
def __init__(self, allowed_directories: List[str] = None):
"""Initialize MCP Server with configurable file access"""
self.allowed_directories = allowed_directories or ["/tmp", "uploads", "output"]
self.processed_files = {} # Cache of processed results
# Convert relative paths to absolute and check access
abs_directories = []
for directory in self.allowed_directories:
# Convert relative paths to absolute
if not os.path.isabs(directory):
directory = os.path.abspath(directory)
abs_directories.append(directory)
print(f"Checking directory: {directory}")
# Only try to create if it doesn't exist and we have permission
if not os.path.exists(directory):
try:
os.makedirs(directory, exist_ok=True)
os.chmod(directory, 0o755)
print(f"✅ Directory created: {directory}")
except PermissionError:
print(f"⚠️ Directory doesn't exist but can't create: {directory}")
print(f" This is normal for system directories like /tmp")
except Exception as e:
print(f"⚠️ Warning creating {directory}: {e}")
else:
print(f"✅ Directory exists: {directory}")
self.allowed_directories = abs_directories
print(f"Final allowed directories: {self.allowed_directories}")
# Test Audiveris installation
self._test_audiveris()
def list_resources(self) -> List[Dict[str, Any]]:
"""List available resources following MCP patterns"""
resources = []
# Add processed files as resources
for file_id, file_info in self.processed_files.items():
resources.append({
"uri": f"musicxml://{file_id}",
"name": file_info["original_name"],
"description": f"MusicXML file converted from {file_info['original_name']} on {file_info['processed_at']}",
"mimeType": "application/vnd.recordare.musicxml+xml"
})
# Add available PDF files in allowed directories
for directory in self.allowed_directories:
if os.path.exists(directory):
for file_path in Path(directory).rglob("*.pdf"):
if self._is_file_accessible(str(file_path)):
resources.append({
"uri": f"file://{file_path}",
"name": file_path.name,
"description": f"PDF music score available for processing: {file_path.name}",
"mimeType": "application/pdf"
})
return resources
def read_resource(self, uri: str) -> Dict[str, Any]:
"""Read resource content following MCP patterns"""
if uri.startswith("musicxml://"):
# Return processed MusicXML file
file_id = uri.replace("musicxml://", "")
if file_id in self.processed_files:
file_info = self.processed_files[file_id]
try:
with open(file_info["output_path"], "rb") as f:
content = base64.b64encode(f.read()).decode()
return {
"contents": [{
"type": "resource",
"resource": {
"uri": uri,
"text": content,
"mimeType": "application/vnd.recordare.musicxml+xml"
}
}]
}
except FileNotFoundError:
raise Exception(f"MusicXML file not found: {file_info['output_path']}")
else:
raise Exception(f"Resource not found: {uri}")
elif uri.startswith("file://"):
# Return PDF file content
file_path = uri.replace("file://", "")
if not self._is_file_accessible(file_path):
raise Exception(f"File access denied: {file_path}")
try:
with open(file_path, "rb") as f:
content = base64.b64encode(f.read()).decode()
return {
"contents": [{
"type": "resource",
"resource": {
"uri": uri,
"text": content,
"mimeType": "application/pdf"
}
}]
}
except FileNotFoundError:
raise Exception(f"File not found: {file_path}")
else:
raise Exception(f"Unsupported URI scheme: {uri}")
def _is_file_accessible(self, file_path: str) -> bool:
"""Check if file is within allowed directories"""
abs_path = os.path.abspath(file_path)
return any(abs_path.startswith(os.path.abspath(d)) for d in self.allowed_directories)
def recognize_music_tool(self, pdf_uri: str, output_dir: str = None) -> Dict[str, Any]:
"""Tool for music recognition following MCP patterns"""
# Handle different URI formats
if pdf_uri.startswith("file://"):
pdf_path = pdf_uri.replace("file://", "")
elif pdf_uri.startswith("data:"):
# Handle data URIs (base64 encoded)
return self._process_data_uri(pdf_uri, output_dir)
else:
# Assume it's a direct file path
pdf_path = pdf_uri
if not self._is_file_accessible(pdf_path):
raise Exception(f"File access denied: {pdf_path}")
if not os.path.exists(pdf_path):
raise Exception(f"PDF file not found: {pdf_path}")
try:
output_file = self._recognize_music_core(pdf_path, output_dir)
# Store in processed files cache
file_id = f"music_{len(self.processed_files) + 1}_{int(datetime.now().timestamp())}"
self.processed_files[file_id] = {
"original_name": os.path.basename(pdf_path),
"original_path": pdf_path,
"output_path": output_file,
"processed_at": datetime.now().isoformat(),
"file_id": file_id
}
# Return MCP-compliant response
return {
"content": [{
"type": "text",
"text": f"✅ Successfully converted '{os.path.basename(pdf_path)}' to MusicXML.\n\n"
f"📁 Output file: {output_file}\n"
f"🔗 Resource URI: musicxml://{file_id}\n"
f"📊 File size: {os.path.getsize(output_file)} bytes\n\n"
f"You can now access this MusicXML file as a resource using the URI: `musicxml://{file_id}`"
}],
"isError": False
}
except Exception as e:
return {
"content": [{
"type": "text",
"text": f"❌ Music recognition failed: {str(e)}"
}],
"isError": True
}
def _process_data_uri(self, data_uri: str, output_dir: str = None) -> Dict[str, Any]:
"""Process base64 encoded data URI"""
try:
# Parse data URI: data:application/pdf;base64,<data>
header, data = data_uri.split(',', 1)
mime_type = header.split(';')[0].replace('data:', '')
if mime_type != 'application/pdf':
raise Exception(f"Unsupported MIME type: {mime_type}")
# Fix base64 padding if needed
data = self._fix_base64_padding(data)
# Decode base64 data
pdf_data = base64.b64decode(data)
# Save to temporary file
temp_dir = output_dir or "/tmp"
temp_pdf = os.path.join(temp_dir, f"temp_{int(datetime.now().timestamp())}.pdf")
with open(temp_pdf, 'wb') as f:
f.write(pdf_data)
# Process the file
return self.recognize_music_tool(f"file://{temp_pdf}", output_dir)
except Exception as e:
raise Exception(f"Failed to process data URI: {str(e)}")
def _fix_base64_padding(self, data: str) -> str:
"""Fix base64 padding to make it valid"""
# Remove any whitespace
data = data.strip().replace('\n', '').replace('\r', '').replace(' ', '')
# Add padding if needed
missing_padding = len(data) % 4
if missing_padding:
data += '=' * (4 - missing_padding)
return data
def _recognize_music_core(self, pdf_file_path: str, output_dir: str = None) -> str:
"""Core music recognition function"""
audiveris = "/opt/audiveris/bin/Audiveris"
if output_dir is None:
output_dir = "/tmp"
# Ensure output directory exists with proper permissions
os.makedirs(output_dir, exist_ok=True)
try:
os.chmod(output_dir, 0o755)
except Exception as e:
print(f"Warning: Could not set permissions for {output_dir}: {e}")
if not self._is_file_accessible(output_dir):
raise Exception(f"Output directory access denied: {output_dir}")
# Verify input file exists
if not os.path.exists(pdf_file_path):
raise Exception(f"Input PDF file not found: {pdf_file_path}")
pdf_file_name = os.path.basename(pdf_file_path)
pdf_name_without_ext = os.path.splitext(pdf_file_name)[0]
# Try both possible extensions
possible_extensions = [".mxl", ".xml", ".musicxml"]
output_files = [os.path.join(output_dir, f"{pdf_name_without_ext}{ext}") for ext in possible_extensions]
cmd = [
audiveris, "-batch", "-export", "-output", output_dir, pdf_file_path
]
print(f"Running Audiveris command: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
print(f"Audiveris stdout: {result.stdout}")
print(f"Audiveris stderr: {result.stderr}")
print(f"Audiveris return code: {result.returncode}")
# List files in output directory for debugging
if os.path.exists(output_dir):
files_in_output = os.listdir(output_dir)
print(f"Files in output directory: {files_in_output}")
# Check if any of the possible output files exist
existing_output = None
for output_file in output_files:
if os.path.exists(output_file):
existing_output = output_file
break
if existing_output:
print(f"Found output file: {existing_output}")
return existing_output
# If no output file found, provide detailed error
error_msg = f"Audiveris processing failed.\n"
error_msg += f"Return code: {result.returncode}\n"
error_msg += f"Stdout: {result.stdout}\n"
error_msg += f"Stderr: {result.stderr}\n"
error_msg += f"Expected files: {output_files}\n"
error_msg += f"Files in output dir: {os.listdir(output_dir) if os.path.exists(output_dir) else 'Directory does not exist'}\n"
raise Exception(error_msg)
def _test_audiveris(self):
"""Test if Audiveris is properly installed"""
audiveris = "/opt/audiveris/bin/Audiveris"
if not os.path.exists(audiveris):
print(f"⚠️ Warning: Audiveris not found at {audiveris}")
return False
try:
# Test Audiveris with help command
result = subprocess.run([audiveris, "-help"], capture_output=True, text=True, timeout=10)
if "Audiveris" in result.stdout or "Audiveris" in result.stderr:
print("✅ Audiveris installation verified")
return True
else:
print(f"⚠️ Warning: Audiveris may not be working properly")
print(f"Output: {result.stdout}")
print(f"Error: {result.stderr}")
return False
except Exception as e:
print(f"⚠️ Warning: Could not test Audiveris: {e}")
return False
# Initialize MCP Server with proper absolute paths
mcp_server = MusicRecognitionMCPServer(["/tmp", "/app/uploads", "/app/output"])
def recognize_music_gradio(pdf_file):
"""Gradio wrapper for music recognition"""
try:
print(f"Processing file: {pdf_file.name}")
result = mcp_server.recognize_music_tool(f"file://{pdf_file.name}")
if result.get("isError"):
error_msg = result["content"][0]["text"]
print(f"Error in music recognition: {error_msg}")
return None
# Extract file ID from the response
response_text = result["content"][0]["text"]
print(f"Response text: {response_text}")
if "musicxml://" in response_text:
file_id = response_text.split("musicxml://")[1].split("`")[0]
print(f"Extracted file ID: {file_id}")
if file_id in mcp_server.processed_files:
file_info = mcp_server.processed_files[file_id]
output_path = file_info["output_path"]
print(f"Output path from cache: {output_path}")
if os.path.exists(output_path):
print(f"✅ File exists: {output_path}")
return output_path
else:
print(f"❌ File not found: {output_path}")
# If the above doesn't work, try to find the file directly
pdf_basename = os.path.splitext(os.path.basename(pdf_file.name))[0]
possible_files = [
f"/tmp/{pdf_basename}.mxl",
f"/tmp/{pdf_basename}.xml",
f"/tmp/{pdf_basename}.musicxml"
]
for file_path in possible_files:
print(f"Checking: {file_path}")
if os.path.exists(file_path):
print(f"✅ Found file: {file_path}")
return file_path
print("❌ No output file found in any expected location")
print(f"Files in /tmp: {os.listdir('/tmp') if os.path.exists('/tmp') else 'Directory not found'}")
return None
except Exception as e:
print(f"Exception in Gradio wrapper: {str(e)}")
import traceback
traceback.print_exc()
return None
# Create Gradio interface
gradio_interface = gr.Interface(
fn=recognize_music_gradio,
inputs=gr.File(file_types=[".pdf"], label="Upload PDF music score"),
outputs=gr.File(label="Download MusicXML file"),
title="Music Score Recognition",
description="Upload a PDF music score and create a MusicXML file from it.",
)
# Removed run_gradio function - now running directly in main thread
if __name__ == "__main__":
print("===== Application Startup at {} =====".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
print()
print("🎵 MCP-Compliant Music Recognition Service Starting...")
print("📱 Gradio UI: http://localhost:7860")
# Run Gradio directly in the main thread to keep the application alive
try:
gradio_interface.launch(
server_name="0.0.0.0",
server_port=7860,
share=False, # Set to False for container deployment
prevent_thread_lock=False # Allow blocking to keep main thread alive
)
except Exception as e:
print(f"❌ Failed to start Gradio interface: {e}")
import traceback
traceback.print_exc()