|
|
|
"""DualβLLM orchestrator coordinating literature β annotation β trials. |
|
Adds gene/variant enrichment with MyGene.info β Ensembl β OpenTargets β cBio. |
|
""" |
|
import asyncio |
|
from typing import Dict, Any, List |
|
|
|
from mcp.arxiv import fetch_arxiv |
|
from mcp.pubmed import fetch_pubmed |
|
from mcp.nlp import extract_keywords |
|
from mcp.umls import lookup_umls |
|
from mcp.openfda import fetch_drug_safety |
|
from mcp.clinicaltrials import search_trials |
|
from mcp.gene_hub import resolve_gene |
|
from mcp.cbio import fetch_cbio_variants |
|
|
|
from mcp.openai_utils import ai_summarize, ai_qa |
|
from mcp.gemini import gemini_summarize, gemini_qa |
|
|
|
_DEF = "openai" |
|
|
|
|
|
|
|
def _llm_router(llm: str): |
|
if llm.lower() == "gemini": |
|
return gemini_summarize, gemini_qa, "gemini" |
|
return ai_summarize, ai_qa, "openai" |
|
|
|
|
|
|
|
|
|
async def _enrich_gene_block(keywords: List[str]) -> Dict[str, Any]: |
|
out: List[Dict] = [] |
|
variants: Dict[str, List[Dict]] = {} |
|
for kw in keywords: |
|
g = await resolve_gene(kw) |
|
if g: |
|
out.append(g) |
|
|
|
try: |
|
variants[kw] = await fetch_cbio_variants(kw) |
|
except Exception: |
|
variants[kw] = [] |
|
return {"genes": out, "variants": variants} |
|
|
|
|
|
|
|
|
|
async def orchestrate_search(query: str, llm: str = _DEF) -> Dict[str, Any]: |
|
"""Run search, summarise and join annotations for the UI.""" |
|
|
|
arxiv_task = asyncio.create_task(fetch_arxiv(query, max_results=20)) |
|
pubmed_task = asyncio.create_task(fetch_pubmed(query, max_results=20)) |
|
papers = sum(await asyncio.gather(arxiv_task, pubmed_task), []) |
|
|
|
|
|
blob = " ".join(p.get("summary", "") for p in papers)[:60_000] |
|
keywords = extract_keywords(blob)[:12] |
|
|
|
|
|
umls_f = [lookup_umls(k) for k in keywords] |
|
fda_f = [fetch_drug_safety(k) for k in keywords] |
|
gene_block = asyncio.create_task(_enrich_gene_block(keywords)) |
|
trials_task = asyncio.create_task(search_trials(query, max_studies=20)) |
|
|
|
umls, fda, gene_data, trials = await asyncio.gather( |
|
asyncio.gather(*umls_f, return_exceptions=True), |
|
asyncio.gather(*fda_f, return_exceptions=True), |
|
gene_block, |
|
trials_task, |
|
) |
|
|
|
|
|
summarise, _, engine_name = _llm_router(llm) |
|
try: |
|
summary = await summarise(blob) |
|
except Exception: |
|
summary = "LLM summarisation unavailable." |
|
|
|
return { |
|
"papers": papers, |
|
"umls": umls, |
|
"drug_safety": fda, |
|
"genes": gene_data["genes"], |
|
"variants": gene_data["variants"], |
|
"clinical_trials": trials, |
|
"ai_summary": summary, |
|
"llm_used": engine_name, |
|
} |
|
|
|
|
|
async def answer_ai_question(question: str, *, context: str, llm: str = _DEF) -> Dict[str, str]: |
|
"""Followβup Q&A via selected LLM.""" |
|
_, qa_fn, _ = _llm_router(llm) |
|
try: |
|
answer = await qa_fn(question, context) |
|
except Exception: |
|
answer = "LLM unavailable or quota exceeded." |
|
return {"answer": answer} |