mgbam commited on
Commit
146b143
Β·
verified Β·
1 Parent(s): e33dfeb

Update mcp/orchestrator.py

Browse files
Files changed (1) hide show
  1. mcp/orchestrator.py +46 -46
mcp/orchestrator.py CHANGED
@@ -1,11 +1,12 @@
1
  #!/usr/bin/env python3
2
- """MedGenesis – orchestrator (v4, resilient).
3
-
4
- * Pulls PubMed + arXiv (async)
5
- * spaCy keyword extraction β†’ UMLS / openFDA / DisGeNET / MeSH fan‑out
6
- * Adds MyGene.info, ClinicalTrials.gov (v2 β†’ v1 fallback)
7
- * Filters out failed enrichment calls (exceptions) so UI never crashes
8
- * Summarises with OpenAI *or* Gemini; router returns `llm_used`
 
9
  """
10
  from __future__ import annotations
11
 
@@ -13,15 +14,15 @@ import asyncio
13
  from typing import Any, Dict, List
14
 
15
  # ── async fetchers ──────────────────────────────────────────────────
16
- from mcp.arxiv import fetch_arxiv
17
- from mcp.pubmed import fetch_pubmed
18
- from mcp.nlp import extract_keywords
19
- from mcp.umls import lookup_umls
20
- from mcp.openfda import fetch_drug_safety
21
- from mcp.ncbi import search_gene, get_mesh_definition
22
  from mcp.disgenet import disease_to_genes
23
- from mcp.mygene import fetch_gene_info
24
- from mcp.ctgov import search_trials
25
 
26
  # ── LLM helpers ────────────────────────────────────────────────────
27
  from mcp.openai_utils import ai_summarize, ai_qa
@@ -30,7 +31,7 @@ from mcp.gemini import gemini_summarize, gemini_qa
30
  # ------------------------------------------------------------------
31
  # LLM router
32
  # ------------------------------------------------------------------
33
- _DEF = "openai"
34
 
35
  def _llm_router(name: str | None):
36
  if name and name.lower() == "gemini":
@@ -38,16 +39,16 @@ def _llm_router(name: str | None):
38
  return ai_summarize, ai_qa, "openai"
39
 
40
  # ------------------------------------------------------------------
41
- # Keyword enrichment bundle
42
  # ------------------------------------------------------------------
43
  async def _enrich_keywords(keys: List[str]) -> Dict[str, Any]:
44
- tasks: List[asyncio.Future] = []
45
  for k in keys:
46
- tasks += [search_gene(k), get_mesh_definition(k), disease_to_genes(k)]
47
 
48
- res = await asyncio.gather(*tasks, return_exceptions=True)
49
 
50
- genes, mesh_defs, disg = [], [], []
51
  for idx, r in enumerate(res):
52
  if isinstance(r, Exception):
53
  continue
@@ -55,50 +56,51 @@ async def _enrich_keywords(keys: List[str]) -> Dict[str, Any]:
55
  if bucket == 0:
56
  genes.extend(r)
57
  elif bucket == 1:
58
- mesh_defs.append(r)
59
  else:
60
  disg.extend(r)
61
- return {"genes": genes, "meshes": mesh_defs, "disgenet": disg}
62
 
63
  # ------------------------------------------------------------------
64
- # Main orchestrator
65
  # ------------------------------------------------------------------
66
- async def orchestrate_search(query: str, *, llm: str = _DEF) -> Dict[str, Any]:
67
- """Run entire async pipeline; never raises."""
68
  # 1) Literature --------------------------------------------------
69
- arxiv_f = asyncio.create_task(fetch_arxiv(query, max_results=10))
70
- pubmed_f = asyncio.create_task(fetch_pubmed(query, max_results=10))
71
- papers = []
72
- for fut in await asyncio.gather(arxiv_f, pubmed_f, return_exceptions=True):
73
- if not isinstance(fut, Exception):
74
- papers.extend(fut)
75
-
76
- # 2) Keywords ----------------------------------------------------
 
