File size: 21,396 Bytes
03b34b8
 
bf761ef
03b34b8
 
bf761ef
03b34b8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bf761ef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
03b34b8
 
 
 
 
 
bf761ef
03b34b8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bf761ef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
03b34b8
 
bf761ef
03b34b8
 
bf761ef
03b34b8
 
 
 
 
 
 
 
 
 
 
bf761ef
 
 
 
 
 
03b34b8
bf761ef
 
 
 
 
 
 
 
03b34b8
bf761ef
 
03b34b8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bf761ef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
03b34b8
bf761ef
 
 
 
 
03b34b8
 
 
bf761ef
 
03b34b8
 
bf761ef
 
 
 
03b34b8
 
 
bf761ef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
03b34b8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bf761ef
03b34b8
 
 
 
 
bf761ef
03b34b8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
import json
from datetime import datetime
from typing import List, Dict, Optional, Union

class PatientDataExtractor:
    """Class to extract all fields from a FHIR Patient resource in a Bundle response."""
    
    def __init__(self, patient_data: str):
        """Initialize with patient data in JSON string format."""
        self.data = json.loads(patient_data) if isinstance(patient_data, str) else patient_data
        self.patients = self._extract_patients()
        self.current_patient_idx = 0  # Default to first patient
        
    def _extract_patients(self) -> List[Dict]:
        """Extract all patient entries from the Bundle."""
        if self.data.get("resourceType") != "Bundle" or "entry" not in self.data:
            raise ValueError("Invalid FHIR Bundle format")
        return [entry["resource"] for entry in self.data["entry"] if entry["resource"]["resourceType"] == "Patient"]

    def set_patient_by_index(self, index: int) -> bool:
        """Set the current patient by index. Returns True if successful."""
        if 0 <= index < len(self.patients):
            self.current_patient_idx = index
            return True
        return False

    def set_patient_by_id(self, patient_id: str) -> bool:
        """Set the current patient by FHIR Patient ID. Returns True if successful."""
        for i, patient in enumerate(self.patients):
            if patient["id"] == patient_id:
                self.current_patient_idx = i
                return True
        return False

    def _get_current_patient(self) -> Dict:
        """Get the currently selected patient resource."""
        return self.patients[self.current_patient_idx]

    # Basic Identification Fields
    def get_id(self) -> str:
        """Extract FHIR Patient ID."""
        return self._get_current_patient().get("id", "")

    def get_resource_type(self) -> str:
        """Extract resource type (should always be 'Patient')."""
        return self._get_current_patient().get("resourceType", "")

    def get_meta_last_updated(self) -> str:
        """Extract last updated timestamp from meta."""
        return self._get_current_patient().get("meta", {}).get("lastUpdated", "")

    def get_meta_profile(self) -> List[str]:
        """Extract profile URIs from meta."""
        return self._get_current_patient().get("meta", {}).get("profile", [])

    def get_text_div(self) -> str:
        """Extract generated text narrative (div content)."""
        return self._get_current_patient().get("text", {}).get("div", "")

    # Name Fields
    def get_first_name(self) -> str:
        """Extract patient's first name."""
        patient = self._get_current_patient()
        names = patient.get("name", [])
        for name in names:
            if name.get("use") == "official" and "given" in name:
                return name["given"][0]
        return ""

    def get_last_name(self) -> str:
        """Extract patient's last name."""
        patient = self._get_current_patient()
        names = patient.get("name", [])
        for name in names:
            if name.get("use") == "official" and "family" in name:
                return name["family"]
        return ""

    def get_middle_initial(self) -> str:
        """Extract patient's middle initial (second given name initial if present)."""
        patient = self._get_current_patient()
        names = patient.get("name", [])
        for name in names:
            if name.get("use") == "official" and "given" in name and len(name["given"]) > 1:
                return name["given"][1][0]
        return ""

    def get_name_prefix(self) -> str:
        """Extract patient's name prefix (e.g., Mr., Mrs.)."""
        patient = self._get_current_patient()
        names = patient.get("name", [])
        for name in names:
            if name.get("use") == "official" and "prefix" in name:
                return name["prefix"][0]
        return ""

    def get_maiden_name(self) -> str:
        """Extract patient's maiden name if available."""
        patient = self._get_current_patient()
        names = patient.get("name", [])
        for name in names:
            if name.get("use") == "maiden" and "family" in name:
                return name["family"]
        return ""

    # Demographic Fields
    def get_dob(self) -> str:
        """Extract patient's date of birth."""
        return self._get_current_patient().get("birthDate", "")

    def get_age(self) -> str:
        """Calculate patient's age based on birth date."""
        dob = self.get_dob()
        if not dob:
            return ""
        birth_date = datetime.strptime(dob, "%Y-%m-%d")
        today = datetime.now()
        age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day))
        return str(age)

    def get_gender(self) -> str:
        """Extract patient's gender."""
        return self._get_current_patient().get("gender", "").capitalize()

    def get_birth_sex(self) -> str:
        """Extract patient's birth sex from extensions."""
        patient = self._get_current_patient()
        for ext in patient.get("extension", []):
            if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex":
                return ext.get("valueCode", "")
        return ""

    def get_multiple_birth(self) -> Union[bool, None]:
        """Extract multiple birth status."""
        return self._get_current_patient().get("multipleBirthBoolean", None)

    # Address Fields
    def get_address_line(self) -> str:
        """Extract patient's street address."""
        patient = self._get_current_patient()
        addresses = patient.get("address", [])
        return addresses[0]["line"][0] if addresses and "line" in addresses[0] else ""

    def get_city(self) -> str:
        """Extract patient's city."""
        patient = self._get_current_patient()
        addresses = patient.get("address", [])
        return addresses[0]["city"] if addresses and "city" in addresses[0] else ""

    def get_state(self) -> str:
        """Extract patient's state."""
        patient = self._get_current_patient()
        addresses = patient.get("address", [])
        return addresses[0]["state"] if addresses and "state" in addresses[0] else ""

    def get_zip_code(self) -> str:
        """Extract patient's postal code."""
        patient = self._get_current_patient()
        addresses = patient.get("address", [])
        return addresses[0]["postalCode"] if addresses and "postalCode" in addresses[0] else ""

    def get_country(self) -> str:
        """Extract patient's country."""
        patient = self._get_current_patient()
        addresses = patient.get("address", [])
        return addresses[0]["country"] if addresses and "country" in addresses[0] else ""

    def get_geolocation(self) -> Dict[str, float]:
        """Extract geolocation (latitude and longitude) from address extension."""
        patient = self._get_current_patient()
        addresses = patient.get("address", [])
        if not addresses:
            return {"latitude": None, "longitude": None}
        for ext in addresses[0].get("extension", []):
            if ext.get("url") == "http://hl7.org/fhir/StructureDefinition/geolocation":
                geo = {}
                for sub_ext in ext.get("extension", []):
                    if sub_ext.get("url") == "latitude":
                        geo["latitude"] = sub_ext.get("valueDecimal")
                    elif sub_ext.get("url") == "longitude":
                        geo["longitude"] = sub_ext.get("valueDecimal")
                return geo
        return {"latitude": None, "longitude": None}

    # Contact Fields
    def get_phone(self) -> str:
        """Extract patient's phone number."""
        patient = self._get_current_patient()
        telecoms = patient.get("telecom", [])
        for telecom in telecoms:
            if telecom.get("system") == "phone" and telecom.get("use") == "home":
                return telecom.get("value", "")
        return ""

    # Identifiers
    def get_identifiers(self) -> Dict[str, str]:
        """Extract all identifiers (e.g., SSN, MRN, Driver's License)."""
        patient = self._get_current_patient()
        identifiers = patient.get("identifier", [])
        id_dict = {}
        for id_entry in identifiers:
            id_type = id_entry.get("type", {}).get("text", "Unknown")
            id_dict[id_type] = id_entry.get("value", "")
        return id_dict

    # Extensions
    def get_race(self) -> str:
        """Extract patient's race from extensions."""
        patient = self._get_current_patient()
        for ext in patient.get("extension", []):
            if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race":
                for sub_ext in ext.get("extension", []):
                    if sub_ext.get("url") == "text":
                        return sub_ext.get("valueString", "")
        return ""

    def get_ethnicity(self) -> str:
        """Extract patient's ethnicity from extensions."""
        patient = self._get_current_patient()
        for ext in patient.get("extension", []):
            if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity":
                for sub_ext in ext.get("extension", []):
                    if sub_ext.get("url") == "text":
                        return sub_ext.get("valueString", "")
        return ""

    def get_mothers_maiden_name(self) -> str:
        """Extract patient's mother's maiden name from extensions."""
        patient = self._get_current_patient()
        for ext in patient.get("extension", []):
            if ext.get("url") == "http://hl7.org/fhir/StructureDefinition/patient-mothersMaidenName":
                return ext.get("valueString", "")
        return ""

    def get_birth_place(self) -> Dict[str, str]:
        """Extract patient's birth place from extensions."""
        patient = self._get_current_patient()
        for ext in patient.get("extension", []):
            if ext.get("url") == "http://hl7.org/fhir/StructureDefinition/patient-birthPlace":
                addr = ext.get("valueAddress", {})
                return {
                    "city": addr.get("city", ""),
                    "state": addr.get("state", ""),
                    "country": addr.get("country", "")
                }
        return {"city": "", "state": "", "country": ""}

    def get_disability_adjusted_life_years(self) -> Optional[float]:
        """Extract disability-adjusted life years from extensions."""
        patient = self._get_current_patient()
        for ext in patient.get("extension", []):
            if ext.get("url") == "http://synthetichealth.github.io/synthea/disability-adjusted-life-years":
                return ext.get("valueDecimal")
        return None

    def get_quality_adjusted_life_years(self) -> Optional[float]:
        """Extract quality-adjusted life years from extensions."""
        patient = self._get_current_patient()
        for ext in patient.get("extension", []):
            if ext.get("url") == "http://synthetichealth.github.io/synthea/quality-adjusted-life-years":
                return ext.get("valueDecimal")
        return None

    # Marital Status
    def get_marital_status(self) -> str:
        """Extract patient's marital status."""
        patient = self._get_current_patient()
        status = patient.get("maritalStatus", {}).get("text", "")
        return status if status else patient.get("maritalStatus", {}).get("coding", [{}])[0].get("display", "")

    # Communication
    def get_language(self) -> str:
        """Extract patient's preferred language."""
        patient = self._get_current_patient()
        comms = patient.get("communication", [])
        return comms[0]["language"]["text"] if comms and "language" in comms[0] else ""

    # Comprehensive Extraction
    def get_all_patient_data(self) -> Dict[str, Union[str, Dict, List, float, bool, None]]:
        """Extract all available data for the current patient."""
        return {
            "id": self.get_id(),
            "resource_type": self.get_resource_type(),
            "meta_last_updated": self.get_meta_last_updated(),
            "meta_profile": self.get_meta_profile(),
            "text_div": self.get_text_div(),
            "first_name": self.get_first_name(),
            "last_name": self.get_last_name(),
            "middle_initial": self.get_middle_initial(),
            "name_prefix": self.get_name_prefix(),
            "maiden_name": self.get_maiden_name(),
            "dob": self.get_dob(),
            "age": self.get_age(),
            "gender": self.get_gender(),
            "birth_sex": self.get_birth_sex(),
            "multiple_birth": self.get_multiple_birth(),
            "address_line": self.get_address_line(),
            "city": self.get_city(),
            "state": self.get_state(),
            "zip_code": self.get_zip_code(),
            "country": self.get_country(),
            "geolocation": self.get_geolocation(),
            "phone": self.get_phone(),
            "identifiers": self.get_identifiers(),
            "race": self.get_race(),
            "ethnicity": self.get_ethnicity(),
            "mothers_maiden_name": self.get_mothers_maiden_name(),
            "birth_place": self.get_birth_place(),
            "disability_adjusted_life_years": self.get_disability_adjusted_life_years(),
            "quality_adjusted_life_years": self.get_quality_adjusted_life_years(),
            "marital_status": self.get_marital_status(),
            "language": self.get_language()
        }

    def get_patient_dict(self) -> Dict[str, str]:
        """Return a dictionary of patient data mapped to discharge form fields (for app.py compatibility)."""
        patient_data = self.get_all_patient_data()
        return {
            "first_name": patient_data["first_name"],
            "last_name": patient_data["last_name"],
            "middle_initial": patient_data["middle_initial"],
            "dob": patient_data["dob"],
            "age": patient_data["age"],
            "sex": patient_data["gender"],
            "address": patient_data["address_line"],
            "city": patient_data["city"],
            "state": patient_data["state"],
            "zip_code": patient_data["zip_code"],
            "doctor_first_name": "",
            "doctor_last_name": "",
            "doctor_middle_initial": "",
            "hospital_name": "",
            "doctor_address": "",
            "doctor_city": "",
            "doctor_state": "",
            "doctor_zip": "",
            "admission_date": "",
            "referral_source": "",
            "admission_method": "",
            "discharge_date": "",
            "discharge_reason": "",
            "date_of_death": "",
            "diagnosis": "",
            "procedures": "",
            "medications": "",
            "preparer_name": "",
            "preparer_job_title": ""
        }

    def get_all_patients(self) -> List[Dict[str, str]]:
        """Return a list of dictionaries for all patients (for app.py)."""
        original_idx = self.current_patient_idx
        all_patients = []
        for i in range(len(self.patients)):
            self.set_patient_by_index(i)
            all_patients.append(self.get_patient_dict())
        self.set_patient_by_index(original_idx)
        return all_patients

    def get_patient_ids(self) -> List[str]:
        """Return a list of all patient IDs in the Bundle."""
        return [patient["id"] for patient in self.patients]

