Spaces:
Sleeping
Sleeping
import streamlit as st | |
import requests | |
import json | |
from PIL import Image | |
import base64 | |
from io import BytesIO | |
import pandas as pd | |
from datetime import datetime | |
import time | |
import logging | |
import os | |
from typing import Dict, Any, Optional | |
import re | |
from reportlab.lib import colors | |
from reportlab.lib.pagesizes import letter | |
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle | |
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
import io | |
import pytesseract # Tesseract OCR | |
from dotenv import load_dotenv # For .env file | |
# Load environment variables | |
load_dotenv() | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
# Configuration and Constants | |
class Config: | |
GEMINI_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash-exp:generateContent" | |
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") # Load from .env | |
MAX_RETRIES = 3 | |
TIMEOUT = 30 | |
MAX_IMAGE_SIZE = (1600, 1600) | |
ALLOWED_MIME_TYPES = ["image/jpeg", "image/png"] | |
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB | |
# Custom Exceptions | |
class APIError(Exception): | |
pass | |
class ImageProcessingError(Exception): | |
pass | |
# Initialize session state | |
def init_session_state(): | |
if 'processing_history' not in st.session_state: | |
st.session_state.processing_history = [] | |
if 'current_document' not in st.session_state: | |
st.session_state.current_document = None | |
if 'pdf_history' not in st.session_state: | |
st.session_state.pdf_history = [] | |
# Page setup and styling | |
def setup_page(): | |
st.set_page_config( | |
page_title="Medical Document Processor", | |
page_icon="🏥", | |
layout="wide", | |
initial_sidebar_state="expanded" | |
) | |
st.markdown(""" | |
<style> | |
.main {padding: 2rem; max-width: 1200px; margin: 0 auto;} | |
.stCard { | |
background-color: white; | |
padding: 2rem; | |
border-radius: 10px; | |
box-shadow: 0 4px 6px rgba(0,0,0,0.1); | |
margin: 1rem 0; | |
} | |
.header-container { | |
background-color: #f8f9fa; | |
padding: 2rem; | |
border-radius: 10px; | |
margin-bottom: 2rem; | |
} | |
.stButton>button { | |
background-color: #007bff; | |
color: white; | |
border: none; | |
padding: 0.5rem 1rem; | |
border-radius: 5px; | |
transition: all 0.3s ease; | |
} | |
.stButton>button:hover { | |
background-color: #0056b3; | |
transform: translateY(-2px); | |
} | |
.element-container {opacity: 1 !important;} | |
.pdf-history-item { | |
background-color: #f8f9fa; | |
padding: 1rem; | |
border-radius: 8px; | |
margin: 0.5rem 0; | |
border: 1px solid #dee2e6; | |
} | |
.metric-card { | |
background-color: #f8f9fa; | |
padding: 1rem; | |
border-radius: 8px; | |
border: 1px solid #dee2e6; | |
margin: 0.5rem 0; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Rest of the code remains the same as in the previous version... | |
# (Include all classes and functions from the previous code here) | |
def main(): | |
init_session_state() | |
setup_page() | |
st.title("🏥 Advanced Medical Document Processor") | |
st.markdown("Upload medical documents for automated processing and analysis.") | |
# Sidebar | |
with st.sidebar: | |
st.header("📋 Processing History") | |
if st.session_state.pdf_history: | |
for idx, pdf_record in enumerate(st.session_state.pdf_history): | |
with st.expander(f"Document {idx + 1}: {pdf_record['timestamp']}"): | |
st.download_button( | |
"📄 Download PDF", | |
pdf_record['data'], | |
file_name=pdf_record['filename'], | |
mime="application/pdf", | |
key=f"sidebar_{pdf_record['timestamp']}" | |
) | |
else: | |
st.info("No documents processed yet") | |
# Main content | |
uploaded_file = st.file_uploader( | |
"Choose a medical document", | |
type=['png', 'jpg', 'jpeg'], | |
help="Upload a clear image of a medical document (max 5MB)" | |
) | |
if uploaded_file: | |
try: | |
# Validate image | |
is_valid, message = ImageProcessor.validate_image(uploaded_file) | |
if not is_valid: | |
st.error(message) | |
return | |
# Display image | |
image = Image.open(uploaded_file) | |
col1, col2 = st.columns([1, 2]) | |
with col1: | |
st.image(image, caption="Uploaded Document", use_column_width=True) | |
# Process document | |
if st.button("🔍 Process Document"): | |
with st.spinner("Processing document..."): | |
processor = DocumentProcessor() | |
results = processor.process_document(image) | |
# Generate PDF | |
pdf_bytes = PDFGenerator.create_pdf(results['structured_data']) | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
pdf_filename = f"medical_report_{timestamp}.pdf" | |
# Store in session state | |
st.session_state.current_document = { | |
'timestamp': timestamp, | |
'results': results | |
} | |
st.session_state.processing_history.append( | |
st.session_state.current_document | |
) | |
st.session_state.pdf_history.append({ | |
'timestamp': timestamp, | |
'filename': pdf_filename, | |
'data': pdf_bytes | |
}) | |
# Display results | |
with col2: | |
st.success("Document processed successfully!") | |
st.markdown(f"**Document Type:** {results['document_type']}") | |
with st.expander("View Extracted Text"): | |
st.text_area( | |
"Raw Text", | |
results['extracted_text'], | |
height=200 | |
) | |
# Display EHR View | |
if results['structured_data']: | |
EHRViewer.display_ehr(results['structured_data']) | |
# Download options | |
st.markdown("### 📥 Download Options") | |
col1, col2 = st.columns(2) | |
with col1: | |
json_str = json.dumps(results['structured_data'], indent=2) | |
st.download_button( | |
"⬇️ Download JSON", | |
json_str, | |
file_name=f"medical_data_{timestamp}.json", | |
mime="application/json" | |
) | |
with col2: | |
st.download_button( | |
"📄 Download PDF Report", | |
pdf_bytes, | |
file_name=pdf_filename, | |
mime="application/pdf" | |
) | |
# Display PDF History | |
st.markdown("### 📚 PDF History") | |
if st.session_state.pdf_history: | |
for pdf_record in st.session_state.pdf_history: | |
col1, col2 = st.columns([3, 1]) | |
with col1: | |
st.write(f"Report from {pdf_record['timestamp']}") | |
with col2: | |
st.download_button( | |
"📄 View PDF", | |
pdf_record['data'], | |
file_name=pdf_record['filename'], | |
mime="application/pdf", | |
key=f"history_{pdf_record['timestamp']}" | |
) | |
else: | |
st.info("No PDF history available") | |
except Exception as e: | |
st.error(f"An error occurred: {str(e)}") | |
logger.exception("Error in main processing loop") | |
if __name__ == "__main__": | |
try: | |
main() | |
except Exception as e: | |
st.error("An unexpected error occurred. Please try again later.") | |
logger.exception("Unhandled exception in main application") |