77
  corpus = " ".join(p.get("summary", "") for p in papers)
78
  keywords = extract_keywords(corpus)[:8]
79
 
80
- # 3) Fan‑out enrichment -----------------------------------------
81
  umls_f = [lookup_umls(k) for k in keywords]
82
  fda_f = [fetch_drug_safety(k) for k in keywords]
83
  ncbi_f = asyncio.create_task(_enrich_keywords(keywords))
84
- mygene_f = asyncio.create_task(fetch_gene_info(query))
85
  trials_f = asyncio.create_task(search_trials(query, max_studies=20))
86
 
87
  umls, fda, ncbi, mygene, trials = await asyncio.gather(
88
  asyncio.gather(*umls_f, return_exceptions=True),
89
  asyncio.gather(*fda_f, return_exceptions=True),
90
  ncbi_f,
91
- mygene_f,
92
  trials_f,
93
  )
94
 
95
- # ── filter exception objects -----------------------------------
96
  umls = [u for u in umls if isinstance(u, dict)]
97
- fda = [d for d in fda if isinstance(d, (dict, list))]
98
 
99
  # 4) LLM summary -------------------------------------------------
100
- summarize, _, engine = _llm_router(llm)
101
- ai_summary = await summarize(corpus) if corpus else ""
102
 
103
  # 5) Assemble payload -------------------------------------------
104
  return {
@@ -107,16 +109,14 @@ async def orchestrate_search(query: str, *, llm: str = _DEF) -> Dict[str, Any]:
107
  "drug_safety" : fda,
108
  "ai_summary" : ai_summary,
109
  "llm_used" : engine,
110
- # gene context
111
  "genes" : (ncbi["genes"] or []) + ([mygene] if mygene else []),
112
  "mesh_defs" : ncbi["meshes"],
113
  "gene_disease" : ncbi["disgenet"],
114
- # trials
115
  "clinical_trials": trials,
116
  }
117
 
118
  # ------------------------------------------------------------------
119
- async def answer_ai_question(question: str, *, context: str, llm: str = _DEF) -> Dict[str, str]:
120
- """Follow‑up QA using chosen LLM."""
121
  _, qa_fn, _ = _llm_router(llm)
122
- return {"answer": await qa_fn(question, context)}
 
1
  #!/usr/bin/env python3
2
+ """MedGenesis – orchestrator (v4.1, context‑safe)
3
+
4
+ Runs an async pipeline that fetches literature, enriches with biomedical
5
+ APIs, and summarises via either OpenAI or Gemini. Fully resilient:
6
+ β€’ HTTPS arXiv
7
+ β€’ 403‑proof ClinicalTrials.gov helper
8
+ β€’ Filters out failed enrichment calls so UI never crashes
9
+ β€’ Follow‑up QA passes `context=` kwarg (fixes TypeError)
10
  """
11
  from __future__ import annotations
12
 
 
14
  from typing import Any, Dict, List
15
 
16
  # ── async fetchers ──────────────────────────────────────────────────
17
+ from mcp.arxiv import fetch_arxiv
18
+ from mcp.pubmed import fetch_pubmed
19
+ from mcp.nlp import extract_keywords
20
+ from mcp.umls import lookup_umls
21
+ from mcp.openfda import fetch_drug_safety
22
+ from mcp.ncbi import search_gene, get_mesh_definition
23
  from mcp.disgenet import disease_to_genes
24
+ from mcp.mygene import fetch_gene_info
25
+ from mcp.ctgov import search_trials # v2β†’v1 helper
26
 
27
  # ── LLM helpers ────────────────────────────────────────────────────
28
  from mcp.openai_utils import ai_summarize, ai_qa
 
31
  # ------------------------------------------------------------------
32
  # LLM router
33
  # ------------------------------------------------------------------
34
+ _DEF_LLM = "openai"
35
 
36
  def _llm_router(name: str | None):
37
  if name and name.lower() == "gemini":
 
39
  return ai_summarize, ai_qa, "openai"
