File size: 9,391 Bytes
a963d65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
#!/usr/bin/env python3
"""
Modal Functions for FhirFlame - L4 GPU Only + MCP Integration
Aligned with Modal documentation and integrated with FhirFlame MCP Server
"""
import modal
import json
import time
import os
import sys
from typing import Dict, Any, Optional

# Add src to path for monitoring
sys.path.append('/app/src')
try:
    from monitoring import monitor
except ImportError:
    # Fallback for Modal environment
    class DummyMonitor:
        def log_modal_function_call(self, *args, **kwargs): pass
        def log_modal_scaling_event(self, *args, **kwargs): pass
        def log_error_event(self, *args, **kwargs): pass
        def log_medical_entity_extraction(self, *args, **kwargs): pass
        def log_medical_processing(self, *args, **kwargs): pass
    monitor = DummyMonitor()

def calculate_real_modal_cost(processing_time: float, gpu_type: str = "L4") -> float:
    """Calculate real Modal cost for L4 GPU processing"""
    # L4 GPU pricing from environment
    l4_hourly_rate = float(os.getenv("MODAL_L4_HOURLY_RATE", "0.73"))
    platform_fee = float(os.getenv("MODAL_PLATFORM_FEE", "15")) / 100
    
    hours_used = processing_time / 3600
    total_cost = l4_hourly_rate * hours_used * (1 + platform_fee)
    
    return round(total_cost, 6)

# Create Modal App following official documentation
app = modal.App("fhirflame-medical-ai-fresh")

# Define optimized image for medical AI processing with optional cache busting
cache_bust_commands = []
if os.getenv("MODAL_NO_CACHE", "false").lower() == "true":
    # Add cache busting command with timestamp
    import time
    cache_bust_commands.append(f"echo 'Cache bust: {int(time.time())}'")

image = (
    modal.Image.debian_slim(python_version="3.11")
    .run_commands([
        "pip install --upgrade pip",
        "echo 'Fresh build with fixed Langfuse tracking'",
    ] + cache_bust_commands)
    .pip_install([
        "transformers==4.35.0",
        "torch==2.1.0",
        "fhir-resources==7.1.0",  # Compatible with pydantic 2.x
        "pydantic>=2.7.2",
        "httpx>=0.25.0",
        "regex>=2023.10.3"
    ])
    .run_commands([
        "pip cache purge || echo 'Cache purge not available, continuing...'"
    ])
)

# L4 GPU Function - Main processor for MCP Server integration
@app.function(
    image=image,
    gpu="L4",  # RTX 4090 equivalent - only GPU we use
    timeout=300,
    scaledown_window=60,  # Updated parameter name for Modal 1.0
    min_containers=0,
    max_containers=15,
    memory=8192,
    cpu=4.0,
    secrets=[modal.Secret.from_name("fhirflame-env")]
)
def process_medical_document(
    document_content: str,
    document_type: str = "clinical_note",
    processing_mode: str = "comprehensive",
    include_validation: bool = True
) -> Dict[str, Any]:
    """
    Process medical documents using L4 GPU
    Returns structured medical data with cost tracking
    """
    start_time = time.time()
    
    try:
        monitor.log_modal_function_call(
            function_name="process_medical_document",
            gpu_type="L4",
            document_type=document_type,
            processing_mode=processing_mode
        )
        
        # Initialize transformers pipeline
        from transformers import pipeline
        import torch
        
        # Check GPU availability
        device = 0 if torch.cuda.is_available() else -1
        monitor.log_modal_scaling_event("GPU_DETECTED", {"cuda_available": torch.cuda.is_available()})
        
        # Medical NER pipeline
        ner_pipeline = pipeline(
            "ner",
            model="d4data/biomedical-ner-all",
            aggregation_strategy="simple",
            device=device
        )
        
        # Extract medical entities
        entities = ner_pipeline(document_content)
        
        # Process entities into structured format
        processed_entities = {}
        for entity in entities:
            entity_type = entity['entity_group']
            if entity_type not in processed_entities:
                processed_entities[entity_type] = []
            
            processed_entities[entity_type].append({
                'text': entity['word'],
                'confidence': float(entity['score']),
                'start': int(entity['start']),
                'end': int(entity['end'])
            })
        
        # Calculate processing metrics
        processing_time = time.time() - start_time
        cost = calculate_real_modal_cost(processing_time, "L4")
        
        monitor.log_medical_entity_extraction(
            entities_found=len(entities),
            processing_time=processing_time,
            cost=cost
        )
        
        # Basic medical document structure (without FHIR for now)
        result = {
            "document_type": document_type,
            "processing_mode": processing_mode,
            "entities": processed_entities,
            "processing_metadata": {
                "processing_time_seconds": processing_time,
                "estimated_cost_usd": cost,
                "gpu_type": "L4",
                "entities_extracted": len(entities),
                "timestamp": time.time()
            },
            "medical_insights": {
                "entity_types_found": list(processed_entities.keys()),
                "total_entities": len(entities),
                "confidence_avg": sum(e['score'] for e in entities) / len(entities) if entities else 0
            }
        }
        
        monitor.log_medical_processing(
            success=True,
            processing_time=processing_time,
            cost=cost,
            entities_count=len(entities)
        )
        
        return result
        
    except Exception as e:
        processing_time = time.time() - start_time
        cost = calculate_real_modal_cost(processing_time, "L4")
        
        monitor.log_error_event(
            error_type=type(e).__name__,
            error_message=str(e),
            processing_time=processing_time,
            cost=cost
        )
        
        return {
            "error": True,
            "error_type": type(e).__name__,
            "error_message": str(e),
            "processing_metadata": {
                "processing_time_seconds": processing_time,
                "estimated_cost_usd": cost,
                "gpu_type": "L4",
                "timestamp": time.time()
            }
        }

