Spaces:
Running
Running
File size: 7,178 Bytes
4428435 dc83e12 4428435 666355f dc83e12 140d9d4 4428435 666355f 89db8f5 140d9d4 666355f 4428435 666355f 4428435 666355f 4428435 666355f 4428435 666355f 4428435 666355f 4428435 dc83e12 666355f dc83e12 666355f 4428435 666355f 4428435 666355f 4428435 666355f 4428435 4e9aea4 666355f 4428435 666355f 4e9aea4 4428435 f869df3 666355f f869df3 4e9aea4 f869df3 4e9aea4 666355f 4428435 4e9aea4 4428435 4e9aea4 4428435 4e9aea4 4428435 4e9aea4 4428435 4e9aea4 4428435 666355f 4e9aea4 4428435 4e9aea4 666355f 4e9aea4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# modules/orchestrator.py
"""
The Central Nervous System of Project Asclepius.
(v2.0 - The "Clinical Insight Engine" Upgrade)
This version uses a smarter post-processing function to guarantee clean output.
"""
import asyncio
import aiohttp
from itertools import chain
from PIL import Image
from . import gemini_handler, prompts, utils
from api_clients import (
pubmed_client, clinicaltrials_client, openfda_client, rxnorm_client
)
# --- Internal Helper for Data Formatting (Unchanged) ---
def _format_api_data_for_prompt(api_results: dict) -> dict[str, str]:
# This function is unchanged.
formatted_strings = {}
pubmed_data = api_results.get('pubmed', [])
if isinstance(pubmed_data, list) and pubmed_data: lines = [f"- Title: {a.get('title', 'N/A')} (Journal: {a.get('journal', 'N/A')}, URL: {a.get('url')})" for a in pubmed_data]; formatted_strings['pubmed'] = "\n".join(lines)
else: formatted_strings['pubmed'] = "No relevant review articles were found on PubMed for this query."
trials_data = api_results.get('trials', [])
if isinstance(trials_data, list) and trials_data: lines = [f"- Title: {t.get('title', 'N/A')} (Status: {t.get('status', 'N/A')}, URL: {t.get('url')})" for t in trials_data]; formatted_strings['trials'] = "\n".join(lines)
else: formatted_strings['trials'] = "No actively recruiting clinical trials were found matching this query."
fda_data = api_results.get('openfda', [])
if isinstance(fda_data, list):
all_events = list(chain.from_iterable(filter(None, fda_data)))
if all_events: lines = [f"- {evt['term']} (Reported {evt['count']} times)" for evt in all_events]; formatted_strings['openfda'] = "\n".join(lines)
else: formatted_strings['openfda'] = "No specific adverse event data was found for this query."
else: formatted_strings['openfda'] = "No specific adverse event data was found for this query."
vision_data = api_results.get('vision', "")
if isinstance(vision_data, str) and vision_data: formatted_strings['vision'] = vision_data
elif isinstance(vision_data, Exception): formatted_strings['vision'] = f"An error occurred during image analysis: {vision_data}"
else: formatted_strings['vision'] = ""
return formatted_strings
# ==============================================================================
# V2.0 UPGRADE: A robust function to remove any AI-generated preamble/disclaimer.
# ==============================================================================
def _clean_ai_preamble(report_text: str) -> str:
"""Intelligently removes redundant disclaimers or preambles added by the AI."""
lines = report_text.strip().split('\n')
# AI disclaimers are often short, in the first few lines, and contain specific keywords.
# We find the first line that looks like real content (starts with '##' for our format).
start_index = 0
for i, line in enumerate(lines):
if line.strip().startswith('##'):
start_index = i
break
# Failsafe for the first 5 lines if no '##' is found
if i > 5:
break
return '\n'.join(lines[start_index:])
# --- FEATURE 1: Symptom Synthesizer Pipeline (v2.0) ---
async def run_symptom_synthesis(user_query: str, image_input: Image.Image | None) -> str:
# (Steps 1-4 remain the same)
if not user_query: return "Please enter a symptom description or a medical question to begin."
correction_prompt = prompts.get_query_correction_prompt(user_query)
corrected_query = await gemini_handler.generate_text_response(correction_prompt)
if not corrected_query: corrected_query = user_query
term_prompt = prompts.get_term_extraction_prompt(corrected_query)
concepts_str = await gemini_handler.generate_text_response(term_prompt)
concepts = utils.safe_literal_eval(concepts_str)
if not isinstance(concepts, list) or not concepts: concepts = [corrected_query]
search_query = " OR ".join(f'"{c}"' for c in concepts)
async with aiohttp.ClientSession() as session:
tasks = { "pubmed": pubmed_client.search_pubmed(session, search_query, max_results=3), "trials": clinicaltrials_client.find_trials(session, search_query, max_results=3), "openfda": asyncio.gather(*(openfda_client.get_adverse_events(session, c, top_n=3) for c in concepts)), }
if image_input: tasks["vision"] = gemini_handler.analyze_image_with_text("In the context of the user query, analyze this image objectively...", image_input)
raw_results = await asyncio.gather(*tasks.values(), return_exceptions=True)
api_data = dict(zip(tasks.keys(), raw_results))
formatted_data = _format_api_data_for_prompt(api_data)
# STEP 5: The Grand Synthesis (using new v2.0 prompt)
synthesis_prompt = prompts.get_synthesis_prompt(user_query=user_query, concepts=concepts, pubmed_data=formatted_data['pubmed'], trials_data=formatted_data['trials'], fda_data=formatted_data['openfda'], vision_analysis=formatted_data['vision'])
final_report = await gemini_handler.generate_text_response(synthesis_prompt)
# STEP 6: Intelligent Post-Processing
cleaned_report = _clean_ai_preamble(final_report)
# STEP 7: Final Delivery
return f"{prompts.DISCLAIMER}\n\n{cleaned_report}"
# --- FEATURE 2: Drug Interaction & Safety Analyzer Pipeline (v2.0) ---
async def run_drug_interaction_analysis(drug_list_str: str) -> str:
# (Steps remain the same)
if not drug_list_str: return "Please enter a comma-separated list of medications."
drug_names = [name.strip() for name in drug_list_str.split(',') if name.strip()]
if len(drug_names) < 2: return "Please enter at least two medications to check for interactions."
async with aiohttp.ClientSession() as session:
tasks = { "interactions": rxnorm_client.run_interaction_check(drug_names), "safety_profiles": asyncio.gather(*(openfda_client.get_safety_profile(session, name) for name in drug_names)) }
raw_results = await asyncio.gather(*tasks.values(), return_exceptions=True)
api_data = dict(zip(tasks.keys(), raw_results))
interaction_data = api_data.get('interactions', [])
if isinstance(interaction_data, Exception): interaction_data = [{"error": str(interaction_data)}]
safety_profiles = api_data.get('safety_profiles', [])
if isinstance(safety_profiles, Exception): safety_profiles = [{"error": str(safety_profiles)}]
safety_data_dict = dict(zip(drug_names, safety_profiles))
interaction_formatted = utils.format_list_as_markdown([str(i) for i in interaction_data]) if interaction_data else "No interactions found."
safety_formatted = "\n".join([f"Profile for {drug}: {profile}" for drug, profile in safety_data_dict.items()])
# Synthesis (using new v2.0 prompt)
synthesis_prompt = prompts.get_drug_interaction_synthesis_prompt(drug_names=drug_names, interaction_data=interaction_formatted, safety_data=safety_formatted)
final_report = await gemini_handler.generate_text_response(synthesis_prompt)
# Intelligent Post-Processing
cleaned_report = _clean_ai_preamble(final_report)
return f"{prompts.DISCLAIMER}\n\n{cleaned_report}" |