40
 
41
  # ------------------------------------------------------------------
42
+ # Keyword enrichment bundle (NCBI / MeSH / DisGeNET)
43
  # ------------------------------------------------------------------
44
  async def _enrich_keywords(keys: List[str]) -> Dict[str, Any]:
45
+ jobs: List[asyncio.Future] = []
46
  for k in keys:
47
+ jobs += [search_gene(k), get_mesh_definition(k), disease_to_genes(k)]
48
 
49
+ res = await asyncio.gather(*jobs, return_exceptions=True)
50
 
51
+ genes, meshes, disg = [], [], []
52
  for idx, r in enumerate(res):
53
  if isinstance(r, Exception):
54
  continue
 
56
  if bucket == 0:
57
  genes.extend(r)
58
  elif bucket == 1:
59
+ meshes.append(r)
60
  else:
61
  disg.extend(r)
62
+ return {"genes": genes, "meshes": meshes, "disgenet": disg}
63
 
64
  # ------------------------------------------------------------------
65
+ # Orchestrator main
66
  # ------------------------------------------------------------------
67
+ async def orchestrate_search(query: str, *, llm: str = _DEF_LLM) -> Dict[str, Any]:
68
+ """Fetch + enrich + summarise; returns dict for Streamlit UI."""
69
  # 1) Literature --------------------------------------------------
70
+ arxiv_task = asyncio.create_task(fetch_arxiv(query, max_results=10))
71
+ pubmed_task = asyncio.create_task(fetch_pubmed(query, max_results=10))
72
+
73
+ papers: List[Dict] = []
74
+ for res in await asyncio.gather(arxiv_task, pubmed_task, return_exceptions=True):
75
+ if not isinstance(res, Exception):
76
+ papers.extend(res)
77
+
78
+ # 2) Keyword extraction -----------------------------------------
79
  corpus = " ".join(p.get("summary", "") for p in papers)
80
  keywords = extract_keywords(corpus)[:8]
81
 
82
+ # 3) Enrichment fan‑out -----------------------------------------
83
  umls_f = [lookup_umls(k) for k in keywords]
84
  fda_f = [fetch_drug_safety(k) for k in keywords]
85
  ncbi_f = asyncio.create_task(_enrich_keywords(keywords))
86
+ gene_f = asyncio.create_task(fetch_gene_info(query))
87
  trials_f = asyncio.create_task(search_trials(query, max_studies=20))
88
 
89
  umls, fda, ncbi, mygene, trials = await asyncio.gather(
90
  asyncio.gather(*umls_f, return_exceptions=True),
91
  asyncio.gather(*fda_f, return_exceptions=True),
92
  ncbi_f,
93
+ gene_f,
94
  trials_f,
95
  )
96
 
97
+ # filter out failed calls --------------------------------------
98
  umls = [u for u in umls if isinstance(u, dict)]
99
+ fda = [d for d in fda if isinstance(d, (dict, list))]
100
 
101
  # 4) LLM summary -------------------------------------------------
102
+ summarize_fn, _, engine = _llm_router(llm)
103
+ ai_summary = await summarize_fn(corpus) if corpus else ""
104
 
105
  # 5) Assemble payload -------------------------------------------
106
  return {
 
109
  "drug_safety" : fda,
110
  "ai_summary" : ai_summary,
111
  "llm_used" : engine,
 
112
  "genes" : (ncbi["genes"] or []) + ([mygene] if mygene else []),
113
  "mesh_defs" : ncbi["meshes"],
114
  "gene_disease" : ncbi["disgenet"],
 
115
  "clinical_trials": trials,
116
  }
117
 
118
  # ------------------------------------------------------------------
119
+ async def answer_ai_question(question: str, *, context: str, llm: str = _DEF_LLM) -> Dict[str, str]:
120
+ """Follow‑up QA using selected LLM (context kwarg fixed)."""
121
  _, qa_fn, _ = _llm_router(llm)
122
+ return {"answer": await qa_fn(question, context=context)}