mgbam commited on
Commit
96208dc
Β·
verified Β·
1 Parent(s): eb1f007

Update mcp/orchestrator.py

Browse files
Files changed (1) hide show
  1. mcp/orchestrator.py +119 -80
mcp/orchestrator.py CHANGED
@@ -1,99 +1,138 @@
1
- # ──────────────────────── mcp/orchestrator.py ─────────────────────────
2
- """Dual‑LLM orchestrator coordinating literature ↔ annotation ↔ trials.
3
- Adds gene/variant enrichment with MyGene.info β†’ Ensembl β†’ OpenTargets β†’ cBio.
4
  """
5
- import asyncio
6
- from typing import Dict, Any, List
7
-
8
- from mcp.arxiv import fetch_arxiv
9
- from mcp.pubmed import fetch_pubmed
10
- from mcp.nlp import extract_keywords
11
- from mcp.umls import lookup_umls
12
- from mcp.openfda import fetch_drug_safety
13
- from mcp.clinicaltrials import search_trials
14
- from mcp.gene_hub import resolve_gene # MyGene→Ensembl→OT
15
- from mcp.cbio import fetch_cbio_variants
16
-
17
- from mcp.openai_utils import ai_summarize, ai_qa
18
- from mcp.gemini import gemini_summarize, gemini_qa
19
-
20
- _DEF = "openai"
21
-
22
- # ------------ light LLM router ------------
23
 
24
- def _llm_router(llm: str):
25
- if llm.lower() == "gemini":
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
  return gemini_summarize, gemini_qa, "gemini"
27
  return ai_summarize, ai_qa, "openai"
28
 
29
-
30
- # ---------------- gene / variant enrichment --------------------------
31
-
32
- async def _enrich_gene_block(keywords: List[str]) -> Dict[str, Any]:
33
- out: List[Dict] = []
34
- variants: Dict[str, List[Dict]] = {}
35
- for kw in keywords:
36
- g = await resolve_gene(kw)
37
- if g:
38
- out.append(g)
39
- # fetch tumour variants – fire & forget (errors ignored)
40
- try:
41
- variants[kw] = await fetch_cbio_variants(kw)
42
- except Exception:
43
- variants[kw] = []
44
- return {"genes": out, "variants": variants}
45
-
46
-
47
- # ---------------- orchestrator entry‑points --------------------------
48
-
49
- async def orchestrate_search(query: str, llm: str = _DEF) -> Dict[str, Any]:
50
- """Run search, summarise and join annotations for the UI."""
51
- # literature ------------------------------------------------------
52
- arxiv_task = asyncio.create_task(fetch_arxiv(query, max_results=20))
53
- pubmed_task = asyncio.create_task(fetch_pubmed(query, max_results=20))
54
- papers = sum(await asyncio.gather(arxiv_task, pubmed_task), [])
55
-
56
- # NLP keyword extraction -----------------------------------------
57
- blob = " ".join(p.get("summary", "") for p in papers)[:60_000]
58
- keywords = extract_keywords(blob)[:12]
59
-
60
- # enrichment (in parallel) ---------------------------------------
61
- umls_f = [lookup_umls(k) for k in keywords]
62
- fda_f = [fetch_drug_safety(k) for k in keywords]
63
- gene_block = asyncio.create_task(_enrich_gene_block(keywords))
64
- trials_task = asyncio.create_task(search_trials(query, max_studies=20))
65
-
66
- umls, fda, gene_data, trials = await asyncio.gather(
67
- asyncio.gather(*umls_f, return_exceptions=True),
68
- asyncio.gather(*fda_f, return_exceptions=True),
69
- gene_block,
70
- trials_task,
71
  )
 
 
 
 
 
 
72
 
73
- # summarise via LLM ----------------------------------------------
74
- summarise, _, engine_name = _llm_router(llm)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
  try:
76
- summary = await summarise(blob)
77
  except Exception:
78
- summary = "LLM summarisation unavailable." # graceful fallback
79
 
80
  return {
81
- "papers": papers,
82
- "umls": umls,
83
- "drug_safety": fda,
84
- "genes": gene_data["genes"],
85
- "variants": gene_data["variants"],
86
- "clinical_trials": trials,
87
- "ai_summary": summary,
88
- "llm_used": engine_name,
 
89
  }
90
 
91
 
92
- async def answer_ai_question(question: str, *, context: str, llm: str = _DEF) -> Dict[str, str]:
93
- """Follow‑up Q&A via selected LLM."""
 
 
 
94
  _, qa_fn, _ = _llm_router(llm)
95
  try:
96
  answer = await qa_fn(question, context)
97
  except Exception:
98
  answer = "LLM unavailable or quota exceeded."
99
- return {"answer": answer}
 
 
 
 
1
  """
2
+ MedGenesis – multi-API orchestrator
3
+ ──────────────────────────────────
4
+ β€’ Supports OpenAI or Gemini (pass llm="openai" | "gemini")
5
+ β€’ Falls back between redundant data sources whenever possible
6
+ β€’ All network I/O is async & individually time-bounded
7
+ """
 
 
 
 
 
 
 
 
 
 
 
 
8
 