# # Example usage with integration into app.py
# def integrate_with_app(patient_data: str):
#     """Integrate PatientDataExtractor with the Gradio app."""
#     extractor = PatientDataExtractor(patient_data)
    
#     # Function to populate form with selected patient's data
#     def populate_form(patient_id: str):
#         if extractor.set_patient_by_id(patient_id):
#             patient_dict = extractor.get_patient_dict()
#             return list(patient_dict.values())  # Return values in order expected by display_form
#         return [""] * 28  # Return empty values if patient not found

#     # Modify the Gradio app to include patient selection
#     with gr.Blocks() as demo:
#         gr.Markdown("# Patient Discharge Form with MeldRx Integration")
#         with gr.Tab("Authenticate with MeldRx"):
#             gr.Markdown("## SMART on FHIR Authentication")
#             auth_url_output = gr.Textbox(label="Authorization URL", value=CALLBACK_MANAGER.get_auth_url(), interactive=False)
#             gr.Markdown("Copy the URL above, open it in a browser, log in, and paste the 'code' from the redirect URL below.")
#             auth_code_input = gr.Textbox(label="Authorization Code")
#             auth_submit = gr.Button("Submit Code")
#             auth_result = gr.Textbox(label="Authentication Result")
#             patient_data_button = gr.Button("Fetch Patient Data")
#             patient_data_output = gr.Textbox(label="Patient Data")
#             auth_submit.click(fn=CALLBACK_MANAGER.set_auth_code, inputs=auth_code_input, outputs=auth_result)
#             patient_data_button.click(fn=CALLBACK_MANAGER.get_patient_data, inputs=None, outputs=patient_data_output)
        
