mgbam commited on
Commit
f400521
Β·
verified Β·
1 Parent(s): 20e7231

Update mcp/orchestrator.py

Browse files
Files changed (1) hide show
  1. mcp/orchestrator.py +129 -80
mcp/orchestrator.py CHANGED
@@ -1,113 +1,162 @@
 
1
  """
2
- MedGenesis – parallel multi-API orchestrator
3
- --------------------------------------------
4
- Now pulls PubMed, arXiv, UMLS, OpenFDA, DisGeNET, ClinicalTrials.gov,
5
- PLUS: MyGene.info, Ensembl, OpenTargets, Expression Atlas, cBioPortal,
6
- DrugCentral & PubChem.
7
-
8
- Call : orchestrate_search(query, llm="openai" | "gemini")
9
- Returns : dict ready for Streamlit UI
10
- Follow-up QA : answer_ai_question(...)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  """
12
 
13
- import asyncio, httpx
 
14
  from typing import Dict, Any, List
15
 
 
16
  from mcp.arxiv import fetch_arxiv
17
  from mcp.pubmed import fetch_pubmed
 
 
 
18
  from mcp.nlp import extract_keywords
19
  from mcp.umls import lookup_umls
20
  from mcp.openfda import fetch_drug_safety
21
- from mcp.ncbi import search_gene, get_mesh_definition # legacy
22
- from mcp.ncbi_turbo import pubmed_ids
23
  from mcp.disgenet import disease_to_genes
24
  from mcp.clinicaltrials import search_trials
25
- from mcp.openai_utils import ai_summarize, ai_qa
26
- from mcp.gemini import gemini_summarize, gemini_qa
27
- from mcp.mygene import fetch_gene_info
28
- from mcp.ensembl import fetch_ensembl
29
- from mcp.opentargets import fetch_ot
30
  from mcp.atlas import fetch_expression
 
 
 
31
  from mcp.drugcentral_ext import fetch_drugcentral
32
  from mcp.pubchem_ext import fetch_compound
33
- from mcp.cbio import fetch_cbio
34
 
35
- _DEF = "openai"
 
 
 
 
36
 
37
- # ---------- LLM router ----------
38
- def _llm_router(llm: str):
39
- if llm.lower() == "gemini":
 
 
 
40
  return gemini_summarize, gemini_qa, "gemini"
41
  return ai_summarize, ai_qa, "openai"
42
 
43
- # ---------- gene resolver ----------
44
- async def _resolve_gene(sym: str) -> dict:
45
- for fn in (fetch_gene_info, fetch_ensembl, fetch_ot):
46
- try:
47
- data = await fn(sym)
48
- if data:
49
- return data
50
- except Exception:
51
- pass
52
- return {}
53
-
54
- # ---------- orchestrator ----------
55
- async def orchestrate_search(query: str,
56
- llm: str = _DEF) -> Dict[str, Any]:
57
- # 1 Literature
58
- arxiv_f = asyncio.create_task(fetch_arxiv(query))
59
- pubmed_f = asyncio.create_task(fetch_pubmed(query))
60
- papers = sum(await asyncio.gather(arxiv_f, pubmed_f), [])
61
-
62
- # 2 Keyword extraction
63
- blob = " ".join(p["summary"] for p in papers)
64
- keywords = extract_keywords(blob)[:10]
65
-
66
- # 3 Enrichment fan-out
67
- umls_tasks = [lookup_umls(k) for k in keywords]
68
- fda_tasks = [fetch_drug_safety(k) for k in keywords]
69
- gene_tasks = [ _resolve_gene(k) for k in keywords]
70
- expr_tasks = [ fetch_expression(k) for k in keywords]
71
- dcz_tasks = [ fetch_drugcentral(k) for k in keywords]
72
- chem_tasks = [ fetch_compound(k) for k in keywords]
73
- cbio_tasks = [ fetch_cbio(k) for k in keywords[:3]] # limit API
74
-
75
- genes, exprs, dcz, chems, cbio = await asyncio.gather(
76
- asyncio.gather(*gene_tasks, return_exceptions=True),
77
- asyncio.gather(*expr_tasks, return_exceptions=True),
78
- asyncio.gather(*dcz_tasks, return_exceptions=True),
79
- asyncio.gather(*chem_tasks, return_exceptions=True),
80
- asyncio.gather(*cbio_tasks, return_exceptions=True),
81
- )
82
 
83
- umls, fda = await asyncio.gather(
84
- asyncio.gather(*umls_tasks, return_exceptions=True),
85
- asyncio.gather(*fda_tasks, return_exceptions=True),
 
 
 
 
 
 
 
 
 
 
 
 
 
86
  )
87
 
88
- trials = await search_trials(query, max_studies=20)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89
 
