Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
@@ -17,6 +17,7 @@ from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, Tabl
|
|
17 |
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
18 |
import io
|
19 |
from dotenv import load_dotenv
|
|
|
20 |
|
21 |
# Load environment variables
|
22 |
load_dotenv()
|
@@ -30,12 +31,12 @@ logger = logging.getLogger(__name__)
|
|
30 |
|
31 |
# Configuration and Constants
|
32 |
class Config:
|
33 |
-
GEMINI_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-
|
34 |
-
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
|
35 |
MAX_RETRIES = 3
|
36 |
TIMEOUT = 30
|
37 |
MAX_IMAGE_SIZE = (1600, 1600)
|
38 |
-
ALLOWED_MIME_TYPES = ["image/jpeg", "image/png"]
|
39 |
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB
|
40 |
|
41 |
# Custom Exceptions
|
@@ -45,6 +46,9 @@ class APIError(Exception):
|
|
45 |
class ImageProcessingError(Exception):
|
46 |
pass
|
47 |
|
|
|
|
|
|
|
48 |
# Initialize session state
|
49 |
def init_session_state():
|
50 |
if 'processing_history' not in st.session_state:
|
@@ -179,22 +183,20 @@ class PDFGenerator:
|
|
179 |
doc.build(elements)
|
180 |
return buffer.getvalue()
|
181 |
|
182 |
-
|
183 |
class ImageProcessor:
|
184 |
@staticmethod
|
185 |
-
def
|
186 |
try:
|
187 |
if uploaded_file.size > Config.MAX_FILE_SIZE:
|
188 |
return False, f"File size exceeds {Config.MAX_FILE_SIZE // (1024*1024)}MB limit"
|
189 |
|
190 |
-
|
191 |
-
|
192 |
-
return False, "Unsupported image format. Please upload JPEG or PNG"
|
193 |
|
194 |
-
return True, "
|
195 |
except Exception as e:
|
196 |
-
logger.error(f"
|
197 |
-
return False, f"
|
198 |
|
199 |
@staticmethod
|
200 |
def preprocess_image(image: Image.Image) -> Image.Image:
|
@@ -214,14 +216,23 @@ class DocumentProcessor:
|
|
214 |
def __init__(self):
|
215 |
self.image_processor = ImageProcessor()
|
216 |
|
217 |
-
def process_document(self,
|
218 |
try:
|
219 |
-
|
220 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
221 |
|
222 |
results = {
|
223 |
-
"document_type": self.classify_document(
|
224 |
-
"extracted_text":
|
225 |
"structured_data": None
|
226 |
}
|
227 |
|
@@ -241,8 +252,22 @@ class DocumentProcessor:
|
|
241 |
image.save(buffered, format="JPEG", quality=95)
|
242 |
return base64.b64encode(buffered.getvalue()).decode('utf-8')
|
243 |
|
244 |
-
|
245 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
246 |
Analyze this medical document and classify it into one of the following categories:
|
247 |
- Lab Report
|
248 |
- Patient Chart
|
@@ -251,8 +276,11 @@ class DocumentProcessor:
|
|
251 |
- Medical Certificate
|
252 |
- Other (specify)
|
253 |
Provide only the category name.
|
|
|
|
|
|
|
254 |
"""
|
255 |
-
response = GeminiAPI.call_api(prompt
|
256 |
return response["candidates"][0]["content"]["parts"][0]["text"].strip()
|
257 |
|
258 |
def extract_text(self, image_base64: str) -> str:
|
@@ -318,6 +346,9 @@ class DocumentProcessor:
|
|
318 |
self.correct_medicine_name(med) for med in structured_data.get('medications', [])
|
319 |
]
|
320 |
|
|
|
|
|
|
|
321 |
return structured_data
|
322 |
|
323 |
@staticmethod
|
@@ -341,6 +372,17 @@ class DocumentProcessor:
|
|
341 |
medication['name'] = response["candidates"][0]["content"]["parts"][0]["text"].strip()
|
342 |
return medication
|
343 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
344 |
@staticmethod
|
345 |
def parse_json_response(response: Dict[str, Any]) -> Dict[str, Any]:
|
346 |
try:
|
@@ -420,7 +462,7 @@ def main():
|
|
420 |
setup_page()
|
421 |
|
422 |
st.title("🏥 Advanced Medical Document Processor")
|
423 |
-
st.markdown("Upload medical documents for automated processing and analysis.")
|
424 |
|
425 |
# Sidebar
|
426 |
with st.sidebar:
|
@@ -441,30 +483,32 @@ def main():
|
|
441 |
# Main content
|
442 |
uploaded_file = st.file_uploader(
|
443 |
"Choose a medical document",
|
444 |
-
type=['png', 'jpg', 'jpeg'],
|
445 |
-
help="Upload a clear image of a medical document (max 5MB)"
|
446 |
)
|
447 |
|
448 |
if uploaded_file:
|
449 |
try:
|
450 |
-
# Validate
|
451 |
-
is_valid, message = ImageProcessor.
|
452 |
if not is_valid:
|
453 |
st.error(message)
|
454 |
return
|
455 |
|
456 |
-
# Display
|
457 |
-
|
458 |
-
|
459 |
-
|
460 |
-
|
461 |
-
|
|
|
|
|
462 |
|
463 |
# Process document
|
464 |
if st.button("🔍 Process Document"):
|
465 |
with st.spinner("Processing document..."):
|
466 |
processor = DocumentProcessor()
|
467 |
-
results = processor.process_document(
|
468 |
|
469 |
# Generate PDF
|
470 |
pdf_bytes = PDFGenerator.create_pdf(results['structured_data'])
|
@@ -486,7 +530,7 @@ def main():
|
|
486 |
})
|
487 |
|
488 |
# Display results
|
489 |
-
with col2:
|
490 |
st.success("Document processed successfully!")
|
491 |
st.markdown(f"**Document Type:** {results['document_type']}")
|
492 |
|
|
|
17 |
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
18 |
import io
|
19 |
from dotenv import load_dotenv
|
20 |
+
import fitz # PyMuPDF for PDF processing
|
21 |
|
22 |
# Load environment variables
|
23 |
load_dotenv()
|
|
|
31 |
|
32 |
# Configuration and Constants
|
33 |
class Config:
|
34 |
+
GEMINI_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash-exp:generateContent"
|
35 |
+
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") # Load from .env
|
36 |
MAX_RETRIES = 3
|
37 |
TIMEOUT = 30
|
38 |
MAX_IMAGE_SIZE = (1600, 1600)
|
39 |
+
ALLOWED_MIME_TYPES = ["image/jpeg", "image/png", "application/pdf"]
|
40 |
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB
|
41 |
|
42 |
# Custom Exceptions
|
|
|
46 |
class ImageProcessingError(Exception):
|
47 |
pass
|
48 |
|
49 |
+
class PDFProcessingError(Exception):
|
50 |
+
pass
|
51 |
+
|
52 |
# Initialize session state
|
53 |
def init_session_state():
|
54 |
if 'processing_history' not in st.session_state:
|
|
|
183 |
doc.build(elements)
|
184 |
return buffer.getvalue()
|
185 |
|
|
|
186 |
class ImageProcessor:
|
187 |
@staticmethod
|
188 |
+
def validate_file(uploaded_file) -> tuple[bool, str]:
|
189 |
try:
|
190 |
if uploaded_file.size > Config.MAX_FILE_SIZE:
|
191 |
return False, f"File size exceeds {Config.MAX_FILE_SIZE // (1024*1024)}MB limit"
|
192 |
|
193 |
+
if uploaded_file.type not in Config.ALLOWED_MIME_TYPES:
|
194 |
+
return False, "Unsupported file type. Please upload JPEG, PNG, or PDF."
|
|
|
195 |
|
196 |
+
return True, "File validation successful"
|
197 |
except Exception as e:
|
198 |
+
logger.error(f"File validation error: {str(e)}")
|
199 |
+
return False, f"File validation failed: {str(e)}"
|
200 |
|
201 |
@staticmethod
|
202 |
def preprocess_image(image: Image.Image) -> Image.Image:
|
|
|
216 |
def __init__(self):
|
217 |
self.image_processor = ImageProcessor()
|
218 |
|
219 |
+
def process_document(self, uploaded_file) -> Dict[str, Any]:
|
220 |
try:
|
221 |
+
if uploaded_file.type.startswith("image/"):
|
222 |
+
# Process image
|
223 |
+
image = Image.open(uploaded_file)
|
224 |
+
processed_image = self.image_processor.preprocess_image(image)
|
225 |
+
image_base64 = self.encode_image(processed_image)
|
226 |
+
extracted_text = self.extract_text(image_base64)
|
227 |
+
elif uploaded_file.type == "application/pdf":
|
228 |
+
# Process PDF
|
229 |
+
extracted_text = self.extract_text_from_pdf(uploaded_file)
|
230 |
+
else:
|
231 |
+
raise ValueError("Unsupported file type.")
|
232 |
|
233 |
results = {
|
234 |
+
"document_type": self.classify_document(extracted_text),
|
235 |
+
"extracted_text": extracted_text,
|
236 |
"structured_data": None
|
237 |
}
|
238 |
|
|
|
252 |
image.save(buffered, format="JPEG", quality=95)
|
253 |
return base64.b64encode(buffered.getvalue()).decode('utf-8')
|
254 |
|
255 |
+
@staticmethod
|
256 |
+
def extract_text_from_pdf(uploaded_file) -> str:
|
257 |
+
try:
|
258 |
+
pdf_bytes = uploaded_file.read()
|
259 |
+
pdf_document = fitz.open(stream=pdf_bytes, filetype="pdf")
|
260 |
+
text = ""
|
261 |
+
for page_num in range(len(pdf_document)):
|
262 |
+
page = pdf_document.load_page(page_num)
|
263 |
+
text += page.get_text()
|
264 |
+
return text
|
265 |
+
except Exception as e:
|
266 |
+
logger.error(f"PDF processing error: {str(e)}")
|
267 |
+
raise PDFProcessingError(f"Failed to process PDF: {str(e)}")
|
268 |
+
|
269 |
+
def classify_document(self, text: str) -> str:
|
270 |
+
prompt = f"""
|
271 |
Analyze this medical document and classify it into one of the following categories:
|
272 |
- Lab Report
|
273 |
- Patient Chart
|
|
|
276 |
- Medical Certificate
|
277 |
- Other (specify)
|
278 |
Provide only the category name.
|
279 |
+
|
280 |
+
Document Text:
|
281 |
+
{text}
|
282 |
"""
|
283 |
+
response = GeminiAPI.call_api(prompt)
|
284 |
return response["candidates"][0]["content"]["parts"][0]["text"].strip()
|
285 |
|
286 |
def extract_text(self, image_base64: str) -> str:
|
|
|
346 |
self.correct_medicine_name(med) for med in structured_data.get('medications', [])
|
347 |
]
|
348 |
|
349 |
+
# Improve symptoms extraction
|
350 |
+
structured_data['symptoms'] = self.extract_symptoms(text)
|
351 |
+
|
352 |
return structured_data
|
353 |
|
354 |
@staticmethod
|
|
|
372 |
medication['name'] = response["candidates"][0]["content"]["parts"][0]["text"].strip()
|
373 |
return medication
|
374 |
|
375 |
+
@staticmethod
|
376 |
+
def extract_symptoms(text: str) -> list[str]:
|
377 |
+
"""Extract symptoms from the text."""
|
378 |
+
prompt = f"""
|
379 |
+
Extract all symptoms mentioned in the following medical text. Return only a list of symptoms:
|
380 |
+
{text}
|
381 |
+
"""
|
382 |
+
response = GeminiAPI.call_api(prompt)
|
383 |
+
symptoms = response["candidates"][0]["content"]["parts"][0]["text"].strip().split("\n")
|
384 |
+
return [symptom.strip() for symptom in symptoms if symptom.strip()]
|
385 |
+
|
386 |
@staticmethod
|
387 |
def parse_json_response(response: Dict[str, Any]) -> Dict[str, Any]:
|
388 |
try:
|
|
|
462 |
setup_page()
|
463 |
|
464 |
st.title("🏥 Advanced Medical Document Processor")
|
465 |
+
st.markdown("Upload medical documents (images or PDFs) for automated processing and analysis.")
|
466 |
|
467 |
# Sidebar
|
468 |
with st.sidebar:
|
|
|
483 |
# Main content
|
484 |
uploaded_file = st.file_uploader(
|
485 |
"Choose a medical document",
|
486 |
+
type=['png', 'jpg', 'jpeg', 'pdf'],
|
487 |
+
help="Upload a clear image or PDF of a medical document (max 5MB)"
|
488 |
)
|
489 |
|
490 |
if uploaded_file:
|
491 |
try:
|
492 |
+
# Validate file
|
493 |
+
is_valid, message = ImageProcessor.validate_file(uploaded_file)
|
494 |
if not is_valid:
|
495 |
st.error(message)
|
496 |
return
|
497 |
|
498 |
+
# Display file
|
499 |
+
if uploaded_file.type.startswith("image/"):
|
500 |
+
image = Image.open(uploaded_file)
|
501 |
+
col1, col2 = st.columns([1, 2])
|
502 |
+
with col1:
|
503 |
+
st.image(image, caption="Uploaded Document", use_column_width=True)
|
504 |
+
elif uploaded_file.type == "application/pdf":
|
505 |
+
st.info("PDF file uploaded. Processing...")
|
506 |
|
507 |
# Process document
|
508 |
if st.button("🔍 Process Document"):
|
509 |
with st.spinner("Processing document..."):
|
510 |
processor = DocumentProcessor()
|
511 |
+
results = processor.process_document(uploaded_file)
|
512 |
|
513 |
# Generate PDF
|
514 |
pdf_bytes = PDFGenerator.create_pdf(results['structured_data'])
|
|
|
530 |
})
|
531 |
|
532 |
# Display results
|
533 |
+
with col2 if uploaded_file.type.startswith("image/") else st:
|
534 |
st.success("Document processed successfully!")
|
535 |
st.markdown(f"**Document Type:** {results['document_type']}")
|
536 |
|