mgbam commited on
Commit
eaba1ed
·
verified ·
1 Parent(s): f400521

Update mcp/orchestrator.py

Browse files
Files changed (1) hide show
  1. mcp/orchestrator.py +84 -96
mcp/orchestrator.py CHANGED
@@ -1,30 +1,23 @@
1
  #!/usr/bin/env python3
2
  """
3
- mcp/orchestrator.py · MedGenesis v5
4
- ─────────────────────────────────────
5
- Fan-out, async orchestrator that drives the Streamlit UI.
6
-
7
- Data sources pulled in parallel
8
- ───────────────────────────────
9
- Literature → PubMed (E-utils) + arXiv RSS
10
- NLP keywords → spaCy (en_core_web_sm)
11
- • UMLS concepts → UMLS REST (optional API key)
12
- Safety signals → openFDA Drug Event API
13
- Gene annotation → MyGene.info ➜ Ensembl REST ➜ Open Targets GraphQL
14
- • Expression → EMBL-EBI Expression Atlas
15
- Gene↔Disease → DisGeNET
16
- Trial registry → ClinicalTrials.gov OAS v2 prod v2 ➜ legacy v1 ➜ WHO ICTRP)
17
- Cancer variants → cBioPortal REST v4
18
- • Drug metadata → DrugCentral SMART API
19
- Chemistry → PubChem PUG-REST
20
- • Fast PubMed IDs → NCBI E-utils (personal key doubles quota)
21
-
22
- LLM engines
23
- ───────────
24
- OpenAI GPT-4o (default) or Gemini 1.5-Flash/Pro via router.
25
-
26
- Return payload keys (used by Streamlit UI)
27
- ──────────────────────────────────────────
28
  papers, ai_summary, llm_used, umls, drug_safety,
29
  genes_rich, expr_atlas, drug_meta, chem_info,
30
  gene_disease, clinical_trials, cbio_variants
@@ -37,126 +30,121 @@ from typing import Dict, Any, List
37
  # ── Literature ──────────────────────────────────────────────────────
38
  from mcp.arxiv import fetch_arxiv
39
  from mcp.pubmed import fetch_pubmed
40
- from mcp.ncbi_turbo import pubmed_ids # fast IDs
41
 
42
- # ── NLP + biomedical enrichment ────────────────────────────────────
43
  from mcp.nlp import extract_keywords
44
  from mcp.umls import lookup_umls
45
  from mcp.openfda import fetch_drug_safety
46
  from mcp.disgenet import disease_to_genes
47
  from mcp.clinicaltrials import search_trials
48
 
49
- # Gene / expression helpers
50
- from mcp.gene_hub import resolve_gene # MyGene → Ensembl → OT
51
  from mcp.atlas import fetch_expression
52
- from mcp.cbio import fetch_cbio
53
 
54
- # Drug metadata
55
  from mcp.drugcentral_ext import fetch_drugcentral
56
  from mcp.pubchem_ext import fetch_compound
57
 
58
- # ── LLM helpers ────────────────────────────────────────────────────
59
  from mcp.openai_utils import ai_summarize, ai_qa
60
  from mcp.gemini import gemini_summarize, gemini_qa
61
 
62
  _LLM_DEFAULT = "openai"
63
 
64
  # ────────────────────────────────────────────────────────────────────
65
- # Internal helpers
66
  # ────────────────────────────────────────────────────────────────────
67
  def _llm_router(name: str):
68
- """Return (summarise_fn, qa_fn, engine_name)."""
69
  if name.lower() == "gemini":
70
  return gemini_summarize, gemini_qa, "gemini"
71
  return ai_summarize, ai_qa, "openai"
72
 
73
 
74
- async def _fanout_keywords(keys: List[str]) -> Dict[str, Any]:
75
- """Run UMLS, safety, gene, expression, drug meta in parallel."""
76
- umls_f = [lookup_umls(k) for k in keys]
77
- fda_f = [fetch_drug_safety(k) for k in keys]
78
- gene_f = [resolve_gene(k) for k in keys]
79
- expr_f = [fetch_expression(k) for k in keys]
80
- drug_f = [fetch_drugcentral(k) for k in keys]
81
- chem_f = [fetch_compound(k) for k in keys]
82
-
83
- umls, fda, genes, exprs, dmeta, chem = await asyncio.gather(
84
- asyncio.gather(*umls_f, return_exceptions=True),
85
- asyncio.gather(*fda_f, return_exceptions=True),
86
- asyncio.gather(*gene_f, return_exceptions=True),
87
- asyncio.gather(*expr_f, return_exceptions=True),
88
- asyncio.gather(*drug_f, return_exceptions=True),
89
- asyncio.gather(*chem_f, return_exceptions=True),
90
- )
91
-
92
- return {
93
- "umls" : [u for u in umls if isinstance(u, dict)],
94
- "fda" : [d for d in fda if d],
95
- "genes" : [g for g in genes if g],
96
- "expr" : [e for e in exprs if e],
97
- "drug_meta" : [d for d in dmeta if d],
98
- "chem_info" : [c for c in chem if c],
99
- }
100
-
101
-
102
  # ────────────────────────────────────────────────────────────────────
103
- # Public API
104
  # ────────────────────────────────────────────────────────────────────
105
  async def orchestrate_search(query: str,
106
  llm: str = _LLM_DEFAULT) -> Dict[str, Any]:
107
- """Run full async pipeline; never raises uncaught exceptions."""
108
- # 1) Literature ---------------------------------------------------
109
- arxiv_task = asyncio.create_task(fetch_arxiv(query, max_results=10))
110
- pubmed_task = asyncio.create_task(fetch_pubmed(query, max_results=10))
111
 
112
  papers: List[Dict] = []
113
- for p in await asyncio.gather(arxiv_task, pubmed_task, return_exceptions=True):
114
- if not isinstance(p, Exception):
115
- papers.extend(p)
116
 
117
- # 2) NLP keywords -------------------------------------------------
118
  corpus = " ".join(p.get("summary", "") for p in papers)
119
  keywords = extract_keywords(corpus)[:10]
120
 
121
- # 3) Keyword fan-out ---------------------------------------------
122
- enrich = await _fanout_keywords(keywords)
123
-
124
- # 4) DisGeNET + trials (single calls) -----------------------------
125
- disg_f = asyncio.create_task(disease_to_genes(query))
126
- trials_f = asyncio.create_task(search_trials(query, max_studies=20))
127
- gene_dis, trials = await asyncio.gather(disg_f, trials_f)
128
-
129
- # 5) Cancer variants (limit first 3 genes for quota) -------------
130
- cbio_tasks = [fetch_cbio(g["symbol"]) for g in enrich["genes"][:3]]
131
- cbio_vars = []
132
- if cbio_tasks:
133
- cbio_vars = await asyncio.gather(*cbio_tasks, return_exceptions=True)
134
- cbio_vars = [v for v in cbio_vars if v]
 
 
135
 
136
- # 6) AI summary ---------------------------------------------------
137
- summarise, _, engine = _llm_router(llm)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
138
  ai_summary = await summarise(corpus) if corpus else ""
139
 
140
- # 7) Return payload ----------------------------------------------
141
  return {
142
  "papers" : papers,
143
  "ai_summary" : ai_summary,
144
- "llm_used" : engine,
145
- "umls" : enrich["umls"],
146
- "drug_safety" : enrich["fda"],
147
- "genes_rich" : enrich["genes"],
148
- "expr_atlas" : enrich["expr"],
149
- "drug_meta" : enrich["drug_meta"],
150
- "chem_info" : enrich["chem_info"],
151
  "gene_disease" : gene_dis,
152
  "clinical_trials" : trials,
153
  "cbio_variants" : cbio_vars,
154
  }
155
 
156
 
 
 
 
157
  async def answer_ai_question(question: str, *,
158
  context: str,
159
  llm: str = _LLM_DEFAULT) -> Dict[str, str]:
160
- """Follow-up QA using selected engine; returns {'answer': str}."""
161
  _, qa_fn, _ = _llm_router(llm)
162
  return {"answer": await qa_fn(question, context=context)}
 
1
  #!/usr/bin/env python3
2
  """
