|
|
|
""" |
|
Modal Functions for FhirFlame - L4 GPU Only + MCP Integration |
|
Aligned with Modal documentation and integrated with FhirFlame MCP Server |
|
""" |
|
import modal |
|
import json |
|
import time |
|
import os |
|
import sys |
|
from typing import Dict, Any, Optional |
|
|
|
|
|
sys.path.append('/app/src') |
|
try: |
|
from monitoring import monitor |
|
except ImportError: |
|
|
|
class DummyMonitor: |
|
def log_modal_function_call(self, *args, **kwargs): pass |
|
def log_modal_scaling_event(self, *args, **kwargs): pass |
|
def log_error_event(self, *args, **kwargs): pass |
|
def log_medical_entity_extraction(self, *args, **kwargs): pass |
|
def log_medical_processing(self, *args, **kwargs): pass |
|
monitor = DummyMonitor() |
|
|
|
def calculate_real_modal_cost(processing_time: float, gpu_type: str = "L4") -> float: |
|
"""Calculate real Modal cost for L4 GPU processing""" |
|
|
|
l4_hourly_rate = float(os.getenv("MODAL_L4_HOURLY_RATE", "0.73")) |
|
platform_fee = float(os.getenv("MODAL_PLATFORM_FEE", "15")) / 100 |
|
|
|
hours_used = processing_time / 3600 |
|
total_cost = l4_hourly_rate * hours_used * (1 + platform_fee) |
|
|
|
return round(total_cost, 6) |
|
|
|
|
|
app = modal.App("fhirflame-medical-ai-fresh") |
|
|
|
|
|
cache_bust_commands = [] |
|
if os.getenv("MODAL_NO_CACHE", "false").lower() == "true": |
|
|
|
import time |
|
cache_bust_commands.append(f"echo 'Cache bust: {int(time.time())}'") |
|
|
|
image = ( |
|
modal.Image.debian_slim(python_version="3.11") |
|
.run_commands([ |
|
"pip install --upgrade pip", |
|
"echo 'Fresh build with fixed Langfuse tracking'", |
|
] + cache_bust_commands) |
|
.pip_install([ |
|
"transformers==4.35.0", |
|
"torch==2.1.0", |
|
"fhir-resources==7.1.0", |
|
"pydantic>=2.7.2", |
|
"httpx>=0.25.0", |
|
"regex>=2023.10.3" |
|
]) |
|
.run_commands([ |
|
"pip cache purge || echo 'Cache purge not available, continuing...'" |
|
]) |
|
) |
|
|
|
|
|
@app.function( |
|
image=image, |
|
gpu="L4", |
|
timeout=300, |
|
scaledown_window=60, |
|
min_containers=0, |
|
max_containers=15, |
|
memory=8192, |
|
cpu=4.0, |
|
secrets=[modal.Secret.from_name("fhirflame-env")] |
|
) |
|
def process_medical_document( |
|
document_content: str, |
|
document_type: str = "clinical_note", |
|
processing_mode: str = "comprehensive", |
|
include_validation: bool = True |
|
) -> Dict[str, Any]: |
|
""" |
|
Process medical documents using L4 GPU |
|
Returns structured medical data with cost tracking |
|
""" |
|
start_time = time.time() |
|
|
|
try: |
|
monitor.log_modal_function_call( |
|
function_name="process_medical_document", |
|
gpu_type="L4", |
|
document_type=document_type, |
|
processing_mode=processing_mode |
|
) |
|
|
|
|
|
from transformers import pipeline |
|
import torch |
|
|
|
|
|
device = 0 if torch.cuda.is_available() else -1 |
|
monitor.log_modal_scaling_event("GPU_DETECTED", {"cuda_available": torch.cuda.is_available()}) |
|
|
|
|
|
ner_pipeline = pipeline( |
|
"ner", |
|
model="d4data/biomedical-ner-all", |
|
aggregation_strategy="simple", |
|
device=device |
|
) |
|
|
|
|
|
entities = ner_pipeline(document_content) |
|
|
|
|
|
processed_entities = {} |
|
for entity in entities: |
|
entity_type = entity['entity_group'] |
|
if entity_type not in processed_entities: |
|
processed_entities[entity_type] = [] |
|
|
|
processed_entities[entity_type].append({ |
|
'text': entity['word'], |
|
'confidence': float(entity['score']), |
|
'start': int(entity['start']), |
|
'end': int(entity['end']) |
|
}) |
|
|
|
|
|
processing_time = time.time() - start_time |
|
cost = calculate_real_modal_cost(processing_time, "L4") |
|
|
|
monitor.log_medical_entity_extraction( |
|
entities_found=len(entities), |
|
processing_time=processing_time, |
|
cost=cost |
|
) |
|
|
|
|
|
result = { |
|
"document_type": document_type, |
|
"processing_mode": processing_mode, |
|
"entities": processed_entities, |
|
"processing_metadata": { |
|
"processing_time_seconds": processing_time, |
|
"estimated_cost_usd": cost, |
|
"gpu_type": "L4", |
|
"entities_extracted": len(entities), |
|
"timestamp": time.time() |
|
}, |
|
"medical_insights": { |
|
"entity_types_found": list(processed_entities.keys()), |
|
"total_entities": len(entities), |
|
"confidence_avg": sum(e['score'] for e in entities) / len(entities) if entities else 0 |
|
} |
|
} |
|
|
|
monitor.log_medical_processing( |
|
success=True, |
|
processing_time=processing_time, |
|
cost=cost, |
|
entities_count=len(entities) |
|
) |
|
|
|
return result |
|
|
|
except Exception as e: |
|
processing_time = time.time() - start_time |
|
cost = calculate_real_modal_cost(processing_time, "L4") |
|
|
|
monitor.log_error_event( |
|
error_type=type(e).__name__, |
|
error_message=str(e), |
|
processing_time=processing_time, |
|
cost=cost |
|
) |
|
|
|
return { |
|
"error": True, |
|
"error_type": type(e).__name__, |
|
"error_message": str(e), |
|
"processing_metadata": { |
|
"processing_time_seconds": processing_time, |
|
"estimated_cost_usd": cost, |
|
"gpu_type": "L4", |
|
"timestamp": time.time() |
|
} |
|
} |
|
|
|
|
|
@app.function( |
|
image=image, |
|
gpu="L4", |
|
timeout=300, |
|
scaledown_window=60, |
|
min_containers=0, |
|
max_containers=10, |
|
memory=8192, |
|
cpu=4.0, |
|
secrets=[modal.Secret.from_name("fhirflame-env")] |
|
) |
|
def mcp_medical_processing_endpoint( |
|
request_data: Dict[str, Any] |
|
) -> Dict[str, Any]: |
|
""" |
|
MCP-compatible endpoint for medical document processing |
|
Used by FhirFlame MCP Server |
|
""" |
|
start_time = time.time() |
|
|
|
try: |
|
|
|
document_content = request_data.get("document_content", "") |
|
document_type = request_data.get("document_type", "clinical_note") |
|
processing_mode = request_data.get("processing_mode", "comprehensive") |
|
|
|
if not document_content: |
|
return { |
|
"success": False, |
|
"error": "No document content provided", |
|
"mcp_response": { |
|
"status": "error", |
|
"message": "Document content is required" |
|
} |
|
} |
|
|
|
|
|
result = process_medical_document.local( |
|
document_content=document_content, |
|
document_type=document_type, |
|
processing_mode=processing_mode |
|
) |
|
|
|
|
|
mcp_response = { |
|
"success": not result.get("error", False), |
|
"data": result, |
|
"mcp_metadata": { |
|
"endpoint": "mcp-medical-processing", |
|
"version": "1.0", |
|
"timestamp": time.time() |
|
} |
|
} |
|
|
|
return mcp_response |
|
|
|
except Exception as e: |
|
processing_time = time.time() - start_time |
|
cost = calculate_real_modal_cost(processing_time, "L4") |
|
|
|
return { |
|
"success": False, |
|
"error": str(e), |
|
"mcp_response": { |
|
"status": "error", |
|
"message": f"Processing failed: {str(e)}", |
|
"cost": cost, |
|
"processing_time": processing_time |
|
} |
|
} |
|
|
|
|
|
@app.function( |
|
image=image, |
|
timeout=30, |
|
scaledown_window=30, |
|
min_containers=1, |
|
max_containers=3, |
|
memory=1024, |
|
cpu=1.0 |
|
) |
|
def health_check() -> Dict[str, Any]: |
|
"""Health check endpoint for Modal functions""" |
|
return { |
|
"status": "healthy", |
|
"timestamp": time.time(), |
|
"app": "fhirflame-medical-ai-fresh", |
|
"functions": ["process_medical_document", "mcp_medical_processing_endpoint"], |
|
"gpu_support": "L4" |
|
} |
|
|
|
if __name__ == "__main__": |
|
print("FhirFlame Modal Functions - L4 GPU Medical Processing") |
|
print("Available functions:") |
|
print("- process_medical_document: Main medical document processor") |
|
print("- mcp_medical_processing_endpoint: MCP-compatible endpoint") |
|
print("- health_check: System health monitoring") |