90
- # 4 AI summary
91
- summarise, _, used_llm = _llm_router(llm)
92
- summary = await summarise(blob)
 
93
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94
  return {
95
  "papers" : papers,
96
- "ai_summary" : summary,
97
- "llm_used" : used_llm,
98
- "umls" : umls,
99
- "drug_safety" : fda,
100
- "genes_rich" : [g for g in genes if g],
101
- "expr_atlas" : [e for e in exprs if e],
102
- "drug_meta" : [d for d in dcz if d],
103
- "chem_info" : [c for c in chems if c],
 
104
  "clinical_trials" : trials,
105
- "cbio_variants" : [v for v in cbio if v],
106
  }
107
 
108
- # ---------- follow-up QA ----------
109
  async def answer_ai_question(question: str, *,
110
  context: str,
111
- llm: str = _DEF) -> Dict[str, str]:
 
112
  _, qa_fn, _ = _llm_router(llm)
113
- return {"answer": await qa_fn(question, context)}
 
1
+ #!/usr/bin/env python3
2
  """
3
+ mcp/orchestrator.py Β· MedGenesis v5
4
+ ─────────────────────────────────────
5
+ Fan-out, async orchestrator that drives the Streamlit UI.
6
+
7
+ Data sources pulled in parallel
8
+ ───────────────────────────────
9
+ β€’ Literature β†’ PubMed (E-utils) + arXiv RSS
10
+ β€’ NLP keywords β†’ spaCy (en_core_web_sm)
11
+ β€’ UMLS concepts β†’ UMLS REST (optional API key)
12
+ β€’ Safety signals β†’ openFDA Drug Event API
13
+ β€’ Gene annotation β†’ MyGene.info ➜ Ensembl REST ➜ Open Targets GraphQL
14
+ β€’ Expression β†’ EMBL-EBI Expression Atlas
15
+ β€’ Gene↔Disease β†’ DisGeNET
16
+ β€’ Trial registry β†’ ClinicalTrials.gov (Ξ² OAS v2 ➜ prod v2 ➜ legacy v1 ➜ WHO ICTRP)
17
+ β€’ Cancer variants β†’ cBioPortal REST v4
18
+ β€’ Drug metadata β†’ DrugCentral SMART API
19
+ β€’ Chemistry β†’ PubChem PUG-REST
20
+ β€’ Fast PubMed IDs β†’ NCBI E-utils (personal key doubles quota)
21
+
22
+ LLM engines
23
+ ───────────
24
+ OpenAI GPT-4o (default) or Gemini 1.5-Flash/Pro via router.
25
+
26
+ Return payload keys (used by Streamlit UI)
27
+ ──────────────────────────────────────────
28
+ papers, ai_summary, llm_used, umls, drug_safety,
29
+ genes_rich, expr_atlas, drug_meta, chem_info,
30
+ gene_disease, clinical_trials, cbio_variants
31
  """
32
 
33
+ from __future__ import annotations
34
+ import asyncio
35
  from typing import Dict, Any, List
36
 
37
+ # ── Literature ──────────────────────────────────────────────────────
38
  from mcp.arxiv import fetch_arxiv
39
  from mcp.pubmed import fetch_pubmed
40
+ from mcp.ncbi_turbo import pubmed_ids # fast IDs
41
+
42
+ # ── NLP + biomedical enrichment ────────────────────────────────────
43
  from mcp.nlp import extract_keywords
44
  from mcp.umls import lookup_umls
45
  from mcp.openfda import fetch_drug_safety
 
 
46
  from mcp.disgenet import disease_to_genes
47
  from mcp.clinicaltrials import search_trials
48
+
49
+ # Gene / expression helpers
50
+ from mcp.gene_hub import resolve_gene # MyGene β†’ Ensembl β†’ OT
 
 
51
  from mcp.atlas import fetch_expression
52
+ from mcp.cbio import fetch_cbio
53
+
54
+ # Drug metadata
55
  from mcp.drugcentral_ext import fetch_drugcentral
56
  from mcp.pubchem_ext import fetch_compound
 
57
 
58
+ # ── LLM helpers ────────────────────────────────────────────────────
59
+ from mcp.openai_utils import ai_summarize, ai_qa
60
+ from mcp.gemini import gemini_summarize, gemini_qa
61
+
62
+ _LLM_DEFAULT = "openai"
63
 
64
+ # ────────────────────────────────────────────────────────────────────
65
+ # Internal helpers
66
+ # ────────────────────────────────────────────────────────────────────
67
+ def _llm_router(name: str):
68
+ """Return (summarise_fn, qa_fn, engine_name)."""
69
+ if name.lower() == "gemini":
70
  return gemini_summarize, gemini_qa, "gemini"
