|
import gradio as gr |
|
import subprocess |
|
import os |
|
import base64 |
|
from typing import List, Dict, Any |
|
from pathlib import Path |
|
from datetime import datetime |
|
|
|
class MusicRecognitionMCPServer: |
|
def __init__(self, allowed_directories: List[str] = None): |
|
"""Initialize MCP Server with configurable file access""" |
|
self.allowed_directories = allowed_directories or ["/tmp", "uploads", "output"] |
|
self.processed_files = {} |
|
|
|
|
|
abs_directories = [] |
|
for directory in self.allowed_directories: |
|
|
|
if not os.path.isabs(directory): |
|
directory = os.path.abspath(directory) |
|
|
|
abs_directories.append(directory) |
|
print(f"Checking directory: {directory}") |
|
|
|
|
|
if not os.path.exists(directory): |
|
try: |
|
os.makedirs(directory, exist_ok=True) |
|
os.chmod(directory, 0o755) |
|
print(f"✅ Directory created: {directory}") |
|
except PermissionError: |
|
print(f"⚠️ Directory doesn't exist but can't create: {directory}") |
|
print(f" This is normal for system directories like /tmp") |
|
except Exception as e: |
|
print(f"⚠️ Warning creating {directory}: {e}") |
|
else: |
|
print(f"✅ Directory exists: {directory}") |
|
|
|
self.allowed_directories = abs_directories |
|
print(f"Final allowed directories: {self.allowed_directories}") |
|
|
|
|
|
self._test_audiveris() |
|
|
|
def list_resources(self) -> List[Dict[str, Any]]: |
|
"""List available resources following MCP patterns""" |
|
resources = [] |
|
|
|
|
|
for file_id, file_info in self.processed_files.items(): |
|
resources.append({ |
|
"uri": f"musicxml://{file_id}", |
|
"name": file_info["original_name"], |
|
"description": f"MusicXML file converted from {file_info['original_name']} on {file_info['processed_at']}", |
|
"mimeType": "application/vnd.recordare.musicxml+xml" |
|
}) |
|
|
|
|
|
for directory in self.allowed_directories: |
|
if os.path.exists(directory): |
|
for file_path in Path(directory).rglob("*.pdf"): |
|
if self._is_file_accessible(str(file_path)): |
|
resources.append({ |
|
"uri": f"file://{file_path}", |
|
"name": file_path.name, |
|
"description": f"PDF music score available for processing: {file_path.name}", |
|
"mimeType": "application/pdf" |
|
}) |
|
|
|
return resources |
|
|
|
def read_resource(self, uri: str) -> Dict[str, Any]: |
|
"""Read resource content following MCP patterns""" |
|
if uri.startswith("musicxml://"): |
|
|
|
file_id = uri.replace("musicxml://", "") |
|
if file_id in self.processed_files: |
|
file_info = self.processed_files[file_id] |
|
try: |
|
with open(file_info["output_path"], "rb") as f: |
|
content = base64.b64encode(f.read()).decode() |
|
|
|
return { |
|
"contents": [{ |
|
"type": "resource", |
|
"resource": { |
|
"uri": uri, |
|
"text": content, |
|
"mimeType": "application/vnd.recordare.musicxml+xml" |
|
} |
|
}] |
|
} |
|
except FileNotFoundError: |
|
raise Exception(f"MusicXML file not found: {file_info['output_path']}") |
|
else: |
|
raise Exception(f"Resource not found: {uri}") |
|
|
|
elif uri.startswith("file://"): |
|
|
|
file_path = uri.replace("file://", "") |
|
if not self._is_file_accessible(file_path): |
|
raise Exception(f"File access denied: {file_path}") |
|
|
|
try: |
|
with open(file_path, "rb") as f: |
|
content = base64.b64encode(f.read()).decode() |
|
|
|
return { |
|
"contents": [{ |
|
"type": "resource", |
|
"resource": { |
|
"uri": uri, |
|
"text": content, |
|
"mimeType": "application/pdf" |
|
} |
|
}] |
|
} |
|
except FileNotFoundError: |
|
raise Exception(f"File not found: {file_path}") |
|
|
|
else: |
|
raise Exception(f"Unsupported URI scheme: {uri}") |
|
|
|
def _is_file_accessible(self, file_path: str) -> bool: |
|
"""Check if file is within allowed directories""" |
|
abs_path = os.path.abspath(file_path) |
|
return any(abs_path.startswith(os.path.abspath(d)) for d in self.allowed_directories) |
|
|
|
def recognize_music_tool(self, pdf_uri: str, output_dir: str = None) -> Dict[str, Any]: |
|
"""Tool for music recognition following MCP patterns""" |
|
|
|
if pdf_uri.startswith("file://"): |
|
pdf_path = pdf_uri.replace("file://", "") |
|
elif pdf_uri.startswith("data:"): |
|
|
|
return self._process_data_uri(pdf_uri, output_dir) |
|
else: |
|
|
|
pdf_path = pdf_uri |
|
|
|
if not self._is_file_accessible(pdf_path): |
|
raise Exception(f"File access denied: {pdf_path}") |
|
|
|
if not os.path.exists(pdf_path): |
|
raise Exception(f"PDF file not found: {pdf_path}") |
|
|
|
try: |
|
output_file = self._recognize_music_core(pdf_path, output_dir) |
|
|
|
|
|
file_id = f"music_{len(self.processed_files) + 1}_{int(datetime.now().timestamp())}" |
|
self.processed_files[file_id] = { |
|
"original_name": os.path.basename(pdf_path), |
|
"original_path": pdf_path, |
|
"output_path": output_file, |
|
"processed_at": datetime.now().isoformat(), |
|
"file_id": file_id |
|
} |
|
|
|
|
|
return { |
|
"content": [{ |
|
"type": "text", |
|
"text": f"✅ Successfully converted '{os.path.basename(pdf_path)}' to MusicXML.\n\n" |
|
f"📁 Output file: {output_file}\n" |
|
f"🔗 Resource URI: musicxml://{file_id}\n" |
|
f"📊 File size: {os.path.getsize(output_file)} bytes\n\n" |
|
f"You can now access this MusicXML file as a resource using the URI: `musicxml://{file_id}`" |
|
}], |
|
"isError": False |
|
} |
|
|
|
except Exception as e: |
|
return { |
|
"content": [{ |
|
"type": "text", |
|
"text": f"❌ Music recognition failed: {str(e)}" |
|
}], |
|
"isError": True |
|
} |
|
|
|
def _process_data_uri(self, data_uri: str, output_dir: str = None) -> Dict[str, Any]: |
|
"""Process base64 encoded data URI""" |
|
try: |
|
|
|
header, data = data_uri.split(',', 1) |
|
mime_type = header.split(';')[0].replace('data:', '') |
|
|
|
if mime_type != 'application/pdf': |
|
raise Exception(f"Unsupported MIME type: {mime_type}") |
|
|
|
|
|
data = self._fix_base64_padding(data) |
|
|
|
|
|
pdf_data = base64.b64decode(data) |
|
|
|
|
|
temp_dir = output_dir or "/tmp" |
|
temp_pdf = os.path.join(temp_dir, f"temp_{int(datetime.now().timestamp())}.pdf") |
|
|
|
with open(temp_pdf, 'wb') as f: |
|
f.write(pdf_data) |
|
|
|
|
|
return self.recognize_music_tool(f"file://{temp_pdf}", output_dir) |
|
|
|
except Exception as e: |
|
raise Exception(f"Failed to process data URI: {str(e)}") |
|
|
|
def _fix_base64_padding(self, data: str) -> str: |
|
"""Fix base64 padding to make it valid""" |
|
|
|
data = data.strip().replace('\n', '').replace('\r', '').replace(' ', '') |
|
|
|
|
|
missing_padding = len(data) % 4 |
|
if missing_padding: |
|
data += '=' * (4 - missing_padding) |
|
|
|
return data |
|
|
|
def _recognize_music_core(self, pdf_file_path: str, output_dir: str = None) -> str: |
|
"""Core music recognition function""" |
|
audiveris = "/opt/audiveris/bin/Audiveris" |
|
|
|
if output_dir is None: |
|
output_dir = "/tmp" |
|
|
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
try: |
|
os.chmod(output_dir, 0o755) |
|
except Exception as e: |
|
print(f"Warning: Could not set permissions for {output_dir}: {e}") |
|
|
|
if not self._is_file_accessible(output_dir): |
|
raise Exception(f"Output directory access denied: {output_dir}") |
|
|
|
|
|
if not os.path.exists(pdf_file_path): |
|
raise Exception(f"Input PDF file not found: {pdf_file_path}") |
|
|
|
pdf_file_name = os.path.basename(pdf_file_path) |
|
pdf_name_without_ext = os.path.splitext(pdf_file_name)[0] |
|
|
|
|
|
possible_extensions = [".mxl", ".xml", ".musicxml"] |
|
output_files = [os.path.join(output_dir, f"{pdf_name_without_ext}{ext}") for ext in possible_extensions] |
|
|
|
cmd = [ |
|
audiveris, "-batch", "-export", "-output", output_dir, pdf_file_path |
|
] |
|
|
|
print(f"Running Audiveris command: {' '.join(cmd)}") |
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
|
|
print(f"Audiveris stdout: {result.stdout}") |
|
print(f"Audiveris stderr: {result.stderr}") |
|
print(f"Audiveris return code: {result.returncode}") |
|
|
|
|
|
if os.path.exists(output_dir): |
|
files_in_output = os.listdir(output_dir) |
|
print(f"Files in output directory: {files_in_output}") |
|
|
|
|
|
existing_output = None |
|
for output_file in output_files: |
|
if os.path.exists(output_file): |
|
existing_output = output_file |
|
break |
|
|
|
if existing_output: |
|
print(f"Found output file: {existing_output}") |
|
return existing_output |
|
|
|
|
|
error_msg = f"Audiveris processing failed.\n" |
|
error_msg += f"Return code: {result.returncode}\n" |
|
error_msg += f"Stdout: {result.stdout}\n" |
|
error_msg += f"Stderr: {result.stderr}\n" |
|
error_msg += f"Expected files: {output_files}\n" |
|
error_msg += f"Files in output dir: {os.listdir(output_dir) if os.path.exists(output_dir) else 'Directory does not exist'}\n" |
|
|
|
raise Exception(error_msg) |
|
|
|
def _test_audiveris(self): |
|
"""Test if Audiveris is properly installed""" |
|
audiveris = "/opt/audiveris/bin/Audiveris" |
|
|
|
if not os.path.exists(audiveris): |
|
print(f"⚠️ Warning: Audiveris not found at {audiveris}") |
|
return False |
|
|
|
try: |
|
|
|
result = subprocess.run([audiveris, "-help"], capture_output=True, text=True, timeout=10) |
|
if "Audiveris" in result.stdout or "Audiveris" in result.stderr: |
|
print("✅ Audiveris installation verified") |
|
return True |
|
else: |
|
print(f"⚠️ Warning: Audiveris may not be working properly") |
|
print(f"Output: {result.stdout}") |
|
print(f"Error: {result.stderr}") |
|
return False |
|
except Exception as e: |
|
print(f"⚠️ Warning: Could not test Audiveris: {e}") |
|
return False |
|
|
|
|
|
mcp_server = MusicRecognitionMCPServer(["/tmp", "/app/uploads", "/app/output"]) |
|
|
|
def recognize_music_gradio(pdf_file): |
|
"""Gradio wrapper for music recognition""" |
|
try: |
|
print(f"Processing file: {pdf_file.name}") |
|
result = mcp_server.recognize_music_tool(f"file://{pdf_file.name}") |
|
|
|
if result.get("isError"): |
|
error_msg = result["content"][0]["text"] |
|
print(f"Error in music recognition: {error_msg}") |
|
return None |
|
|
|
|
|
response_text = result["content"][0]["text"] |
|
print(f"Response text: {response_text}") |
|
|
|
if "musicxml://" in response_text: |
|
file_id = response_text.split("musicxml://")[1].split("`")[0] |
|
print(f"Extracted file ID: {file_id}") |
|
|
|
if file_id in mcp_server.processed_files: |
|
file_info = mcp_server.processed_files[file_id] |
|
output_path = file_info["output_path"] |
|
print(f"Output path from cache: {output_path}") |
|
|
|
if os.path.exists(output_path): |
|
print(f"✅ File exists: {output_path}") |
|
return output_path |
|
else: |
|
print(f"❌ File not found: {output_path}") |
|
|
|
|
|
pdf_basename = os.path.splitext(os.path.basename(pdf_file.name))[0] |
|
possible_files = [ |
|
f"/tmp/{pdf_basename}.mxl", |
|
f"/tmp/{pdf_basename}.xml", |
|
f"/tmp/{pdf_basename}.musicxml" |
|
] |
|
|
|
for file_path in possible_files: |
|
print(f"Checking: {file_path}") |
|
if os.path.exists(file_path): |
|
print(f"✅ Found file: {file_path}") |
|
return file_path |
|
|
|
print("❌ No output file found in any expected location") |
|
print(f"Files in /tmp: {os.listdir('/tmp') if os.path.exists('/tmp') else 'Directory not found'}") |
|
return None |
|
|
|
except Exception as e: |
|
print(f"Exception in Gradio wrapper: {str(e)}") |
|
import traceback |
|
traceback.print_exc() |
|
return None |
|
|
|
|
|
gradio_interface = gr.Interface( |
|
fn=recognize_music_gradio, |
|
inputs=gr.File(file_types=[".pdf"], label="Upload PDF music score"), |
|
outputs=gr.File(label="Download MusicXML file"), |
|
title="Music Score Recognition", |
|
description="Upload a PDF music score and create a MusicXML file from it.", |
|
) |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
print("===== Application Startup at {} =====".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) |
|
print() |
|
|
|
print("🎵 MCP-Compliant Music Recognition Service Starting...") |
|
print("📱 Gradio UI: http://localhost:7860") |
|
|
|
|
|
try: |
|
gradio_interface.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
mcp_server=True, |
|
share=True, |
|
prevent_thread_lock=False |
|
) |
|
except Exception as e: |
|
print(f"❌ Failed to start Gradio interface: {e}") |
|
import traceback |
|
traceback.print_exc() |
|
|