9
+ from __future__ import annotations
10
+ import asyncio, textwrap
11
+ from typing import Any, Dict, List, Tuple
12
+
13
+ # ── 1. Literature helpers ────────────────────────────────────────────
14
+ from mcp.arxiv import fetch_arxiv
15
+ from mcp.pubmed import fetch_pubmed
16
+
17
+ # ── 2. Gene / disease / expression helpers ───────────────────────────
18
+ from mcp.gene_hub import resolve_gene # smart dispatcher
19
+ from mcp.mygene import fetch_gene_info
20
+ from mcp.ensembl import fetch_ensembl
21
+ from mcp.opentargets import fetch_ot # tractability, constraint
22
+ from mcp.cbio import fetch_cbio
23
+
24
+ # ── 3. Safety, trials, concepts ──────────────────────────────────────
25
+ from mcp.openfda import fetch_drug_safety
26
+ from mcp.clinicaltrials import search_trials
27
+ from mcp.umls import lookup_umls
28
+ from mcp.disgenet import disease_to_genes
29
+
30
+ # ── 4. Chem & drug metadata ──────────────────────────────────────────
31
+ from mcp.drugcentral_ext import fetch_drugcentral
32
+ from mcp.pubchem_ext import fetch_compound
33
+
34
+ # ── 5. LLM utils (OpenAI & Gemini) ───────────────────────────────────
35
+ from mcp.openai_utils import ai_summarize, ai_qa
36
+ from mcp.gemini import gemini_summarize, gemini_qa
37
+
38
+ ###############################################################################
39
+ # Internal routing helpers
40
+ ###############################################################################
41
+ _DEFAULT_LLM = "openai"
42
+
43
+ def _llm_router(choice: str) -> Tuple:
44
+ """
45
+ Return (summary_fn, qa_fn, tag) for the requested engine.
46
+ """
47
+ if str(choice).lower() == "gemini":
48
  return gemini_summarize, gemini_qa, "gemini"
49
  return ai_summarize, ai_qa, "openai"
50
 
51
+ ###############################################################################
52
+ # High-level enrichment helpers
53
+ ###############################################################################
54
+ async def _keyword_enrichment(keywords: List[str]) -> Dict[str, Any]:
55
+ """
56
+ Fan-out to UMLS, Drug Safety, and probes gene/Disease APIs in parallel.
57
+ """
58
+ umls_tasks = [lookup_umls(k) for k in keywords]
59
+ fda_tasks = [fetch_drug_safety(k) for k in keywords]
60
+ gene_tasks = [resolve_gene(k) for k in keywords]
61
+
62
+ # gather protects against individual failures
63
+ umls, fda, genes = await asyncio.gather(
64
+ asyncio.gather(*umls_tasks, return_exceptions=True),
65
+ asyncio.gather(*fda_tasks, return_exceptions=True),
66
+ asyncio.gather(*gene_tasks, return_exceptions=True),
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
  )
68
+ # flatten & sanitise
69
+ return {
70
+ "umls" : [u for u in umls if not isinstance(u, Exception)],
71
+ "fda" : [d for d in fda if not isinstance(d, Exception)],
72
+ "genes": [g for g in genes if not isinstance(g, Exception)],
73
+ }
74
 
75
+ ###############################################################################
76
+ # Public orchestration entry-points
77
+ ###############################################################################
78
+ async def orchestrate_search(query: str, *, llm: str=_DEFAULT_LLM,
79
+ max_papers: int = 25,
80
+ max_trials: int = 20) -> Dict[str, Any]:
81
+ """
82
+ Full pipeline:
83
+ 1. Fetch literature (arXiv + PubMed)
84
+ 2. Derive keywords (simple TF filtering)
85
+ 3. Multi-API enrich (UMLS, safety, gene, trials, chem)
86
+ 4. Summarise with LLM
87
+ """
88
+
89
+ # ── 1 literature (parallel) ───────────────────────────────────────
90
+ arxiv_task = asyncio.create_task(fetch_arxiv(query, max_results=max_papers//2))
91
+ pubmed_task = asyncio.create_task(fetch_pubmed(query, max_results=max_papers//2))
92
+ papers = sum(await asyncio.gather(arxiv_task, pubmed_task, return_exceptions=False), [])
93
+
94
+ # ── 2 keywords (top-8 by naive word-freq) ─────────────────────────
95
+ joined = " ".join(p["summary"] for p in papers)
96
+ tokens = [w for w in joined.split() if len(w) > 4]
97
+ freq = {}
98
+ for t in tokens: freq[t] = freq.get(t, 0) + 1
99
+ keywords = sorted(freq, key=freq.get, reverse=True)[:8]
100
+
101
+ # ── 3 enrichment ──────────────────────────────────────────────────
102
+ enrich_task = asyncio.create_task(_keyword_enrichment(keywords))
103
+ trials_task = asyncio.create_task(search_trials(query, max_studies=max_trials))
104
+ gene_dis_gen = asyncio.create_task(disease_to_genes(query)) # coarse disease string
105
+
106
+ enrich, trials, gene_dis = await asyncio.gather(enrich_task, trials_task, gene_dis_gen)
107
+
108
+ # ── 4 LLM summary & return ────────────────────────────────────────
109
+ summarise_fn, _, engine_tag = _llm_router(llm)
110
  try:
111
+ ai_summary = await summarise_fn(joined[:15000])
112
  except Exception:
113
+ ai_summary = "LLM unavailable or quota exceeded."
114
 
115
  return {
116
+ "papers" : papers,
117
+ "keywords" : keywords,
118
+ "umls" : enrich["umls"],
119
+ "drug_safety" : enrich["fda"],
120
+ "genes" : enrich["genes"],
121
+ "gene_disease" : gene_dis,
122
+ "clinical_trials" : trials,
123
+ "ai_summary" : ai_summary,
124
+ "llm_used" : engine_tag,
125
  }
126
 
127
 
128
+ async def answer_ai_question(question: str, *, context: str,
129
+ llm: str=_DEFAULT_LLM) -> Dict[str, str]:
130
+ """
131
+ Follow-up Q-A on demand.
132
+ """
133
  _, qa_fn, _ = _llm_router(llm)
134
  try:
135
  answer = await qa_fn(question, context)
136
  except Exception:
137
  answer = "LLM unavailable or quota exceeded."
138
+ return {"answer": answer}