File size: 16,518 Bytes
43f7000
 
 
11a8c1f
 
 
 
43f7000
11a8c1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3723edd
11a8c1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43f7000
11a8c1f
 
 
43f7000
11a8c1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3723edd
 
43f7000
11a8c1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3723edd
 
 
11a8c1f
 
 
 
 
 
 
 
 
3723edd
11a8c1f
 
 
 
 
 
 
43f7000
11a8c1f
 
 
43f7000
 
11a8c1f
 
43f7000
 
3723edd
11a8c1f
 
3723edd
 
11a8c1f
 
 
3723edd
 
 
 
 
 
4dd19d4
15e4ee8
3723edd
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
import gradio as gr
import subprocess
import os
import base64
from typing import List, Dict, Any
from pathlib import Path
from datetime import datetime

class MusicRecognitionMCPServer:
    def __init__(self, allowed_directories: List[str] = None):
        """Initialize MCP Server with configurable file access"""
        self.allowed_directories = allowed_directories or ["/tmp", "uploads", "output"]
        self.processed_files = {}  # Cache of processed results
        
        # Convert relative paths to absolute and check access
        abs_directories = []
        for directory in self.allowed_directories:
            # Convert relative paths to absolute
            if not os.path.isabs(directory):
                directory = os.path.abspath(directory)
            
            abs_directories.append(directory)
            print(f"Checking directory: {directory}")
            
            # Only try to create if it doesn't exist and we have permission
            if not os.path.exists(directory):
                try:
                    os.makedirs(directory, exist_ok=True)
                    os.chmod(directory, 0o755)
                    print(f"βœ… Directory created: {directory}")
                except PermissionError:
                    print(f"⚠️ Directory doesn't exist but can't create: {directory}")
                    print(f"   This is normal for system directories like /tmp")
                except Exception as e:
                    print(f"⚠️ Warning creating {directory}: {e}")
            else:
                print(f"βœ… Directory exists: {directory}")
        
        self.allowed_directories = abs_directories
        print(f"Final allowed directories: {self.allowed_directories}")
        
        # Test Audiveris installation
        self._test_audiveris()
    
    def list_resources(self) -> List[Dict[str, Any]]:
        """List available resources following MCP patterns"""
        resources = []
        
        # Add processed files as resources
        for file_id, file_info in self.processed_files.items():
            resources.append({
                "uri": f"musicxml://{file_id}",
                "name": file_info["original_name"],
                "description": f"MusicXML file converted from {file_info['original_name']} on {file_info['processed_at']}",
                "mimeType": "application/vnd.recordare.musicxml+xml"
            })
        
        # Add available PDF files in allowed directories
        for directory in self.allowed_directories:
            if os.path.exists(directory):
                for file_path in Path(directory).rglob("*.pdf"):
                    if self._is_file_accessible(str(file_path)):
                        resources.append({
                            "uri": f"file://{file_path}",
                            "name": file_path.name,
                            "description": f"PDF music score available for processing: {file_path.name}",
                            "mimeType": "application/pdf"
                        })
        
        return resources
    
    def read_resource(self, uri: str) -> Dict[str, Any]:
        """Read resource content following MCP patterns"""
        if uri.startswith("musicxml://"):
            # Return processed MusicXML file
            file_id = uri.replace("musicxml://", "")
            if file_id in self.processed_files:
                file_info = self.processed_files[file_id]
                try:
                    with open(file_info["output_path"], "rb") as f:
                        content = base64.b64encode(f.read()).decode()
                    
                    return {
                        "contents": [{
                            "type": "resource",
                            "resource": {
                                "uri": uri,
                                "text": content,
                                "mimeType": "application/vnd.recordare.musicxml+xml"
                            }
                        }]
                    }
                except FileNotFoundError:
                    raise Exception(f"MusicXML file not found: {file_info['output_path']}")
            else:
                raise Exception(f"Resource not found: {uri}")
        
        elif uri.startswith("file://"):
            # Return PDF file content
            file_path = uri.replace("file://", "")
            if not self._is_file_accessible(file_path):
                raise Exception(f"File access denied: {file_path}")
            
            try:
                with open(file_path, "rb") as f:
                    content = base64.b64encode(f.read()).decode()
                
                return {
                    "contents": [{
                        "type": "resource", 
                        "resource": {
                            "uri": uri,
                            "text": content,
                            "mimeType": "application/pdf"
                        }
                    }]
                }
            except FileNotFoundError:
                raise Exception(f"File not found: {file_path}")
        
        else:
            raise Exception(f"Unsupported URI scheme: {uri}")
    
    def _is_file_accessible(self, file_path: str) -> bool:
        """Check if file is within allowed directories"""
        abs_path = os.path.abspath(file_path)
        return any(abs_path.startswith(os.path.abspath(d)) for d in self.allowed_directories)
    
    def recognize_music_tool(self, pdf_uri: str, output_dir: str = None) -> Dict[str, Any]:
        """Tool for music recognition following MCP patterns"""
        # Handle different URI formats
        if pdf_uri.startswith("file://"):
            pdf_path = pdf_uri.replace("file://", "")
        elif pdf_uri.startswith("data:"):
            # Handle data URIs (base64 encoded)
            return self._process_data_uri(pdf_uri, output_dir)
        else:
            # Assume it's a direct file path
            pdf_path = pdf_uri
        
        if not self._is_file_accessible(pdf_path):
            raise Exception(f"File access denied: {pdf_path}")
        
        if not os.path.exists(pdf_path):
            raise Exception(f"PDF file not found: {pdf_path}")
        
        try:
            output_file = self._recognize_music_core(pdf_path, output_dir)
            
            # Store in processed files cache
            file_id = f"music_{len(self.processed_files) + 1}_{int(datetime.now().timestamp())}"
            self.processed_files[file_id] = {
                "original_name": os.path.basename(pdf_path),
                "original_path": pdf_path,
                "output_path": output_file,
                "processed_at": datetime.now().isoformat(),
                "file_id": file_id
            }
            
            # Return MCP-compliant response
            return {
                "content": [{
                    "type": "text",
                    "text": f"βœ… Successfully converted '{os.path.basename(pdf_path)}' to MusicXML.\n\n"
                           f"πŸ“ Output file: {output_file}\n"
                           f"πŸ”— Resource URI: musicxml://{file_id}\n"
                           f"πŸ“Š File size: {os.path.getsize(output_file)} bytes\n\n"
                           f"You can now access this MusicXML file as a resource using the URI: `musicxml://{file_id}`"
                }],
                "isError": False
            }
            
        except Exception as e:
            return {
                "content": [{
                    "type": "text", 
                    "text": f"❌ Music recognition failed: {str(e)}"
                }],
                "isError": True
            }
    
    def _process_data_uri(self, data_uri: str, output_dir: str = None) -> Dict[str, Any]:
        """Process base64 encoded data URI"""
        try:
            # Parse data URI: data:application/pdf;base64,<data>
            header, data = data_uri.split(',', 1)
            mime_type = header.split(';')[0].replace('data:', '')
            
            if mime_type != 'application/pdf':
                raise Exception(f"Unsupported MIME type: {mime_type}")
            
            # Fix base64 padding if needed
            data = self._fix_base64_padding(data)
            
            # Decode base64 data
            pdf_data = base64.b64decode(data)
            
            # Save to temporary file
            temp_dir = output_dir or "/tmp"
            temp_pdf = os.path.join(temp_dir, f"temp_{int(datetime.now().timestamp())}.pdf")
            
            with open(temp_pdf, 'wb') as f:
                f.write(pdf_data)
            
            # Process the file
            return self.recognize_music_tool(f"file://{temp_pdf}", output_dir)
            
        except Exception as e:
            raise Exception(f"Failed to process data URI: {str(e)}")
    
    def _fix_base64_padding(self, data: str) -> str:
        """Fix base64 padding to make it valid"""
        # Remove any whitespace
        data = data.strip().replace('\n', '').replace('\r', '').replace(' ', '')
        
        # Add padding if needed
        missing_padding = len(data) % 4
        if missing_padding:
            data += '=' * (4 - missing_padding)
        
        return data
    
    def _recognize_music_core(self, pdf_file_path: str, output_dir: str = None) -> str:
        """Core music recognition function"""
        audiveris = "/opt/audiveris/bin/Audiveris"
        
        if output_dir is None:
            output_dir = "/tmp"
        
        # Ensure output directory exists with proper permissions
        os.makedirs(output_dir, exist_ok=True)
        try:
            os.chmod(output_dir, 0o755)
        except Exception as e:
            print(f"Warning: Could not set permissions for {output_dir}: {e}")
            
        if not self._is_file_accessible(output_dir):
            raise Exception(f"Output directory access denied: {output_dir}")
        
        # Verify input file exists
        if not os.path.exists(pdf_file_path):
            raise Exception(f"Input PDF file not found: {pdf_file_path}")
        
        pdf_file_name = os.path.basename(pdf_file_path)
        pdf_name_without_ext = os.path.splitext(pdf_file_name)[0]
        
        # Try both possible extensions
        possible_extensions = [".mxl", ".xml", ".musicxml"]
        output_files = [os.path.join(output_dir, f"{pdf_name_without_ext}{ext}") for ext in possible_extensions]

        cmd = [
            audiveris, "-batch", "-export", "-output", output_dir, pdf_file_path
        ]

        print(f"Running Audiveris command: {' '.join(cmd)}")
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        print(f"Audiveris stdout: {result.stdout}")
        print(f"Audiveris stderr: {result.stderr}")
        print(f"Audiveris return code: {result.returncode}")
        
        # List files in output directory for debugging
        if os.path.exists(output_dir):
            files_in_output = os.listdir(output_dir)
            print(f"Files in output directory: {files_in_output}")
        
        # Check if any of the possible output files exist
        existing_output = None
        for output_file in output_files:
            if os.path.exists(output_file):
                existing_output = output_file
                break
        
        if existing_output:
            print(f"Found output file: {existing_output}")
            return existing_output
        
        # If no output file found, provide detailed error
        error_msg = f"Audiveris processing failed.\n"
        error_msg += f"Return code: {result.returncode}\n"
        error_msg += f"Stdout: {result.stdout}\n"
        error_msg += f"Stderr: {result.stderr}\n"
        error_msg += f"Expected files: {output_files}\n"
        error_msg += f"Files in output dir: {os.listdir(output_dir) if os.path.exists(output_dir) else 'Directory does not exist'}\n"
        
        raise Exception(error_msg)
    
    def _test_audiveris(self):
        """Test if Audiveris is properly installed"""
        audiveris = "/opt/audiveris/bin/Audiveris"
        
        if not os.path.exists(audiveris):
            print(f"⚠️ Warning: Audiveris not found at {audiveris}")
            return False
        
        try:
            # Test Audiveris with help command
            result = subprocess.run([audiveris, "-help"], capture_output=True, text=True, timeout=10)
            if "Audiveris" in result.stdout or "Audiveris" in result.stderr:
                print("βœ… Audiveris installation verified")
                return True
            else:
                print(f"⚠️ Warning: Audiveris may not be working properly")
                print(f"Output: {result.stdout}")
                print(f"Error: {result.stderr}")
                return False
        except Exception as e:
            print(f"⚠️ Warning: Could not test Audiveris: {e}")
            return False
        