3
+ mcp/orchestrator.pyMedGenesis v5
4
+ ───────────────────────────────────
5
+ Asynchronously fan-outs across >10 open biomedical APIs, then returns
6
+ one consolidated dictionary for the Streamlit UI.
7
+
8
+ Public-key–free by default:
9
+ MyGene.info, Ensembl REST, Open Targets GraphQL
10
+ PubMed (E-utils), arXiv
11
+ • UMLS, openFDA, DisGeNET
12
+ Expression Atlas, ClinicalTrials.gov (+ WHO ICTRP fallback)
13
+ cBioPortal, DrugCentral, PubChem
14
+
15
+ If you add secrets **MYGENE_KEY**, **OT_KEY**, **CBIO_KEY** or
16
+ **NCBI_EUTILS_KEY**, they are auto-detected and used otherwise the code
17
+ runs key-less.
18
+
19
+ Returned payload keys
20
+ ─────────────────────
 
 
 
 
 
 
 
21
  papers, ai_summary, llm_used, umls, drug_safety,
22
  genes_rich, expr_atlas, drug_meta, chem_info,
23
  gene_disease, clinical_trials, cbio_variants
 
30
  # ── Literature ──────────────────────────────────────────────────────
31
  from mcp.arxiv import fetch_arxiv
32
  from mcp.pubmed import fetch_pubmed
 
33
 
34
+ # ── NLP + enrichment ────────────────────────────────────────────────
35
  from mcp.nlp import extract_keywords
36
  from mcp.umls import lookup_umls
37
  from mcp.openfda import fetch_drug_safety
38
  from mcp.disgenet import disease_to_genes
39
  from mcp.clinicaltrials import search_trials
40
 
41
+ # Gene / expression modules
42
+ from mcp.gene_hub import resolve_gene # MyGene → Ensembl → OT
43
  from mcp.atlas import fetch_expression
44
+ from mcp.cbio import fetch_cbio # cancer variants
45
 
46
+ # Drug metadata & chemistry
47
  from mcp.drugcentral_ext import fetch_drugcentral
