Spaces:
Sleeping
Sleeping
# modules/orchestrator.py | |
""" | |
The main conductor. This module sequences the calls to APIs and the AI model. | |
It's the heart of the application's logic. | |
""" | |
import asyncio | |
import aiohttp | |
import ast | |
from . import gemini_handler, prompts | |
from .api_clients import umls_client, pubmed_client | |
async def run_symptom_synthesis(user_query: str, image_input=None): | |
""" | |
The complete pipeline for the Symptom Synthesizer tab. | |
""" | |
if not user_query: | |
return "Please enter your symptoms or query." | |
# --- Step 1: Extract Key Concepts with Gemini --- | |
term_extraction_prompt = prompts.get_term_extraction_prompt(user_query) | |
concepts_str = await gemini_handler.generate_gemini_response(term_extraction_prompt) | |
try: | |
# Safely evaluate the string representation of the list | |
concepts = ast.literal_eval(concepts_str) | |
if not isinstance(concepts, list): | |
concepts = [user_query] # Fallback | |
except (ValueError, SyntaxError): | |
concepts = [user_query] # Fallback if Gemini doesn't return a perfect list | |
search_query = " AND ".join(concepts) | |
# --- Step 2: Gather Evidence Asynchronously --- | |
async with aiohttp.ClientSession() as session: | |
# Create a UMLS client instance for this session | |
umls = umls_client.UMLSClient(session) | |
# Define all async tasks | |
tasks = { | |
"pubmed": pubmed_client.search_pubmed(session, search_query, max_results=3), | |
"umls_cui": umls.get_cui_for_term(concepts[0] if concepts else user_query), | |
# Add other clients here as they are built e.g., | |
# "trials": clinicaltrials_client.find_trials(session, search_query), | |
# "fda": openfda_client.get_adverse_events(session, concepts) | |
} | |
# Run all tasks concurrently | |
results = await asyncio.gather(*tasks.values(), return_exceptions=True) | |
# Map results back to their keys, handling potential errors | |
api_data = dict(zip(tasks.keys(), results)) | |
for key, value in api_data.items(): | |
if isinstance(value, Exception): | |
print(f"Error fetching data from {key}: {value}") | |
api_data[key] = None # Nullify data if fetch failed | |
# --- Step 3: Format Data for the Synthesis Prompt --- | |
# Convert raw JSON/list data into clean, readable strings for the AI | |
pubmed_formatted = "\n".join([f"- Title: {a.get('title', 'N/A')}, PMID: {a.get('uid', 'N/A')}" for a in api_data.get('pubmed', [])]) | |
# In a real implementation, you'd format trials and fda data here too | |
trials_formatted = "Trial data fetching is not yet fully implemented in this demo." | |
fda_formatted = "FDA data fetching is not yet fully implemented in this demo." | |
# --- Step 4: The Grand Synthesis with Gemini --- | |
synthesis_prompt = prompts.get_synthesis_prompt( | |
user_query, | |
concepts, | |
pubmed_data=pubmed_formatted, | |
trials_data=trials_formatted, | |
fda_data=fda_formatted | |
) | |
final_report = await gemini_handler.generate_gemini_response(synthesis_prompt) | |
# --- Step 5: Prepend Disclaimer and Return --- | |
return f"{prompts.DISCLAIMER}\n\n{final_report}" | |
# You would create similar orchestrator functions for other tabs | |
# e.g., async def run_drug_interaction_analysis(drug_list): ... |