mgbam commited on
Commit
08c0325
Β·
verified Β·
1 Parent(s): 2a448c1

Update mcp/orchestrator.py

Browse files
Files changed (1) hide show
  1. mcp/orchestrator.py +84 -135
mcp/orchestrator.py CHANGED
@@ -1,150 +1,99 @@
1
- #!/usr/bin/env python3
 
 
2
  """
3
- mcp/orchestrator.py β€” MedGenesis v5
4
- ───────────────────────────────────
5
- Asynchronously fan-outs across >10 open biomedical APIs, then returns
6
- one consolidated dictionary for the Streamlit UI.
7
-
8
- Public-key–free by default:
9
- β€’ MyGene.info, Ensembl REST, Open Targets GraphQL
10
- β€’ PubMed (E-utils), arXiv
11
- β€’ UMLS, openFDA, DisGeNET
12
- β€’ Expression Atlas, ClinicalTrials.gov (+ WHO ICTRP fallback)
13
- β€’ cBioPortal, DrugCentral, PubChem
14
-
15
- If you add secrets **MYGENE_KEY**, **OT_KEY**, **CBIO_KEY** or
16
- **NCBI_EUTILS_KEY**, they are auto-detected and used β€” otherwise the code
17
- runs key-less.
18
-
19
- Returned payload keys
20
- ─────────────────────
21
- papers, ai_summary, llm_used, umls, drug_safety,
22
- genes_rich, expr_atlas, drug_meta, chem_info,
23
- gene_disease, clinical_trials, cbio_variants
24
- """
25
-
26
- from __future__ import annotations
27
  import asyncio
28
  from typing import Dict, Any, List
29
 
30
- # ── Literature ──────────────────────────────────────────────────────
31
- from mcp.arxiv import fetch_arxiv
32
- from mcp.pubmed import fetch_pubmed
33
-
34
- # ── NLP + enrichment ────────────────────────────────────────────────
35
- from mcp.nlp import extract_keywords
36
- from mcp.umls import lookup_umls
37
- from mcp.openfda import fetch_drug_safety
38
- from mcp.disgenet import disease_to_genes
39
- from mcp.clinicaltrials import search_trials
40
-
41
- # Gene / expression modules
42
- from mcp.gene_hub import resolve_gene # MyGene β†’ Ensembl β†’ OT
43
- from mcp.atlas import fetch_expression
44
- from mcp.cbio import fetch_cbio # cancer variants
45
-
46
- # Drug metadata & chemistry
47
- from mcp.drugcentral_ext import fetch_drugcentral
48
- from mcp.pubchem_ext import fetch_compound
49
-
50
- # ── Large-language model helpers ────────────────────────────────────
51
- from mcp.openai_utils import ai_summarize, ai_qa
52
- from mcp.gemini import gemini_summarize, gemini_qa
53
-
54
- _LLM_DEFAULT = "openai"
55
-
56
- # ────────────────────────────────────────────────────────────────────
57
- # LLM router
58
- # ────────────────────────────────────────────────────────────────────
59
- def _llm_router(name: str):
60
- """Return (summarise_fn, qa_fn, engine_tag)."""
61
- if name.lower() == "gemini":
62
  return gemini_summarize, gemini_qa, "gemini"
63
  return ai_summarize, ai_qa, "openai"
64
 
65
 
