|
|
|
""" |
|
MedGenesis β dualβLLM asynchronous orchestrator |
|
============================================== |
|
β’ Accepts `llm` argument ("openai" | "gemini"), defaults to "openai". |
|
β’ Harvests literature (PubMedβ―+β―arXiv) β extracts keywords. |
|
β’ Fansβout to open APIs for genes, trials, safety, ontology: |
|
β **MyGene.info** for live gene annotations |
|
β **ClinicalTrials.govΒ v2** for recruiting & completed studies |
|
β UMLSβ―/β―openFDAβ―/β―DisGeNETβ―/β―MeSH (existing helpers) |
|
β Optional OpenΒ Targets & DrugCentral via `multi_enrich` if needed. |
|
β’ Returns a single JSONβserialisable dict consumed by the Streamlit UI. |
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
import asyncio |
|
from typing import Any, Dict, List |
|
|
|
|
|
from mcp.arxiv import fetch_arxiv |
|
from mcp.pubmed import fetch_pubmed |
|
|
|
|
|
from mcp.nlp import extract_keywords |
|
from mcp.umls import lookup_umls |
|
from mcp.openfda import fetch_drug_safety |
|
from mcp.ncbi import search_gene, get_mesh_definition |
|
from mcp.disgenet import disease_to_genes |
|
|
|
|
|
from mcp.mygene import fetch_gene_info |
|
from mcp.ctgov import search_trials_v2 |
|
|
|
|
|
|
|
from mcp.openai_utils import ai_summarize, ai_qa |
|
from mcp.gemini import gemini_summarize, gemini_qa |
|
|
|
|
|
|
|
|
|
|
|
def _get_llm(llm: str): |
|
"""Return (summarize_fn, qa_fn) based on requested engine.""" |
|
if llm and llm.lower() == "gemini": |
|
return gemini_summarize, gemini_qa |
|
return ai_summarize, ai_qa |
|
|
|
|
|
|
|
|
|
async def _enrich_ncbi_mesh_disg(keys: List[str]) -> Dict[str, Any]: |
|
jobs = [search_gene(k) for k in keys] + \ |
|
[get_mesh_definition(k) for k in keys] + \ |
|
[disease_to_genes(k) for k in keys] |
|
|
|
results = await asyncio.gather(*jobs, return_exceptions=True) |
|
|
|
genes, mesh_defs, disg_links = [], [], [] |
|
n = len(keys) |
|
for idx, res in enumerate(results): |
|
if isinstance(res, Exception): |
|
continue |
|
bucket = idx // n |
|
if bucket == 0: |
|
genes.extend(res) |
|
elif bucket == 1: |
|
mesh_defs.append(res) |
|
else: |
|
disg_links.extend(res) |
|
|
|
return {"genes": genes, "meshes": mesh_defs, "disgenet": disg_links} |
|
|
|
|
|
|
|
|
|
async def orchestrate_search(query: str, *, llm: str = "openai") -> Dict[str, Any]: |
|
"""Master async pipeline β returns dict consumed by UI.""" |
|
|
|
|
|
arxiv_task = asyncio.create_task(fetch_arxiv(query)) |
|
pubmed_task = asyncio.create_task(fetch_pubmed(query)) |
|
papers = sum(await asyncio.gather(arxiv_task, pubmed_task), []) |
|
|
|
|
|
corpus = " ".join(p["summary"] for p in papers) |
|
keywords = extract_keywords(corpus)[:8] |
|
|
|
|
|
umls_tasks = [lookup_umls(k) for k in keywords] |
|
fda_tasks = [fetch_drug_safety(k) for k in keywords] |
|
|
|
ncbi_task = asyncio.create_task(_enrich_ncbi_mesh_disg(keywords)) |
|
mygene_task = asyncio.create_task(fetch_gene_info(query)) |
|
trials_task = asyncio.create_task(search_trials_v2(query, max_n=20)) |
|
|
|
umls, fda, ncbi_data, mygene, trials = await asyncio.gather( |
|
asyncio.gather(*umls_tasks, return_exceptions=True), |
|
asyncio.gather(*fda_tasks, return_exceptions=True), |
|
ncbi_task, |
|
mygene_task, |
|
trials_task, |
|
) |
|
|
|
|
|
summarize_fn, _ = _get_llm(llm) |
|
ai_summary = await summarize_fn(corpus) |
|
|
|
|
|
return { |
|
"papers" : papers, |
|
"umls" : umls, |
|
"drug_safety" : fda, |
|
"ai_summary" : ai_summary, |
|
"llm_used" : llm.lower(), |
|
|
|
|
|
"genes" : (ncbi_data["genes"] or []) + ([mygene] if mygene else []), |
|
"mesh_defs" : ncbi_data["meshes"], |
|
"gene_disease" : ncbi_data["disgenet"], |
|
|
|
|
|
"clinical_trials": trials, |
|
} |
|
|
|
|
|
async def answer_ai_question(question: str, *, context: str, llm: str = "openai") -> Dict[str, str]: |
|
"""Oneβshot followβup QA using selected engine.""" |
|
_, qa_fn = _get_llm(llm) |
|
return {"answer": await qa_fn(question, context)} |
|
|