mgbam commited on
Commit
4e9aea4
·
verified ·
1 Parent(s): f4a15ae

Update modules/orchestrator.py

Browse files
Files changed (1) hide show
  1. modules/orchestrator.py +30 -69
modules/orchestrator.py CHANGED
@@ -4,7 +4,7 @@ The Central Nervous System of Project Asclepius.
4
  This module is the master conductor, orchestrating high-performance, asynchronous
5
  workflows for each of the application's features. It intelligently sequences
6
  calls to API clients and the Gemini handler to transform user queries into
7
- comprehensive, synthesized reports. (v1.2)
8
  """
9
 
10
  import asyncio
@@ -22,30 +22,21 @@ from api_clients import (
22
  )
23
 
24
  # --- Internal Helper for Data Formatting ---
 
25
  def _format_api_data_for_prompt(api_results: dict) -> dict[str, str]:
26
- """
27
- Takes the raw dictionary of API results and formats each entry into a
28
- clean, readable string suitable for injection into a Gemini prompt.
29
- """
30
  formatted_strings = {}
31
-
32
- # Format PubMed data
33
  pubmed_data = api_results.get('pubmed', [])
34
  if isinstance(pubmed_data, list) and pubmed_data:
35
  lines = [f"- Title: {a.get('title', 'N/A')} (Journal: {a.get('journal', 'N/A')}, URL: {a.get('url')})" for a in pubmed_data]
36
  formatted_strings['pubmed'] = "\n".join(lines)
37
  else:
38
  formatted_strings['pubmed'] = "No relevant review articles were found on PubMed for this query."
39
-
40
- # Format Clinical Trials data
41
  trials_data = api_results.get('trials', [])
42
  if isinstance(trials_data, list) and trials_data:
43
  lines = [f"- Title: {t.get('title', 'N/A')} (Status: {t.get('status', 'N/A')}, URL: {t.get('url')})" for t in trials_data]
44
  formatted_strings['trials'] = "\n".join(lines)
45
  else:
46
  formatted_strings['trials'] = "No actively recruiting clinical trials were found matching this query."
47
-
48
- # Format OpenFDA Adverse Events data
49
  fda_data = api_results.get('openfda', [])
50
  if isinstance(fda_data, list):
51
  all_events = list(chain.from_iterable(filter(None, fda_data)))
@@ -56,8 +47,6 @@ def _format_api_data_for_prompt(api_results: dict) -> dict[str, str]:
56
  formatted_strings['openfda'] = "No specific adverse event data was found for this query."
57
  else:
58
  formatted_strings['openfda'] = "No specific adverse event data was found for this query."
59
-
60
- # Format Vision analysis
61
  vision_data = api_results.get('vision', "")
62
  if isinstance(vision_data, str) and vision_data:
63
  formatted_strings['vision'] = vision_data
@@ -65,97 +54,69 @@ def _format_api_data_for_prompt(api_results: dict) -> dict[str, str]:
65
  formatted_strings['vision'] = f"An error occurred during image analysis: {vision_data}"
66
  else:
67
  formatted_strings['vision'] = ""
68
-
69
  return formatted_strings
70
 
71
 
72
- # ==============================================================================
73
- # THIS IS THE FUNCTION THAT WAS REPORTED AS MISSING. PLEASE ENSURE IT EXISTS.
74
- # --- FEATURE 1: Symptom Synthesizer Pipeline (v1.2) ---
75
- # ==============================================================================
76
  async def run_symptom_synthesis(user_query: str, image_input: Image.Image | None) -> str:
77
  """The complete, asynchronous pipeline for the Symptom Synthesizer tab."""
 
78
  if not user_query:
79
  return "Please enter a symptom description or a medical question to begin."
80
-
81
- # STEP 1: AI-Powered Query Correction
82
  correction_prompt = prompts.get_query_correction_prompt(user_query)
83
  corrected_query = await gemini_handler.generate_text_response(correction_prompt)
84
  if not corrected_query:
85
  corrected_query = user_query
86
-
87
- # STEP 2: AI-Powered Concept Extraction
88
  term_prompt = prompts.get_term_extraction_prompt(corrected_query)
89
  concepts_str = await gemini_handler.generate_text_response(term_prompt)
90
  concepts = utils.safe_literal_eval(concepts_str)
91
  if not isinstance(concepts, list) or not concepts:
92
  concepts = [corrected_query]
93
-
94
  search_query = " OR ".join(f'"{c}"' for c in concepts)
95
-
96
- # STEP 3: Massively Parallel Evidence Gathering
97
  async with aiohttp.ClientSession() as session:
98
- tasks = {
99
- "pubmed": pubmed_client.search_pubmed(session, search_query, max_results=3),
100
- "trials": clinicaltrials_client.find_trials(session, search_query, max_results=3),
101
- "openfda": asyncio.gather(*(openfda_client.get_adverse_events(session, c, top_n=3) for c in concepts)),
102
- }
103
  if image_input:
104
- tasks["vision"] = gemini_handler.analyze_image_with_text(
105
- "In the context of the user query, analyze this image objectively. Describe visual features. Do not diagnose.", image_input
106
- )
107
  raw_results = await asyncio.gather(*tasks.values(), return_exceptions=True)
108
  api_data = dict(zip(tasks.keys(), raw_results))
109
-
110
- # STEP 4: Data Formatting
111
  formatted_data = _format_api_data_for_prompt(api_data)
112
-
113
- # STEP 5: The Grand Synthesis
114
- synthesis_prompt = prompts.get_synthesis_prompt(
115
- user_query=user_query,
116
- concepts=concepts,
117
- pubmed_data=formatted_data['pubmed'],
118
- trials_data=formatted_data['trials'],
119
- fda_data=formatted_data['openfda'],
120
- vision_analysis=formatted_data['vision']
121
- )
122
  final_report = await gemini_handler.generate_text_response(synthesis_prompt)
123
 
124
- # STEP 6: Final Delivery
125
- return f"{prompts.DISCLAIMER}\n\n{final_report}"
 
 
 
 
126
 
 
 
127
 
128
- # ==============================================================================
129
- # THIS FUNCTION IS ALSO REQUIRED BY APP.PY. PLEASE ENSURE IT EXISTS.
130
- # --- FEATURE 2: Drug Interaction & Safety Analyzer Pipeline ---
131
- # ==============================================================================
132
  async def run_drug_interaction_analysis(drug_list_str: str) -> str:
133
  """The complete, asynchronous pipeline for the Drug Interaction Analyzer tab."""
134
- if not drug_list_str:
135
- return "Please enter a comma-separated list of medications."
136
  drug_names = [name.strip() for name in drug_list_str.split(',') if name.strip()]
137
- if len(drug_names) < 2:
138
- return "Please enter at least two medications to check for interactions."
139
  async with aiohttp.ClientSession() as session:
140
- tasks = {
141
- "interactions": rxnorm_client.run_interaction_check(drug_names),
142
- "safety_profiles": asyncio.gather(*(openfda_client.get_safety_profile(session, name) for name in drug_names))
143
- }
144
  raw_results = await asyncio.gather(*tasks.values(), return_exceptions=True)
145
  api_data = dict(zip(tasks.keys(), raw_results))
146
  interaction_data = api_data.get('interactions', [])
147
- if isinstance(interaction_data, Exception):
148
- interaction_data = [{"error": str(interaction_data)}]
149
  safety_profiles = api_data.get('safety_profiles', [])
150
- if isinstance(safety_profiles, Exception):
151
- safety_profiles = [{"error": str(safety_profiles)}]
152
  safety_data_dict = dict(zip(drug_names, safety_profiles))
153
  interaction_formatted = utils.format_list_as_markdown([str(i) for i in interaction_data]) if interaction_data else "No interactions found."
154
  safety_formatted = "\n".join([f"Profile for {drug}: {profile}" for drug, profile in safety_data_dict.items()])
155
- synthesis_prompt = prompts.get_drug_interaction_synthesis_prompt(
156
- drug_names=drug_names,
157
- interaction_data=interaction_formatted,
158
- safety_data=safety_formatted
159
- )
160
  final_report = await gemini_handler.generate_text_response(synthesis_prompt)
161
- return f"{prompts.DISCLAIMER}\n\n{final_report}"
 
 
 
 
 
 
4
  This module is the master conductor, orchestrating high-performance, asynchronous
5
  workflows for each of the application's features. It intelligently sequences
6
  calls to API clients and the Gemini handler to transform user queries into
7
+ comprehensive, synthesized reports. (v1.3 - Final Polish)
8
  """
