mgbam commited on
Commit
edc2450
·
verified ·
1 Parent(s): afa570e

Update mcp/orchestrator.py

Browse files
Files changed (1) hide show
  1. mcp/orchestrator.py +29 -9
mcp/orchestrator.py CHANGED
@@ -4,7 +4,7 @@ from typing import Any, Dict, List, Literal, Union
4
 
5
  from mcp.pubmed import fetch_pubmed
6
  from mcp.arxiv import fetch_arxiv
7
- from mcp.umls import extract_umls_concepts
8
  from mcp.openfda import fetch_drug_safety
9
  from mcp.ncbi import search_gene, get_mesh_definition
10
  from mcp.mygene import fetch_gene_info
@@ -44,7 +44,7 @@ async def _gather_tasks(tasks: List[asyncio.Task]) -> List[Any]:
44
  def _flatten_unique(items: List[Union[List[Any], Any]]) -> List[Any]:
45
  """
46
  Flatten a list of items where elements may be lists or single values,
47
- then deduplicate preserving order.
48
  """
49
  flat: List[Any] = []
50
  for elem in items:
@@ -66,21 +66,36 @@ async def orchestrate_search(
66
  ) -> Dict[str, Any]:
67
  """
68
  Perform a comprehensive biomedical search pipeline with fault tolerance:
 
69
  - Literature (PubMed + arXiv)
70
- - Entity extraction (UMLS)
71
  - Drug safety, gene & variant info, disease-gene mapping
72
  - Clinical trials, cBioPortal data
73
  - AI-driven summary
74
 
75
- Individual fetch functions that fail with an HTTP error will return an empty default,
76
- ensuring the pipeline always completes.
77
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78
  tasks = {
79
  'pubmed': asyncio.create_task(fetch_pubmed(query, max_results=max_papers)),
80
  'arxiv': asyncio.create_task(fetch_arxiv(query, max_results=max_papers)),
81
- 'umls': asyncio.create_task(
82
- asyncio.to_thread(extract_umls_concepts, query)
83
- ),
84
  'drug_safety': asyncio.create_task(_safe_call(fetch_drug_safety, query, default=[])),
85
  'ncbi_gene': asyncio.create_task(_safe_call(search_gene, query, default=[])),
86
  'mygene': asyncio.create_task(_safe_call(fetch_gene_info, query, default=[])),
@@ -92,14 +107,19 @@ async def orchestrate_search(
92
  'disgenet': asyncio.create_task(_safe_call(disease_to_genes, query, default=[])),
93
  }
94
 
 
95
  results = await _gather_tasks(list(tasks.values()))
96
  data = dict(zip(tasks.keys(), results))
 
97
 
 
98
  gene_sources = [data['ncbi_gene'], data['mygene'], data['ensembl'], data['opentargets']]
99
  genes = _flatten_unique(gene_sources)
100
 
 
101
  papers = (data['pubmed'] or []) + (data['arxiv'] or [])
102
 
 
103
  summaries = " ".join(p.get('summary', '') for p in papers)
104
  if llm == 'gemini':
105
  ai_summary = await gemini_summarize(summaries)
@@ -111,7 +131,7 @@ async def orchestrate_search(
111
  return {
112
  'papers': papers,
113
  'genes': genes,
114
- 'umls': data['umls'] or [],
115
  'gene_disease': data['disgenet'] or [],
116
  'mesh_defs': [data['mesh']] if data['mesh'] else [],
117
  'drug_safety': data['drug_safety'] or [],
 
4
 
5
  from mcp.pubmed import fetch_pubmed
6
  from mcp.arxiv import fetch_arxiv
7
+ from mcp.umls import extract_umls_concepts, lookup_umls
8
  from mcp.openfda import fetch_drug_safety
9
  from mcp.ncbi import search_gene, get_mesh_definition
10
  from mcp.mygene import fetch_gene_info
 
44
  def _flatten_unique(items: List[Union[List[Any], Any]]) -> List[Any]:
45
  """
46
  Flatten a list of items where elements may be lists or single values,
47
+ then deduplicate preserving insertion order.
48
  """
49
  flat: List[Any] = []
50
  for elem in items:
 
66
  ) -> Dict[str, Any]:
67
  """
68
  Perform a comprehensive biomedical search pipeline with fault tolerance:
69
+ - Extract UMLS concepts and fetch definitions
70
  - Literature (PubMed + arXiv)
 
71
  - Drug safety, gene & variant info, disease-gene mapping
72
  - Clinical trials, cBioPortal data
73
  - AI-driven summary
74
 
75
+ Returns a dict with structured results ready for UI/graph building.
 
76
  """
77
+ # 1) Extract concepts and perform UMLS lookups
78
+ raw_concepts = await asyncio.to_thread(extract_umls_concepts, query)
79
+ umls_tasks = [
80
+ asyncio.create_task(
81
+ _safe_call(
82
+ lookup_umls,
83
+ term,
84
+ default={
85
+ 'term': term,
86
+ 'cui': None,
87
+ 'name': None,
88
+ 'definition': None,
89
+ },
90
+ )
91
+ )
92
+ for term in raw_concepts
93
+ ]
94
+
95
+ # 2) Launch parallel data-fetch tasks (excluding UMLS)
96
  tasks = {
97
  'pubmed': asyncio.create_task(fetch_pubmed(query, max_results=max_papers)),
98
  'arxiv': asyncio.create_task(fetch_arxiv(query, max_results=max_papers)),
 
 
 
99
  'drug_safety': asyncio.create_task(_safe_call(fetch_drug_safety, query, default=[])),
100
  'ncbi_gene': asyncio.create_task(_safe_call(search_gene, query, default=[])),
101
  'mygene': asyncio.create_task(_safe_call(fetch_gene_info, query, default=[])),
 
107
  'disgenet': asyncio.create_task(_safe_call(disease_to_genes, query, default=[])),
108
  }
109
 
110
+ # 3) Await all tasks
111
  results = await _gather_tasks(list(tasks.values()))
112
  data = dict(zip(tasks.keys(), results))
113
+ umls_results = await asyncio.gather(*umls_tasks)
114
 
115
+ # 4) Consolidate gene sources
116
  gene_sources = [data['ncbi_gene'], data['mygene'], data['ensembl'], data['opentargets']]
117
  genes = _flatten_unique(gene_sources)
118
 
119
+ # 5) Merge literature
120
  papers = (data['pubmed'] or []) + (data['arxiv'] or [])
121
 
122
+ # 6) AI-driven summary
123
  summaries = " ".join(p.get('summary', '') for p in papers)
124
  if llm == 'gemini':
125
  ai_summary = await gemini_summarize(summaries)
 
131
  return {
132
  'papers': papers,
133
  'genes': genes,
134
+ 'umls': umls_results,
135
  'gene_disease': data['disgenet'] or [],
136
  'mesh_defs': [data['mesh']] if data['mesh'] else [],
137
  'drug_safety': data['drug_safety'] or [],