File size: 13,286 Bytes
a963d65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
#!/usr/bin/env python3
"""
Shared Medical Extraction Utilities
Centralized medical entity extraction logic to ensure consistency across all processors
"""

import re
from typing import Dict, Any, List
import json

class MedicalExtractor:
    """Centralized medical entity extraction with consistent patterns"""
    
    def __init__(self):
        # Comprehensive medical conditions database
        self.conditions_patterns = [
            "hypertension", "diabetes", "diabetes mellitus", "type 2 diabetes", "type 1 diabetes",
            "pneumonia", "asthma", "copd", "chronic obstructive pulmonary disease",
            "depression", "anxiety", "arthritis", "rheumatoid arthritis", "osteoarthritis",
            "cancer", "stroke", "heart disease", "coronary artery disease", "myocardial infarction",
            "kidney disease", "chronic kidney disease", "liver disease", "hepatitis",
            "chest pain", "acute coronary syndrome", "angina", "atrial fibrillation",
            "congestive heart failure", "heart failure", "cardiomyopathy",
            "hyperlipidemia", "high cholesterol", "obesity", "metabolic syndrome"
        ]
        
        # Common medication patterns
        self.medication_patterns = [
            r"([a-zA-Z]+(?:pril|sartan|olol|pine|statin|formin|cillin))\s+(\d+(?:\.\d+)?)\s*(mg|g|ml|units?)\s+(daily|twice daily|bid|tid|qid|once daily)",
            r"(aspirin|lisinopril|atorvastatin|metformin|insulin|warfarin|prednisone|omeprazole)\s+(\d+(?:\.\d+)?)\s*(mg|g|ml|units?)",
            r"([a-zA-Z]+)\s+(\d+(?:\.\d+)?)\s*(mg|g|ml|units?)\s+(daily|twice daily|bid|tid|qid)"
        ]
        
        # Vital signs patterns
        self.vital_patterns = [
            (r"bp:?\s*(\d{2,3}/\d{2,3})", "Blood Pressure"),
            (r"blood pressure:?\s*(\d{2,3}/\d{2,3})", "Blood Pressure"),
            (r"hr:?\s*(\d{2,3})", "Heart Rate"),
            (r"heart rate:?\s*(\d{2,3})", "Heart Rate"),
            (r"temp:?\s*(\d{2,3}(?:\.\d)?)", "Temperature"),
            (r"temperature:?\s*(\d{2,3}(?:\.\d)?)", "Temperature"),
            (r"o2 sat:?\s*(\d{2,3}%)", "O2 Saturation"),
            (r"oxygen saturation:?\s*(\d{2,3}%)", "O2 Saturation")
        ]
        
        # Procedures keywords
        self.procedures_keywords = [
            "ecg", "ekg", "electrocardiogram", "x-ray", "ct scan", "mri", "ultrasound",
            "blood test", "lab work", "biopsy", "endoscopy", "colonoscopy",
            "surgery", "operation", "procedure", "catheterization", "angiography"
        ]
    
    def extract_all_entities(self, text: str, processing_mode: str = "standard") -> Dict[str, Any]:
        """
        Extract all medical entities from text using consistent patterns
        
        Args:
            text: Medical text to analyze
            processing_mode: Processing mode for confidence scoring
        
        Returns:
            Dictionary with all extracted entities
        """
        return {
            "patient_info": self.extract_patient_info(text),
            "date_of_birth": self.extract_date_of_birth(text),
            "conditions": self.extract_conditions(text),
            "medications": self.extract_medications(text),
            "vitals": self.extract_vitals(text),
            "procedures": self.extract_procedures(text),
            "confidence_score": self.calculate_confidence_score(text, processing_mode),
            "extraction_quality": self.assess_extraction_quality(text),
            "processing_mode": processing_mode
        }
    
    def extract_patient_info(self, text: str) -> str:
        """Extract patient information with consistent patterns"""
        text_lower = text.lower()
        
        # Enhanced patient name patterns
        patterns = [
            r"patient:\s*([^\n\r,]+)",
            r"name:\s*([^\n\r,]+)", 
            r"pt\.?\s*([^\n\r,]+)",
            r"mr\.?\s*([^\n\r,]+)",
            r"patient name:\s*([^\n\r,]+)"
        ]
        
        for pattern in patterns:
            match = re.search(pattern, text_lower)
            if match:
                name = match.group(1).strip().title()
                # Validate name quality
                if (len(name) > 2 and 
                    not any(word in name.lower() for word in ['unknown', 'patient', 'test', 'sample']) and
                    re.match(r'^[a-zA-Z\s]+$', name)):
                    return name
        
        return "Unknown Patient"
    
    def extract_date_of_birth(self, text: str) -> str:
        """Extract date of birth with multiple formats"""
        text_lower = text.lower()
        
        # DOB patterns
        dob_patterns = [
            r"dob:?\s*([^\n\r]+)",
            r"date of birth:?\s*([^\n\r]+)",
            r"born:?\s*([^\n\r]+)",
            r"birth date:?\s*([^\n\r]+)"
        ]
        
        for pattern in dob_patterns:
            match = re.search(pattern, text_lower)
            if match:
                dob = match.group(1).strip()
                # Basic date validation
                if re.match(r'\d{1,2}[/-]\d{1,2}[/-]\d{4}|\d{4}[/-]\d{1,2}[/-]\d{1,2}|[a-zA-Z]+ \d{1,2}, \d{4}', dob):
                    return dob
        
        return "Not specified"
    
    def extract_conditions(self, text: str) -> List[str]:
        """Extract medical conditions with context"""
        text_lower = text.lower()
        found_conditions = []
        
        for condition in self.conditions_patterns:
            if condition in text_lower:
                # Get context around the condition
                condition_pattern = rf"([^\n\r]*{re.escape(condition)}[^\n\r]*)"
                context_match = re.search(condition_pattern, text_lower)
                if context_match:
                    context = context_match.group(1).strip().title()
                    if context not in found_conditions and len(context) > len(condition):
                        found_conditions.append(context)
                elif condition.title() not in found_conditions:
                    found_conditions.append(condition.title())
        
        return found_conditions[:5]  # Limit to top 5 for clarity
    
    def extract_medications(self, text: str) -> List[str]:
        """Extract medications with dosages using consistent patterns"""
        medications = []
        
        for pattern in self.medication_patterns:
            matches = re.finditer(pattern, text, re.IGNORECASE)
            for match in matches:
                if len(match.groups()) >= 3:
                    med_name = match.group(1).title()
                    dose = match.group(2)
                    unit = match.group(3).lower()
                    frequency = match.group(4) if len(match.groups()) >= 4 else ""
                    
                    full_med = f"{med_name} {dose}{unit} {frequency}".strip()
                    if full_med not in medications:
                        medications.append(full_med)
        
        return medications[:5]  # Limit to top 5
    
    def extract_vitals(self, text: str) -> List[str]:
        """Extract vital signs with consistent formatting"""
        vitals = []
        
        for pattern, vital_type in self.vital_patterns:
            matches = re.finditer(pattern, text, re.IGNORECASE)
            for match in matches:
                vital_value = match.group(1)
                
                if vital_type == "Blood Pressure":
                    vitals.append(f"Blood Pressure: {vital_value}")
                elif vital_type == "Heart Rate":
                    vitals.append(f"Heart Rate: {vital_value} bpm")
                elif vital_type == "Temperature":
                    vitals.append(f"Temperature: {vital_value}°F")
                elif vital_type == "O2 Saturation":
                    vitals.append(f"O2 Saturation: {vital_value}")
        
        return vitals[:4]  # Limit to top 4
    
    def extract_procedures(self, text: str) -> List[str]:
        """Extract procedures with consistent naming"""
        procedures = []
        text_lower = text.lower()
        
        for procedure in self.procedures_keywords:
            if procedure in text_lower:
                procedures.append(procedure.title())
        
        return procedures[:3]  # Limit to top 3
    
    def calculate_confidence_score(self, text: str, processing_mode: str) -> float:
        """Calculate confidence score based on text quality and processing mode"""
        base_confidence = {
            "rule_based": 0.75,
            "ollama": 0.85,
            "modal": 0.94,
            "huggingface": 0.88,
            "standard": 0.80
        }
        
        confidence = base_confidence.get(processing_mode, 0.80)
        
        # Adjust based on text quality
        if len(text) > 500:
            confidence += 0.05
        if len(text) > 1000:
            confidence += 0.05
        
        # Check for medical keywords
        medical_keywords = ["patient", "diagnosis", "medication", "treatment", "clinical"]
        keyword_count = sum(1 for keyword in medical_keywords if keyword.lower() in text.lower())
        confidence += keyword_count * 0.02
        
        return min(0.98, confidence)
    
    def assess_extraction_quality(self, text: str) -> Dict[str, Any]:
        """Assess the quality of extraction based on text content"""
        # Extract basic entities for quality assessment
        patient = self.extract_patient_info(text)
        dob = self.extract_date_of_birth(text)
        conditions = self.extract_conditions(text)
        medications = self.extract_medications(text)
        vitals = self.extract_vitals(text)
        procedures = self.extract_procedures(text)
        
        return {
            "patient_identified": patient != "Unknown Patient",
            "dob_found": dob != "Not specified",
            "conditions_count": len(conditions),
            "medications_count": len(medications),
            "vitals_count": len(vitals),
            "procedures_count": len(procedures),
            "total_entities": len(conditions) + len(medications) + len(vitals) + len(procedures),
            "detailed_medications": sum(1 for med in medications if any(unit in med.lower() for unit in ['mg', 'g', 'ml'])),
            "has_vital_signs": len(vitals) > 0,
            "comprehensive_analysis": len(conditions) > 0 and len(medications) > 0
        }
    
    def count_entities(self, extracted_data: Dict[str, Any]) -> int:
        """Count total entities consistently across the system"""
        return (len(extracted_data.get("conditions", [])) + 
                len(extracted_data.get("medications", [])) + 
                len(extracted_data.get("vitals", [])) + 
                len(extracted_data.get("procedures", [])))
    
    def format_for_pydantic(self, extracted_data: Dict[str, Any]) -> Dict[str, Any]:
        """Format extracted data for Pydantic model compatibility"""
        return {
            "patient": extracted_data.get("patient_info", "Unknown Patient"),
            "date_of_birth": extracted_data.get("date_of_birth", "Not specified"),
            "conditions": extracted_data.get("conditions", []),
            "medications": extracted_data.get("medications", []),
            "vitals": extracted_data.get("vitals", []),
            "procedures": extracted_data.get("procedures", []),
            "confidence_score": extracted_data.get("confidence_score", 0.80),
            "extraction_quality": extracted_data.get("extraction_quality", {}),
            "_processing_metadata": {
                "mode": extracted_data.get("processing_mode", "standard"),
                "total_entities": self.count_entities(extracted_data),
                "extraction_timestamp": "2025-06-06T12:00:00Z"
            }
        }