9
 
10
  import asyncio
 
22
  )
23
 
24
  # --- Internal Helper for Data Formatting ---
25
+ # (This helper function remains unchanged)
26
  def _format_api_data_for_prompt(api_results: dict) -> dict[str, str]:
 
 
 
 
27
  formatted_strings = {}
 
 
28
  pubmed_data = api_results.get('pubmed', [])
29
  if isinstance(pubmed_data, list) and pubmed_data:
30
  lines = [f"- Title: {a.get('title', 'N/A')} (Journal: {a.get('journal', 'N/A')}, URL: {a.get('url')})" for a in pubmed_data]
31
  formatted_strings['pubmed'] = "\n".join(lines)
32
  else:
33
  formatted_strings['pubmed'] = "No relevant review articles were found on PubMed for this query."
 
 
34
  trials_data = api_results.get('trials', [])
35
  if isinstance(trials_data, list) and trials_data:
36
  lines = [f"- Title: {t.get('title', 'N/A')} (Status: {t.get('status', 'N/A')}, URL: {t.get('url')})" for t in trials_data]
37
  formatted_strings['trials'] = "\n".join(lines)
38
  else:
39
  formatted_strings['trials'] = "No actively recruiting clinical trials were found matching this query."
 
 
40
  fda_data = api_results.get('openfda', [])
