mgbam commited on
Commit
86771dc
Β·
verified Β·
1 Parent(s): ee520ba

Update mcp/orchestrator.py

Browse files
Files changed (1) hide show
  1. mcp/orchestrator.py +113 -74
mcp/orchestrator.py CHANGED
@@ -1,92 +1,131 @@
 
1
  """
2
- MedGenesis – dual-LLM orchestrator
3
- ----------------------------------
4
- β€’ Accepts `llm` arg ("openai" | "gemini")
5
- β€’ Defaults to "openai" if arg omitted
 
 
 
 
 
 
6
  """
7
 
8
- import asyncio, httpx
9
- from typing import Dict, Any, List
10
-
11
- from mcp.arxiv import fetch_arxiv
12
- from mcp.pubmed import fetch_pubmed
13
- from mcp.nlp import extract_keywords
14
- from mcp.umls import lookup_umls
15
- from mcp.openfda import fetch_drug_safety
16
- from mcp.ncbi import search_gene, get_mesh_definition
17
- from mcp.disgenet import disease_to_genes
18
- from mcp.clinicaltrials import search_trials
19
- from mcp.openai_utils import ai_summarize, ai_qa
20
- from mcp.gemini import gemini_summarize, gemini_qa # make sure gemini.py exists
21
-
22
- # ---------------- LLM router ----------------
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
  def _get_llm(llm: str):
24
- if llm.lower() == "gemini":
 
25
  return gemini_summarize, gemini_qa
26
- return ai_summarize, ai_qa # default β†’ OpenAI
27
 
 
 
 
 
 
 
 
28
 
29
- async def _enrich_genes_mesh_disg(keys: List[str]) -> Dict[str, Any]:
30
- jobs = []
31
- for k in keys:
32
- jobs += [search_gene(k), get_mesh_definition(k), disease_to_genes(k)]
33
- res = await asyncio.gather(*jobs, return_exceptions=True)
34
 
35
- genes, meshes, disg = [], [], []
36
- for i, r in enumerate(res):
37
- if isinstance(r, Exception): # skip failures quietly
 
38
  continue
39
- if i % 3 == 0: genes.extend(r)
40
- elif i % 3 == 1: meshes.append(r)
41
- else: disg.extend(r)
42
- return {"genes": genes, "meshes": meshes, "disgenet": disg}
 
 
 
43
 
 
44
 
45
  # ------------------------------------------------------------------
46
- async def orchestrate_search(query: str, llm: str = "openai") -> Dict[str, Any]:
47
- """
48
- Main orchestrator – returns dict for UI.
49
- """
50
- # 1) Literature ---------------------------------------------------
51
- arxiv_f = asyncio.create_task(fetch_arxiv(query))
52
- pubmed_f = asyncio.create_task(fetch_pubmed(query))
53
- papers = sum(await asyncio.gather(arxiv_f, pubmed_f), [])
54
-
55
- # 2) Keywords -----------------------------------------------------
56
- blob = " ".join(p["summary"] for p in papers)
57
- keys = extract_keywords(blob)[:8]
58
-
59
- # 3) Enrichment ---------------------------------------------------
60
- umls_f = [lookup_umls(k) for k in keys]
61
- fda_f = [fetch_drug_safety(k) for k in keys]
62
- genes_f = asyncio.create_task(_enrich_genes_mesh_disg(keys))
63
- trials_f = asyncio.create_task(search_trials(query, max_studies=10))
64
-
65
- umls, fda, genes, trials = await asyncio.gather(
66
- asyncio.gather(*umls_f, return_exceptions=True),
67
- asyncio.gather(*fda_f, return_exceptions=True),
68
- genes_f,
69
- trials_f,
 
 
 
 
70
  )
71
 
72
- # 4) AI summary ---------------------------------------------------
73
- summarize, _ = _get_llm(llm)
74
- summary = await summarize(blob)
75
 
 
76
  return {
77
- "papers" : papers,
78
- "umls" : umls,
79
- "drug_safety" : fda,
80
- "ai_summary" : summary,
81
- "llm_used" : llm.lower(),
82
- "genes" : genes["genes"],
83
- "mesh_defs" : genes["meshes"],
84
- "gene_disease" : genes["disgenet"],
85
- "clinical_trials" : trials,
86
- }
87
 
 
 
 
 
88
 
89
- async def answer_ai_question(question: str, context: str, llm: str = "openai") -> Dict[str, str]:
90
- """One-shot follow-up Q-A via chosen engine."""
91
- _, qa = _get_llm(llm)
92
- return {"answer": await qa(question, context)}
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
  """
3
+ MedGenesis – dual‑LLM asynchronous orchestrator
4
+ ==============================================
5
+ β€’ Accepts `llm` argument ("openai" | "gemini"), defaults to "openai".
6
+ β€’ Harvests literature (PubMedβ€―+β€―arXiv) β†’ extracts keywords.
7
+ β€’ Fans‑out to open APIs for genes, trials, safety, ontology:
8
+ – **MyGene.info** for live gene annotations
9
+ – **ClinicalTrials.govΒ v2** for recruiting & completed studies
10
+ – UMLSβ€―/β€―openFDAβ€―/β€―DisGeNETβ€―/β€―MeSH (existing helpers)
11
+ – Optional OpenΒ Targets & DrugCentral via `multi_enrich` if needed.
12
+ β€’ Returns a single JSON‑serialisable dict consumed by the Streamlit UI.
13
  """
14
 