71
  return ai_summarize, ai_qa, "openai"
72
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
 
74
+ async def _fanout_keywords(keys: List[str]) -> Dict[str, Any]:
75
+ """Run UMLS, safety, gene, expression, drug meta in parallel."""
76
+ umls_f = [lookup_umls(k) for k in keys]
77
+ fda_f = [fetch_drug_safety(k) for k in keys]
78
+ gene_f = [resolve_gene(k) for k in keys]
79
+ expr_f = [fetch_expression(k) for k in keys]
80
+ drug_f = [fetch_drugcentral(k) for k in keys]
81
+ chem_f = [fetch_compound(k) for k in keys]
82
+
83
+ umls, fda, genes, exprs, dmeta, chem = await asyncio.gather(
84
+ asyncio.gather(*umls_f, return_exceptions=True),
85
+ asyncio.gather(*fda_f, return_exceptions=True),
86
+ asyncio.gather(*gene_f, return_exceptions=True),
87
+ asyncio.gather(*expr_f, return_exceptions=True),
88
+ asyncio.gather(*drug_f, return_exceptions=True),
89
+ asyncio.gather(*chem_f, return_exceptions=True),
90
  )
91
 
92
+ return {
93
+ "umls" : [u for u in umls if isinstance(u, dict)],
94
+ "fda" : [d for d in fda if d],
95
+ "genes" : [g for g in genes if g],
96
+ "expr" : [e for e in exprs if e],
97
+ "drug_meta" : [d for d in dmeta if d],
98
+ "chem_info" : [c for c in chem if c],
99
+ }
100
+
101
+
102
+ # ────────────────────────────────────────────────────────────────────
103
+ # Public API
104
+ # ────────────────────────────────────────────────────────────────────
105
+ async def orchestrate_search(query: str,
106
+ llm: str = _LLM_DEFAULT) -> Dict[str, Any]:
107
+ """Run full async pipeline; never raises uncaught exceptions."""
108
+ # 1) Literature ---------------------------------------------------
109
+ arxiv_task = asyncio.create_task(fetch_arxiv(query, max_results=10))
110
+ pubmed_task = asyncio.create_task(fetch_pubmed(query, max_results=10))
111
 
112
+ papers: List[Dict] = []
113
+ for p in await asyncio.gather(arxiv_task, pubmed_task, return_exceptions=True):
114
+ if not isinstance(p, Exception):
115
+ papers.extend(p)
116
 
117
+ # 2) NLP keywords -------------------------------------------------
118
+ corpus = " ".join(p.get("summary", "") for p in papers)
119
+ keywords = extract_keywords(corpus)[:10]
120
+
121
+ # 3) Keyword fan-out ---------------------------------------------
122
+ enrich = await _fanout_keywords(keywords)
123
+
124
+ # 4) DisGeNET + trials (single calls) -----------------------------
125
+ disg_f = asyncio.create_task(disease_to_genes(query))
126
+ trials_f = asyncio.create_task(search_trials(query, max_studies=20))
127
+ gene_dis, trials = await asyncio.gather(disg_f, trials_f)
128
+
129
+ # 5) Cancer variants (limit first 3 genes for quota) -------------
130
+ cbio_tasks = [fetch_cbio(g["symbol"]) for g in enrich["genes"][:3]]
131
+ cbio_vars = []
132
+ if cbio_tasks:
133
+ cbio_vars = await asyncio.gather(*cbio_tasks, return_exceptions=True)
134
+ cbio_vars = [v for v in cbio_vars if v]
135
+
136
+ # 6) AI summary ---------------------------------------------------
137
+ summarise, _, engine = _llm_router(llm)
138
+ ai_summary = await summarise(corpus) if corpus else ""
139
+
140
+ # 7) Return payload ----------------------------------------------
141
  return {
142
  "papers" : papers,
143
+ "ai_summary" : ai_summary,
144
+ "llm_used" : engine,
145
+ "umls" : enrich["umls"],
146
+ "drug_safety" : enrich["fda"],
147
+ "genes_rich" : enrich["genes"],
148
+ "expr_atlas" : enrich["expr"],
149
+ "drug_meta" : enrich["drug_meta"],
150
+ "chem_info" : enrich["chem_info"],
151
+ "gene_disease" : gene_dis,
152
  "clinical_trials" : trials,
153
+ "cbio_variants" : cbio_vars,
154
  }
155
 
156
+
157
  async def answer_ai_question(question: str, *,
158
  context: str,
159
+ llm: str = _LLM_DEFAULT) -> Dict[str, str]:
160
+ """Follow-up QA using selected engine; returns {'answer': str}."""
161
  _, qa_fn, _ = _llm_router(llm)
162
+ return {"answer": await qa_fn(question, context=context)}