48
  from mcp.pubchem_ext import fetch_compound
49
 
50
+ # ── Large-language model helpers ────────────────────────────────────
51
  from mcp.openai_utils import ai_summarize, ai_qa
52
  from mcp.gemini import gemini_summarize, gemini_qa
53
 
54
  _LLM_DEFAULT = "openai"
55
 
56
  # ────────────────────────────────────────────────────────────────────
57
+ # LLM router
58
  # ────────────────────────────────────────────────────────────────────
59
  def _llm_router(name: str):
60
+ """Return (summarise_fn, qa_fn, engine_tag)."""
61
  if name.lower() == "gemini":
62
  return gemini_summarize, gemini_qa, "gemini"
63
  return ai_summarize, ai_qa, "openai"
64
 
65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66
  # ────────────────────────────────────────────────────────────────────
67
+ # Main orchestrator
68
  # ────────────────────────────────────────────────────────────────────
69
  async def orchestrate_search(query: str,
70
  llm: str = _LLM_DEFAULT) -> Dict[str, Any]:
71
+ """Run the complete async pipeline; always resolves without raising."""
72
+ # 1 Literature ---------------------------------------------------
73
+ arxiv_f = asyncio.create_task(fetch_arxiv(query, max_results=10))
74
+ pubmed_f = asyncio.create_task(fetch_pubmed(query, max_results=10))
75
 
76
  papers: List[Dict] = []
77
+ for res in await asyncio.gather(arxiv_f, pubmed_f, return_exceptions=True):
78
+ if not isinstance(res, Exception):
79
+ papers.extend(res)
80
 
81
+ # 2 Keyword extraction ------------------------------------------
82
  corpus = " ".join(p.get("summary", "") for p in papers)
83
  keywords = extract_keywords(corpus)[:10]
84
 
85
+ # 3 Parallel enrichment -----------------------------------------
86
+ umls_jobs = [lookup_umls(k) for k in keywords]
87
+ fda_jobs = [fetch_drug_safety(k) for k in keywords]
88
+ gene_jobs = [resolve_gene(k) for k in keywords]
89
+ expr_jobs = [fetch_expression(k) for k in keywords]
90
+ drug_jobs = [fetch_drugcentral(k) for k in keywords]
91
+ chem_jobs = [fetch_compound(k) for k in keywords]
92
+
93
+ umls, fda, genes, exprs, drugs, chems = await asyncio.gather(
94
+ asyncio.gather(*umls_jobs, return_exceptions=True),
95
+ asyncio.gather(*fda_jobs, return_exceptions=True),
96
+ asyncio.gather(*gene_jobs, return_exceptions=True),
97
+ asyncio.gather(*expr_jobs, return_exceptions=True),
98
+ asyncio.gather(*drug_jobs, return_exceptions=True),
99
+ asyncio.gather(*chem_jobs, return_exceptions=True),
100
+ )
101
 
102
+ # filter out errors / empty payloads
103
+ umls = [u for u in umls if isinstance(u, dict)]
104
+ fda = [d for d in fda if d]
105
+ genes = [g for g in genes if g]
106
+ exprs = [e for e in exprs if e]
107
+ drugs = [d for d in drugs if d]
108
+ chems = [c for c in chems if c]
109
+
110
+ # 4 Other single-shot APIs --------------------------------------
111
+ gene_dis = await disease_to_genes(query)
112
+ trials = await search_trials(query, max_studies=20)
113
+
114
+ # Cancer variants for first 3 gene symbols (quota safety)
115
+ cbio_jobs = [fetch_cbio(g.get("symbol", "")) for g in genes[:3]]
116
+ cbio_vars = []
117
+ if cbio_jobs:
118
+ tmp = await asyncio.gather(*cbio_jobs, return_exceptions=True)
119
+ cbio_vars = [v for v in tmp if v]
120
+
121
+ # 5 AI summary ---------------------------------------------------
122
+ summarise, _, engine_tag = _llm_router(llm)
123
  ai_summary = await summarise(corpus) if corpus else ""
124
 
125
+ # 6 Return payload ----------------------------------------------
126
  return {
127
  "papers" : papers,
128
  "ai_summary" : ai_summary,
129
+ "llm_used" : engine_tag,
130
+ "umls" : umls,
131
+ "drug_safety" : fda,
132
+ "genes_rich" : genes,
133
+ "expr_atlas" : exprs,
134
+ "drug_meta" : drugs,
135
+ "chem_info" : chems,
136
  "gene_disease" : gene_dis,
137
  "clinical_trials" : trials,
138
  "cbio_variants" : cbio_vars,
139
  }
140
 
141
 
142
+ # ────────────────────────────────────────────────────────────────────
143
+ # Follow-up question-answer
144
+ # ────────────────────────────────────────────────────────────────────
145
  async def answer_ai_question(question: str, *,
146
  context: str,
147
  llm: str = _LLM_DEFAULT) -> Dict[str, str]:
148
+ """Return {"answer": str} using chosen LLM."""
149
  _, qa_fn, _ = _llm_router(llm)
150
  return {"answer": await qa_fn(question, context=context)}