15
+ from __future__ import annotations
16
+
17
+ import asyncio
18
+ from typing import Any, Dict, List
19
+
20
+ # ── Literature fetchers ─────────────────────────────────────────────
21
+ from mcp.arxiv import fetch_arxiv
22
+ from mcp.pubmed import fetch_pubmed
23
+
24
+ # ── NLP & legacy enrichers ─────────────────────────────────────────
25
+ from mcp.nlp import extract_keywords
26
+ from mcp.umls import lookup_umls
27
+ from mcp.openfda import fetch_drug_safety
28
+ from mcp.ncbi import search_gene, get_mesh_definition
29
+ from mcp.disgenet import disease_to_genes
30
+
31
+ # ── Modern high‑throughput APIs ────────────────────────────────────
32
+ from mcp.mygene import fetch_gene_info # MyGene.info
33
+ from mcp.ctgov import search_trials_v2 # ClinicalTrials.gov v2
34
+ # from mcp.targets import fetch_ot_associations # (optional future use)
35
+
36
+ # ── LLM utilities ─────────────────────────────────────────────────
37
+ from mcp.openai_utils import ai_summarize, ai_qa
38
+ from mcp.gemini import gemini_summarize, gemini_qa
39
+
40
+ # ------------------------------------------------------------------
41
+ # LLM router
42
+ # ------------------------------------------------------------------
43
+
44
  def _get_llm(llm: str):
45
+ """Return (summarize_fn, qa_fn) based on requested engine."""
46
+ if llm and llm.lower() == "gemini":
47
  return gemini_summarize, gemini_qa
48
+ return ai_summarize, ai_qa # default β†’ OpenAI
49
 
50
+ # ------------------------------------------------------------------
51
+ # Helper: batch NCBIΒ /Β MeSHΒ /Β DisGeNET enrichment for keyword list
52
+ # ------------------------------------------------------------------
53
+ async def _enrich_ncbi_mesh_disg(keys: List[str]) -> Dict[str, Any]:
54
+ jobs = [search_gene(k) for k in keys] + \
55
+ [get_mesh_definition(k) for k in keys] + \
56
+ [disease_to_genes(k) for k in keys]
57
 
58
+ results = await asyncio.gather(*jobs, return_exceptions=True)
 
 
 
 
59
 
60
+ genes, mesh_defs, disg_links = [], [], []
61
+ n = len(keys)
62
+ for idx, res in enumerate(results):
63
+ if isinstance(res, Exception):
64
  continue
65
+ bucket = idx // n # 0Β =Β gene, 1Β =Β mesh, 2Β =Β disg
66
+ if bucket == 0:
67
+ genes.extend(res)
68
+ elif bucket == 1:
69
+ mesh_defs.append(res)
70
+ else:
71
+ disg_links.extend(res)
72
 
73
+ return {"genes": genes, "meshes": mesh_defs, "disgenet": disg_links}
74
 
75
  # ------------------------------------------------------------------
76
+ # Main orchestrator
77
+ # ------------------------------------------------------------------
78
+ async def orchestrate_search(query: str, *, llm: str = "openai") -> Dict[str, Any]:
79
+ """Master async pipeline – returns dict consumed by UI."""
80
+
81
+ # 1)Β Literature --------------------------------------------------
82
+ arxiv_task = asyncio.create_task(fetch_arxiv(query))
83
+ pubmed_task = asyncio.create_task(fetch_pubmed(query))
84
+ papers = sum(await asyncio.gather(arxiv_task, pubmed_task), [])
85
+
86
+ # 2)Β Keyword extraction -----------------------------------------
87
+ corpus = " ".join(p["summary"] for p in papers)
88
+ keywords = extract_keywords(corpus)[:8]
89
+
90
+ # 3)Β Fan‑out enrichment -----------------------------------------
91
+ umls_tasks = [lookup_umls(k) for k in keywords]
92
+ fda_tasks = [fetch_drug_safety(k) for k in keywords]
93
+
94
+ ncbi_task = asyncio.create_task(_enrich_ncbi_mesh_disg(keywords))
95
+ mygene_task = asyncio.create_task(fetch_gene_info(query)) # top gene hit
96
+ trials_task = asyncio.create_task(search_trials_v2(query, max_n=20))
97
+
98
+ umls, fda, ncbi_data, mygene, trials = await asyncio.gather(
99
+ asyncio.gather(*umls_tasks, return_exceptions=True),
100
+ asyncio.gather(*fda_tasks, return_exceptions=True),
101
+ ncbi_task,
102
+ mygene_task,
103
+ trials_task,
104
  )
105
 
106
+ # 4)Β LLM summary -------------------------------------------------
107
+ summarize_fn, _ = _get_llm(llm)
108
+ ai_summary = await summarize_fn(corpus)
109
 
110
+ # 5)Β Assemble payload -------------------------------------------
111
  return {
112
+ "papers" : papers,
113
+ "umls" : umls,
114
+ "drug_safety" : fda,
115
+ "ai_summary" : ai_summary,
116
+ "llm_used" : llm.lower(),
 
 
 
 
 
117
 
118
+ # Gene & variant context
119
+ "genes" : (ncbi_data["genes"] or []) + ([mygene] if mygene else []),
120
+ "mesh_defs" : ncbi_data["meshes"],
121
+ "gene_disease" : ncbi_data["disgenet"],
122
 
123
+ # Clinical trials
124
+ "clinical_trials": trials,
125
+ }
126
+
127
+ # ------------------------------------------------------------------
128
+ async def answer_ai_question(question: str, *, context: str, llm: str = "openai") -> Dict[str, str]:
129
+ """One‑shot follow‑up QA using selected engine."""
130
+ _, qa_fn = _get_llm(llm)
131
+ return {"answer": await qa_fn(question, context)}