# Global instance for consistent usage across the system
medical_extractor = MedicalExtractor()

# Convenience functions for backward compatibility
def extract_medical_entities(text: str, processing_mode: str = "standard") -> Dict[str, Any]:
    """Extract medical entities using the shared extractor"""
    return medical_extractor.extract_all_entities(text, processing_mode)

def count_entities(extracted_data: Dict[str, Any]) -> int:
    """Count entities using the shared method"""
    return medical_extractor.count_entities(extracted_data)

def format_for_pydantic(extracted_data: Dict[str, Any]) -> Dict[str, Any]:
    """Format for Pydantic using the shared method"""
    return medical_extractor.format_for_pydantic(extracted_data)

def calculate_quality_score(extracted_data: Dict[str, Any]) -> float:
    """Calculate quality score based on entity richness"""
    entity_count = count_entities(extracted_data)
    patient_found = bool(extracted_data.get("patient_info") and 
                        extracted_data.get("patient_info") != "Unknown Patient")
    
    base_score = 0.7
    entity_bonus = min(0.25, entity_count * 0.04)  # Up to 0.25 bonus for entities
    patient_bonus = 0.05 if patient_found else 0
    
    return min(0.98, base_score + entity_bonus + patient_bonus)

# Export main components
__all__ = [
    "MedicalExtractor", 
    "medical_extractor", 
    "extract_medical_entities", 
    "count_entities", 
    "format_for_pydantic",
    "calculate_quality_score"
]