# MCP Integration Endpoint
@app.function(
    image=image,
    gpu="L4",
    timeout=300,
    scaledown_window=60,
    min_containers=0,
    max_containers=10,
    memory=8192,
    cpu=4.0,
    secrets=[modal.Secret.from_name("fhirflame-env")]
)
def mcp_medical_processing_endpoint(
    request_data: Dict[str, Any]
) -> Dict[str, Any]:
    """
    MCP-compatible endpoint for medical document processing
    Used by FhirFlame MCP Server
    """
    start_time = time.time()
    
    try:
        # Extract request parameters
        document_content = request_data.get("document_content", "")
        document_type = request_data.get("document_type", "clinical_note")
        processing_mode = request_data.get("processing_mode", "comprehensive")
        
        if not document_content:
            return {
                "success": False,
                "error": "No document content provided",
                "mcp_response": {
                    "status": "error",
                    "message": "Document content is required"
                }
            }
        
        # Process document
        result = process_medical_document.local(
            document_content=document_content,
            document_type=document_type,
            processing_mode=processing_mode
        )
        
        # Format for MCP response
        mcp_response = {
            "success": not result.get("error", False),
            "data": result,
            "mcp_metadata": {
                "endpoint": "mcp-medical-processing",
                "version": "1.0",
                "timestamp": time.time()
            }
        }
        
        return mcp_response
        
    except Exception as e:
        processing_time = time.time() - start_time
        cost = calculate_real_modal_cost(processing_time, "L4")
        
        return {
            "success": False,
            "error": str(e),
            "mcp_response": {
                "status": "error",
                "message": f"Processing failed: {str(e)}",
                "cost": cost,
                "processing_time": processing_time
            }
        }

# Health check endpoint
@app.function(
    image=image,
    timeout=30,
    scaledown_window=30,
    min_containers=1,  # Keep one warm for health checks
    max_containers=3,
    memory=1024,
    cpu=1.0
)
def health_check() -> Dict[str, Any]:
    """Health check endpoint for Modal functions"""
    return {
        "status": "healthy",
        "timestamp": time.time(),
        "app": "fhirflame-medical-ai-fresh",
        "functions": ["process_medical_document", "mcp_medical_processing_endpoint"],
        "gpu_support": "L4"
    }

if __name__ == "__main__":
    print("FhirFlame Modal Functions - L4 GPU Medical Processing")
    print("Available functions:")
    print("- process_medical_document: Main medical document processor")
    print("- mcp_medical_processing_endpoint: MCP-compatible endpoint")
    print("- health_check: System health monitoring")