# Initialize MCP Server with proper absolute paths
mcp_server = MusicRecognitionMCPServer(["/tmp", "/app/uploads", "/app/output"])

def recognize_music_gradio(pdf_file):
    """Gradio wrapper for music recognition"""
    try:
        print(f"Processing file: {pdf_file.name}")
        result = mcp_server.recognize_music_tool(f"file://{pdf_file.name}")
        
        if result.get("isError"):
            error_msg = result["content"][0]["text"]
            print(f"Error in music recognition: {error_msg}")
            return None
        
        # Extract file ID from the response
        response_text = result["content"][0]["text"]
        print(f"Response text: {response_text}")
        
        if "musicxml://" in response_text:
            file_id = response_text.split("musicxml://")[1].split("`")[0]
            print(f"Extracted file ID: {file_id}")
            
            if file_id in mcp_server.processed_files:
                file_info = mcp_server.processed_files[file_id]
                output_path = file_info["output_path"]
                print(f"Output path from cache: {output_path}")
                
                if os.path.exists(output_path):
                    print(f"βœ… File exists: {output_path}")
                    return output_path
                else:
                    print(f"❌ File not found: {output_path}")
        
        # If the above doesn't work, try to find the file directly
        pdf_basename = os.path.splitext(os.path.basename(pdf_file.name))[0]
        possible_files = [
            f"/tmp/{pdf_basename}.mxl",
            f"/tmp/{pdf_basename}.xml", 
            f"/tmp/{pdf_basename}.musicxml"
        ]
        
        for file_path in possible_files:
            print(f"Checking: {file_path}")
            if os.path.exists(file_path):
                print(f"βœ… Found file: {file_path}")
                return file_path
        
        print("❌ No output file found in any expected location")
        print(f"Files in /tmp: {os.listdir('/tmp') if os.path.exists('/tmp') else 'Directory not found'}")
        return None
        
    except Exception as e:
        print(f"Exception in Gradio wrapper: {str(e)}")
        import traceback
        traceback.print_exc()
        return None

# Create Gradio interface
gradio_interface = gr.Interface(
    fn=recognize_music_gradio,
    inputs=gr.File(file_types=[".pdf"], label="Upload PDF music score"),
    outputs=gr.File(label="Download MusicXML file"),
    title="Music Score Recognition",
    description="Upload a PDF music score and create a MusicXML file from it.",
)

# Removed run_gradio function - now running directly in main thread

if __name__ == "__main__":
    print("===== Application Startup at {} =====".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    print()
    
    print("🎡 MCP-Compliant Music Recognition Service Starting...")
    print("πŸ“± Gradio UI: http://localhost:7860")
    
    # Run Gradio directly in the main thread to keep the application alive
    try:
        gradio_interface.launch(
            server_name="0.0.0.0", 
            server_port=7860, 
            mcp_server=True,
            share=True,  # Set to False for container deployment
            prevent_thread_lock=False  # Allow blocking to keep main thread alive
        )
    except Exception as e:
        print(f"❌ Failed to start Gradio interface: {e}")
        import traceback
        traceback.print_exc()