|
|
|
""" |
|
FhirFlame: Medical AI Technology Demonstration |
|
MVP/Prototype Platform - Development & Testing Only |
|
|
|
⚠️ IMPORTANT: This is a technology demonstration and MVP prototype for development, |
|
testing, and educational purposes only. NOT approved for clinical use, patient data, |
|
or production healthcare environments. Requires proper regulatory evaluation, |
|
compliance review, and legal assessment before any real-world deployment. |
|
|
|
Technology Stack Demonstration: |
|
- Real-time medical text processing with CodeLlama 13B-Instruct |
|
- FHIR R4/R5 compliance workflow prototypes |
|
- Multi-provider AI routing architecture (Ollama, HuggingFace, Modal) |
|
- Healthcare document processing with OCR capabilities |
|
- DICOM medical imaging analysis demos |
|
- Enterprise-grade security patterns (demonstration) |
|
|
|
Architecture: Microservices with horizontal auto-scaling patterns |
|
Security: Healthcare-grade infrastructure patterns (demo implementation) |
|
Performance: Optimized for demonstration and development workflows |
|
""" |
|
|
|
import os |
|
import asyncio |
|
import json |
|
import time |
|
import uuid |
|
from typing import Dict, Any, Optional |
|
from pathlib import Path |
|
|
|
|
|
from src.workflow_orchestrator import WorkflowOrchestrator |
|
from src.enhanced_codellama_processor import EnhancedCodeLlamaProcessor |
|
from src.fhir_validator import FhirValidator |
|
from src.dicom_processor import dicom_processor |
|
from src.monitoring import monitor |
|
|
|
|
|
from database import db_manager |
|
|
|
|
|
|
|
|
|
codellama = None |
|
enhanced_codellama = None |
|
fhir_validator = None |
|
workflow_orchestrator = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
service_status = { |
|
"ollama_initialized": False, |
|
"enhanced_codellama_initialized": False, |
|
"ollama_connection_url": None, |
|
"last_ollama_check": None |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cancellation_flags = { |
|
"text_task": False, |
|
"file_task": False, |
|
"dicom_task": False |
|
} |
|
|
|
|
|
|
|
running_tasks = { |
|
"text_task": None, |
|
"file_task": None, |
|
"dicom_task": None |
|
} |
|
|
|
|
|
|
|
task_queues = { |
|
"text_task": [], |
|
"file_task": [], |
|
"dicom_task": [] |
|
} |
|
|
|
|
|
|
|
active_jobs = { |
|
"text_task": None, |
|
"file_task": None, |
|
"dicom_task": None |
|
} |
|
|
|
import uuid |
|
import datetime |
|
|
|
class UnifiedJobManager: |
|
"""Centralized job and metrics management for all FhirFlame processing with PostgreSQL persistence""" |
|
|
|
def __init__(self): |
|
|
|
self.jobs_database = { |
|
"processing_jobs": [], |
|
"batch_jobs": [], |
|
"container_metrics": [], |
|
"performance_metrics": [], |
|
"queue_statistics": { |
|
"active_tasks": 0, |
|
"completed_tasks": 0, |
|
"failed_tasks": 0 |
|
}, |
|
"system_monitoring": [] |
|
} |
|
|
|
|
|
self.dashboard_state = { |
|
"active_tasks": 0, |
|
"files_processed": [], |
|
"total_files": 0, |
|
"successful_files": 0, |
|
"failed_files": 0, |
|
"failed_tasks": 0, |
|
"processing_queue": {"active_tasks": 0, "completed_files": 0, "failed_files": 0}, |
|
"last_update": None |
|
} |
|
|
|
|
|
self._sync_dashboard_from_db() |
|
|
|
def _sync_dashboard_from_db(self): |
|
"""Sync dashboard state from PostgreSQL database""" |
|
try: |
|
metrics = db_manager.get_dashboard_metrics() |
|
self.dashboard_state.update({ |
|
"active_tasks": metrics.get('active_jobs', 0), |
|
"total_files": metrics.get('completed_jobs', 0), |
|
"successful_files": metrics.get('successful_jobs', 0), |
|
"failed_files": metrics.get('failed_jobs', 0), |
|
"failed_tasks": metrics.get('failed_jobs', 0) |
|
}) |
|
print(f"✅ Dashboard synced from PostgreSQL: {metrics}") |
|
except Exception as e: |
|
print(f"⚠️ Failed to sync dashboard from PostgreSQL: {e}") |
|
|
|
def add_processing_job(self, job_type: str, name: str, details: dict = None) -> str: |
|
"""Record start of any type of processing job in PostgreSQL""" |
|
job_id = str(uuid.uuid4()) |
|
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
job_record = { |
|
"id": job_id, |
|
"job_type": job_type, |
|
"name": name[:100], |
|
"status": "processing", |
|
"success": None, |
|
"processing_time": None, |
|
"error_message": None, |
|
"entities_found": 0, |
|
"result_data": details or {}, |
|
"text_input": details.get("text_input") if details else None, |
|
"file_path": details.get("file_path") if details else None, |
|
"workflow_type": details.get("workflow_type") if details else None |
|
} |
|
|
|
|
|
db_success = db_manager.add_job(job_record) |
|
|
|
if db_success: |
|
|
|
legacy_job = { |
|
"job_id": job_id, |
|
"job_type": job_type, |
|
"name": name[:100], |
|
"status": "started", |
|
"success": None, |
|
"start_time": timestamp, |
|
"completion_time": None, |
|
"processing_time": None, |
|
"error": None, |
|
"entities_found": 0, |
|
"details": details or {} |
|
} |
|
self.jobs_database["processing_jobs"].append(legacy_job) |
|
|
|
|
|
self.dashboard_state["active_tasks"] += 1 |
|
self.jobs_database["queue_statistics"]["active_tasks"] += 1 |
|
self.dashboard_state["last_update"] = timestamp |
|
|
|
print(f"✅ Job {job_id[:8]} added to PostgreSQL: {name[:30]}...") |
|
else: |
|
print(f"❌ Failed to add job {job_id[:8]} to PostgreSQL") |
|
|
|
return job_id |
|
|
|
def update_job_completion(self, job_id: str, success: bool, metrics: dict = None): |
|
"""Update job completion with metrics in PostgreSQL""" |
|
completion_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
updates = { |
|
"status": "completed", |
|
"success": success, |
|
"completed_at": completion_time |
|
} |
|
|
|
if metrics: |
|
updates["processing_time"] = metrics.get("processing_time", "N/A") |
|
updates["entities_found"] = metrics.get("entities_found", 0) |
|
updates["error_message"] = metrics.get("error", None) |
|
updates["result_data"] = metrics.get("details", {}) |
|
|
|
|
|
if metrics.get("cancelled", False): |
|
updates["status"] = "cancelled" |
|
updates["error_message"] = "Cancelled by user" |
|
|
|
|
|
db_success = db_manager.update_job(job_id, updates) |
|
|
|
if db_success: |
|
|
|
for job in self.jobs_database["processing_jobs"]: |
|
if job["job_id"] == job_id: |
|
job["status"] = updates["status"] |
|
job["success"] = success |
|
job["completion_time"] = completion_time |
|
|
|
if metrics: |
|
job["processing_time"] = metrics.get("processing_time", "N/A") |
|
job["entities_found"] = metrics.get("entities_found", 0) |
|
job["error"] = metrics.get("error", None) |
|
job["details"].update(metrics.get("details", {})) |
|
|
|
|
|
if metrics.get("cancelled", False): |
|
job["status"] = "cancelled" |
|
job["error"] = "Cancelled by user" |
|
|
|
break |
|
|
|
|
|
self.dashboard_state["active_tasks"] = max(0, self.dashboard_state["active_tasks"] - 1) |
|
self.dashboard_state["total_files"] += 1 |
|
|
|
if success: |
|
self.dashboard_state["successful_files"] += 1 |
|
self.jobs_database["queue_statistics"]["completed_tasks"] += 1 |
|
else: |
|
self.dashboard_state["failed_files"] += 1 |
|
self.dashboard_state["failed_tasks"] += 1 |
|
self.jobs_database["queue_statistics"]["failed_tasks"] += 1 |
|
|
|
self.jobs_database["queue_statistics"]["active_tasks"] = max(0, |
|
self.jobs_database["queue_statistics"]["active_tasks"] - 1) |
|
|
|
|
|
job_name = "Unknown" |
|
job_type = "Processing" |
|
for job in self.jobs_database["processing_jobs"]: |
|
if job["job_id"] == job_id: |
|
job_name = job["name"] |
|
job_type = job["job_type"].title() + " Processing" |
|
break |
|
|
|
file_info = { |
|
"filename": job_name, |
|
"file_type": job_type, |
|
"success": success, |
|
"processing_time": updates.get("processing_time", "N/A"), |
|
"timestamp": completion_time, |
|
"error": updates.get("error_message"), |
|
"entities_found": updates.get("entities_found", 0) |
|
} |
|
self.dashboard_state["files_processed"].append(file_info) |
|
self.dashboard_state["last_update"] = completion_time |
|
|
|
|
|
status_icon = "✅" if success else "❌" if not metrics.get("cancelled", False) else "⏹️" |
|
print(f"{status_icon} Job {job_id[:8]} completed in PostgreSQL: {job_name[:30]}... - Success: {success}") |
|
else: |
|
print(f"❌ Failed to update job {job_id[:8]} in PostgreSQL") |
|
|
|
def add_batch_job(self, batch_type: str, batch_size: int, workflow_type: str) -> str: |
|
"""Record start of batch processing job""" |
|
job_id = str(uuid.uuid4()) |
|
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
batch_record = { |
|
"job_id": job_id, |
|
"job_type": "batch", |
|
"batch_type": batch_type, |
|
"batch_size": batch_size, |
|
"workflow_type": workflow_type, |
|
"status": "started", |
|
"start_time": timestamp, |
|
"completion_time": None, |
|
"processed_count": 0, |
|
"success_count": 0, |
|
"failed_count": 0, |
|
"documents": [] |
|
} |
|
|
|
self.jobs_database["batch_jobs"].append(batch_record) |
|
self.dashboard_state["active_tasks"] += 1 |
|
self.dashboard_state["last_update"] = f"Batch processing started: {batch_size} {workflow_type} documents" |
|
|
|
return job_id |
|
|
|
def update_batch_progress(self, job_id: str, processed_count: int, success_count: int, failed_count: int): |
|
"""Update batch processing progress""" |
|
for batch in self.jobs_database["batch_jobs"]: |
|
if batch["job_id"] == job_id: |
|
batch["processed_count"] = processed_count |
|
batch["success_count"] = success_count |
|
batch["failed_count"] = failed_count |
|
|
|
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
self.dashboard_state["last_update"] = f"Batch processing: {processed_count}/{batch['batch_size']} documents" |
|
break |
|
|
|
def get_dashboard_status(self) -> str: |
|
"""Get current dashboard status string""" |
|
if self.dashboard_state["total_files"] == 0: |
|
return "📊 No files processed yet" |
|
|
|
return f"📊 Files: {self.dashboard_state['total_files']} | Success: {self.dashboard_state['successful_files']} | Failed: {self.dashboard_state['failed_files']} | Active: {self.dashboard_state['active_tasks']}" |
|
|
|
def get_dashboard_metrics(self) -> list: |
|
"""Get file processing metrics for DataFrame display from PostgreSQL""" |
|
|
|
metrics = db_manager.get_dashboard_metrics() |
|
|
|
total_jobs = metrics.get('total_jobs', 0) |
|
completed_jobs = metrics.get('completed_jobs', 0) |
|
success_jobs = metrics.get('successful_jobs', 0) |
|
failed_jobs = metrics.get('failed_jobs', 0) |
|
active_jobs = metrics.get('active_jobs', 0) |
|
|
|
|
|
self.dashboard_state["total_files"] = completed_jobs |
|
self.dashboard_state["successful_files"] = success_jobs |
|
self.dashboard_state["failed_files"] = failed_jobs |
|
self.dashboard_state["active_tasks"] = active_jobs |
|
|
|
success_rate = (success_jobs / max(1, completed_jobs)) * 100 if completed_jobs else 0 |
|
last_update = self.dashboard_state["last_update"] or "Never" |
|
|
|
print(f"🔍 DEBUG get_dashboard_metrics from PostgreSQL: Total={total_jobs}, Completed={completed_jobs}, Success={success_jobs}, Failed={failed_jobs}, Active={active_jobs}") |
|
|
|
return [ |
|
["Total Files", completed_jobs], |
|
["Success Rate", f"{success_rate:.1f}%"], |
|
["Failed Files", failed_jobs], |
|
["Completed Files", success_jobs], |
|
["Active Tasks", active_jobs], |
|
["Last Update", last_update] |
|
] |
|
|
|
def get_processing_queue(self) -> list: |
|
"""Get processing queue for DataFrame display""" |
|
return [ |
|
["Active Tasks", self.dashboard_state["active_tasks"]], |
|
["Completed Files", self.dashboard_state["successful_files"]], |
|
["Failed Files", self.dashboard_state["failed_files"]] |
|
] |
|
|
|
def get_jobs_history(self) -> list: |
|
"""Get comprehensive jobs history for DataFrame display from PostgreSQL""" |
|
jobs_data = [] |
|
|
|
|
|
recent_jobs = db_manager.get_jobs_history(limit=20) |
|
|
|
print(f"🔍 DEBUG get_jobs_history from PostgreSQL: Retrieved {len(recent_jobs)} jobs") |
|
|
|
if recent_jobs: |
|
print(f"🔍 DEBUG: Sample jobs from PostgreSQL:") |
|
for i, job in enumerate(recent_jobs[:3]): |
|
status = job.get('status', 'unknown') |
|
success = job.get('success', None) |
|
print(f" Job {i}: {job.get('name', 'Unknown')[:20]} | Status: {status} | Success: {success} | Type: {job.get('job_type', 'Unknown')}") |
|
|
|
|
|
for job in recent_jobs: |
|
job_type = job.get("job_type", "Unknown") |
|
job_name = job.get("name", "Unknown") |
|
|
|
|
|
if job_type == "batch": |
|
category = "🔄 Batch Job" |
|
elif job_type == "text": |
|
category = "📝 Text Processing" |
|
elif job_type == "dicom": |
|
category = "🏥 DICOM Analysis" |
|
elif job_type == "file": |
|
category = "📄 Document Processing" |
|
else: |
|
category = "⚙️ Processing" |
|
|
|
|
|
if job.get("status") == "cancelled": |
|
status = "⏹️ Cancelled" |
|
elif job.get("success") is True: |
|
status = "✅ Success" |
|
elif job.get("success") is False: |
|
status = "❌ Failed" |
|
elif job.get("status") == "processing": |
|
status = "🔄 Processing" |
|
else: |
|
status = "⏳ Pending" |
|
|
|
job_row = [ |
|
job_name, |
|
category, |
|
status, |
|
job.get("processing_time", "N/A") |
|
] |
|
jobs_data.append(job_row) |
|
print(f"🔍 DEBUG: Added PostgreSQL job row: {job_row}") |
|
|
|
print(f"🔍 DEBUG: Final jobs_data length from PostgreSQL: {len(jobs_data)}") |
|
return jobs_data |
|
|
|
|
|
job_manager = UnifiedJobManager() |
|
|
|
dashboard_state = job_manager.dashboard_state |
|
|
|
def get_codellama(): |
|
"""Lazy load CodeLlama processor with proper Ollama initialization checks""" |
|
global codellama, service_status |
|
if codellama is None: |
|
print("🔄 Initializing CodeLlama processor with Ollama connection check...") |
|
|
|
|
|
ollama_ready = _check_ollama_service() |
|
service_status["ollama_initialized"] = ollama_ready |
|
service_status["last_ollama_check"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
if not ollama_ready: |
|
print("⚠️ Ollama service not ready - CodeLlama will have limited functionality") |
|
|
|
from src.codellama_processor import CodeLlamaProcessor |
|
codellama = CodeLlamaProcessor() |
|
print(f"✅ CodeLlama processor initialized (Ollama: {'Ready' if ollama_ready else 'Not Ready'})") |
|
return codellama |
|
|
|
def get_enhanced_codellama(): |
|
"""Lazy load Enhanced CodeLlama processor with provider initialization checks""" |
|
global enhanced_codellama, service_status |
|
if enhanced_codellama is None: |
|
print("🔄 Initializing Enhanced CodeLlama processor with provider checks...") |
|
|
|
|
|
enhanced_codellama = EnhancedCodeLlamaProcessor() |
|
service_status["enhanced_codellama_initialized"] = True |
|
|
|
|
|
router = enhanced_codellama.router |
|
print(f"✅ Enhanced CodeLlama processor ready:") |
|
print(f" Ollama: {'✅ Ready' if router.ollama_available else '❌ Not Ready'}") |
|
print(f" HuggingFace: {'✅ Ready' if router.hf_available else '❌ Not Ready'}") |
|
print(f" Modal: {'✅ Ready' if router.modal_available else '❌ Not Ready'}") |
|
|
|
return enhanced_codellama |
|
|
|
def _check_ollama_service(): |
|
"""Check if Ollama service is properly initialized and accessible with model status""" |
|
import requests |
|
import os |
|
|
|
ollama_url = os.getenv("OLLAMA_BASE_URL", "http://ollama:11434") |
|
use_real_ollama = os.getenv("USE_REAL_OLLAMA", "true").lower() == "true" |
|
model_name = os.getenv("OLLAMA_MODEL", "codellama:13b-instruct") |
|
|
|
if not use_real_ollama: |
|
print("📝 Ollama disabled by configuration") |
|
return False |
|
|
|
|
|
urls_to_try = [ollama_url] |
|
if "ollama:11434" in ollama_url: |
|
urls_to_try.append("http://localhost:11434") |
|
elif "localhost:11434" in ollama_url: |
|
urls_to_try.append("http://ollama:11434") |
|
|
|
for attempt in range(3): |
|
for url in urls_to_try: |
|
try: |
|
response = requests.get(f"{url}/api/version", timeout=5) |
|
if response.status_code == 200: |
|
print(f"✅ Ollama service ready at {url}") |
|
service_status["ollama_connection_url"] = url |
|
|
|
|
|
model_status = _check_ollama_model_status(url, model_name) |
|
service_status["model_status"] = model_status |
|
service_status["model_name"] = model_name |
|
|
|
if model_status == "available": |
|
print(f"✅ Model {model_name} is ready") |
|
return True |
|
elif model_status == "downloading": |
|
print(f"🔄 Model {model_name} is downloading (7.4GB)...") |
|
return False |
|
else: |
|
print(f"❌ Model {model_name} not found") |
|
return False |
|
except Exception as e: |
|
print(f"⚠️ Ollama check failed for {url}: {e}") |
|
continue |
|
import time |
|
time.sleep(2) |
|
|
|
print("❌ All Ollama connection attempts failed") |
|
return False |
|
|
|
def _check_ollama_model_status(url: str, model_name: str) -> str: |
|
"""Check if specific model is available in Ollama""" |
|
import requests |
|
try: |
|
|
|
response = requests.get(f"{url}/api/tags", timeout=10) |
|
if response.status_code == 200: |
|
models_data = response.json() |
|
models = models_data.get("models", []) |
|
|
|
|
|
for model in models: |
|
if model.get("name", "").startswith(model_name.split(":")[0]): |
|
return "available" |
|
|
|
|
|
return "downloading" |
|
else: |
|
return "unknown" |
|
|
|
except Exception as e: |
|
print(f"⚠️ Model status check failed: {e}") |
|
return "unknown" |
|
|
|
def get_ollama_status() -> dict: |
|
"""Get current Ollama and model status for UI display""" |
|
model_name = os.getenv("OLLAMA_MODEL", "codellama:13b-instruct") |
|
model_status = service_status.get("model_status", "unknown") |
|
|
|
status_messages = { |
|
"available": f"✅ {model_name} ready for processing", |
|
"downloading": f"🔄 {model_name} downloading (7.4GB). Please wait...", |
|
"unknown": f"⚠️ {model_name} status unknown" |
|
} |
|
|
|
return { |
|
"service_available": service_status.get("ollama_initialized", False), |
|
"model_status": model_status, |
|
"model_name": model_name, |
|
"message": status_messages.get(model_status, f"⚠️ Unknown status: {model_status}") |
|
} |
|
|
|
def get_fhir_validator(): |
|
"""Lazy load FHIR validator""" |
|
global fhir_validator |
|
if fhir_validator is None: |
|
print("🔄 Initializing FHIR validator...") |
|
fhir_validator = FhirValidator() |
|
print("✅ FHIR validator ready") |
|
return fhir_validator |
|
|
|
def get_workflow_orchestrator(): |
|
"""Lazy load workflow orchestrator""" |
|
global workflow_orchestrator |
|
if workflow_orchestrator is None: |
|
print("🔄 Initializing workflow orchestrator...") |
|
workflow_orchestrator = WorkflowOrchestrator() |
|
print("✅ Workflow orchestrator ready") |
|
return workflow_orchestrator |
|
|
|
def get_current_model_display(): |
|
"""Get current model name from environment variables for display""" |
|
import os |
|
|
|
|
|
ollama_model = os.getenv("OLLAMA_MODEL", "") |
|
if ollama_model: |
|
|
|
model_parts = ollama_model.split(":") |
|
if len(model_parts) >= 2: |
|
model_name = model_parts[0].title() |
|
model_size = model_parts[1].upper().replace("B-", "B ").replace("-", " ").title() |
|
return f"{model_name} {model_size}" |
|
else: |
|
return ollama_model.title() |
|
|
|
|
|
if os.getenv("MISTRAL_API_KEY"): |
|
return "Mistral Large" |
|
elif os.getenv("HF_TOKEN"): |
|
return "HuggingFace Transformers" |
|
elif os.getenv("MODAL_TOKEN_ID"): |
|
return "Modal Labs GPU" |
|
else: |
|
return "CodeLlama 13B-Instruct" |
|
|
|
def get_simple_agent_status(): |
|
"""Get comprehensive system status including APIs and configurations""" |
|
global codellama, enhanced_codellama, fhir_validator, workflow_orchestrator |
|
|
|
|
|
codellama_status = "✅ Ready" if codellama is not None else "⏳ On-demand loading" |
|
enhanced_status = "✅ Ready" if enhanced_codellama is not None else "⏳ On-demand loading" |
|
fhir_status = "✅ Ready" if fhir_validator is not None else "⏳ On-demand loading" |
|
workflow_status = "✅ Ready" if workflow_orchestrator is not None else "⏳ On-demand loading" |
|
dicom_status = "✅ Available" if dicom_processor else "❌ Not available" |
|
|
|
|
|
mistral_api_key = os.getenv("MISTRAL_API_KEY", "") |
|
mistral_status = "✅ Configured" if mistral_api_key else "❌ Missing API key" |
|
|
|
|
|
ollama_status = "❌ Not available locally" |
|
try: |
|
|
|
ollama_url = os.getenv("OLLAMA_BASE_URL", "http://ollama:11434") |
|
use_real_ollama = os.getenv("USE_REAL_OLLAMA", "true").lower() == "true" |
|
|
|
if use_real_ollama: |
|
import requests |
|
|
|
urls_to_try = [ollama_url] |
|
if "ollama:11434" in ollama_url: |
|
urls_to_try.append("http://localhost:11434") |
|
elif "localhost:11434" in ollama_url: |
|
urls_to_try.append("http://ollama:11434") |
|
|
|
for url in urls_to_try: |
|
try: |
|
response = requests.get(f"{url}/api/version", timeout=2) |
|
if response.status_code == 200: |
|
ollama_status = "✅ Available" |
|
break |
|
except: |
|
continue |
|
|
|
|
|
if ollama_status == "❌ Not available locally" and use_real_ollama: |
|
ollama_status = "⚠️ Configured (starting up)" |
|
except: |
|
pass |
|
|
|
|
|
try: |
|
import pydicom |
|
dicom_lib_status = "✅ pydicom available" |
|
except ImportError: |
|
dicom_lib_status = "⚠️ pydicom not installed (fallback mode)" |
|
|
|
|
|
modal_token = os.getenv("MODAL_TOKEN_ID", "") |
|
modal_status = "✅ Configured" if modal_token else "❌ Not configured" |
|
|
|
|
|
hf_token = os.getenv("HF_TOKEN", "") |
|
if not hf_token: |
|
hf_status = "❌ No token (set HF_TOKEN)" |
|
elif not hf_token.startswith("hf_"): |
|
hf_status = "❌ Invalid token format" |
|
else: |
|
try: |
|
|
|
from huggingface_hub import HfApi |
|
api = HfApi(token=hf_token) |
|
user_info = api.whoami() |
|
if user_info and 'name' in user_info: |
|
hf_status = f"✅ Authenticated as {user_info['name']}" |
|
else: |
|
hf_status = "❌ Authentication failed" |
|
except ImportError: |
|
hf_status = "❌ huggingface_hub not installed" |
|
except Exception as e: |
|
hf_status = f"❌ Error: {str(e)[:30]}..." |
|
|
|
status_html = f""" |
|
<div class="system-status-container" style="padding: 20px; border-radius: 8px; border: 1px solid var(--border-color-primary, #e5e7eb); background: var(--background-fill-primary, #ffffff); color: var(--body-text-color, #374151);"> |
|
<h3 style="color: var(--body-text-color, #374151); margin-bottom: 20px;">🔧 System Components Status</h3> |
|
|
|
<div style="margin-bottom: 15px;"> |
|
<h4 style="color: var(--body-text-color-subdued, #6b7280); margin-bottom: 8px;">Core Processing Components</h4> |
|
<p><strong>CodeLlama Processor:</strong> <span style="color: #059669;">{codellama_status}</span></p> |
|
<p><strong>Enhanced Processor:</strong> <span style="color: #059669;">{enhanced_status}</span></p> |
|
<p><strong>FHIR Validator:</strong> <span style="color: #059669;">{fhir_status}</span></p> |
|
<p><strong>Workflow Orchestrator:</strong> <span style="color: #059669;">{workflow_status}</span></p> |
|
<p><strong>DICOM Processor:</strong> <span style="color: #059669;">{dicom_status}</span></p> |
|
</div> |
|
|
|
<div style="margin-bottom: 15px;"> |
|
<h4 style="color: var(--body-text-color-subdued, #6b7280); margin-bottom: 8px;">AI Provider APIs</h4> |
|
<p><strong>Mistral API:</strong> <span style="color: {'#059669' if mistral_api_key else '#dc2626'};">{mistral_status}</span></p> |
|
<p><strong>Ollama Local:</strong> <span style="color: {'#059669' if '✅' in ollama_status else '#dc2626'};">{ollama_status}</span></p> |
|
<p><strong>Modal Labs GPU:</strong> <span style="color: {'#059669' if modal_token else '#dc2626'};">{modal_status}</span></p> |
|
<p><strong>HuggingFace API:</strong> <span style="color: {'#059669' if hf_token else '#dc2626'};">{hf_status}</span></p> |
|
</div> |
|
|
|
<div style="margin-bottom: 15px;"> |
|
<h4 style="color: var(--body-text-color-subdued, #6b7280); margin-bottom: 8px;">Medical Processing</h4> |
|
<p><strong>DICOM Library:</strong> <span style="color: {'#059669' if '✅' in dicom_lib_status else '#B71C1C'};">{dicom_lib_status}</span></p> |
|
<p><strong>FHIR R4 Compliance:</strong> <span style="color: #059669;">✅ Active</span></p> |
|
<p><strong>FHIR R5 Compliance:</strong> <span style="color: #059669;">✅ Active</span></p> |
|
<p><strong>Medical Entity Extraction:</strong> <span style="color: #059669;">✅ Ready</span></p> |
|
<p><strong>OCR Processing:</strong> <span style="color: #059669;">✅ Integrated</span></p> |
|
</div> |
|
|
|
<div> |
|
<h4 style="color: var(--body-text-color-subdued, #6b7280); margin-bottom: 8px;">System Status</h4> |
|
<p><strong>Overall Status:</strong> <span style="color: #16a34a;">🟢 Operational</span></p> |
|
<p><strong>Current Model:</strong> <span style="color: #2563eb;">{get_current_model_display()}</span></p> |
|
<p><strong>Processing Mode:</strong> <span style="color: #2563eb;">Multi-Provider Dynamic Scaling</span></p> |
|
<p><strong>Architecture:</strong> <span style="color: #2563eb;">Lazy Loading + Frontend/Backend Separation</span></p> |
|
</div> |
|
</div> |
|
""" |
|
return status_html |
|
|
|
|
|
async def _process_text_async(text, enable_fhir): |
|
"""Async text processing that can be cancelled""" |
|
global cancellation_flags, running_tasks |
|
|
|
|
|
if cancellation_flags["text_task"]: |
|
raise asyncio.CancelledError("Text processing cancelled") |
|
|
|
|
|
try: |
|
processor = get_enhanced_codellama() |
|
method_name = "Enhanced CodeLlama (Multi-Provider)" |
|
|
|
result = await processor.process_document( |
|
medical_text=text, |
|
document_type="clinical_note", |
|
extract_entities=True, |
|
generate_fhir=enable_fhir |
|
) |
|
|
|
|
|
if cancellation_flags["text_task"]: |
|
raise asyncio.CancelledError("Text processing cancelled") |
|
|
|
|
|
actual_provider = result.get("provider_metadata", {}).get("provider_used", "Enhanced Processor") |
|
method_name = f"Enhanced CodeLlama ({actual_provider.title()})" |
|
|
|
return result, method_name |
|
|
|
except Exception as e: |
|
print(f"⚠️ Enhanced CodeLlama processing failed: {e}") |
|
|
|
|
|
try: |
|
processor = get_codellama() |
|
method_name = "CodeLlama (Basic Fallback)" |
|
|
|
result = await processor.process_document( |
|
medical_text=text, |
|
document_type="clinical_note", |
|
extract_entities=True, |
|
generate_fhir=enable_fhir |
|
) |
|
|
|
|
|
if cancellation_flags["text_task"]: |
|
raise asyncio.CancelledError("Text processing cancelled") |
|
|
|
return result, method_name |
|
|
|
except Exception as fallback_error: |
|
print(f"❌ HuggingFace fallback also failed: {fallback_error}") |
|
|
|
return { |
|
"extracted_data": {"error": "Processing failed", "patient": "Unknown Patient", "conditions": [], "medications": []}, |
|
"metadata": {"model_used": "error_fallback", "processing_time": 0} |
|
}, "Error (Both Failed)" |
|
|
|
def process_text_only(text, enable_fhir=True): |
|
"""Process text with CodeLlama processor""" |
|
global cancellation_flags, running_tasks |
|
|
|
print(f"🔥 DEBUG: process_text_only called with text length: {len(text) if text else 0}") |
|
|
|
if not text.strip(): |
|
return "❌ Please enter some medical text", {}, {} |
|
|
|
|
|
job_id = None |
|
try: |
|
job_id = job_manager.add_processing_job("text", text[:50], {"enable_fhir": enable_fhir}) |
|
active_jobs["text_task"] = job_id |
|
print(f"✅ DEBUG: Job {job_id[:8]} recorded successfully") |
|
except Exception as job_error: |
|
print(f"❌ DEBUG: Failed to record job: {job_error}") |
|
|
|
job_id = "fallback-" + str(uuid.uuid4())[:8] |
|
|
|
try: |
|
|
|
cancellation_flags["text_task"] = False |
|
start_time = time.time() |
|
monitor.log_event("text_processing_start", {"text_length": len(text)}) |
|
|
|
|
|
if cancellation_flags["text_task"]: |
|
job_manager.update_job_completion(job_id, False, {"error": "Cancelled by user"}) |
|
return "⏹️ Processing cancelled", {}, {} |
|
|
|
|
|
async def run_with_cancellation(): |
|
task = asyncio.create_task(_process_text_async(text, enable_fhir)) |
|
running_tasks["text_task"] = task |
|
try: |
|
return await task |
|
finally: |
|
if "text_task" in running_tasks: |
|
del running_tasks["text_task"] |
|
|
|
result, method_name = asyncio.run(run_with_cancellation()) |
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
|
|
|
|
|
extracted_data_raw = result.get("extracted_data", {}) |
|
if isinstance(extracted_data_raw, str): |
|
try: |
|
entities = json.loads(extracted_data_raw) |
|
except json.JSONDecodeError: |
|
entities = {} |
|
else: |
|
entities = extracted_data_raw |
|
|
|
|
|
processing_failed = ( |
|
isinstance(entities, dict) and entities.get("error") == "Processing failed" or |
|
result.get("metadata", {}).get("error") == "All providers failed" or |
|
method_name == "Error (Both Failed)" or |
|
result.get("failover_metadata", {}).get("complete_failure", False) |
|
) |
|
|
|
if processing_failed: |
|
|
|
providers_tried = entities.get("providers_tried", ["ollama", "huggingface"]) if isinstance(entities, dict) else ["unknown"] |
|
error_msg = entities.get("error", "Processing failed") if isinstance(entities, dict) else "Processing failed" |
|
|
|
status = f"❌ **Processing Failed**\n\n📝 **Text:** {len(text)} characters\n⚠️ **Error:** {error_msg}\n🔄 **Providers Tried:** {', '.join(providers_tried)}\n💡 **Note:** All available AI providers are currently unavailable" |
|
|
|
|
|
try: |
|
if job_id: |
|
job_manager.update_job_completion(job_id, False, { |
|
"processing_time": f"{processing_time:.2f}s", |
|
"error": error_msg, |
|
"providers_tried": providers_tried |
|
}) |
|
print(f"✅ DEBUG: Failed job {job_id[:8]} recorded successfully") |
|
else: |
|
print("❌ DEBUG: No job_id to record failure") |
|
except Exception as completion_error: |
|
print(f"❌ DEBUG: Failed to record job completion: {completion_error}") |
|
|
|
monitor.log_event("text_processing_failed", {"error": error_msg, "providers_tried": providers_tried}) |
|
|
|
return status, entities, {} |
|
else: |
|
|
|
status = f"✅ **Processing Complete!**\n\nProcessed {len(text)} characters using **{method_name}**" |
|
|
|
fhir_resources = result.get("fhir_bundle", {}) if enable_fhir else {} |
|
|
|
|
|
try: |
|
if job_id: |
|
job_manager.update_job_completion(job_id, True, { |
|
"processing_time": f"{processing_time:.2f}s", |
|
"entities_found": len(entities) if isinstance(entities, dict) else 0, |
|
"method": method_name |
|
}) |
|
print(f"✅ DEBUG: Success job {job_id[:8]} recorded successfully") |
|
else: |
|
print("❌ DEBUG: No job_id to record success") |
|
except Exception as completion_error: |
|
print(f"❌ DEBUG: Failed to record job completion: {completion_error}") |
|
|
|
|
|
active_jobs["text_task"] = None |
|
|
|
monitor.log_event("text_processing_success", {"entities_found": len(entities), "method": method_name}) |
|
|
|
return status, entities, fhir_resources |
|
|
|
except asyncio.CancelledError: |
|
job_manager.update_job_completion(job_id, False, {"error": "Processing cancelled"}) |
|
active_jobs["text_task"] = None |
|
monitor.log_event("text_processing_cancelled", {}) |
|
return "⏹️ Processing cancelled", {}, {} |
|
|
|
except Exception as e: |
|
job_manager.update_job_completion(job_id, False, {"error": str(e)}) |
|
active_jobs["text_task"] = None |
|
monitor.log_event("text_processing_error", {"error": str(e)}) |
|
return f"❌ Processing failed: {str(e)}", {}, {} |
|
|
|
async def _process_file_async(file, enable_mistral_ocr, enable_fhir): |
|
"""Async file processing that can be cancelled""" |
|
global cancellation_flags, running_tasks |
|
|
|
|
|
from src.file_processor import local_processor |
|
|
|
with open(file.name, 'rb') as f: |
|
document_bytes = f.read() |
|
|
|
|
|
actual_ocr_method = None |
|
|
|
|
|
if enable_mistral_ocr: |
|
|
|
try: |
|
extracted_text = await local_processor._extract_with_mistral(document_bytes) |
|
actual_ocr_method = "mistral_api" |
|
except Exception as e: |
|
print(f"⚠️ Mistral OCR failed, falling back to local OCR: {e}") |
|
|
|
ocr_result = await local_processor.process_document(document_bytes, "user", file.name) |
|
extracted_text = ocr_result.get('extracted_text', '') |
|
actual_ocr_method = "local_processor" |
|
else: |
|
|
|
ocr_result = await local_processor.process_document(document_bytes, "user", file.name) |
|
extracted_text = ocr_result.get('extracted_text', '') |
|
actual_ocr_method = "local_processor" |
|
|
|
|
|
if cancellation_flags["file_task"]: |
|
raise asyncio.CancelledError("File processing cancelled") |
|
|
|
|
|
|
|
if cancellation_flags["file_task"]: |
|
raise asyncio.CancelledError("File processing cancelled") |
|
|
|
|
|
try: |
|
processor = get_codellama() |
|
method_name = "CodeLlama (Ollama)" |
|
|
|
result = await processor.process_document( |
|
medical_text=extracted_text, |
|
document_type="clinical_note", |
|
extract_entities=True, |
|
generate_fhir=enable_fhir, |
|
source_metadata={"extraction_method": actual_ocr_method} |
|
) |
|
except Exception as e: |
|
print(f"⚠️ CodeLlama processing failed: {e}, falling back to HuggingFace") |
|
|
|
|
|
try: |
|
processor = get_enhanced_codellama() |
|
method_name = "HuggingFace (Fallback)" |
|
|
|
result = await processor.process_document( |
|
medical_text=extracted_text, |
|
document_type="clinical_note", |
|
extract_entities=True, |
|
generate_fhir=enable_fhir, |
|
source_metadata={"extraction_method": actual_ocr_method} |
|
) |
|
except Exception as fallback_error: |
|
print(f"❌ HuggingFace fallback also failed: {fallback_error}") |
|
|
|
result = { |
|
"extracted_data": {"error": "Processing failed", "patient": "Unknown Patient", "conditions": [], "medications": []}, |
|
"metadata": {"model_used": "error_fallback", "processing_time": 0} |
|
} |
|
method_name = "Error (Both Failed)" |
|
|
|
|
|
if cancellation_flags["file_task"]: |
|
raise asyncio.CancelledError("File processing cancelled") |
|
|
|
return result, method_name, extracted_text, actual_ocr_method |
|
|
|
def process_file_only(file, enable_mistral_ocr=True, enable_fhir=True): |
|
"""Process uploaded file with CodeLlama processor and optional Mistral OCR""" |
|
global cancellation_flags |
|
|
|
if not file: |
|
return "❌ Please upload a file", {}, {} |
|
|
|
|
|
job_id = job_manager.add_processing_job("file", file.name, { |
|
"enable_mistral_ocr": enable_mistral_ocr, |
|
"enable_fhir": enable_fhir |
|
}) |
|
active_jobs["file_task"] = job_id |
|
|
|
try: |
|
|
|
cancellation_flags["file_task"] = False |
|
monitor.log_event("file_processing_start", {"filename": file.name}) |
|
|
|
|
|
if cancellation_flags["file_task"]: |
|
job_manager.update_job_completion(job_id, False, {"error": "Cancelled by user"}) |
|
return "⏹️ File processing cancelled", {}, {} |
|
|
|
import time |
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
|
async def run_with_cancellation(): |
|
task = asyncio.create_task(_process_file_async(file, enable_mistral_ocr, enable_fhir)) |
|
running_tasks["file_task"] = task |
|
try: |
|
return await task |
|
finally: |
|
if "file_task" in running_tasks: |
|
del running_tasks["file_task"] |
|
|
|
result, method_name, extracted_text, actual_ocr_method = asyncio.run(run_with_cancellation()) |
|
except asyncio.CancelledError: |
|
job_manager.update_job_completion(job_id, False, {"error": "Processing cancelled"}) |
|
active_jobs["file_task"] = None |
|
return "⏹️ File processing cancelled", {}, {} |
|
|
|
processing_time = time.time() - start_time |
|
|
|
|
|
ocr_method_display = "Mistral OCR (Advanced)" if actual_ocr_method == "mistral_api" else "Local OCR (Standard)" |
|
status = f"✅ **File Processing Complete!**\n\n📁 **File:** {file.name}\n🔍 **OCR Method:** {ocr_method_display}\n🤖 **AI Processor:** {method_name}\n⏱️ **Processing Time:** {processing_time:.2f}s" |
|
|
|
|
|
extracted_data_raw = result.get("extracted_data", {}) |
|
if isinstance(extracted_data_raw, str): |
|
try: |
|
entities = json.loads(extracted_data_raw) |
|
except json.JSONDecodeError: |
|
entities = {} |
|
else: |
|
entities = extracted_data_raw |
|
|
|
fhir_resources = result.get("fhir_bundle", {}) if enable_fhir else {} |
|
|
|
|
|
job_manager.update_job_completion(job_id, True, { |
|
"processing_time": f"{processing_time:.2f}s", |
|
"entities_found": len(entities) if isinstance(entities, dict) else 0, |
|
"method": method_name |
|
}) |
|
|
|
|
|
active_jobs["file_task"] = None |
|
|
|
monitor.log_event("file_processing_success", {"filename": file.name, "method": method_name}) |
|
|
|
return status, entities, fhir_resources |
|
|
|
except Exception as e: |
|
job_manager.update_job_completion(job_id, False, {"error": str(e)}) |
|
active_jobs["file_task"] = None |
|
monitor.log_event("file_processing_error", {"error": str(e)}) |
|
return f"❌ File processing failed: {str(e)}", {}, {} |
|
|
|
def process_dicom_only(dicom_file): |
|
"""Process DICOM files using the real DICOM processor""" |
|
global cancellation_flags |
|
|
|
if not dicom_file: |
|
return "❌ Please upload a DICOM file", {}, {} |
|
|
|
|
|
job_id = job_manager.add_processing_job("dicom", dicom_file.name) |
|
active_jobs["dicom_task"] = job_id |
|
|
|
try: |
|
|
|
cancellation_flags["dicom_task"] = False |
|
|
|
|
|
if cancellation_flags["dicom_task"]: |
|
job_manager.update_job_completion(job_id, False, {"error": "Cancelled by user"}) |
|
return "⏹️ DICOM processing cancelled", {}, {} |
|
monitor.log_event("dicom_processing_start", {"filename": dicom_file.name}) |
|
|
|
import time |
|
start_time = time.time() |
|
|
|
|
|
async def run_dicom_with_cancellation(): |
|
task = asyncio.create_task(dicom_processor.process_dicom_file(dicom_file.name)) |
|
running_tasks["dicom_task"] = task |
|
try: |
|
return await task |
|
finally: |
|
if "dicom_task" in running_tasks: |
|
del running_tasks["dicom_task"] |
|
|
|
try: |
|
result = asyncio.run(run_dicom_with_cancellation()) |
|
except asyncio.CancelledError: |
|
job_manager.update_job_completion(job_id, False, {"error": "Processing cancelled"}) |
|
active_jobs["dicom_task"] = None |
|
return "⏹️ DICOM processing cancelled", {}, {} |
|
|
|
processing_time = time.time() - start_time |
|
|
|
|
|
if result.get("status") == "success": |
|
|
|
fhir_bundle = result.get("fhir_bundle", {}) |
|
patient_name = result.get("patient_name", "Unknown") |
|
study_description = result.get("study_description", "Unknown") |
|
modality = result.get("modality", "Unknown") |
|
file_size = result.get("file_size", 0) |
|
|
|
status = f"""✅ **DICOM Processing Complete!** |
|
|
|
📁 **File:** {os.path.basename(dicom_file.name)} |
|
📊 **Size:** {file_size} bytes |
|
⏱️ **Processing Time:** {processing_time:.2f}s |
|
🏥 **Modality:** {modality} |
|
👤 **Patient:** {patient_name} |
|
📋 **Study:** {study_description} |
|
📊 **FHIR Resources:** {len(fhir_bundle.get('entry', []))} generated""" |
|
|
|
|
|
analysis = { |
|
"file_info": { |
|
"filename": os.path.basename(dicom_file.name), |
|
"file_size_bytes": file_size, |
|
"processing_time": result.get('processing_time', 0) |
|
}, |
|
"patient_info": { |
|
"name": patient_name |
|
}, |
|
"study_info": { |
|
"description": study_description, |
|
"modality": modality |
|
}, |
|
"processing_status": "✅ Successfully processed", |
|
"processor_used": "DICOM Processor with pydicom", |
|
"pydicom_available": True |
|
} |
|
|
|
|
|
fhir_imaging = fhir_bundle |
|
|
|
|
|
job_manager.update_job_completion(job_id, True, { |
|
"processing_time": f"{processing_time:.2f}s", |
|
"patient_name": patient_name, |
|
"modality": modality |
|
}) |
|
|
|
|
|
active_jobs["dicom_task"] = None |
|
|
|
else: |
|
|
|
error_msg = result.get("error", "Unknown error") |
|
fallback_used = result.get("fallback_used", False) |
|
processor_info = "DICOM Fallback Processor" if fallback_used else "DICOM Processor" |
|
|
|
status = f"""❌ **DICOM Processing Failed** |
|
|
|
📁 **File:** {os.path.basename(dicom_file.name)} |
|
🚫 **Error:** {error_msg} |
|
🔧 **Processor:** {processor_info} |
|
💡 **Note:** pydicom library may not be available or file format issue""" |
|
|
|
analysis = { |
|
"error": error_msg, |
|
"file_info": {"filename": os.path.basename(dicom_file.name)}, |
|
"processing_status": "❌ Failed", |
|
"processor_used": processor_info, |
|
"fallback_used": fallback_used, |
|
"pydicom_available": not fallback_used |
|
} |
|
|
|
fhir_imaging = {} |
|
|
|
|
|
job_manager.update_job_completion(job_id, False, {"error": error_msg}) |
|
|
|
|
|
active_jobs["dicom_task"] = None |
|
|
|
monitor.log_event("dicom_processing_success", {"filename": dicom_file.name}) |
|
|
|
return status, analysis, fhir_imaging |
|
|
|
except Exception as e: |
|
job_manager.update_job_completion(job_id, False, {"error": str(e)}) |
|
active_jobs["dicom_task"] = None |
|
monitor.log_event("dicom_processing_error", {"error": str(e)}) |
|
error_analysis = { |
|
"error": str(e), |
|
"file_info": {"filename": os.path.basename(dicom_file.name) if dicom_file else "Unknown"}, |
|
"processing_status": "❌ Exception occurred" |
|
} |
|
return f"❌ DICOM processing failed: {str(e)}", error_analysis, {} |
|
|
|
def cancel_current_task(task_type): |
|
"""Cancel current processing task""" |
|
global cancellation_flags, running_tasks, task_queues, active_jobs |
|
|
|
|
|
monitor.log_event("cancel_state_before", { |
|
"task_type": task_type, |
|
"cancellation_flags": cancellation_flags.copy(), |
|
"active_jobs": active_jobs.copy(), |
|
"task_queues": {k: len(v) for k, v in task_queues.items()} |
|
}) |
|
|
|
|
|
cancellation_flags[task_type] = True |
|
|
|
|
|
if running_tasks[task_type] is not None: |
|
try: |
|
running_tasks[task_type].cancel() |
|
running_tasks[task_type] = None |
|
except Exception as e: |
|
print(f"Error cancelling task {task_type}: {e}") |
|
|
|
|
|
if task_queues.get(task_type): |
|
task_queues[task_type].clear() |
|
|
|
|
|
active_jobs[task_type] = None |
|
|
|
|
|
if dashboard_state["active_tasks"] > 0: |
|
dashboard_state["active_tasks"] -= 1 |
|
|
|
monitor.log_event("task_cancelled", {"task_type": task_type}) |
|
|
|
|
|
monitor.log_event("cancel_state_after", { |
|
"task_type": task_type, |
|
"cancellation_flags": cancellation_flags.copy(), |
|
"active_jobs": active_jobs.copy(), |
|
"task_queues": {k: len(v) for k, v in task_queues.items()} |
|
}) |
|
|
|
return f"⏹️ Cancelled {task_type}" |
|
|
|
|
|
monitor.log_event("cancel_state_before", { |
|
"task_type": task_type, |
|
"cancellation_flags": cancellation_flags.copy(), |
|
"active_jobs": active_jobs.copy(), |
|
"task_queues": {k: len(v) for k, v in task_queues.items()} |
|
}) |
|
|
|
|
|
cancellation_flags[task_type] = True |
|
|
|
|
|
if running_tasks[task_type] is not None: |
|
try: |
|
running_tasks[task_type].cancel() |
|
running_tasks[task_type] = None |
|
except Exception as e: |
|
print(f"Error cancelling task {task_type}: {e}") |
|
|
|
|
|
if dashboard_state["active_tasks"] > 0: |
|
dashboard_state["active_tasks"] -= 1 |
|
|
|
monitor.log_event("task_cancelled", {"task_type": task_type}) |
|
|
|
|
|
monitor.log_event("cancel_state_after", { |
|
"task_type": task_type, |
|
"cancellation_flags": cancellation_flags.copy(), |
|
"active_jobs": active_jobs.copy(), |
|
"task_queues": {k: len(v) for k, v in task_queues.items()} |
|
}) |
|
return f"⏹️ Cancelled {task_type}" |
|
|
|
def get_dashboard_status(): |
|
"""Get current file processing dashboard status""" |
|
return job_manager.get_dashboard_status() |
|
|
|
def get_dashboard_metrics(): |
|
"""Get file processing metrics for DataFrame display""" |
|
return job_manager.get_dashboard_metrics() |
|
|
|
def get_processing_queue(): |
|
"""Get processing queue for DataFrame display""" |
|
return job_manager.get_processing_queue() |
|
|
|
def get_jobs_history(): |
|
"""Get processing jobs history for DataFrame display""" |
|
return job_manager.get_jobs_history() |
|
|
|
|
|
def get_files_history(): |
|
"""Legacy function - redirects to get_jobs_history()""" |
|
return get_jobs_history() |
|
def get_old_files_history(): |
|
"""Get list of recently processed files for dashboard (legacy function)""" |
|
|
|
recent_files = dashboard_state["files_processed"][-10:] if dashboard_state["files_processed"] else [] |
|
return recent_files |
|
|
|
def add_file_to_dashboard(filename, file_type, success, processing_time=None, error=None, entities_found=None): |
|
"""Add a processed file to the dashboard statistics""" |
|
import datetime |
|
|
|
file_info = { |
|
"filename": filename, |
|
"file_type": file_type, |
|
"success": success, |
|
"processing_time": processing_time, |
|
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
"error": error if not success else None, |
|
"entities_found": entities_found or 0 |
|
} |
|
|
|
dashboard_state["files_processed"].append(file_info) |
|
dashboard_state["total_files"] += 1 |
|
|
|
if success: |
|
dashboard_state["successful_files"] += 1 |
|
else: |
|
dashboard_state["failed_files"] += 1 |
|
|
|
dashboard_state["last_update"] = file_info["timestamp"] |
|
|
|
|
|
if __name__ == "__main__": |
|
print("🔥 Starting FhirFlame Medical AI Platform...") |
|
|
|
|
|
from frontend_ui import create_medical_ui |
|
|
|
|
|
demo = create_medical_ui( |
|
process_text_only=process_text_only, |
|
process_file_only=process_file_only, |
|
process_dicom_only=process_dicom_only, |
|
cancel_current_task=cancel_current_task, |
|
get_dashboard_status=get_dashboard_status, |
|
dashboard_state=dashboard_state, |
|
get_dashboard_metrics=get_dashboard_metrics, |
|
get_simple_agent_status=get_simple_agent_status, |
|
get_enhanced_codellama=get_enhanced_codellama, |
|
add_file_to_dashboard=add_file_to_dashboard |
|
) |
|
|
|
|
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=False, |
|
inbrowser=False, |
|
favicon_path="static/favicon.ico" |
|
) |
|
|