mgbam commited on
Commit
874a822
Β·
verified Β·
1 Parent(s): 64eb751

Update mcp/orchestrator.py

Browse files
Files changed (1) hide show
  1. mcp/orchestrator.py +99 -108
mcp/orchestrator.py CHANGED
@@ -1,122 +1,113 @@
1
- #!/usr/bin/env python3
2
- """MedGenesis – orchestrator (v4.1, context‑safe)
3
-
4
- Runs an async pipeline that fetches literature, enriches with biomedical
5
- APIs, and summarises via either OpenAI or Gemini. Fully resilient:
6
- β€’ HTTPS arXiv
7
- β€’ 403‑proof ClinicalTrials.gov helper
8
- β€’ Filters out failed enrichment calls so UI never crashes
9
- β€’ Follow‑up QA passes `context=` kwarg (fixes TypeError)
10
  """
11
- from __future__ import annotations
12
-
13
- import asyncio
14
- from typing import Any, Dict, List
15
-
16
- # ── async fetchers ──────────────────────────────────────────────────
17
- from mcp.arxiv import fetch_arxiv
18
- from mcp.pubmed import fetch_pubmed
19
- from mcp.nlp import extract_keywords
20
- from mcp.umls import lookup_umls
21
- from mcp.openfda import fetch_drug_safety
22
- from mcp.ncbi import search_gene, get_mesh_definition
23
- from mcp.disgenet import disease_to_genes
24
- from mcp.mygene import fetch_gene_info
25
- from mcp.ctgov import search_trials # v2β†’v1 helper
26
-
27
- # ── LLM helpers ────────────────────────────────────────────────────
28
- from mcp.openai_utils import ai_summarize, ai_qa
29
- from mcp.gemini import gemini_summarize, gemini_qa
30
-
31
- # ------------------------------------------------------------------
32
- # LLM router
33
- # ------------------------------------------------------------------
34
- _DEF_LLM = "openai"
35
 
36
- def _llm_router(name: str | None):
37
- if name and name.lower() == "gemini":
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
38
  return gemini_summarize, gemini_qa, "gemini"
39
  return ai_summarize, ai_qa, "openai"
40
 
41
- # ------------------------------------------------------------------
42
- # Keyword enrichment bundle (NCBI / MeSH / DisGeNET)
43
- # ------------------------------------------------------------------
44
- async def _enrich_keywords(keys: List[str]) -> Dict[str, Any]:
45
- jobs: List[asyncio.Future] = []
46
- for k in keys:
47
- jobs += [search_gene(k), get_mesh_definition(k), disease_to_genes(k)]
48
-
49
- res = await asyncio.gather(*jobs, return_exceptions=True)
50
-
51
- genes, meshes, disg = [], [], []
52
- for idx, r in enumerate(res):
53
- if isinstance(r, Exception):
54
- continue
55
- bucket = idx % 3
56
- if bucket == 0:
57
- genes.extend(r)
58
- elif bucket == 1:
59
- meshes.append(r)
60
- else:
61
- disg.extend(r)
62
- return {"genes": genes, "meshes": meshes, "disgenet": disg}
63
-
64
- # ------------------------------------------------------------------
65
- # Orchestrator main
66
- # ------------------------------------------------------------------
67
- async def orchestrate_search(query: str, *, llm: str = _DEF_LLM) -> Dict[str, Any]:
68
- """Fetch + enrich + summarise; returns dict for Streamlit UI."""
69
- # 1) Literature --------------------------------------------------
70
- arxiv_task = asyncio.create_task(fetch_arxiv(query, max_results=10))
71
- pubmed_task = asyncio.create_task(fetch_pubmed(query, max_results=10))
72
-
73
- papers: List[Dict] = []
74
- for res in await asyncio.gather(arxiv_task, pubmed_task, return_exceptions=True):
75
- if not isinstance(res, Exception):
76
- papers.extend(res)
77
-
78
- # 2) Keyword extraction -----------------------------------------
79
- corpus = " ".join(p.get("summary", "") for p in papers)
80
- keywords = extract_keywords(corpus)[:8]
81
-
82
- # 3) Enrichment fan‑out -----------------------------------------
83
- umls_f = [lookup_umls(k) for k in keywords]
84
- fda_f = [fetch_drug_safety(k) for k in keywords]
85
- ncbi_f = asyncio.create_task(_enrich_keywords(keywords))
86
- gene_f = asyncio.create_task(fetch_gene_info(query))
87
- trials_f = asyncio.create_task(search_trials(query, max_studies=20))
88
 
