mgbam commited on
Commit
c72a1cd
Β·
verified Β·
1 Parent(s): 5035006

Update mcp/orchestrator.py

Browse files
Files changed (1) hide show
  1. mcp/orchestrator.py +66 -80
mcp/orchestrator.py CHANGED
@@ -1,131 +1,117 @@
1
  #!/usr/bin/env python3
 
 
 
 
 
 
 
 
 
2
  """
3
- MedGenesis – dual‑LLM asynchronous orchestrator
4
- ==============================================
5
- β€’ Accepts `llm` argument ("openai" | "gemini"), defaults to "openai".
6
- β€’ Harvests literature (PubMedβ€―+β€―arXiv) β†’ extracts keywords.
7
- β€’ Fans‑out to open APIs for genes, trials, safety, ontology:
8
- – **MyGene.info** for live gene annotations
9
- – **ClinicalTrials.govΒ v2** for recruiting & completed studies
10
- – UMLSβ€―/β€―openFDAβ€―/β€―DisGeNETβ€―/β€―MeSH (existing helpers)
11
- – Optional OpenΒ Targets & DrugCentral via `multi_enrich` if needed.
12
- β€’ Returns a single JSON‑serialisable dict consumed by the Streamlit UI.
13
- """
14
-
15
  from __future__ import annotations
16
 
17
  import asyncio
18
  from typing import Any, Dict, List
19
 
20
- # ── Literature fetchers ─────────────────────────────────────────────
21
- from mcp.arxiv import fetch_arxiv
22
- from mcp.pubmed import fetch_pubmed
23
-
24
- # ── NLP & legacy enrichers ─────────────────────────────────────────
25
- from mcp.nlp import extract_keywords
26
- from mcp.umls import lookup_umls
27
- from mcp.openfda import fetch_drug_safety
28
- from mcp.ncbi import search_gene, get_mesh_definition
29
- from mcp.disgenet import disease_to_genes
30
-
31
- # ── Modern high‑throughput APIs ────────────────────────────────────
32
- from mcp.mygene import fetch_gene_info # MyGene.info
33
- from mcp.ctgov import search_trials_v2 # ClinicalTrials.gov v2
34
- # from mcp.targets import fetch_ot_associations # (optional future use)
35
-
36
- # ── LLM utilities ─────────────────────────────────────────────────
37
- from mcp.openai_utils import ai_summarize, ai_qa
38
- from mcp.gemini import gemini_summarize, gemini_qa
39
-
40
- # ------------------------------------------------------------------
41
  # LLM router
42
- # ------------------------------------------------------------------
 
43
 
44
- def _get_llm(llm: str):
45
- """Return (summarize_fn, qa_fn) based on requested engine."""
46
  if llm and llm.lower() == "gemini":
47
- return gemini_summarize, gemini_qa
48
- return ai_summarize, ai_qa # default β†’ OpenAI
49
 
50
- # ------------------------------------------------------------------
51
- # Helper: batch NCBIΒ /Β MeSHΒ /Β DisGeNET enrichment for keyword list
52
- # ------------------------------------------------------------------
53
- async def _enrich_ncbi_mesh_disg(keys: List[str]) -> Dict[str, Any]:
54
- jobs = [search_gene(k) for k in keys] + \
55
- [get_mesh_definition(k) for k in keys] + \
56
- [disease_to_genes(k) for k in keys]
57
 
58
- results = await asyncio.gather(*jobs, return_exceptions=True)
59
 
60
- genes, mesh_defs, disg_links = [], [], []
61
- n = len(keys)
62
  for idx, res in enumerate(results):
63
  if isinstance(res, Exception):
64
  continue
65
- bucket = idx // n # 0Β =Β gene, 1Β =Β mesh, 2Β =Β disg
66
  if bucket == 0:
67
  genes.extend(res)
68
  elif bucket == 1:
69
  mesh_defs.append(res)
70
  else:
