# modules/orchestrator.py """ The Central Nervous System of Project Asclepius. (v2.0 - The "Clinical Insight Engine" Upgrade) This version uses a smarter post-processing function to guarantee clean output. """ import asyncio import aiohttp from itertools import chain from PIL import Image from . import gemini_handler, prompts, utils from api_clients import ( pubmed_client, clinicaltrials_client, openfda_client, rxnorm_client ) # --- Internal Helper for Data Formatting (Unchanged) --- def _format_api_data_for_prompt(api_results: dict) -> dict[str, str]: # This function is unchanged. formatted_strings = {} pubmed_data = api_results.get('pubmed', []) if isinstance(pubmed_data, list) and pubmed_data: lines = [f"- Title: {a.get('title', 'N/A')} (Journal: {a.get('journal', 'N/A')}, URL: {a.get('url')})" for a in pubmed_data]; formatted_strings['pubmed'] = "\n".join(lines) else: formatted_strings['pubmed'] = "No relevant review articles were found on PubMed for this query." trials_data = api_results.get('trials', []) if isinstance(trials_data, list) and trials_data: lines = [f"- Title: {t.get('title', 'N/A')} (Status: {t.get('status', 'N/A')}, URL: {t.get('url')})" for t in trials_data]; formatted_strings['trials'] = "\n".join(lines) else: formatted_strings['trials'] = "No actively recruiting clinical trials were found matching this query." fda_data = api_results.get('openfda', []) if isinstance(fda_data, list): all_events = list(chain.from_iterable(filter(None, fda_data))) if all_events: lines = [f"- {evt['term']} (Reported {evt['count']} times)" for evt in all_events]; formatted_strings['openfda'] = "\n".join(lines) else: formatted_strings['openfda'] = "No specific adverse event data was found for this query." else: formatted_strings['openfda'] = "No specific adverse event data was found for this query." vision_data = api_results.get('vision', "") if isinstance(vision_data, str) and vision_data: formatted_strings['vision'] = vision_data elif isinstance(vision_data, Exception): formatted_strings['vision'] = f"An error occurred during image analysis: {vision_data}" else: formatted_strings['vision'] = "" return formatted_strings # ============================================================================== # V2.0 UPGRADE: A robust function to remove any AI-generated preamble/disclaimer. # ============================================================================== def _clean_ai_preamble(report_text: str) -> str: """Intelligently removes redundant disclaimers or preambles added by the AI.""" lines = report_text.strip().split('\n') # AI disclaimers are often short, in the first few lines, and contain specific keywords. # We find the first line that looks like real content (starts with '##' for our format). start_index = 0 for i, line in enumerate(lines): if line.strip().startswith('##'): start_index = i break # Failsafe for the first 5 lines if no '##' is found if i > 5: break return '\n'.join(lines[start_index:]) # --- FEATURE 1: Symptom Synthesizer Pipeline (v2.0) --- async def run_symptom_synthesis(user_query: str, image_input: Image.Image | None) -> str: # (Steps 1-4 remain the same) if not user_query: return "Please enter a symptom description or a medical question to begin." correction_prompt = prompts.get_query_correction_prompt(user_query) corrected_query = await gemini_handler.generate_text_response(correction_prompt) if not corrected_query: corrected_query = user_query term_prompt = prompts.get_term_extraction_prompt(corrected_query) concepts_str = await gemini_handler.generate_text_response(term_prompt) concepts = utils.safe_literal_eval(concepts_str) if not isinstance(concepts, list) or not concepts: concepts = [corrected_query] search_query = " OR ".join(f'"{c}"' for c in concepts) async with aiohttp.ClientSession() as session: tasks = { "pubmed": pubmed_client.search_pubmed(session, search_query, max_results=3), "trials": clinicaltrials_client.find_trials(session, search_query, max_results=3), "openfda": asyncio.gather(*(openfda_client.get_adverse_events(session, c, top_n=3) for c in concepts)), } if image_input: tasks["vision"] = gemini_handler.analyze_image_with_text("In the context of the user query, analyze this image objectively...", image_input) raw_results = await asyncio.gather(*tasks.values(), return_exceptions=True) api_data = dict(zip(tasks.keys(), raw_results)) formatted_data = _format_api_data_for_prompt(api_data) # STEP 5: The Grand Synthesis (using new v2.0 prompt) synthesis_prompt = prompts.get_synthesis_prompt(user_query=user_query, concepts=concepts, pubmed_data=formatted_data['pubmed'], trials_data=formatted_data['trials'], fda_data=formatted_data['openfda'], vision_analysis=formatted_data['vision']) final_report = await gemini_handler.generate_text_response(synthesis_prompt) # STEP 6: Intelligent Post-Processing cleaned_report = _clean_ai_preamble(final_report) # STEP 7: Final Delivery return f"{prompts.DISCLAIMER}\n\n{cleaned_report}" # --- FEATURE 2: Drug Interaction & Safety Analyzer Pipeline (v2.0) --- async def run_drug_interaction_analysis(drug_list_str: str) -> str: # (Steps remain the same) if not drug_list_str: return "Please enter a comma-separated list of medications." drug_names = [name.strip() for name in drug_list_str.split(',') if name.strip()] if len(drug_names) < 2: return "Please enter at least two medications to check for interactions." async with aiohttp.ClientSession() as session: tasks = { "interactions": rxnorm_client.run_interaction_check(drug_names), "safety_profiles": asyncio.gather(*(openfda_client.get_safety_profile(session, name) for name in drug_names)) } raw_results = await asyncio.gather(*tasks.values(), return_exceptions=True) api_data = dict(zip(tasks.keys(), raw_results)) interaction_data = api_data.get('interactions', []) if isinstance(interaction_data, Exception): interaction_data = [{"error": str(interaction_data)}] safety_profiles = api_data.get('safety_profiles', []) if isinstance(safety_profiles, Exception): safety_profiles = [{"error": str(safety_profiles)}] safety_data_dict = dict(zip(drug_names, safety_profiles)) interaction_formatted = utils.format_list_as_markdown([str(i) for i in interaction_data]) if interaction_data else "No interactions found." safety_formatted = "\n".join([f"Profile for {drug}: {profile}" for drug, profile in safety_data_dict.items()]) # Synthesis (using new v2.0 prompt) synthesis_prompt = prompts.get_drug_interaction_synthesis_prompt(drug_names=drug_names, interaction_data=interaction_formatted, safety_data=safety_formatted) final_report = await gemini_handler.generate_text_response(synthesis_prompt) # Intelligent Post-Processing cleaned_report = _clean_ai_preamble(final_report) return f"{prompts.DISCLAIMER}\n\n{cleaned_report}"