89
- umls, fda, ncbi, mygene, trials = await asyncio.gather(
90
- asyncio.gather(*umls_f, return_exceptions=True),
91
- asyncio.gather(*fda_f, return_exceptions=True),
92
- ncbi_f,
93
- gene_f,
94
- trials_f,
95
  )
96
 
97
- # filter out failed calls --------------------------------------
98
- umls = [u for u in umls if isinstance(u, dict)]
99
- fda = [d for d in fda if isinstance(d, (dict, list))]
100
 
101
- # 4) LLM summary -------------------------------------------------
102
- summarize_fn, _, engine = _llm_router(llm)
103
- ai_summary = await summarize_fn(corpus) if corpus else ""
104
 
105
- # 5) Assemble payload -------------------------------------------
106
  return {
107
- "papers" : papers,
108
- "umls" : umls,
109
- "drug_safety" : fda,
110
- "ai_summary" : ai_summary,
111
- "llm_used" : engine,
112
- "genes" : (ncbi["genes"] or []) + ([mygene] if mygene else []),
113
- "mesh_defs" : ncbi["meshes"],
114
- "gene_disease" : ncbi["disgenet"],
115
- "clinical_trials": trials,
 
 
116
  }
117
 
118
- # ------------------------------------------------------------------
119
- async def answer_ai_question(question: str, *, context: str, llm: str = _DEF_LLM) -> Dict[str, str]:
120
- """Follow‑up QA using selected LLM (context kwarg fixed)."""
 
121
  _, qa_fn, _ = _llm_router(llm)
122
- return {"answer": await qa_fn(question, context=context)}
 
 
 
 
 
 
 
 
 
 
1
  """
2
+ MedGenesis – parallel multi-API orchestrator
3
+ --------------------------------------------
4
+ Now pulls PubMed, arXiv, UMLS, OpenFDA, DisGeNET, ClinicalTrials.gov,
5
+ PLUS: MyGene.info, Ensembl, OpenTargets, Expression Atlas, cBioPortal,
6
+ DrugCentral & PubChem.
7
+
8
+ Call : orchestrate_search(query, llm="openai" | "gemini")
9
+ Returns : dict ready for Streamlit UI
10
+ Follow-up QA : answer_ai_question(...)
11
+ """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
 
13
+ import asyncio, httpx
14
+ from typing import Dict, Any, List
15
+
16
+ from mcp.arxiv import fetch_arxiv
17
+ from mcp.pubmed import fetch_pubmed
18
+ from mcp.nlp import extract_keywords
19
+ from mcp.umls import lookup_umls
20
+ from mcp.openfda import fetch_drug_safety
21
+ from mcp.ncbi import search_gene, get_mesh_definition # legacy
22
+ from mcp.ncbi_turbo import pubmed_ids
23
+ from mcp.disgenet import disease_to_genes
24
+ from mcp.clinicaltrials import search_trials
25
+ from mcp.openai_utils import ai_summarize, ai_qa
26
+ from mcp.gemini import gemini_summarize, gemini_qa
27
+ from mcp.mygene import fetch_gene_info
28
+ from mcp.ensembl import fetch_ensembl
29
+ from mcp.opentargets import fetch_ot
30
+ from mcp.atlas import fetch_expression
31
+ from mcp.drugcentral_ext import fetch_drugcentral
32
+ from mcp.pubchem_ext import fetch_compound
33
+ from mcp.cbio import fetch_cbio
34
+
35
+ _DEF = "openai"
36
+
37
+ # ---------- LLM router ----------
38
+ def _llm_router(llm: str):
39
+ if llm.lower() == "gemini":
40
  return gemini_summarize, gemini_qa, "gemini"
