|
import os, asyncio, httpx |
|
from functools import lru_cache |
|
|
|
_KEY = os.getenv("NCBI_EUTILS_KEY") |
|
_BASE = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/" |
|
|
|
async def _call(ep: str, params: dict, *, t=8): |
|
if _KEY: params["api_key"] = _KEY |
|
async with httpx.AsyncClient(timeout=t) as c: |
|
r = await c.get(f"{_BASE}{ep}", params=params); r.raise_for_status() |
|
return r.json() if r.headers["content-type"].startswith("application/json") else r.text |
|
|
|
@lru_cache(maxsize=512) |
|
async def pubmed_ids(term: str, n: int = 20): |
|
data = await _call("esearch.fcgi", {"db":"pubmed","term":term,"retmode":"json","retmax":n}) |
|
return data["esearchresult"]["idlist"] |
|
|
|
@lru_cache(maxsize=512) |
|
async def gene_summary(gene_id: str): |
|
data = await _call("esummary.fcgi", {"db":"gene","id":gene_id,"retmode":"json"}) |
|
return list(data["result"].values())[1] |
|
|