41
  if isinstance(fda_data, list):
42
  all_events = list(chain.from_iterable(filter(None, fda_data)))
 
47
  formatted_strings['openfda'] = "No specific adverse event data was found for this query."
48
  else:
49
  formatted_strings['openfda'] = "No specific adverse event data was found for this query."
 
 
50
  vision_data = api_results.get('vision', "")
51
  if isinstance(vision_data, str) and vision_data:
52
  formatted_strings['vision'] = vision_data
 
54
  formatted_strings['vision'] = f"An error occurred during image analysis: {vision_data}"
55
  else:
56
  formatted_strings['vision'] = ""
 
57
  return formatted_strings
58
 
59
 
60
+ # --- FEATURE 1: Symptom Synthesizer Pipeline (v1.3) ---
 
 
 
61
  async def run_symptom_synthesis(user_query: str, image_input: Image.Image | None) -> str:
62
  """The complete, asynchronous pipeline for the Symptom Synthesizer tab."""
63
+ # (Steps 1-5 remain the same)
64
  if not user_query:
65
  return "Please enter a symptom description or a medical question to begin."
 
 
66
  correction_prompt = prompts.get_query_correction_prompt(user_query)
67
  corrected_query = await gemini_handler.generate_text_response(correction_prompt)
68
  if not corrected_query:
69
  corrected_query = user_query
 
 
70
  term_prompt = prompts.get_term_extraction_prompt(corrected_query)
71
  concepts_str = await gemini_handler.generate_text_response(term_prompt)
72
  concepts = utils.safe_literal_eval(concepts_str)
73
  if not isinstance(concepts, list) or not concepts:
74
  concepts = [corrected_query]
 
75
  search_query = " OR ".join(f'"{c}"' for c in concepts)
 
 
76
  async with aiohttp.ClientSession() as session:
77
+ tasks = { "pubmed": pubmed_client.search_pubmed(session, search_query, max_results=3), "trials": clinicaltrials_client.find_trials(session, search_query, max_results=3), "openfda": asyncio.gather(*(openfda_client.get_adverse_events(session, c, top_n=3) for c in concepts)), }
 
 
 
 
78
  if image_input:
79
+ tasks["vision"] = gemini_handler.analyze_image_with_text("In the context of the user query, analyze this image objectively. Describe visual features. Do not diagnose.", image_input)
 
 
80
  raw_results = await asyncio.gather(*tasks.values(), return_exceptions=True)
81
  api_data = dict(zip(tasks.keys(), raw_results))
 
 
82
  formatted_data = _format_api_data_for_prompt(api_data)
83
+ synthesis_prompt = prompts.get_synthesis_prompt(user_query=user_query, concepts=concepts, pubmed_data=formatted_data['pubmed'], trials_data=formatted_data['trials'], fda_data=formatted_data['openfda'], vision_analysis=formatted_data['vision'])
 
 
 
 
 
 
 
 
 
84
  final_report = await gemini_handler.generate_text_response(synthesis_prompt)
85
 
86
+ # ==============================================================================
87
+ # STEP 6 (V1.3 UPGRADE): Deterministic Post-Processing
88
+ # We will manually remove the AI's redundant disclaimer to ensure a clean output.
89
+ # ==============================================================================
90
+ ghost_disclaimer = "⚠️ IMPORTANT DISCLAIMER: This report is for informational purposes only and should not be considered medical advice. Always consult with a qualified healthcare professional for diagnosis and treatment of any medical condition."
91
+ cleaned_report = final_report.replace(ghost_disclaimer, "").strip()
92
 
93
+ # STEP 7: Final Delivery
94
+ return f"{prompts.DISCLAIMER}\n\n{cleaned_report}"
95
 
96
+
97
+ # --- FEATURE 2: Drug Interaction & Safety Analyzer Pipeline (v1.3) ---
 
 
98
  async def run_drug_interaction_analysis(drug_list_str: str) -> str:
99
  """The complete, asynchronous pipeline for the Drug Interaction Analyzer tab."""
100
+ # (Steps remain the same)
101
+ if not drug_list_str: return "Please enter a comma-separated list of medications."
102
  drug_names = [name.strip() for name in drug_list_str.split(',') if name.strip()]
103
+ if len(drug_names) < 2: return "Please enter at least two medications to check for interactions."
 
104
  async with aiohttp.ClientSession() as session:
105
+ tasks = { "interactions": rxnorm_client.run_interaction_check(drug_names), "safety_profiles": asyncio.gather(*(openfda_client.get_safety_profile(session, name) for name in drug_names)) }
 
 
 
106
  raw_results = await asyncio.gather(*tasks.values(), return_exceptions=True)
107
  api_data = dict(zip(tasks.keys(), raw_results))
108
  interaction_data = api_data.get('interactions', [])
109
+ if isinstance(interaction_data, Exception): interaction_data = [{"error": str(interaction_data)}]
 
110
  safety_profiles = api_data.get('safety_profiles', [])
111
+ if isinstance(safety_profiles, Exception): safety_profiles = [{"error": str(safety_profiles)}]
 
112
  safety_data_dict = dict(zip(drug_names, safety_profiles))
113
  interaction_formatted = utils.format_list_as_markdown([str(i) for i in interaction_data]) if interaction_data else "No interactions found."
114
  safety_formatted = "\n".join([f"Profile for {drug}: {profile}" for drug, profile in safety_data_dict.items()])
115
+ synthesis_prompt = prompts.get_drug_interaction_synthesis_prompt(drug_names=drug_names, interaction_data=interaction_formatted, safety_data=safety_formatted)
 
 
 
 
116
  final_report = await gemini_handler.generate_text_response(synthesis_prompt)
117
+
118
+ # Deterministic Post-Processing for the drug report
119
+ ghost_disclaimer_drug = "DISCLAIMER: This report is for informational purposes only and should not be considered medical advice. Always consult with a healthcare professional before making any decisions related to your health or treatment. This information is based on the provided data and may not be exhaustive."
120
+ cleaned_report = final_report.replace(ghost_disclaimer_drug, "").strip()
121
+
122
+ return f"{prompts.DISCLAIMER}\n\n{cleaned_report}"