66
- # ────────────────────────────────────────────────────────────────────
67
- # Main orchestrator
68
- # ────────────────────────────────────────────────────────────────────
69
- async def orchestrate_search(query: str,
70
- llm: str = _LLM_DEFAULT) -> Dict[str, Any]:
71
- """Run the complete async pipeline; always resolves without raising."""
72
- # 1 Literature ---------------------------------------------------
73
- arxiv_f = asyncio.create_task(fetch_arxiv(query, max_results=10))
74
- pubmed_f = asyncio.create_task(fetch_pubmed(query, max_results=10))
75
-
76
- papers: List[Dict] = []
77
- for res in await asyncio.gather(arxiv_f, pubmed_f, return_exceptions=True):
78
- if not isinstance(res, Exception):
79
- papers.extend(res)
80
-
81
- # 2 Keyword extraction ------------------------------------------
82
- corpus = " ".join(p.get("summary", "") for p in papers)
83
- keywords = extract_keywords(corpus)[:10]
84
-
85
- # 3 Parallel enrichment -----------------------------------------
86
- umls_jobs = [lookup_umls(k) for k in keywords]
87
- fda_jobs = [fetch_drug_safety(k) for k in keywords]
88
- gene_jobs = [resolve_gene(k) for k in keywords]
89
- expr_jobs = [fetch_expression(k) for k in keywords]
90
- drug_jobs = [fetch_drugcentral(k) for k in keywords]
91
- chem_jobs = [fetch_compound(k) for k in keywords]
92
-
93
- umls, fda, genes, exprs, drugs, chems = await asyncio.gather(
94
- asyncio.gather(*umls_jobs, return_exceptions=True),
95
- asyncio.gather(*fda_jobs, return_exceptions=True),
96
- asyncio.gather(*gene_jobs, return_exceptions=True),
97
- asyncio.gather(*expr_jobs, return_exceptions=True),
98
- asyncio.gather(*drug_jobs, return_exceptions=True),
99
- asyncio.gather(*chem_jobs, return_exceptions=True),
 
 
 
 
 
 
 
100
  )
101
 
102
- # filter out errors / empty payloads
103
- umls = [u for u in umls if isinstance(u, dict)]
104
- fda = [d for d in fda if d]
105
- genes = [g for g in genes if g]
106
- exprs = [e for e in exprs if e]
107
- drugs = [d for d in drugs if d]
108
- chems = [c for c in chems if c]
109
-
110
- # 4 Other single-shot APIs --------------------------------------
111
- gene_dis = await disease_to_genes(query)
112
- trials = await search_trials(query, max_studies=20)
113
-
114
- # Cancer variants for first 3 gene symbols (quota safety)
115
- cbio_jobs = [fetch_cbio(g.get("symbol", "")) for g in genes[:3]]
116
- cbio_vars = []
117
- if cbio_jobs:
118
- tmp = await asyncio.gather(*cbio_jobs, return_exceptions=True)
119
- cbio_vars = [v for v in tmp if v]
120
-
121
- # 5 AI summary ---------------------------------------------------
122
- summarise, _, engine_tag = _llm_router(llm)
123
- ai_summary = await summarise(corpus) if corpus else ""
124
-
125
- # 6 Return payload ----------------------------------------------
126
  return {
127
- "papers" : papers,
128
- "ai_summary" : ai_summary,
129
- "llm_used" : engine_tag,
130
- "umls" : umls,
131
- "drug_safety" : fda,
132
- "genes_rich" : genes,
133
- "expr_atlas" : exprs,
134
- "drug_meta" : drugs,
135
- "chem_info" : chems,
136
- "gene_disease" : gene_dis,
137
- "clinical_trials" : trials,
138
- "cbio_variants" : cbio_vars,
139
  }
140
 
141
 
142
- # ────────────────────────────────────────────────────────────────────
143
- # Follow-up question-answer
144
- # ────────────────────────────────────────────────────────────────────
145
- async def answer_ai_question(question: str, *,
146
- context: str,
147
- llm: str = _LLM_DEFAULT) -> Dict[str, str]:
148
- """Return {"answer": str} using chosen LLM."""
149
  _, qa_fn, _ = _llm_router(llm)
150
- return {"answer": await qa_fn(question, context=context)}
 
 
 
 
 