71
- disg_links.extend(res)
72
-
73
- return {"genes": genes, "meshes": mesh_defs, "disgenet": disg_links}
74
-
75
- # ------------------------------------------------------------------
76
- # Main orchestrator
77
- # ------------------------------------------------------------------
78
- async def orchestrate_search(query: str, *, llm: str = "openai") -> Dict[str, Any]:
79
- """Master async pipeline – returns dict consumed by UI."""
80
-
81
- # 1)Β Literature --------------------------------------------------
82
- arxiv_task = asyncio.create_task(fetch_arxiv(query))
83
- pubmed_task = asyncio.create_task(fetch_pubmed(query))
84
  papers = sum(await asyncio.gather(arxiv_task, pubmed_task), [])
85
 
86
- # 2)Β Keyword extraction -----------------------------------------
87
  corpus = " ".join(p["summary"] for p in papers)
88
  keywords = extract_keywords(corpus)[:8]
89
 
90
- # 3)Β Fan‑out enrichment -----------------------------------------
91
- umls_tasks = [lookup_umls(k) for k in keywords]
92
- fda_tasks = [fetch_drug_safety(k) for k in keywords]
93
 
94
- ncbi_task = asyncio.create_task(_enrich_ncbi_mesh_disg(keywords))
95
- mygene_task = asyncio.create_task(fetch_gene_info(query)) # top gene hit
96
- trials_task = asyncio.create_task(search_trials_v2(query, max_n=20))
97
 
98
  umls, fda, ncbi_data, mygene, trials = await asyncio.gather(
99
  asyncio.gather(*umls_tasks, return_exceptions=True),
100
  asyncio.gather(*fda_tasks, return_exceptions=True),
101
  ncbi_task,
102
- mygene_task,
103
  trials_task,
104
  )
105
 
106
- # 4)Β LLM summary -------------------------------------------------
107
- summarize_fn, _ = _get_llm(llm)
108
  ai_summary = await summarize_fn(corpus)
109
 
110
- # 5)Β Assemble payload -------------------------------------------
111
  return {
112
  "papers" : papers,
113
  "umls" : umls,
114
  "drug_safety" : fda,
115
  "ai_summary" : ai_summary,
116
- "llm_used" : llm.lower(),
117
 
118
- # Gene & variant context
119
  "genes" : (ncbi_data["genes"] or []) + ([mygene] if mygene else []),
120
  "mesh_defs" : ncbi_data["meshes"],
121
  "gene_disease" : ncbi_data["disgenet"],
122
 
123
- # Clinical trials
124
  "clinical_trials": trials,
125
  }
126
 
127
- # ------------------------------------------------------------------
128
- async def answer_ai_question(question: str, *, context: str, llm: str = "openai") -> Dict[str, str]:
129
- """One‑shot follow‑up QA using selected engine."""
130
- _, qa_fn = _get_llm(llm)
131
  return {"answer": await qa_fn(question, context)}
 
1
  #!/usr/bin/env python3
2
+ """MedGenesis – asynchronous orchestrator (v3)
3
+
4
+ β€’ Pulls literature (PubMed + arXiv)
5
+ β€’ Extracts keywords via spaCy β†’ fans‑out to:
6
+ – MyGene.info (gene annotations)
7
+ – ClinicalTrials.gov v2 (trials) with v1 fallback
8
+ – UMLS / openFDA / DisGeNET / MeSH
9
+ β€’ Summarises with OpenAI or Gemini (user‑selectable)
10
+ β€’ Returns a single dict ready for Streamlit UI & pydantic schemas.
11
  """
 
 
 
 
 
 
 
 
 
 
 
 
12
  from __future__ import annotations
13
 
14
  import asyncio
15
  from typing import Any, Dict, List
16
 
17
+ # ── Internal modules ────────────────────────────────────────────────
18
+ from mcp.arxiv import fetch_arxiv
19
+ from mcp.pubmed import fetch_pubmed
20
+ from mcp.nlp import extract_keywords
21
+ from mcp.umls import lookup_umls
22
+ from mcp.openfda import fetch_drug_safety
23
+ from mcp.ncbi import search_gene, get_mesh_definition
24
+ from mcp.disgenet import disease_to_genes
25
+ from mcp.mygene import fetch_gene_info
26
+ from mcp.ctgov import search_trials # v2 helper (v1 fallback inside)
27
+ from mcp.openai_utils import ai_summarize, ai_qa
28
+ from mcp.gemini import gemini_summarize, gemini_qa
29
+
30
+ # -------------------------------------------------------------------
 
 
 
 
 
 
 
