Spaces:
Sleeping
Sleeping
File size: 3,351 Bytes
dc83e12 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# modules/orchestrator.py
"""
The main conductor. This module sequences the calls to APIs and the AI model.
It's the heart of the application's logic.
"""
import asyncio
import aiohttp
import ast
from . import gemini_handler, prompts
from .api_clients import umls_client, pubmed_client
async def run_symptom_synthesis(user_query: str, image_input=None):
"""
The complete pipeline for the Symptom Synthesizer tab.
"""
if not user_query:
return "Please enter your symptoms or query."
# --- Step 1: Extract Key Concepts with Gemini ---
term_extraction_prompt = prompts.get_term_extraction_prompt(user_query)
concepts_str = await gemini_handler.generate_gemini_response(term_extraction_prompt)
try:
# Safely evaluate the string representation of the list
concepts = ast.literal_eval(concepts_str)
if not isinstance(concepts, list):
concepts = [user_query] # Fallback
except (ValueError, SyntaxError):
concepts = [user_query] # Fallback if Gemini doesn't return a perfect list
search_query = " AND ".join(concepts)
# --- Step 2: Gather Evidence Asynchronously ---
async with aiohttp.ClientSession() as session:
# Create a UMLS client instance for this session
umls = umls_client.UMLSClient(session)
# Define all async tasks
tasks = {
"pubmed": pubmed_client.search_pubmed(session, search_query, max_results=3),
"umls_cui": umls.get_cui_for_term(concepts[0] if concepts else user_query),
# Add other clients here as they are built e.g.,
# "trials": clinicaltrials_client.find_trials(session, search_query),
# "fda": openfda_client.get_adverse_events(session, concepts)
}
# Run all tasks concurrently
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
# Map results back to their keys, handling potential errors
api_data = dict(zip(tasks.keys(), results))
for key, value in api_data.items():
if isinstance(value, Exception):
print(f"Error fetching data from {key}: {value}")
api_data[key] = None # Nullify data if fetch failed
# --- Step 3: Format Data for the Synthesis Prompt ---
# Convert raw JSON/list data into clean, readable strings for the AI
pubmed_formatted = "\n".join([f"- Title: {a.get('title', 'N/A')}, PMID: {a.get('uid', 'N/A')}" for a in api_data.get('pubmed', [])])
# In a real implementation, you'd format trials and fda data here too
trials_formatted = "Trial data fetching is not yet fully implemented in this demo."
fda_formatted = "FDA data fetching is not yet fully implemented in this demo."
# --- Step 4: The Grand Synthesis with Gemini ---
synthesis_prompt = prompts.get_synthesis_prompt(
user_query,
concepts,
pubmed_data=pubmed_formatted,
trials_data=trials_formatted,
fda_data=fda_formatted
)
final_report = await gemini_handler.generate_gemini_response(synthesis_prompt)
# --- Step 5: Prepend Disclaimer and Return ---
return f"{prompts.DISCLAIMER}\n\n{final_report}"
# You would create similar orchestrator functions for other tabs
# e.g., async def run_drug_interaction_analysis(drug_list): ... |