41
  return ai_summarize, ai_qa, "openai"
42
 
43
+ # ---------- gene resolver ----------
44
+ async def _resolve_gene(sym: str) -> dict:
45
+ for fn in (fetch_gene_info, fetch_ensembl, fetch_ot):
46
+ try:
47
+ data = await fn(sym)
48
+ if data:
49
+ return data
50
+ except Exception:
51
+ pass
52
+ return {}
53
+
54
+ # ---------- orchestrator ----------
55
+ async def orchestrate_search(query: str,
56
+ llm: str = _DEF) -> Dict[str, Any]:
57
+ # 1 Literature
58
+ arxiv_f = asyncio.create_task(fetch_arxiv(query))
59
+ pubmed_f = asyncio.create_task(fetch_pubmed(query))
60
+ papers = sum(await asyncio.gather(arxiv_f, pubmed_f), [])
61
+
62
+ # 2 Keyword extraction
63
+ blob = " ".join(p["summary"] for p in papers)
64
+ keywords = extract_keywords(blob)[:10]
65
+
66
+ # 3 Enrichment fan-out
67
+ umls_tasks = [lookup_umls(k) for k in keywords]
68
+ fda_tasks = [fetch_drug_safety(k) for k in keywords]
69
+ gene_tasks = [ _resolve_gene(k) for k in keywords]
70
+ expr_tasks = [ fetch_expression(k) for k in keywords]
71
+ dcz_tasks = [ fetch_drugcentral(k) for k in keywords]
72
+ chem_tasks = [ fetch_compound(k) for k in keywords]
73
+ cbio_tasks = [ fetch_cbio(k) for k in keywords[:3]] # limit API
74
+
75
+ genes, exprs, dcz, chems, cbio = await asyncio.gather(
76
+ asyncio.gather(*gene_tasks, return_exceptions=True),
77
+ asyncio.gather(*expr_tasks, return_exceptions=True),
78
+ asyncio.gather(*dcz_tasks, return_exceptions=True),
79
+ asyncio.gather(*chem_tasks, return_exceptions=True),
80
+ asyncio.gather(*cbio_tasks, return_exceptions=True),
81
+ )
 
 
 
 
 
 
 
 
82
 
83
+ umls, fda = await asyncio.gather(
84
+ asyncio.gather(*umls_tasks, return_exceptions=True),
85
+ asyncio.gather(*fda_tasks, return_exceptions=True),
 
 
 
86
  )
87
 
88
+ trials = await search_trials(query, max_studies=20)
 
 
89
 
90
+ # 4 AI summary
91
+ summarise, _, used_llm = _llm_router(llm)
92
+ summary = await summarise(blob)
93
 
 
94
  return {
95
+ "papers" : papers,
96
+ "ai_summary" : summary,
97
+ "llm_used" : used_llm,
98
+ "umls" : umls,
99
+ "drug_safety" : fda,
100
+ "genes_rich" : [g for g in genes if g],
101
+ "expr_atlas" : [e for e in exprs if e],
102
+ "drug_meta" : [d for d in dcz if d],
103
+ "chem_info" : [c for c in chems if c],
104
+ "clinical_trials" : trials,
105
+ "cbio_variants" : [v for v in cbio if v],
106
  }
107
 
108
+ # ---------- follow-up QA ----------
109
+ async def answer_ai_question(question: str, *,
110
+ context: str,
111
+ llm: str = _DEF) -> Dict[str, str]:
112
  _, qa_fn, _ = _llm_router(llm)
113
+ return {"answer": await qa_fn(question, context)}