31
  # LLM router
32
+ # -------------------------------------------------------------------
33
+ _DEF_MODEL = "openai"
34
 
35
+ def _get_llm(llm: str | None):
 
36
  if llm and llm.lower() == "gemini":
37
+ return gemini_summarize, gemini_qa, "gemini"
38
+ return ai_summarize, ai_qa, "openai"
39
 
40
+ # -------------------------------------------------------------------
41
+ # Helper: NCBI / MeSH / DisGeNET enrichment
42
+ # -------------------------------------------------------------------
43
+ async def _enrich_keywords(keys: List[str]) -> Dict[str, Any]:
44
+ tasks = []
45
+ for k in keys:
46
+ tasks += [search_gene(k), get_mesh_definition(k), disease_to_genes(k)]
47
 
48
+ results = await asyncio.gather(*tasks, return_exceptions=True)
49
 
50
+ genes, mesh_defs, disg = [], [], []
 
51
  for idx, res in enumerate(results):
52
  if isinstance(res, Exception):
53
  continue
54
+ bucket = idx % 3
55
  if bucket == 0:
56
  genes.extend(res)
57
  elif bucket == 1:
58
  mesh_defs.append(res)
59
  else:
60
+ disg.extend(res)
61
+ return {"genes": genes, "meshes": mesh_defs, "disgenet": disg}
62
+
63
+ # -------------------------------------------------------------------
64
+ # Public API
65
+ # -------------------------------------------------------------------
66
+ async def orchestrate_search(query: str, *, llm: str = _DEF_MODEL) -> Dict[str, Any]:
67
+ """Run full async pipeline and return merged result dict."""
68
+ # 1) Literature --------------------------------------------------
69
+ arxiv_task = asyncio.create_task(fetch_arxiv(query, max_results=10))
70
+ pubmed_task = asyncio.create_task(fetch_pubmed(query, max_results=10))
 
 
71
  papers = sum(await asyncio.gather(arxiv_task, pubmed_task), [])
72
 
73
+ # 2) Keyword extraction -----------------------------------------
74
  corpus = " ".join(p["summary"] for p in papers)
75
  keywords = extract_keywords(corpus)[:8]
76
 
77
+ # 3) Enrichment fan‑out -----------------------------------------
78
+ umls_tasks = [lookup_umls(k) for k in keywords]
79
+ fda_tasks = [fetch_drug_safety(k) for k in keywords]
80
 
81
+ ncbi_task = asyncio.create_task(_enrich_keywords(keywords))
82
+ gene_task = asyncio.create_task(fetch_gene_info(query))
83
+ trials_task = asyncio.create_task(search_trials(query, max_studies=20))
84
 
85
  umls, fda, ncbi_data, mygene, trials = await asyncio.gather(
86
  asyncio.gather(*umls_tasks, return_exceptions=True),
87
  asyncio.gather(*fda_tasks, return_exceptions=True),
88
  ncbi_task,
89
+ gene_task,
90
  trials_task,
91
  )
92
 
93
+ # 4) LLM summary -------------------------------------------------
94
+ summarize_fn, _, engine = _get_llm(llm)
95
  ai_summary = await summarize_fn(corpus)
96
 
97
+ # 5) Assemble result --------------------------------------------
98
  return {
99
  "papers" : papers,
100
  "umls" : umls,
101
  "drug_safety" : fda,
102
  "ai_summary" : ai_summary,
103
+ "llm_used" : engine,
104
 
105
+ # genes & ontologies
106
  "genes" : (ncbi_data["genes"] or []) + ([mygene] if mygene else []),
107
  "mesh_defs" : ncbi_data["meshes"],
108
  "gene_disease" : ncbi_data["disgenet"],
109
 
110
+ # trials
111
  "clinical_trials": trials,
112
  }
113
 
114
+
115
+ async def answer_ai_question(question: str, *, context: str, llm: str = _DEF_MODEL) -> Dict[str, str]:
116
+ summarize, qa_fn, engine = _get_llm(llm)
 
117
  return {"answer": await qa_fn(question, context)}