1
+ # ──────────────────────── mcp/orchestrator.py ─────────────────────────
2
+ """Dual‑LLM orchestrator coordinating literature ↔ annotation ↔ trials.
3
+ Adds gene/variant enrichment with MyGene.info β†’ Ensembl β†’ OpenTargets β†’ cBio.
4
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5
  import asyncio
6
  from typing import Dict, Any, List
7
 
8
+ from mcp.arxiv import fetch_arxiv
9
+ from mcp.pubmed import fetch_pubmed
10
+ from mcp.nlp import extract_keywords
11
+ from mcp.umls import lookup_umls
12
+ from mcp.openfda import fetch_drug_safety
13
+ from mcp.clinicaltrials import search_trials
14
+ from mcp.gene_hub import resolve_gene # MyGene→Ensembl→OT
15
+ from mcp.cbio import fetch_cbio_variants
16
+
17
+ from mcp.openai_utils import ai_summarize, ai_qa
18
+ from mcp.gemini import gemini_summarize, gemini_qa
19
+
20
+ _DEF = "openai"
21
+
22
+ # ------------ light LLM router ------------
23
+
24
+ def _llm_router(llm: str):
25
+ if llm.lower() == "gemini":
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
  return gemini_summarize, gemini_qa, "gemini"
27
  return ai_summarize, ai_qa, "openai"
28
 
29
 
30
+ # ---------------- gene / variant enrichment --------------------------
31
+
32
+ async def _enrich_gene_block(keywords: List[str]) -> Dict[str, Any]:
33
+ out: List[Dict] = []
34
+ variants: Dict[str, List[Dict]] = {}
35
+ for kw in keywords:
36
+ g = await resolve_gene(kw)
37
+ if g:
38
+ out.append(g)
39
+ # fetch tumour variants – fire & forget (errors ignored)
40
+ try:
41
+ variants[kw] = await fetch_cbio_variants(kw)
42
+ except Exception:
43
+ variants[kw] = []
44
+ return {"genes": out, "variants": variants}
45
+
46
+
47
+ # ---------------- orchestrator entry‑points --------------------------
48
+
49
+ async def orchestrate_search(query: str, llm: str = _DEF) -> Dict[str, Any]:
50
+ """Run search, summarise and join annotations for the UI."""
51
+ # literature ------------------------------------------------------
52
+ arxiv_task = asyncio.create_task(fetch_arxiv(query, max_results=20))
53
+ pubmed_task = asyncio.create_task(fetch_pubmed(query, max_results=20))
54
+ papers = sum(await asyncio.gather(arxiv_task, pubmed_task), [])
55
+
56
+ # NLP keyword extraction -----------------------------------------
57
+ blob = " ".join(p.get("summary", "") for p in papers)[:60_000]
58
+ keywords = extract_keywords(blob)[:12]
59
+
60
+ # enrichment (in parallel) ---------------------------------------
61
+ umls_f = [lookup_umls(k) for k in keywords]
62
+ fda_f = [fetch_drug_safety(k) for k in keywords]
63
+ gene_block = asyncio.create_task(_enrich_gene_block(keywords))
64
+ trials_task = asyncio.create_task(search_trials(query, max_studies=20))
65
+
66
+ umls, fda, gene_data, trials = await asyncio.gather(
67
+ asyncio.gather(*umls_f, return_exceptions=True),
68
+ asyncio.gather(*fda_f, return_exceptions=True),
69
+ gene_block,
70
+ trials_task,
71
  )
72
 
73
+ # summarise via LLM ----------------------------------------------
74
+ summarise, _, engine_name = _llm_router(llm)
75
+ try:
76
+ summary = await summarise(blob)
77
+ except Exception:
78
+ summary = "LLM summarisation unavailable." # graceful fallback
79
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
80
  return {
81
+ "papers": papers,
82
+ "umls": umls,
83
+ "drug_safety": fda,
84
+ "genes": gene_data["genes"],
85
+ "variants": gene_data["variants"],
86
+ "clinical_trials": trials,
87
+ "ai_summary": summary,
88
+ "llm_used": engine_name,
 
 
 
 
89
  }
90
 
91
 
92
+ async def answer_ai_question(question: str, *, context: str, llm: str = _DEF) -> Dict[str, str]:
93
+ """Follow‑up Q&A via selected LLM."""
 
 
 
 
 
94
  _, qa_fn, _ = _llm_router(llm)
95
+ try:
96
+ answer = await qa_fn(question, context)
97
+ except Exception:
98
+ answer = "LLM unavailable or quota exceeded."
99
+ return {"answer": answer}