audiveris / app.py
judith.ibanez
add some modifications
1c5ed00
raw
history blame
27.4 kB
import gradio as gr
import subprocess
import os
import base64
from typing import List, Dict, Any
from pathlib import Path
from datetime import datetime
class MusicRecognitionMCPServer:
def __init__(self, allowed_directories: List[str] = None):
"""Initialize MCP Server with configurable file access"""
self.allowed_directories = allowed_directories or ["/tmp", "uploads", "output"]
self.processed_files = {} # Cache of processed results
# Convert relative paths to absolute and check access
abs_directories = []
for directory in self.allowed_directories:
# Convert relative paths to absolute
if not os.path.isabs(directory):
directory = os.path.abspath(directory)
abs_directories.append(directory)
print(f"Checking directory: {directory}")
# Only try to create if it doesn't exist and we have permission
if not os.path.exists(directory):
try:
os.makedirs(directory, exist_ok=True)
os.chmod(directory, 0o755)
print(f"βœ… Directory created: {directory}")
except PermissionError:
print(f"⚠️ Directory doesn't exist but can't create: {directory}")
print(f" This is normal for system directories like /tmp")
except Exception as e:
print(f"⚠️ Warning creating {directory}: {e}")
else:
print(f"βœ… Directory exists: {directory}")
self.allowed_directories = abs_directories
print(f"Final allowed directories: {self.allowed_directories}")
# Test Audiveris installation
self._test_audiveris()
def list_resources(self) -> List[Dict[str, Any]]:
"""List available resources following MCP patterns"""
resources = []
# Add processed files as resources
for file_id, file_info in self.processed_files.items():
resources.append({
"uri": f"musicxml://{file_id}",
"name": file_info["original_name"],
"description": f"MusicXML file converted from {file_info['original_name']} on {file_info['processed_at']}",
"mimeType": "application/vnd.recordare.musicxml+xml"
})
# Add available PDF files in allowed directories
for directory in self.allowed_directories:
if os.path.exists(directory):
for file_path in Path(directory).rglob("*.pdf"):
if self._is_file_accessible(str(file_path)):
resources.append({
"uri": f"file://{file_path}",
"name": file_path.name,
"description": f"PDF music score available for processing: {file_path.name}",
"mimeType": "application/pdf"
})
return resources
def read_resource(self, uri: str) -> Dict[str, Any]:
"""Read resource content following MCP patterns"""
if uri.startswith("musicxml://"):
# Return processed MusicXML file
file_id = uri.replace("musicxml://", "")
if file_id in self.processed_files:
file_info = self.processed_files[file_id]
try:
with open(file_info["output_path"], "rb") as f:
content = base64.b64encode(f.read()).decode()
return {
"contents": [{
"type": "resource",
"resource": {
"uri": uri,
"text": content,
"mimeType": "application/vnd.recordare.musicxml+xml"
}
}]
}
except FileNotFoundError:
raise Exception(f"MusicXML file not found: {file_info['output_path']}")
else:
raise Exception(f"Resource not found: {uri}")
elif uri.startswith("file://"):
# Return PDF file content
file_path = uri.replace("file://", "")
if not self._is_file_accessible(file_path):
raise Exception(f"File access denied: {file_path}")
try:
with open(file_path, "rb") as f:
content = base64.b64encode(f.read()).decode()
return {
"contents": [{
"type": "resource",
"resource": {
"uri": uri,
"text": content,
"mimeType": "application/pdf"
}
}]
}
except FileNotFoundError:
raise Exception(f"File not found: {file_path}")
else:
raise Exception(f"Unsupported URI scheme: {uri}")
def _is_file_accessible(self, file_path: str) -> bool:
"""Check if file is within allowed directories"""
abs_path = os.path.abspath(file_path)
return any(abs_path.startswith(os.path.abspath(d)) for d in self.allowed_directories)
def recognize_music_tool(self, pdf_uri: str, output_dir: str = None) -> Dict[str, Any]:
"""Tool for music recognition following MCP patterns"""
# Handle different URI formats
if pdf_uri.startswith("file://"):
pdf_path = pdf_uri.replace("file://", "")
elif pdf_uri.startswith("data:"):
# Handle data URIs (base64 encoded)
return self._process_data_uri(pdf_uri, output_dir)
else:
# Assume it's a direct file path
pdf_path = pdf_uri
if not self._is_file_accessible(pdf_path):
raise Exception(f"File access denied: {pdf_path}")
if not os.path.exists(pdf_path):
raise Exception(f"PDF file not found: {pdf_path}")
try:
output_file = self._recognize_music_core(pdf_path, output_dir)
# Store in processed files cache
file_id = f"music_{len(self.processed_files) + 1}_{int(datetime.now().timestamp())}"
self.processed_files[file_id] = {
"original_name": os.path.basename(pdf_path),
"original_path": pdf_path,
"output_path": output_file,
"processed_at": datetime.now().isoformat(),
"file_id": file_id
}
# Return MCP-compliant response
return {
"content": [{
"type": "text",
"text": f"βœ… Successfully converted '{os.path.basename(pdf_path)}' to MusicXML.\n\n"
f"πŸ“ Output file: {output_file}\n"
f"πŸ”— Resource URI: musicxml://{file_id}\n"
f"πŸ“Š File size: {os.path.getsize(output_file)} bytes\n\n"
f"You can now access this MusicXML file as a resource using the URI: `musicxml://{file_id}`"
}],
"isError": False
}
except Exception as e:
return {
"content": [{
"type": "text",
"text": f"❌ Music recognition failed: {str(e)}"
}],
"isError": True
}
def _process_data_uri(self, data_uri: str, output_dir: str = None) -> Dict[str, Any]:
"""Process base64 encoded data URI"""
try:
# Parse data URI: data:application/pdf;base64,<data>
header, data = data_uri.split(',', 1)
mime_type = header.split(';')[0].replace('data:', '')
if mime_type != 'application/pdf':
raise Exception(f"Unsupported MIME type: {mime_type}")
# Fix base64 padding if needed
data = self._fix_base64_padding(data)
# Decode base64 data
pdf_data = base64.b64decode(data)
# Save to temporary file
temp_dir = output_dir or "/tmp"
temp_pdf = os.path.join(temp_dir, f"temp_{int(datetime.now().timestamp())}.pdf")
with open(temp_pdf, 'wb') as f:
f.write(pdf_data)
# Process the file
return self.recognize_music_tool(f"file://{temp_pdf}", output_dir)
except Exception as e:
raise Exception(f"Failed to process data URI: {str(e)}")
def _fix_base64_padding(self, data: str) -> str:
"""Fix base64 padding to make it valid"""
# Remove any whitespace
data = data.strip().replace('\n', '').replace('\r', '').replace(' ', '')
# Add padding if needed
missing_padding = len(data) % 4
if missing_padding:
data += '=' * (4 - missing_padding)
return data
def _recognize_music_core(self, pdf_file_path: str, output_dir: str = None) -> str:
"""Core music recognition function"""
audiveris = "/opt/audiveris/bin/Audiveris"
if output_dir is None:
output_dir = "/tmp"
# Ensure output directory exists with proper permissions
os.makedirs(output_dir, exist_ok=True)
try:
os.chmod(output_dir, 0o755)
except Exception as e:
print(f"Warning: Could not set permissions for {output_dir}: {e}")
if not self._is_file_accessible(output_dir):
raise Exception(f"Output directory access denied: {output_dir}")
# Verify input file exists
if not os.path.exists(pdf_file_path):
raise Exception(f"Input PDF file not found: {pdf_file_path}")
pdf_file_name = os.path.basename(pdf_file_path)
pdf_name_without_ext = os.path.splitext(pdf_file_name)[0]
# Try both possible extensions
possible_extensions = [".mxl", ".xml", ".musicxml"]
output_files = [os.path.join(output_dir, f"{pdf_name_without_ext}{ext}") for ext in possible_extensions]
cmd = [
audiveris, "-batch", "-export", "-output", output_dir, pdf_file_path
]
print(f"Running Audiveris command: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
print(f"Audiveris stdout: {result.stdout}")
print(f"Audiveris stderr: {result.stderr}")
print(f"Audiveris return code: {result.returncode}")
# List files in output directory for debugging
if os.path.exists(output_dir):
files_in_output = os.listdir(output_dir)
print(f"Files in output directory: {files_in_output}")
# Check if any of the possible output files exist
existing_output = None
for output_file in output_files:
if os.path.exists(output_file):
existing_output = output_file
break
if existing_output:
print(f"Found output file: {existing_output}")
return existing_output
# If no output file found, provide detailed error
error_msg = f"Audiveris processing failed.\n"
error_msg += f"Return code: {result.returncode}\n"
error_msg += f"Stdout: {result.stdout}\n"
error_msg += f"Stderr: {result.stderr}\n"
error_msg += f"Expected files: {output_files}\n"
error_msg += f"Files in output dir: {os.listdir(output_dir) if os.path.exists(output_dir) else 'Directory does not exist'}\n"
raise Exception(error_msg)
def _test_audiveris(self):
"""Test if Audiveris is properly installed"""
audiveris = "/opt/audiveris/bin/Audiveris"
if not os.path.exists(audiveris):
print(f"⚠️ Warning: Audiveris not found at {audiveris}")
return False
try:
# Test Audiveris with help command
result = subprocess.run([audiveris, "-help"], capture_output=True, text=True, timeout=10)
if "Audiveris" in result.stdout or "Audiveris" in result.stderr:
print("βœ… Audiveris installation verified")
return True
else:
print(f"⚠️ Warning: Audiveris may not be working properly")
print(f"Output: {result.stdout}")
print(f"Error: {result.stderr}")
return False
except Exception as e:
print(f"⚠️ Warning: Could not test Audiveris: {e}")
return False
# Initialize MCP Server with proper absolute paths
mcp_server = MusicRecognitionMCPServer(["/tmp", "/app/uploads", "/app/output"])
def recognize_music_gradio(pdf_file):
"""Gradio wrapper for music recognition"""
try:
print(f"Processing file: {pdf_file.name}")
result = mcp_server.recognize_music_tool(f"file://{pdf_file.name}")
if result.get("isError"):
error_msg = result["content"][0]["text"]
print(f"Error in music recognition: {error_msg}")
return None
# Extract file ID from the response
response_text = result["content"][0]["text"]
print(f"Response text: {response_text}")
if "musicxml://" in response_text:
file_id = response_text.split("musicxml://")[1].split("`")[0]
print(f"Extracted file ID: {file_id}")
if file_id in mcp_server.processed_files:
file_info = mcp_server.processed_files[file_id]
output_path = file_info["output_path"]
print(f"Output path from cache: {output_path}")
if os.path.exists(output_path):
print(f"βœ… File exists: {output_path}")
return output_path
else:
print(f"❌ File not found: {output_path}")
# If the above doesn't work, try to find the file directly
pdf_basename = os.path.splitext(os.path.basename(pdf_file.name))[0]
possible_files = [
f"/tmp/{pdf_basename}.mxl",
f"/tmp/{pdf_basename}.xml",
f"/tmp/{pdf_basename}.musicxml"
]
for file_path in possible_files:
print(f"Checking: {file_path}")
if os.path.exists(file_path):
print(f"βœ… Found file: {file_path}")
return file_path
print("❌ No output file found in any expected location")
print(f"Files in /tmp: {os.listdir('/tmp') if os.path.exists('/tmp') else 'Directory not found'}")
return None
except Exception as e:
print(f"Exception in Gradio wrapper: {str(e)}")
import traceback
traceback.print_exc()
return None
# Create enhanced Gradio interface with custom CSS and better UX
custom_css = """
.gradio-container {
max-width: 1200px !important;
margin: auto !important;
}
.main-header {
text-align: center;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 2rem;
border-radius: 15px;
margin-bottom: 2rem;
box-shadow: 0 8px 32px rgba(0,0,0,0.1);
}
.feature-card {
background: white;
border-radius: 12px;
padding: 1.5rem;
margin: 1rem 0;
box-shadow: 0 4px 16px rgba(0,0,0,0.1);
border-left: 4px solid #667eea;
}
.upload-area {
border: 2px dashed #667eea;
border-radius: 12px;
padding: 2rem;
text-align: center;
background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%);
transition: all 0.3s ease;
}
.upload-area:hover {
border-color: #764ba2;
background: linear-gradient(135deg, #e3f2fd 0%, #bbdefb 100%);
}
.status-success {
background: linear-gradient(135deg, #4caf50 0%, #45a049 100%);
color: white;
padding: 1rem;
border-radius: 8px;
margin: 1rem 0;
}
.status-error {
background: linear-gradient(135deg, #f44336 0%, #d32f2f 100%);
color: white;
padding: 1rem;
border-radius: 8px;
margin: 1rem 0;
}
.info-box {
background: linear-gradient(135deg, #e3f2fd 0%, #bbdefb 100%);
border-radius: 8px;
padding: 1rem;
margin: 1rem 0;
border-left: 4px solid #2196f3;
}
"""
def create_gradio_interface():
with gr.Blocks(css=custom_css, title="🎼 Audiveris Music Score Recognition", theme=gr.themes.Soft()) as interface:
# Header
gr.HTML("""
<div class="main-header">
<h1>🎼 Audiveris Music Score Recognition</h1>
<p style="font-size: 1.2em; margin-top: 1rem; opacity: 0.9;">
Transform your PDF music scores into editable MusicXML files using advanced AI recognition
</p>
</div>
""")
# Main content area
with gr.Row():
with gr.Column(scale=1):
gr.HTML("""
<div class="feature-card">
<h3>✨ Features</h3>
<ul style="line-height: 1.8;">
<li>🎡 High-accuracy music notation recognition</li>
<li>πŸ“„ PDF to MusicXML conversion</li>
<li>🎹 Supports complex musical scores</li>
<li>⚑ Fast processing with Audiveris engine</li>
<li>πŸ’Ύ Downloadable results</li>
</ul>
</div>
""")
gr.HTML("""
<div class="info-box">
<h4>πŸ“‹ How to use:</h4>
<ol style="line-height: 1.6;">
<li>Upload your PDF music score</li>
<li>Click "🎡 Convert to MusicXML"</li>
<li>Wait for processing to complete</li>
<li>Download your MusicXML file</li>
</ol>
</div>
""")
with gr.Column(scale=2):
# File upload section
gr.HTML("<h3 style='text-align: center; color: #667eea;'>πŸ“ Upload Your Music Score</h3>")
pdf_input = gr.File(
file_types=[".pdf"],
label="Select PDF File",
file_count="single",
height=200,
elem_classes=["upload-area"]
)
# Processing button
convert_btn = gr.Button(
"🎡 Convert to MusicXML",
variant="primary",
size="lg",
scale=1
)
# Status and progress
status_display = gr.HTML(visible=False)
progress_bar = gr.Progress()
# Output section
gr.HTML("<h3 style='text-align: center; color: #667eea; margin-top: 2rem;'>πŸ“₯ Download Results</h3>")
output_file = gr.File(
label="MusicXML Output",
visible=False,
height=100
)
# Processing info
processing_info = gr.Textbox(
label="Processing Details",
lines=8,
visible=False,
interactive=False
)
# Footer
gr.HTML("""
<div style="text-align: center; margin-top: 3rem; padding: 2rem; background: #f8f9fa; border-radius: 12px;">
<p style="color: #666; margin: 0;">
Powered by <strong>Audiveris</strong> β€’ Built with ❀️ using Gradio
</p>
<p style="color: #888; font-size: 0.9em; margin-top: 0.5rem;">
For best results, use high-quality PDF scans with clear musical notation
</p>
</div>
""")
# Enhanced processing function with better feedback
def process_with_feedback(pdf_file, progress=gr.Progress()):
if pdf_file is None:
return (
gr.HTML("<div class='status-error'>❌ Please upload a PDF file first!</div>", visible=True),
None,
gr.Textbox(visible=False),
gr.File(visible=False)
)
try:
# Show processing status
progress(0.1, desc="πŸ“„ Analyzing PDF file...")
status_html = """
<div class='status-success'>
<h4>πŸ”„ Processing your music score...</h4>
<p>File: <strong>{}</strong></p>
<p>Size: <strong>{:.2f} MB</strong></p>
<p>Please wait while Audiveris analyzes your score...</p>
</div>
""".format(
pdf_file.name.split('/')[-1],
os.path.getsize(pdf_file.name) / (1024*1024)
)
progress(0.3, desc="🎡 Running Audiveris recognition...")
# Process the file
result_file = recognize_music_gradio(pdf_file)
progress(0.9, desc="βœ… Finalizing results...")
if result_file and os.path.exists(result_file):
# Success
success_html = """
<div class='status-success'>
<h4>βœ… Conversion completed successfully!</h4>
<p>πŸ“ Output: <strong>{}</strong></p>
<p>πŸ“Š Size: <strong>{:.2f} KB</strong></p>
<p>πŸŽ‰ Your MusicXML file is ready for download!</p>
</div>
""".format(
os.path.basename(result_file),
os.path.getsize(result_file) / 1024
)
# Processing details
details = f"""βœ… CONVERSION SUCCESSFUL
πŸ“„ Input File: {pdf_file.name.split('/')[-1]}
πŸ“Š Input Size: {os.path.getsize(pdf_file.name) / (1024*1024):.2f} MB
🎡 Output File: {os.path.basename(result_file)}
πŸ“Š Output Size: {os.path.getsize(result_file) / 1024:.2f} KB
⏱️ Processing completed at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
🎼 Your PDF music score has been successfully converted to MusicXML format!
You can now download the file and use it in music notation software like MuseScore, Finale, or Sibelius."""
progress(1.0, desc="πŸŽ‰ Complete!")
return (
gr.HTML(success_html, visible=True),
gr.File(result_file, visible=True),
gr.Textbox(details, visible=True),
gr.File(visible=True)
)
else:
# Failure
error_html = """
<div class='status-error'>
<h4>❌ Conversion failed</h4>
<p>The music recognition process encountered an error.</p>
<p>Please check that your PDF contains clear musical notation and try again.</p>
</div>
"""
error_details = f"""❌ CONVERSION FAILED
πŸ“„ Input File: {pdf_file.name.split('/')[-1]}
πŸ“Š Input Size: {os.path.getsize(pdf_file.name) / (1024*1024):.2f} MB
⚠️ Error: No output file was generated by Audiveris
⏱️ Failed at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
πŸ’‘ Troubleshooting tips:
β€’ Ensure your PDF contains clear, high-quality musical notation
β€’ Check that the PDF is not password-protected
β€’ Try with a different PDF file
β€’ Make sure the musical notation is not handwritten"""
return (
gr.HTML(error_html, visible=True),
None,
gr.Textbox(error_details, visible=True),
gr.File(visible=False)
)
except Exception as e:
# Exception handling
error_html = f"""
<div class='status-error'>
<h4>❌ Processing Error</h4>
<p>An unexpected error occurred: <code>{str(e)}</code></p>
<p>Please try again or contact support if the problem persists.</p>
</div>
"""
error_details = f"""❌ PROCESSING ERROR
πŸ“„ Input File: {pdf_file.name.split('/')[-1] if pdf_file else 'Unknown'}
⚠️ Error: {str(e)}
⏱️ Failed at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
πŸ”§ Technical Details:
{str(e)}
Please try again with a different file or contact support."""
return (
gr.HTML(error_html, visible=True),
None,
gr.Textbox(error_details, visible=True),
gr.File(visible=False)
)
# Connect the button to the processing function
convert_btn.click(
fn=process_with_feedback,
inputs=[pdf_input],
outputs=[status_display, output_file, processing_info, output_file],
show_progress=True
)
# Auto-hide status when new file is uploaded
pdf_input.change(
fn=lambda: (gr.HTML(visible=False), gr.Textbox(visible=False), gr.File(visible=False)),
outputs=[status_display, processing_info, output_file]
)
return interface
# Create the enhanced interface
gradio_interface = create_gradio_interface()
# Removed run_gradio function - now running directly in main thread
if __name__ == "__main__":
print("===== Application Startup at {} =====".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
print()
print("🎡 MCP-Compliant Music Recognition Service Starting...")
print("πŸ“± Gradio UI: http://localhost:7860")
# Run Gradio directly in the main thread to keep the application alive
try:
gradio_interface.launch(
server_name="0.0.0.0",
server_port=7860,
mcp_server=True,
share=True, # Set to False for container deployment
prevent_thread_lock=False # Allow blocking to keep main thread alive
)
except Exception as e:
print(f"❌ Failed to start Gradio interface: {e}")
import traceback
traceback.print_exc()