File size: 6,079 Bytes
2689723
e22ad8c
2689723
e22ad8c
2689723
 
 
 
 
 
 
e22ad8c
2689723
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e22ad8c
2689723
 
 
 
 
 
e22ad8c
2689723
 
 
 
 
 
 
 
 
 
 
 
 
 
e22ad8c
2689723
e22ad8c
 
 
2689723
 
e22ad8c
2689723
e22ad8c
2689723
 
e22ad8c
2689723
 
 
 
 
e22ad8c
2689723
e22ad8c
2689723
e22ad8c
2689723
e22ad8c
2689723
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e22ad8c
2689723
 
 
 
 
 
 
 
 
e22ad8c
 
2689723
 
 
 
 
 
e22ad8c
2689723
e22ad8c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2689723
e22ad8c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from __future__ import annotations
import os, json
import httpx
from typing import Any, Dict, List

class ToolBase:
    name: str = "tool"
    description: str = ""
    async def call(self, *args, **kwargs) -> Dict[str, Any]:
        raise NotImplementedError

# β€” Ontology normalization (BioPortal)
class OntologyTool(ToolBase):
    name = "ontology_normalize"
    description = "Normalize biomedical terms via BioPortal; returns concept info (no protocols)."

    def __init__(self, timeout: float = 20.0):
        self.http = httpx.AsyncClient(timeout=timeout)
        self.bioportal_key = os.getenv("BIOPORTAL_API_KEY")

    async def call(self, term: str) -> dict:
        out = {"term": term, "bioportal": None}
        try:
            if self.bioportal_key:
                r = await self.http.get(
                    "https://data.bioontology.org/search",
                    params={"q": term, "pagesize": 5},
                    headers={"Authorization": f"apikey token={self.bioportal_key}"},
                )
                out["bioportal"] = r.json()
        except Exception as e:
            out["bioportal_error"] = str(e)
        return out

# β€” PubMed search (NCBI E-utilities)
class PubMedTool(ToolBase):
    name = "pubmed_search"
    description = "Search PubMed via NCBI; return metadata with citations."

    def __init__(self, timeout: float = 20.0):
        self.http = httpx.AsyncClient(timeout=timeout)
        self.key = os.getenv("NCBI_API_KEY")
        self.email = os.getenv("NCBI_EMAIL")

    async def call(self, query: str) -> dict:
        base = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/"
        try:
            es = await self.http.get(
                base + "esearch.fcgi",
                params={"db":"pubmed","term":query,"retmode":"json","retmax":20,"api_key":self.key,"email":self.email},
            )
            ids = es.json().get("esearchresult", {}).get("idlist", [])
            if not ids:
                return {"query": query, "results": []}
            su = await self.http.get(
                base + "esummary.fcgi",
                params={"db":"pubmed","id":",".join(ids),"retmode":"json","api_key":self.key,"email":self.email},
            )
            recs = su.json().get("result", {})
            items = []
            for pmid in ids:
                r = recs.get(pmid, {})
                items.append({
                    "pmid": pmid,
                    "title": r.get("title"),
                    "journal": r.get("fulljournalname"),
                    "year": (r.get("pubdate") or "")[:4],
                    "authors": [a.get("name") for a in r.get("authors", [])],
                })
            return {"query": query, "results": items}
        except Exception as e:
            return {"query": query, "error": str(e)}

# β€” RCSB structure metadata
class StructureTool(ToolBase):
    name = "structure_info"
    description = "Query RCSB structure metadata (no lab steps)."

    def __init__(self, timeout: float = 20.0):
        self.http = httpx.AsyncClient(timeout=timeout)

    async def call(self, pdb_id: str) -> dict:
        out = {"pdb_id": pdb_id}
        try:
            r = await self.http.get(f"https://data.rcsb.org/rest/v1/core/entry/{pdb_id}")
            r.raise_for_status()
            out["rcsb_core"] = r.json()
        except Exception as e:
            out["error"] = str(e)
        return out

# β€” Crossref DOIs
class CrossrefTool(ToolBase):
    name = "crossref_search"
    description = "Crossref search for DOIs; titles, years, authors."

    def __init__(self, timeout: float = 20.0):
        self.http = httpx.AsyncClient(timeout=timeout)

    async def call(self, query: str) -> dict:
        try:
            r = await self.http.get("https://api.crossref.org/works", params={"query": query, "rows": 10})
            items = r.json().get("message", {}).get("items", [])
            papers = []
            for it in items:
                papers.append({
                    "title": (it.get("title") or [None])[0],
                    "doi": it.get("DOI"),
                    "year": (it.get("issued") or {}).get("date-parts", [[None]])[0][0],
                    "authors": [f"{a.get('given','')} {a.get('family','')}".strip() for a in it.get("author", [])],
                })
            return {"query": query, "results": papers}
        except Exception as e:
            return {"query": query, "error": str(e)}

# β€” HF Inference API Reranker (optional)
class HFRerankTool(ToolBase):
    name = "hf_rerank"
    description = "Rerank documents using a Hugging Face reranker model (API)."

    def __init__(self, model_id: str):
        self.model = model_id
        self.hf_token = os.getenv("HF_TOKEN")
        self.http = httpx.AsyncClient(timeout=30.0)

    async def call(self, query: str, documents: List[str]) -> dict:
        if not self.hf_token:
            return {"error": "HF_TOKEN not set"}
        try:
            # Generic payload; different models may expect different schemas β€” keep robust.
            payload = {"inputs": {"query": query, "texts": documents}}
            r = await self.http.post(
                f"https://api-inference.huggingface.co/models/{self.model}",
                headers={"Authorization": f"Bearer {self.hf_token}"},
                json=payload,
            )
            data = r.json()
            # Try to interpret scores
            scores = []
            if isinstance(data, dict) and "scores" in data:
                scores = data["scores"]
            elif isinstance(data, list) and data and isinstance(data[0], dict) and "score" in data[0]:
                scores = [x.get("score", 0.0) for x in data]
            else:
                # Fallback: equal scores
                scores = [1.0 for _ in documents]
            # Sort indices by score desc
            order = sorted(range(len(documents)), key=lambda i: scores[i], reverse=True)
            return {"order": order, "scores": scores, "raw": data}
        except Exception as e:
            return {"error": str(e)}