|
import gradio as gr |
|
import subprocess |
|
import os |
|
import base64 |
|
from typing import List, Dict, Any |
|
from pathlib import Path |
|
from datetime import datetime |
|
|
|
class MusicRecognitionMCPServer: |
|
def __init__(self, allowed_directories: List[str] = None): |
|
"""Initialize MCP Server with configurable file access""" |
|
self.allowed_directories = allowed_directories or ["/tmp", "uploads", "output"] |
|
self.processed_files = {} |
|
|
|
|
|
abs_directories = [] |
|
for directory in self.allowed_directories: |
|
|
|
if not os.path.isabs(directory): |
|
directory = os.path.abspath(directory) |
|
|
|
abs_directories.append(directory) |
|
print(f"Checking directory: {directory}") |
|
|
|
|
|
if not os.path.exists(directory): |
|
try: |
|
os.makedirs(directory, exist_ok=True) |
|
os.chmod(directory, 0o755) |
|
print(f"β
Directory created: {directory}") |
|
except PermissionError: |
|
print(f"β οΈ Directory doesn't exist but can't create: {directory}") |
|
print(f" This is normal for system directories like /tmp") |
|
except Exception as e: |
|
print(f"β οΈ Warning creating {directory}: {e}") |
|
else: |
|
print(f"β
Directory exists: {directory}") |
|
|
|
self.allowed_directories = abs_directories |
|
print(f"Final allowed directories: {self.allowed_directories}") |
|
|
|
|
|
self._test_audiveris() |
|
|
|
def list_resources(self) -> List[Dict[str, Any]]: |
|
"""List available resources following MCP patterns""" |
|
resources = [] |
|
|
|
|
|
for file_id, file_info in self.processed_files.items(): |
|
resources.append({ |
|
"uri": f"musicxml://{file_id}", |
|
"name": file_info["original_name"], |
|
"description": f"MusicXML file converted from {file_info['original_name']} on {file_info['processed_at']}", |
|
"mimeType": "application/vnd.recordare.musicxml+xml" |
|
}) |
|
|
|
|
|
for directory in self.allowed_directories: |
|
if os.path.exists(directory): |
|
for file_path in Path(directory).rglob("*.pdf"): |
|
if self._is_file_accessible(str(file_path)): |
|
resources.append({ |
|
"uri": f"file://{file_path}", |
|
"name": file_path.name, |
|
"description": f"PDF music score available for processing: {file_path.name}", |
|
"mimeType": "application/pdf" |
|
}) |
|
|
|
return resources |
|
|
|
def read_resource(self, uri: str) -> Dict[str, Any]: |
|
"""Read resource content following MCP patterns""" |
|
if uri.startswith("musicxml://"): |
|
|
|
file_id = uri.replace("musicxml://", "") |
|
if file_id in self.processed_files: |
|
file_info = self.processed_files[file_id] |
|
try: |
|
with open(file_info["output_path"], "rb") as f: |
|
content = base64.b64encode(f.read()).decode() |
|
|
|
return { |
|
"contents": [{ |
|
"type": "resource", |
|
"resource": { |
|
"uri": uri, |
|
"text": content, |
|
"mimeType": "application/vnd.recordare.musicxml+xml" |
|
} |
|
}] |
|
} |
|
except FileNotFoundError: |
|
raise Exception(f"MusicXML file not found: {file_info['output_path']}") |
|
else: |
|
raise Exception(f"Resource not found: {uri}") |
|
|
|
elif uri.startswith("file://"): |
|
|
|
file_path = uri.replace("file://", "") |
|
if not self._is_file_accessible(file_path): |
|
raise Exception(f"File access denied: {file_path}") |
|
|
|
try: |
|
with open(file_path, "rb") as f: |
|
content = base64.b64encode(f.read()).decode() |
|
|
|
return { |
|
"contents": [{ |
|
"type": "resource", |
|
"resource": { |
|
"uri": uri, |
|
"text": content, |
|
"mimeType": "application/pdf" |
|
} |
|
}] |
|
} |
|
except FileNotFoundError: |
|
raise Exception(f"File not found: {file_path}") |
|
|
|
else: |
|
raise Exception(f"Unsupported URI scheme: {uri}") |
|
|
|
def _is_file_accessible(self, file_path: str) -> bool: |
|
"""Check if file is within allowed directories""" |
|
abs_path = os.path.abspath(file_path) |
|
return any(abs_path.startswith(os.path.abspath(d)) for d in self.allowed_directories) |
|
|
|
def recognize_music_tool(self, pdf_uri: str, output_dir: str = None) -> Dict[str, Any]: |
|
"""Tool for music recognition following MCP patterns""" |
|
|
|
if pdf_uri.startswith("file://"): |
|
pdf_path = pdf_uri.replace("file://", "") |
|
elif pdf_uri.startswith("data:"): |
|
|
|
return self._process_data_uri(pdf_uri, output_dir) |
|
else: |
|
|
|
pdf_path = pdf_uri |
|
|
|
if not self._is_file_accessible(pdf_path): |
|
raise Exception(f"File access denied: {pdf_path}") |
|
|
|
if not os.path.exists(pdf_path): |
|
raise Exception(f"PDF file not found: {pdf_path}") |
|
|
|
try: |
|
output_file = self._recognize_music_core(pdf_path, output_dir) |
|
|
|
|
|
file_id = f"music_{len(self.processed_files) + 1}_{int(datetime.now().timestamp())}" |
|
self.processed_files[file_id] = { |
|
"original_name": os.path.basename(pdf_path), |
|
"original_path": pdf_path, |
|
"output_path": output_file, |
|
"processed_at": datetime.now().isoformat(), |
|
"file_id": file_id |
|
} |
|
|
|
|
|
return { |
|
"content": [{ |
|
"type": "text", |
|
"text": f"β
Successfully converted '{os.path.basename(pdf_path)}' to MusicXML.\n\n" |
|
f"π Output file: {output_file}\n" |
|
f"π Resource URI: musicxml://{file_id}\n" |
|
f"π File size: {os.path.getsize(output_file)} bytes\n\n" |
|
f"You can now access this MusicXML file as a resource using the URI: `musicxml://{file_id}`" |
|
}], |
|
"isError": False |
|
} |
|
|
|
except Exception as e: |
|
return { |
|
"content": [{ |
|
"type": "text", |
|
"text": f"β Music recognition failed: {str(e)}" |
|
}], |
|
"isError": True |
|
} |
|
|
|
def _process_data_uri(self, data_uri: str, output_dir: str = None) -> Dict[str, Any]: |
|
"""Process base64 encoded data URI""" |
|
try: |
|
|
|
header, data = data_uri.split(',', 1) |
|
mime_type = header.split(';')[0].replace('data:', '') |
|
|
|
if mime_type != 'application/pdf': |
|
raise Exception(f"Unsupported MIME type: {mime_type}") |
|
|
|
|
|
data = self._fix_base64_padding(data) |
|
|
|
|
|
pdf_data = base64.b64decode(data) |
|
|
|
|
|
temp_dir = output_dir or "/tmp" |
|
temp_pdf = os.path.join(temp_dir, f"temp_{int(datetime.now().timestamp())}.pdf") |
|
|
|
with open(temp_pdf, 'wb') as f: |
|
f.write(pdf_data) |
|
|
|
|
|
return self.recognize_music_tool(f"file://{temp_pdf}", output_dir) |
|
|
|
except Exception as e: |
|
raise Exception(f"Failed to process data URI: {str(e)}") |
|
|
|
def _fix_base64_padding(self, data: str) -> str: |
|
"""Fix base64 padding to make it valid""" |
|
|
|
data = data.strip().replace('\n', '').replace('\r', '').replace(' ', '') |
|
|
|
|
|
missing_padding = len(data) % 4 |
|
if missing_padding: |
|
data += '=' * (4 - missing_padding) |
|
|
|
return data |
|
|
|
def _recognize_music_core(self, pdf_file_path: str, output_dir: str = None) -> str: |
|
"""Core music recognition function""" |
|
audiveris = "/opt/audiveris/bin/Audiveris" |
|
|
|
if output_dir is None: |
|
output_dir = "/tmp" |
|
|
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
try: |
|
os.chmod(output_dir, 0o755) |
|
except Exception as e: |
|
print(f"Warning: Could not set permissions for {output_dir}: {e}") |
|
|
|
if not self._is_file_accessible(output_dir): |
|
raise Exception(f"Output directory access denied: {output_dir}") |
|
|
|
|
|
if not os.path.exists(pdf_file_path): |
|
raise Exception(f"Input PDF file not found: {pdf_file_path}") |
|
|
|
pdf_file_name = os.path.basename(pdf_file_path) |
|
pdf_name_without_ext = os.path.splitext(pdf_file_name)[0] |
|
|
|
|
|
possible_extensions = [".mxl", ".xml", ".musicxml"] |
|
output_files = [os.path.join(output_dir, f"{pdf_name_without_ext}{ext}") for ext in possible_extensions] |
|
|
|
cmd = [ |
|
audiveris, "-batch", "-export", "-output", output_dir, pdf_file_path |
|
] |
|
|
|
print(f"Running Audiveris command: {' '.join(cmd)}") |
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
|
|
print(f"Audiveris stdout: {result.stdout}") |
|
print(f"Audiveris stderr: {result.stderr}") |
|
print(f"Audiveris return code: {result.returncode}") |
|
|
|
|
|
if os.path.exists(output_dir): |
|
files_in_output = os.listdir(output_dir) |
|
print(f"Files in output directory: {files_in_output}") |
|
|
|
|
|
existing_output = None |
|
for output_file in output_files: |
|
if os.path.exists(output_file): |
|
existing_output = output_file |
|
break |
|
|
|
if existing_output: |
|
print(f"Found output file: {existing_output}") |
|
return existing_output |
|
|
|
|
|
error_msg = f"Audiveris processing failed.\n" |
|
error_msg += f"Return code: {result.returncode}\n" |
|
error_msg += f"Stdout: {result.stdout}\n" |
|
error_msg += f"Stderr: {result.stderr}\n" |
|
error_msg += f"Expected files: {output_files}\n" |
|
error_msg += f"Files in output dir: {os.listdir(output_dir) if os.path.exists(output_dir) else 'Directory does not exist'}\n" |
|
|
|
raise Exception(error_msg) |
|
|
|
def _test_audiveris(self): |
|
"""Test if Audiveris is properly installed""" |
|
audiveris = "/opt/audiveris/bin/Audiveris" |
|
|
|
if not os.path.exists(audiveris): |
|
print(f"β οΈ Warning: Audiveris not found at {audiveris}") |
|
return False |
|
|
|
try: |
|
|
|
result = subprocess.run([audiveris, "-help"], capture_output=True, text=True, timeout=10) |
|
if "Audiveris" in result.stdout or "Audiveris" in result.stderr: |
|
print("β
Audiveris installation verified") |
|
return True |
|
else: |
|
print(f"β οΈ Warning: Audiveris may not be working properly") |
|
print(f"Output: {result.stdout}") |
|
print(f"Error: {result.stderr}") |
|
return False |
|
except Exception as e: |
|
print(f"β οΈ Warning: Could not test Audiveris: {e}") |
|
return False |
|
|
|
|
|
mcp_server = MusicRecognitionMCPServer(["/tmp", "/app/uploads", "/app/output"]) |
|
|
|
def recognize_music_gradio(pdf_file): |
|
"""Gradio wrapper for music recognition""" |
|
try: |
|
print(f"Processing file: {pdf_file.name}") |
|
result = mcp_server.recognize_music_tool(f"file://{pdf_file.name}") |
|
|
|
if result.get("isError"): |
|
error_msg = result["content"][0]["text"] |
|
print(f"Error in music recognition: {error_msg}") |
|
return None |
|
|
|
|
|
response_text = result["content"][0]["text"] |
|
print(f"Response text: {response_text}") |
|
|
|
if "musicxml://" in response_text: |
|
file_id = response_text.split("musicxml://")[1].split("`")[0] |
|
print(f"Extracted file ID: {file_id}") |
|
|
|
if file_id in mcp_server.processed_files: |
|
file_info = mcp_server.processed_files[file_id] |
|
output_path = file_info["output_path"] |
|
print(f"Output path from cache: {output_path}") |
|
|
|
if os.path.exists(output_path): |
|
print(f"β
File exists: {output_path}") |
|
return output_path |
|
else: |
|
print(f"β File not found: {output_path}") |
|
|
|
|
|
pdf_basename = os.path.splitext(os.path.basename(pdf_file.name))[0] |
|
possible_files = [ |
|
f"/tmp/{pdf_basename}.mxl", |
|
f"/tmp/{pdf_basename}.xml", |
|
f"/tmp/{pdf_basename}.musicxml" |
|
] |
|
|
|
for file_path in possible_files: |
|
print(f"Checking: {file_path}") |
|
if os.path.exists(file_path): |
|
print(f"β
Found file: {file_path}") |
|
return file_path |
|
|
|
print("β No output file found in any expected location") |
|
print(f"Files in /tmp: {os.listdir('/tmp') if os.path.exists('/tmp') else 'Directory not found'}") |
|
return None |
|
|
|
except Exception as e: |
|
print(f"Exception in Gradio wrapper: {str(e)}") |
|
import traceback |
|
traceback.print_exc() |
|
return None |
|
|
|
|
|
custom_css = """ |
|
.gradio-container { |
|
max-width: 1200px !important; |
|
margin: auto !important; |
|
} |
|
|
|
.main-header { |
|
text-align: center; |
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
|
color: white; |
|
padding: 2rem; |
|
border-radius: 15px; |
|
margin-bottom: 2rem; |
|
box-shadow: 0 8px 32px rgba(0,0,0,0.1); |
|
} |
|
|
|
.feature-card { |
|
background: white; |
|
border-radius: 12px; |
|
padding: 1.5rem; |
|
margin: 1rem 0; |
|
box-shadow: 0 4px 16px rgba(0,0,0,0.1); |
|
border-left: 4px solid #667eea; |
|
} |
|
|
|
.upload-area { |
|
border: 2px dashed #667eea; |
|
border-radius: 12px; |
|
padding: 2rem; |
|
text-align: center; |
|
background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); |
|
transition: all 0.3s ease; |
|
} |
|
|
|
.upload-area:hover { |
|
border-color: #764ba2; |
|
background: linear-gradient(135deg, #e3f2fd 0%, #bbdefb 100%); |
|
} |
|
|
|
.status-success { |
|
background: linear-gradient(135deg, #4caf50 0%, #45a049 100%); |
|
color: white; |
|
padding: 1rem; |
|
border-radius: 8px; |
|
margin: 1rem 0; |
|
} |
|
|
|
.status-error { |
|
background: linear-gradient(135deg, #f44336 0%, #d32f2f 100%); |
|
color: white; |
|
padding: 1rem; |
|
border-radius: 8px; |
|
margin: 1rem 0; |
|
} |
|
|
|
.info-box { |
|
background: linear-gradient(135deg, #e3f2fd 0%, #bbdefb 100%); |
|
border-radius: 8px; |
|
padding: 1rem; |
|
margin: 1rem 0; |
|
border-left: 4px solid #2196f3; |
|
} |
|
""" |
|
|
|
def create_gradio_interface(): |
|
with gr.Blocks(css=custom_css, title="πΌ Audiveris Music Score Recognition", theme=gr.themes.Soft()) as interface: |
|
|
|
gr.HTML(""" |
|
<div class="main-header"> |
|
<h1>πΌ Audiveris Music Score Recognition</h1> |
|
<p style="font-size: 1.2em; margin-top: 1rem; opacity: 0.9;"> |
|
Transform your PDF music scores into editable MusicXML files using advanced AI recognition |
|
</p> |
|
</div> |
|
""") |
|
|
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
gr.HTML(""" |
|
<div class="feature-card"> |
|
<h3>β¨ Features</h3> |
|
<ul style="line-height: 1.8;"> |
|
<li>π΅ High-accuracy music notation recognition</li> |
|
<li>π PDF to MusicXML conversion</li> |
|
<li>πΉ Supports complex musical scores</li> |
|
<li>β‘ Fast processing with Audiveris engine</li> |
|
<li>πΎ Downloadable results</li> |
|
</ul> |
|
</div> |
|
""") |
|
|
|
gr.HTML(""" |
|
<div class="info-box"> |
|
<h4>π How to use:</h4> |
|
<ol style="line-height: 1.6;"> |
|
<li>Upload your PDF music score</li> |
|
<li>Click "π΅ Convert to MusicXML"</li> |
|
<li>Wait for processing to complete</li> |
|
<li>Download your MusicXML file</li> |
|
</ol> |
|
</div> |
|
""") |
|
|
|
with gr.Column(scale=2): |
|
|
|
gr.HTML("<h3 style='text-align: center; color: #667eea;'>π Upload Your Music Score</h3>") |
|
|
|
pdf_input = gr.File( |
|
file_types=[".pdf"], |
|
label="Select PDF File", |
|
file_count="single", |
|
height=200, |
|
elem_classes=["upload-area"] |
|
) |
|
|
|
|
|
convert_btn = gr.Button( |
|
"π΅ Convert to MusicXML", |
|
variant="primary", |
|
size="lg", |
|
scale=1 |
|
) |
|
|
|
|
|
status_display = gr.HTML(visible=False) |
|
progress_bar = gr.Progress() |
|
|
|
|
|
gr.HTML("<h3 style='text-align: center; color: #667eea; margin-top: 2rem;'>π₯ Download Results</h3>") |
|
|
|
output_file = gr.File( |
|
label="MusicXML Output", |
|
visible=False, |
|
height=100 |
|
) |
|
|
|
|
|
processing_info = gr.Textbox( |
|
label="Processing Details", |
|
lines=8, |
|
visible=False, |
|
interactive=False |
|
) |
|
|
|
|
|
gr.HTML(""" |
|
<div style="text-align: center; margin-top: 3rem; padding: 2rem; background: #f8f9fa; border-radius: 12px;"> |
|
<p style="color: #666; margin: 0;"> |
|
Powered by <strong>Audiveris</strong> β’ Built with β€οΈ using Gradio |
|
</p> |
|
<p style="color: #888; font-size: 0.9em; margin-top: 0.5rem;"> |
|
For best results, use high-quality PDF scans with clear musical notation |
|
</p> |
|
</div> |
|
""") |
|
|
|
|
|
def process_with_feedback(pdf_file, progress=gr.Progress()): |
|
if pdf_file is None: |
|
return ( |
|
gr.HTML("<div class='status-error'>β Please upload a PDF file first!</div>", visible=True), |
|
None, |
|
gr.Textbox(visible=False), |
|
gr.File(visible=False) |
|
) |
|
|
|
try: |
|
|
|
progress(0.1, desc="π Analyzing PDF file...") |
|
|
|
status_html = """ |
|
<div class='status-success'> |
|
<h4>π Processing your music score...</h4> |
|
<p>File: <strong>{}</strong></p> |
|
<p>Size: <strong>{:.2f} MB</strong></p> |
|
<p>Please wait while Audiveris analyzes your score...</p> |
|
</div> |
|
""".format( |
|
pdf_file.name.split('/')[-1], |
|
os.path.getsize(pdf_file.name) / (1024*1024) |
|
) |
|
|
|
progress(0.3, desc="π΅ Running Audiveris recognition...") |
|
|
|
|
|
result_file = recognize_music_gradio(pdf_file) |
|
|
|
progress(0.9, desc="β
Finalizing results...") |
|
|
|
if result_file and os.path.exists(result_file): |
|
|
|
success_html = """ |
|
<div class='status-success'> |
|
<h4>β
Conversion completed successfully!</h4> |
|
<p>π Output: <strong>{}</strong></p> |
|
<p>π Size: <strong>{:.2f} KB</strong></p> |
|
<p>π Your MusicXML file is ready for download!</p> |
|
</div> |
|
""".format( |
|
os.path.basename(result_file), |
|
os.path.getsize(result_file) / 1024 |
|
) |
|
|
|
|
|
details = f"""β
CONVERSION SUCCESSFUL |
|
|
|
π Input File: {pdf_file.name.split('/')[-1]} |
|
π Input Size: {os.path.getsize(pdf_file.name) / (1024*1024):.2f} MB |
|
|
|
π΅ Output File: {os.path.basename(result_file)} |
|
π Output Size: {os.path.getsize(result_file) / 1024:.2f} KB |
|
|
|
β±οΈ Processing completed at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |
|
|
|
πΌ Your PDF music score has been successfully converted to MusicXML format! |
|
You can now download the file and use it in music notation software like MuseScore, Finale, or Sibelius.""" |
|
|
|
progress(1.0, desc="π Complete!") |
|
|
|
return ( |
|
gr.HTML(success_html, visible=True), |
|
gr.File(result_file, visible=True), |
|
gr.Textbox(details, visible=True), |
|
gr.File(visible=True) |
|
) |
|
else: |
|
|
|
error_html = """ |
|
<div class='status-error'> |
|
<h4>β Conversion failed</h4> |
|
<p>The music recognition process encountered an error.</p> |
|
<p>Please check that your PDF contains clear musical notation and try again.</p> |
|
</div> |
|
""" |
|
|
|
error_details = f"""β CONVERSION FAILED |
|
|
|
π Input File: {pdf_file.name.split('/')[-1]} |
|
π Input Size: {os.path.getsize(pdf_file.name) / (1024*1024):.2f} MB |
|
|
|
β οΈ Error: No output file was generated by Audiveris |
|
β±οΈ Failed at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |
|
|
|
π‘ Troubleshooting tips: |
|
β’ Ensure your PDF contains clear, high-quality musical notation |
|
β’ Check that the PDF is not password-protected |
|
β’ Try with a different PDF file |
|
β’ Make sure the musical notation is not handwritten""" |
|
|
|
return ( |
|
gr.HTML(error_html, visible=True), |
|
None, |
|
gr.Textbox(error_details, visible=True), |
|
gr.File(visible=False) |
|
) |
|
|
|
except Exception as e: |
|
|
|
error_html = f""" |
|
<div class='status-error'> |
|
<h4>β Processing Error</h4> |
|
<p>An unexpected error occurred: <code>{str(e)}</code></p> |
|
<p>Please try again or contact support if the problem persists.</p> |
|
</div> |
|
""" |
|
|
|
error_details = f"""β PROCESSING ERROR |
|
|
|
π Input File: {pdf_file.name.split('/')[-1] if pdf_file else 'Unknown'} |
|
β οΈ Error: {str(e)} |
|
β±οΈ Failed at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |
|
|
|
π§ Technical Details: |
|
{str(e)} |
|
|
|
Please try again with a different file or contact support.""" |
|
|
|
return ( |
|
gr.HTML(error_html, visible=True), |
|
None, |
|
gr.Textbox(error_details, visible=True), |
|
gr.File(visible=False) |
|
) |
|
|
|
|
|
convert_btn.click( |
|
fn=process_with_feedback, |
|
inputs=[pdf_input], |
|
outputs=[status_display, output_file, processing_info, output_file], |
|
show_progress=True |
|
) |
|
|
|
|
|
pdf_input.change( |
|
fn=lambda: (gr.HTML(visible=False), gr.Textbox(visible=False), gr.File(visible=False)), |
|
outputs=[status_display, processing_info, output_file] |
|
) |
|
|
|
return interface |
|
|
|
|
|
gradio_interface = create_gradio_interface() |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
print("===== Application Startup at {} =====".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) |
|
print() |
|
|
|
print("π΅ MCP-Compliant Music Recognition Service Starting...") |
|
print("π± Gradio UI: http://localhost:7860") |
|
|
|
|
|
try: |
|
gradio_interface.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
mcp_server=True, |
|
share=True, |
|
prevent_thread_lock=False |
|
) |
|
except Exception as e: |
|
print(f"β Failed to start Gradio interface: {e}") |
|
import traceback |
|
traceback.print_exc() |
|
|