import json | |
from datetime import datetime | |
from typing import List, Dict, Optional | |
class PatientDataExtractor: | |
"""Class to extract patient data from a FHIR Bundle response and map it to discharge form fields.""" | |
def __init__(self, patient_data: str): | |
"""Initialize with patient data in JSON string format.""" | |
self.data = json.loads(patient_data) if isinstance(patient_data, str) else patient_data | |
self.patients = self._extract_patients() | |
self.current_patient_idx = 0 # Default to first patient | |
def _extract_patients(self) -> List[Dict]: | |
"""Extract all patient entries from the Bundle.""" | |
if self.data.get("resourceType") != "Bundle" or "entry" not in self.data: | |
raise ValueError("Invalid FHIR Bundle format") | |
return [entry["resource"] for entry in self.data["entry"] if entry["resource"]["resourceType"] == "Patient"] | |
def set_patient_by_index(self, index: int) -> bool: | |
"""Set the current patient by index. Returns True if successful.""" | |
if 0 <= index < len(self.patients): | |
self.current_patient_idx = index | |
return True | |
return False | |
def set_patient_by_id(self, patient_id: str) -> bool: | |
"""Set the current patient by FHIR Patient ID. Returns True if successful.""" | |
for i, patient in enumerate(self.patients): | |
if patient["id"] == patient_id: | |
self.current_patient_idx = i | |
return True | |
return False | |
def _get_current_patient(self) -> Dict: | |
"""Get the currently selected patient resource.""" | |
return self.patients[self.current_patient_idx] | |
def get_first_name(self) -> str: | |
"""Extract patient's first name.""" | |
patient = self._get_current_patient() | |
names = patient.get("name", []) | |
for name in names: | |
if name.get("use") == "official" and "given" in name: | |
return name["given"][0] # First given name | |
return "" | |
def get_last_name(self) -> str: | |
"""Extract patient's last name.""" | |
patient = self._get_current_patient() | |
names = patient.get("name", []) | |
for name in names: | |
if name.get("use") == "official" and "family" in name: | |
return name["family"] | |
return "" | |
def get_middle_initial(self) -> str: | |
"""Extract patient's middle initial (second given name initial if present).""" | |
patient = self._get_current_patient() | |
names = patient.get("name", []) | |
for name in names: | |
if name.get("use") == "official" and "given" in name and len(name["given"]) > 1: | |
return name["given"][1][0] # First letter of second given name | |
return "" | |
def get_dob(self) -> str: | |
"""Extract patient's date of birth.""" | |
patient = self._get_current_patient() | |
return patient.get("birthDate", "") | |
def get_age(self) -> str: | |
"""Calculate patient's age based on birth date.""" | |
dob = self.get_dob() | |
if not dob: | |
return "" | |
birth_date = datetime.strptime(dob, "%Y-%m-%d") | |
today = datetime.now() | |
age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day)) | |
return str(age) | |
def get_sex(self) -> str: | |
"""Extract patient's sex (gender).""" | |
patient = self._get_current_patient() | |
return patient.get("gender", "").capitalize() | |
def get_address(self) -> str: | |
"""Extract patient's street address.""" | |
patient = self._get_current_patient() | |
addresses = patient.get("address", []) | |
return addresses[0]["line"][0] if addresses and "line" in addresses[0] else "" | |
def get_city(self) -> str: | |
"""Extract patient's city.""" | |
patient = self._get_current_patient() | |
addresses = patient.get("address", []) | |
return addresses[0]["city"] if addresses and "city" in addresses[0] else "" | |
def get_state(self) -> str: | |
"""Extract patient's state.""" | |
patient = self._get_current_patient() | |
addresses = patient.get("address", []) | |
return addresses[0]["state"] if addresses and "state" in addresses[0] else "" | |
def get_zip_code(self) -> str: | |
"""Extract patient's postal code.""" | |
patient = self._get_current_patient() | |
addresses = patient.get("address", []) | |
return addresses[0]["postalCode"] if addresses and "postalCode" in addresses[0] else "" | |
def get_patient_dict(self) -> Dict[str, str]: | |
"""Return a dictionary of patient data mapped to discharge form fields.""" | |
return { | |
"first_name": self.get_first_name(), | |
"last_name": self.get_last_name(), | |
"middle_initial": self.get_middle_initial(), | |
"dob": self.get_dob(), | |
"age": self.get_age(), | |
"sex": self.get_sex(), | |
"address": self.get_address(), | |
"city": self.get_city(), | |
"state": self.get_state(), | |
"zip_code": self.get_zip_code(), | |
# Fields not directly available in Patient resource can be left blank or populated separately | |
"doctor_first_name": "", | |
"doctor_last_name": "", | |
"doctor_middle_initial": "", | |
"hospital_name": "", | |
"doctor_address": "", | |
"doctor_city": "", | |
"doctor_state": "", | |
"doctor_zip": "", | |
"admission_date": "", | |
"referral_source": "", | |
"admission_method": "", | |
"discharge_date": "", | |
"discharge_reason": "", | |
"date_of_death": "", | |
"diagnosis": "", | |
"procedures": "", | |
"medications": "", | |
"preparer_name": "", | |
"preparer_job_title": "" | |
} | |
def get_all_patients(self) -> List[Dict[str, str]]: | |
"""Return a list of dictionaries for all patients.""" | |
original_idx = self.current_patient_idx | |
all_patients = [] | |
for i in range(len(self.patients)): | |
self.set_patient_by_index(i) | |
all_patients.append(self.get_patient_dict()) | |
self.set_patient_by_index(original_idx) # Restore original selection | |
return all_patients | |
def get_patient_ids(self) -> List[str]: | |
"""Return a list of all patient IDs in the Bundle.""" | |
return [patient["id"] for patient in self.patients] | |
# # Example usage with integration into app.py | |
# def integrate_with_app(patient_data: str): | |
# """Integrate PatientDataExtractor with the Gradio app.""" | |
# extractor = PatientDataExtractor(patient_data) | |
# # Function to populate form with selected patient's data | |
# def populate_form(patient_id: str): | |
# if extractor.set_patient_by_id(patient_id): | |
# patient_dict = extractor.get_patient_dict() | |
# return list(patient_dict.values()) # Return values in order expected by display_form | |
# return [""] * 28 # Return empty values if patient not found | |
# # Modify the Gradio app to include patient selection | |
# with gr.Blocks() as demo: | |
# gr.Markdown("# Patient Discharge Form with MeldRx Integration") | |
# with gr.Tab("Authenticate with MeldRx"): | |
# gr.Markdown("## SMART on FHIR Authentication") | |
# auth_url_output = gr.Textbox(label="Authorization URL", value=CALLBACK_MANAGER.get_auth_url(), interactive=False) | |
# gr.Markdown("Copy the URL above, open it in a browser, log in, and paste the 'code' from the redirect URL below.") | |
# auth_code_input = gr.Textbox(label="Authorization Code") | |
# auth_submit = gr.Button("Submit Code") | |
# auth_result = gr.Textbox(label="Authentication Result") | |
# patient_data_button = gr.Button("Fetch Patient Data") | |
# patient_data_output = gr.Textbox(label="Patient Data") | |
# auth_submit.click(fn=CALLBACK_MANAGER.set_auth_code, inputs=auth_code_input, outputs=auth_result) | |
# patient_data_button.click(fn=CALLBACK_MANAGER.get_patient_data, inputs=None, outputs=patient_data_output) | |
# with gr.Tab("Discharge Form"): | |
# gr.Markdown("## Select Patient") | |
# patient_dropdown = gr.Dropdown(choices=extractor.get_patient_ids(), label="Select Patient ID") | |
# populate_button = gr.Button("Populate Form with Patient Data") | |
# gr.Markdown("## Patient Details") | |
# with gr.Row(): | |
# first_name = gr.Textbox(label="First Name") | |
# last_name = gr.Textbox(label="Last Name") | |
# middle_initial = gr.Textbox(label="Middle Initial") | |
# with gr.Row(): | |
# dob = gr.Textbox(label="Date of Birth") | |
# age = gr.Textbox(label="Age") | |
# sex = gr.Textbox(label="Sex") | |
# address = gr.Textbox(label="Address") | |
# with gr.Row(): | |
# city = gr.Textbox(label="City") | |
# state = gr.Textbox(label="State") | |
# zip_code = gr.Textbox(label="Zip Code") | |
# gr.Markdown("## Primary Healthcare Professional Details") | |
# with gr.Row(): | |
# doctor_first_name = gr.Textbox(label="Doctor's First Name") | |
# doctor_last_name = gr.Textbox(label="Doctor's Last Name") | |
# doctor_middle_initial = gr.Textbox(label="Middle Initial") | |
# hospital_name = gr.Textbox(label="Hospital/Clinic Name") | |
# doctor_address = gr.Textbox(label="Address") | |
# with gr.Row(): | |
# doctor_city = gr.Textbox(label="City") | |
# doctor_state = gr.Textbox(label="State") | |
# doctor_zip = gr.Textbox(label="Zip Code") | |
# gr.Markdown("## Admission and Discharge Details") | |
# with gr.Row(): | |
# admission_date = gr.Textbox(label="Date of Admission") | |
# referral_source = gr.Textbox(label="Source of Referral") | |
# admission_method = gr.Textbox(label="Method of Admission") | |
# with gr.Row(): | |
# discharge_date = gr.Textbox(label="Date of Discharge") | |
# discharge_reason = gr.Radio(["Treated", "Transferred", "Discharge Against Advice", "Patient Died"], label="Discharge Reason") | |
# date_of_death = gr.Textbox(label="Date of Death (if applicable)") | |
# gr.Markdown("## Diagnosis & Procedures") | |
# diagnosis = gr.Textbox(label="Diagnosis") | |
# procedures = gr.Textbox(label="Operation & Procedures") | |
# gr.Markdown("## Medication Details") | |
# medications = gr.Textbox(label="Medication on Discharge") | |
# gr.Markdown("## Prepared By") | |
# with gr.Row(): | |
# preparer_name = gr.Textbox(label="Name") | |
# preparer_job_title = gr.Textbox(label="Job Title") | |
# submit = gr.Button("Generate Form") | |
# output = gr.Markdown() | |
# # Inputs list for populate_form and display_form | |
# inputs_list = [ | |
# first_name, last_name, middle_initial, dob, age, sex, address, city, state, zip_code, | |
# doctor_first_name, doctor_last_name, doctor_middle_initial, hospital_name, doctor_address, | |
# doctor_city, doctor_state, doctor_zip, | |
# admission_date, referral_source, admission_method, discharge_date, discharge_reason, date_of_death, | |
# diagnosis, procedures, medications, preparer_name, preparer_job_title | |
# ] | |
# # Populate form with patient data when button is clicked | |
# populate_button.click( | |
# fn=populate_form, | |
# inputs=patient_dropdown, | |
# outputs=inputs_list | |
# ) | |
# # Generate the form output | |
# submit.click( | |
# display_form, | |
# inputs=inputs_list, | |
# outputs=output | |
# ) | |
# return demo | |
# # Assuming patient_data is the JSON string from your example | |
# # patient_data = <your JSON string here> | |
# # demo = integrate_with_app(patient_data) | |
# # demo.launch() |