#         with gr.Tab("Discharge Form"):
#             gr.Markdown("## Select Patient")
#             patient_dropdown = gr.Dropdown(choices=extractor.get_patient_ids(), label="Select Patient ID")
#             populate_button = gr.Button("Populate Form with Patient Data")
            
#             gr.Markdown("## Patient Details")
#             with gr.Row():
#                 first_name = gr.Textbox(label="First Name")
#                 last_name = gr.Textbox(label="Last Name")
#                 middle_initial = gr.Textbox(label="Middle Initial")
#             with gr.Row():
#                 dob = gr.Textbox(label="Date of Birth")
#                 age = gr.Textbox(label="Age")
#                 sex = gr.Textbox(label="Sex")
#             address = gr.Textbox(label="Address")
#             with gr.Row():
#                 city = gr.Textbox(label="City")
#                 state = gr.Textbox(label="State")
#                 zip_code = gr.Textbox(label="Zip Code")
#             gr.Markdown("## Primary Healthcare Professional Details")
#             with gr.Row():
#                 doctor_first_name = gr.Textbox(label="Doctor's First Name")
#                 doctor_last_name = gr.Textbox(label="Doctor's Last Name")
#                 doctor_middle_initial = gr.Textbox(label="Middle Initial")
#             hospital_name = gr.Textbox(label="Hospital/Clinic Name")
#             doctor_address = gr.Textbox(label="Address")
#             with gr.Row():
#                 doctor_city = gr.Textbox(label="City")
#                 doctor_state = gr.Textbox(label="State")
#                 doctor_zip = gr.Textbox(label="Zip Code")
#             gr.Markdown("## Admission and Discharge Details")
#             with gr.Row():
#                 admission_date = gr.Textbox(label="Date of Admission")
#                 referral_source = gr.Textbox(label="Source of Referral")
#             admission_method = gr.Textbox(label="Method of Admission")
#             with gr.Row():
#                 discharge_date = gr.Textbox(label="Date of Discharge")
#                 discharge_reason = gr.Radio(["Treated", "Transferred", "Discharge Against Advice", "Patient Died"], label="Discharge Reason")
#             date_of_death = gr.Textbox(label="Date of Death (if applicable)")
#             gr.Markdown("## Diagnosis & Procedures")
#             diagnosis = gr.Textbox(label="Diagnosis")
#             procedures = gr.Textbox(label="Operation & Procedures")
#             gr.Markdown("## Medication Details")
#             medications = gr.Textbox(label="Medication on Discharge")
#             gr.Markdown("## Prepared By")
#             with gr.Row():
#                 preparer_name = gr.Textbox(label="Name")
#                 preparer_job_title = gr.Textbox(label="Job Title")
#             submit = gr.Button("Generate Form")
#             output = gr.Markdown()

#             # Inputs list for populate_form and display_form
#             inputs_list = [
#                 first_name, last_name, middle_initial, dob, age, sex, address, city, state, zip_code,
#                 doctor_first_name, doctor_last_name, doctor_middle_initial, hospital_name, doctor_address,
#                 doctor_city, doctor_state, doctor_zip,
#                 admission_date, referral_source, admission_method, discharge_date, discharge_reason, date_of_death,
#                 diagnosis, procedures, medications, preparer_name, preparer_job_title
#             ]

#             # Populate form with patient data when button is clicked
#             populate_button.click(
#                 fn=populate_form,
#                 inputs=patient_dropdown,
#                 outputs=inputs_list
#             )

#             # Generate the form output
#             submit.click(
#                 display_form,
#                 inputs=inputs_list,
#                 outputs=output
#             )

#     return demo

# # Assuming patient_data is the JSON string from your example
# # patient_data = <your JSON string here>
# # demo = integrate_with_app(patient_data)
# # demo.launch()