tonic-discharge-guard / utils /responseparser.py
Tonic's picture
big changes to the application flow - get_language
26ac540 unverified
raw
history blame
22.7 kB
import json
import lxml.etree as etree
from datetime import datetime
from typing import List, Dict, Optional, Union
import base64
import logging
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class PatientDataExtractor:
"""Class to extract fields from FHIR Patient Bundle (JSON) or C-CDA (XML)."""
def __init__(self, patient_data: str, format_type: str = None):
"""Initialize with patient data and optional format type."""
self.format = format_type.lower() if format_type else self._detect_format(patient_data)
if self.format == "xml":
self.data = etree.fromstring(patient_data.encode('utf-8')) if isinstance(patient_data, str) else patient_data
self.ns = {'hl7': 'urn:hl7-org:v3'}
elif self.format == "json":
self.data = json.loads(patient_data) if isinstance(patient_data, str) else patient_data
else:
raise ValueError("Unsupported format. Use 'xml' or 'json'")
self.patients = self._extract_patients()
self.current_patient_idx = 0
def _detect_format(self, data: str) -> str:
"""Detect the format of the input data."""
if isinstance(data, str):
data = data.strip()
if data.startswith('<'):
return 'xml'
elif data.startswith('{') or data.startswith('['):
return 'json'
raise ValueError("Cannot determine data format")
def _extract_patients(self) -> List:
"""Extract all patient entries based on format."""
if self.format == "xml":
return [self.data] # C-CDA has one patient per document
elif self.format == "json":
if self.data.get("resourceType") != "Bundle" or "entry" not in self.data:
raise ValueError("Invalid FHIR Bundle format")
return [entry["resource"] for entry in self.data["entry"] if entry["resource"]["resourceType"] == "Patient"]
def set_patient_by_index(self, index: int) -> bool:
"""Set the current patient by index."""
if 0 <= index < len(self.patients):
self.current_patient_idx = index
return True
return False
def _get_current_patient(self):
"""Get the currently selected patient resource."""
return self.patients[self.current_patient_idx]
# Basic Identification Fields
def get_id(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
id_list = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:id/@extension", namespaces=self.ns)
return id_list[0] if id_list else ""
elif self.format == "json":
return patient.get("id", "")
def get_resource_type(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
return "ClinicalDocument"
elif self.format == "json":
return patient.get("resourceType", "")
def get_meta_last_updated(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
time_list = patient.xpath("//hl7:effectiveTime/@value", namespaces=self.ns)
return time_list[0] if time_list else ""
elif self.format == "json":
return patient.get("meta", {}).get("lastUpdated", "")
# Name Fields
def get_first_name(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
given = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:given/text()", namespaces=self.ns)
return given[0] if given else ""
elif self.format == "json":
for name in patient.get("name", []):
if name.get("use") == "official" and "given" in name:
return name["given"][0]
return ""
def get_last_name(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
family = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:family/text()", namespaces=self.ns)
return family[0] if family else ""
elif self.format == "json":
for name in patient.get("name", []):
if name.get("use") == "official" and "family" in name:
return name["family"]
return ""
def get_name_prefix(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
prefix = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:prefix/text()", namespaces=self.ns)
return prefix[0] if prefix else ""
elif self.format == "json":
for name in patient.get("name", []):
if name.get("use") == "official" and "prefix" in name:
return name["prefix"][0]
return ""
# Demographic Fields
def get_dob(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
dob = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:birthTime/@value", namespaces=self.ns)
return dob[0] if dob else ""
elif self.format == "json":
return patient.get("birthDate", "")
def get_age(self) -> str:
dob = self.get_dob()
if not dob:
return ""
try:
birth_date = datetime.strptime(dob[:8], "%Y%m%d")
today = datetime.now()
age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day))
return str(age)
except ValueError:
return ""
def get_gender(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
gender = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:administrativeGenderCode/@code", namespaces=self.ns)
return "Male" if gender and gender[0] == "M" else "Female" if gender and gender[0] == "F" else ""
elif self.format == "json":
return patient.get("gender", "").capitalize()
# Address Fields
def get_address_line(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
line = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:streetAddressLine/text()", namespaces=self.ns)
return line[0] if line else ""
elif self.format == "json":
addresses = patient.get("address", [])
return addresses[0]["line"][0] if addresses and "line" in addresses[0] else ""
def get_city(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
city = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:city/text()", namespaces=self.ns)
return city[0] if city else ""
elif self.format == "json":
addresses = patient.get("address", [])
return addresses[0]["city"] if addresses and "city" in addresses[0] else ""
def get_state(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
state = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:state/text()", namespaces=self.ns)
return state[0] if state else ""
elif self.format == "json":
addresses = patient.get("address", [])
return addresses[0]["state"] if addresses and "state" in addresses[0] else ""
def get_zip_code(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
zip = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:postalCode/text()", namespaces=self.ns)
return zip[0] if zip else ""
elif self.format == "json":
addresses = patient.get("address", [])
return addresses[0]["postalCode"] if addresses and "postalCode" in addresses[0] else ""
# Contact Fields
def get_phone(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
telecom = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:telecom/@value", namespaces=self.ns)
return telecom[0].replace("tel:", "") if telecom and "tel:" in telecom[0] else ""
elif self.format == "json":
for telecom in patient.get("telecom", []):
if telecom.get("system") == "phone" and telecom.get("use") == "home":
return telecom.get("value", "")
return ""
# Extensions and Additional Fields
def get_race(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
race = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:raceCode/@displayName", namespaces=self.ns)
return race[0] if race else ""
elif self.format == "json":
for ext in patient.get("extension", []):
if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race":
for sub_ext in ext.get("extension", []):
if sub_ext.get("url") == "text":
return sub_ext.get("valueString", "")
return ""
def get_ethnicity(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
ethnicity = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:ethnicGroupCode/@displayName", namespaces=self.ns)
return ethnicity[0] if ethnicity else ""
elif self.format == "json":
for ext in patient.get("extension", []):
if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity":
for sub_ext in ext.get("extension", []):
if sub_ext.get("url") == "text":
return sub_ext.get("valueString", "")
return ""
def get_language(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
lang = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:languageCommunication/hl7:languageCode/@code", namespaces=self.ns)
return lang[0] if lang else ""
elif self.format == "json":
comms = patient.get("communication", [])
if comms and "language" in comms[0]:
lang = comms[0]["language"]
# Try 'text' first, then fall back to 'coding' if available
if "text" in lang:
return lang["text"]
elif "coding" in lang and lang["coding"]:
return lang["coding"][0].get("display", lang["coding"][0].get("code", ""))
return ""
# Medications
def get_medications(self) -> List[Dict[str, str]]:
if self.format == "xml":
section = self.data.xpath("//hl7:section[hl7:code/@code='10160-0']", namespaces=self.ns)
if not section:
return []
meds = section[0].xpath(".//hl7:substanceAdministration", namespaces=self.ns)
result = []
for med in meds:
start_list = med.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns)
start = start_list[0] if start_list else ""
stop_list = med.xpath(".//hl7:effectiveTime/hl7:high/@value", namespaces=self.ns)
stop = stop_list[0] if stop_list else ""
desc_list = med.xpath(".//hl7:manufacturedMaterial/hl7:code/@displayName", namespaces=self.ns)
desc = desc_list[0] if desc_list else ""
code_list = med.xpath(".//hl7:manufacturedMaterial/hl7:code/@code", namespaces=self.ns)
code = code_list[0] if code_list else ""
result.append({"start": start, "stop": stop, "description": desc, "code": code})
return result
elif self.format == "json":
entries = self.data.get("entry", [])
result = []
for entry in entries:
if entry["resource"]["resourceType"] == "MedicationRequest":
med = entry["resource"]
start = med.get("authoredOn", "")
stop = med.get("dispenseRequest", {}).get("validityPeriod", {}).get("end", "")
desc = med.get("medicationCodeableConcept", {}).get("text", "")
code = med.get("medicationCodeableConcept", {}).get("coding", [{}])[0].get("code", "")
result.append({"start": start, "stop": stop, "description": desc, "code": code})
return result
# Encounters
def get_encounters(self) -> List[Dict[str, str]]:
if self.format == "xml":
service = self.data.xpath("//hl7:documentationOf/hl7:serviceEvent", namespaces=self.ns)
if not service:
return []
start_list = service[0].xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns)
start = start_list[0] if start_list else ""
end_list = service[0].xpath(".//hl7:effectiveTime/hl7:high/@value", namespaces=self.ns)
end = end_list[0] if end_list else ""
return [{"start": start, "end": end, "description": "Patient Care", "code": ""}]
elif self.format == "json":
entries = self.data.get("entry", [])
result = []
for entry in entries:
if entry["resource"]["resourceType"] == "Encounter":
enc = entry["resource"]
start = enc.get("period", {}).get("start", "")
end = enc.get("period", {}).get("end", "")
desc = enc.get("type", [{}])[0].get("text", "")
code = enc.get("type", [{}])[0].get("coding", [{}])[0].get("code", "")
result.append({"start": start, "end": end, "description": desc, "code": code})
return result
# Conditions/Diagnoses
def get_conditions(self) -> List[Dict[str, str]]:
if self.format == "xml":
section = self.data.xpath("//hl7:section[hl7:code/@code='11450-4']", namespaces=self.ns)
if not section:
return []
entries = section[0].xpath(".//hl7:entry/hl7:act/hl7:entryRelationship/hl7:observation", namespaces=self.ns) if section else []
result = []
for entry in entries:
onset_list = entry.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns)
onset = onset_list[0] if onset_list else ""
desc_list = entry.xpath(".//hl7:value/@displayName", namespaces=self.ns)
desc = desc_list[0] if desc_list else ""
code_list = entry.xpath(".//hl7:value/@code", namespaces=self.ns)
code = code_list[0] if code_list else ""
result.append({"onset": onset, "description": desc, "code": code})
return result
elif self.format == "json":
entries = self.data.get("entry", [])
result = []
for entry in entries:
if entry["resource"]["resourceType"] == "Condition":
cond = entry["resource"]
onset = cond.get("onsetDateTime", "")
desc = cond.get("code", {}).get("text", "")
code = cond.get("code", {}).get("coding", [{}])[0].get("code", "")
result.append({"onset": onset, "description": desc, "code": code})
return result
# Immunizations
def get_immunizations(self) -> List[Dict[str, str]]:
if self.format == "xml":
section = self.data.xpath("//hl7:section[hl7:code/@code='11369-6']", namespaces=self.ns)
if not section:
return []
immunizations = section[0].xpath(".//hl7:substanceAdministration", namespaces=self.ns)
result = []
for imm in immunizations:
date_list = imm.xpath(".//hl7:effectiveTime/@value", namespaces=self.ns)
date = date_list[0] if date_list else ""
desc_list = imm.xpath(".//hl7:consumable/hl7:manufacturedProduct/hl7:manufacturedMaterial/hl7:code/@displayName", namespaces=self.ns)
desc = desc_list[0] if desc_list else ""
code_list = imm.xpath(".//hl7:consumable/hl7:manufacturedProduct/hl7:manufacturedMaterial/hl7:code/@code", namespaces=self.ns)
code = code_list[0] if code_list else ""
result.append({"date": date, "description": desc, "code": code})
return result
elif self.format == "json":
entries = self.data.get("entry", [])
result = []
for entry in entries:
if entry["resource"]["resourceType"] == "Immunization":
imm = entry["resource"]
date = imm.get("occurrenceDateTime", "")
desc = imm.get("vaccineCode", {}).get("text", "")
code = imm.get("vaccineCode", {}).get("coding", [{}])[0].get("code", "")
result.append({"date": date, "description": desc, "code": code})
return result
# Diagnostic Reports
def get_diagnostic_reports(self) -> List[Dict[str, str]]:
if self.format == "xml":
section = self.data.xpath("//hl7:section[hl7:code/@code='30954-2']", namespaces=self.ns)
if not section:
return []
reports = section[0].xpath(".//hl7:organizer", namespaces=self.ns)
result = []
for report in reports:
start_list = report.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns)
start = start_list[0] if start_list else ""
desc_list = report.xpath(".//hl7:code/@displayName", namespaces=self.ns)
desc = desc_list[0] if desc_list else ""
code_list = report.xpath(".//hl7:code/@code", namespaces=self.ns)
code = code_list[0] if code_list else ""
result.append({"start": start, "description": desc, "code": code})
return result
elif self.format == "json":
entries = self.data.get("entry", [])
result = []
for entry in entries:
if entry["resource"]["resourceType"] == "DiagnosticReport":
report = entry["resource"]
start = report.get("effectiveDateTime", "")
desc = report.get("code", {}).get("text", "")
code = report.get("code", {}).get("coding", [{}])[0].get("code", "")
data = report.get("presentedForm", [{}])[0].get("data", "")
if data:
decoded = base64.b64decode(data).decode('utf-8')
result.append({"start": start, "description": desc, "code": code, "content": decoded})
else:
result.append({"start": start, "description": desc, "code": code})
return result
# Comprehensive Extraction
def get_all_patient_data(self) -> Dict[str, Union[str, List, Dict]]:
"""Extract all available data for the current patient."""
return {
"id": self.get_id(),
"resource_type": self.get_resource_type(),
"meta_last_updated": self.get_meta_last_updated(),
"first_name": self.get_first_name(),
"last_name": self.get_last_name(),
"name_prefix": self.get_name_prefix(),
"dob": self.get_dob(),
"age": self.get_age(),
"gender": self.get_gender(),
"address_line": self.get_address_line(),
"city": self.get_city(),
"state": self.get_state(),
"zip_code": self.get_zip_code(),
"phone": self.get_phone(),
"race": self.get_race(),
"ethnicity": self.get_ethnicity(),
"language": self.get_language(),
"medications": self.get_medications(),
"encounters": self.get_encounters(),
"conditions": self.get_conditions(),
"immunizations": self.get_immunizations(),
"diagnostic_reports": self.get_diagnostic_reports()
}
def get_patient_dict(self) -> Dict[str, str]:
"""Return a dictionary of patient data mapped to discharge form fields."""
data = self.get_all_patient_data()
latest_encounter = data["encounters"][-1] if data["encounters"] else {}
latest_condition = data["conditions"][-1] if data["conditions"] else {}
medications_str = "; ".join([m["description"] for m in data["medications"]])
return {
"first_name": data["first_name"],
"last_name": data["last_name"],
"middle_initial": "",
"dob": data["dob"],
"age": data["age"],
"sex": data["gender"],
"address": data["address_line"],
"city": data["city"],
"state": data["state"],
"zip_code": data["zip_code"],
"doctor_first_name": "",
"doctor_last_name": "",
"doctor_middle_initial": "",
"hospital_name": "",
"doctor_address": "",
"doctor_city": "",
"doctor_state": "",
"doctor_zip": "",
"admission_date": latest_encounter.get("start", ""),
"referral_source": "",
"admission_method": "",
"discharge_date": latest_encounter.get("end", ""),
"discharge_reason": "",
"date_of_death": "",
"diagnosis": latest_condition.get("description", ""),
"procedures": "",
"medications": medications_str,
"preparer_name": "",
"preparer_job_title": ""
}
def get_all_patients(self) -> List[Dict[str, str]]:
"""Return a list of dictionaries for all patients."""
original_idx = self.current_patient_idx
all_patients = []
for i in range(len(self.patients)):
self.set_patient_by_index(i)
all_patients.append(self.get_patient_dict())
self.set_patient_by_index(original_idx)
return all_patients
def get_patient_ids(self) -> List[str]:
"""Return a list of all patient IDs."""
return [self.get_id() for _ in self.patients]