Spaces:
Sleeping
Sleeping
File size: 8,244 Bytes
dc83e12 140d9d4 495a355 dc83e12 140d9d4 dc83e12 89db8f5 140d9d4 398c674 89db8f5 140d9d4 495a355 140d9d4 495a355 140d9d4 89db8f5 dc83e12 89db8f5 dc83e12 495a355 140d9d4 495a355 dc83e12 140d9d4 dc83e12 495a355 dc83e12 89db8f5 140d9d4 dc83e12 89db8f5 140d9d4 89db8f5 140d9d4 495a355 140d9d4 dc83e12 495a355 dc83e12 495a355 89db8f5 140d9d4 dc83e12 89db8f5 495a355 dc83e12 89db8f5 140d9d4 495a355 89db8f5 140d9d4 89db8f5 140d9d4 89db8f5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# modules/orchestrator.py
"""
The Central Nervous System of Project Asclepius.
This module is the master conductor, orchestrating high-performance, asynchronous
workflows for each of the application's features. It intelligently sequences
calls to API clients and the Gemini handler to transform user queries into
comprehensive, synthesized reports. (v1.1)
"""
import asyncio
import aiohttp
from itertools import chain
from PIL import Image
# Import all our specialized tools
from . import gemini_handler, prompts, utils
from api_clients import (
pubmed_client,
clinicaltrials_client,
openfda_client,
rxnorm_client
)
# --- Internal Helper for Data Formatting ---
# (This helper function remains unchanged)
def _format_api_data_for_prompt(api_results: dict) -> dict[str, str]:
formatted_strings = {}
pubmed_data = api_results.get('pubmed', [])
if isinstance(pubmed_data, list) and pubmed_data:
lines = [f"- Title: {a.get('title', 'N/A')} (Journal: {a.get('journal', 'N/A')}, URL: {a.get('url')})" for a in pubmed_data]
formatted_strings['pubmed'] = "\n".join(lines)
else:
formatted_strings['pubmed'] = "No relevant review articles were found on PubMed for this query."
trials_data = api_results.get('trials', [])
if isinstance(trials_data, list) and trials_data:
lines = [f"- Title: {t.get('title', 'N/A')} (Status: {t.get('status', 'N/A')}, URL: {t.get('url')})" for t in trials_data]
formatted_strings['trials'] = "\n".join(lines)
else:
formatted_strings['trials'] = "No actively recruiting clinical trials were found matching this query."
fda_data = api_results.get('openfda', [])
if isinstance(fda_data, list):
all_events = list(chain.from_iterable(filter(None, fda_data)))
if all_events:
lines = [f"- {evt['term']} (Reported {evt['count']} times)" for evt in all_events]
formatted_strings['openfda'] = "\n".join(lines)
else:
formatted_strings['openfda'] = "No specific adverse event data was found for this query."
else:
formatted_strings['openfda'] = "No specific adverse event data was found for this query."
vision_data = api_results.get('vision', "")
if isinstance(vision_data, str) and vision_data:
formatted_strings['vision'] = vision_data
elif isinstance(vision_data, Exception):
formatted_strings['vision'] = f"An error occurred during image analysis: {vision_data}"
else:
formatted_strings['vision'] = ""
return formatted_strings
# --- FEATURE 1: Symptom Synthesizer Pipeline (v1.1) ---
async def run_symptom_synthesis(user_query: str, image_input: Image.Image | None) -> str:
"""The complete, asynchronous pipeline for the Symptom Synthesizer tab."""
if not user_query:
return "Please enter a symptom description or a medical question to begin."
# ==============================================================================
# STEP 1 (V1.1 UPGRADE): AI-Powered Query Correction (The "Medical Translator")
# ==============================================================================
correction_prompt = prompts.get_query_correction_prompt(user_query)
corrected_query = await gemini_handler.generate_text_response(correction_prompt)
if not corrected_query:
corrected_query = user_query # Fallback to original query if correction fails
# ==============================================================================
# STEP 2: AI-Powered Concept Extraction (now on the CLEANED query)
# ==============================================================================
term_prompt = prompts.get_term_extraction_prompt(corrected_query)
concepts_str = await gemini_handler.generate_text_response(term_prompt)
concepts = utils.safe_literal_eval(concepts_str)
if not isinstance(concepts, list) or not concepts:
concepts = [corrected_query] # Fallback if parsing fails
# Use "OR" for a broader, more inclusive search across APIs
search_query = " OR ".join(f'"{c}"' for c in concepts)
# ==============================================================================
# STEP 3: Massively Parallel Evidence Gathering
# ==============================================================================
async with aiohttp.ClientSession() as session:
tasks = {
"pubmed": pubmed_client.search_pubmed(session, search_query, max_results=3),
"trials": clinicaltrials_client.find_trials(session, search_query, max_results=3),
"openfda": asyncio.gather(*(openfda_client.get_adverse_events(session, c, top_n=3) for c in concepts)),
}
if image_input:
tasks["vision"] = gemini_handler.analyze_image_with_text(
"In the context of the user query, analyze this image objectively. Describe visual features like color, shape, texture, and patterns. Do not diagnose or offer medical advice.", image_input
)
raw_results = await asyncio.gather(*tasks.values(), return_exceptions=True)
api_data = dict(zip(tasks.keys(), raw_results))
# ==============================================================================
# STEP 4: Data Formatting
# ==============================================================================
formatted_data = _format_api_data_for_prompt(api_data)
# ==============================================================================
# STEP 5: The Grand Synthesis
# ==============================================================================
synthesis_prompt = prompts.get_synthesis_prompt(
user_query=user_query, # Pass original query for context
concepts=concepts,
pubmed_data=formatted_data['pubmed'],
trials_data=formatted_data['trials'],
fda_data=formatted_data['openfda'],
vision_analysis=formatted_data['vision']
)
final_report = await gemini_handler.generate_text_response(synthesis_prompt)
# ==============================================================================
# STEP 6: Final Delivery
# ==============================================================================
return f"{prompts.DISCLAIMER}\n\n{final_report}"
# --- FEATURE 2: Drug Interaction & Safety Analyzer Pipeline ---
# (This function remains unchanged)
async def run_drug_interaction_analysis(drug_list_str: str) -> str:
"""The complete, asynchronous pipeline for the Drug Interaction Analyzer tab."""
if not drug_list_str:
return "Please enter a comma-separated list of medications."
drug_names = [name.strip() for name in drug_list_str.split(',') if name.strip()]
if len(drug_names) < 2:
return "Please enter at least two medications to check for interactions."
async with aiohttp.ClientSession() as session:
tasks = {
"interactions": rxnorm_client.run_interaction_check(drug_names),
"safety_profiles": asyncio.gather(*(openfda_client.get_safety_profile(session, name) for name in drug_names))
}
raw_results = await asyncio.gather(*tasks.values(), return_exceptions=True)
api_data = dict(zip(tasks.keys(), raw_results))
interaction_data = api_data.get('interactions', [])
if isinstance(interaction_data, Exception):
interaction_data = [{"error": str(interaction_data)}]
safety_profiles = api_data.get('safety_profiles', [])
if isinstance(safety_profiles, Exception):
safety_profiles = [{"error": str(safety_profiles)}]
safety_data_dict = dict(zip(drug_names, safety_profiles))
interaction_formatted = utils.format_list_as_markdown([str(i) for i in interaction_data]) if interaction_data else "No interactions found."
safety_formatted = "\n".join([f"Profile for {drug}: {profile}" for drug, profile in safety_data_dict.items()])
synthesis_prompt = prompts.get_drug_interaction_synthesis_prompt(
drug_names=drug_names,
interaction_data=interaction_formatted,
safety_data=safety_formatted
)
final_report = await gemini_handler.generate_text_response(synthesis_prompt)
return f"{prompts.DISCLAIMER}\n\n{final_report}" |