Spaces:
Running
Running
File size: 7,484 Bytes
79899c0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
import asyncio
import time
from typing import Dict, List
import aiohttp
from config.global_storage import get_model_config
from dto.bio_document import PubMedDocument
from service.pubmed_xml_parse import PubmedXmlParse
from utils.bio_logger import bio_logger as logger
PUBMED_ACCOUNT = [
{"email": "[email protected]", "api_key": "60eb67add17f39aa588a43e30bb7fce98809"},
{"email": "[email protected]", "api_key": "fd9bb5b827c95086b9c2d579df20beca2708"},
{"email": "[email protected]", "api_key": "026586b79437a2b21d1e27d8c3f339230208"},
{"email": "[email protected]", "api_key": "bca0489d8fe314bfdbb1f7bfe63fb5d76e09"},
]
class PubMedAsyncApi:
def __init__(self):
self.pubmed_xml_parse = PubmedXmlParse()
self.model_config = get_model_config()
async def pubmed_search_function(
self, query: str, top_k: int, search_type: str
) -> List[PubMedDocument]:
try:
start_time = time.time()
logger.info(
f'Trying to search PubMed for "{query}", top_k={top_k}, search_type={search_type}'
)
id_list = await self.search_database(
query, db="pubmed", retmax=top_k, search_type=search_type
)
articles = await self.fetch_details(
id_list, db="pubmed", rettype="abstract"
)
end_search_pubmed_time = time.time()
logger.info(
f'Finished searching PubMed for "{query}", took {end_search_pubmed_time - start_time:.2f} seconds, found {len(articles)} results'
)
return [
PubMedDocument(
title=result["title"],
abstract=result["abstract"],
authors=self.process_authors(result["authors"]),
doi=result["doi"],
source="pubmed",
source_id=result["pmid"],
pub_date=result["pub_date"],
journal=result["journal"],
)
for result in articles
]
except Exception as e:
logger.error(f"Error searching PubMed query: {query} error: {e}")
raise e
def process_authors(self, author_list: List[Dict]) -> str:
return ", ".join(
[f"{author['forename']} {author['lastname']}" for author in author_list]
)
# 搜索数据库(ESearch)
async def search_database(
self, query: str, db: str, retmax: int, search_type: str = "keyword"
) -> List[Dict]:
if search_type not in ["keyword", "advanced"]:
raise ValueError("search_type must be one of 'keyword' or 'advanced'")
if search_type == "keyword":
art_type_list = [
"Address",
"Bibliography",
"Biography",
"Books and Documents",
"Clinical Conference",
"Clinical Study",
"Collected Works",
"Comment",
"Congress",
"Consensus Development Conference",
"Consensus Development Conference, NIH",
"Dictionary",
"Directory",
"Duplicate Publication",
"Editorial",
"Festschrift",
"Government Document",
"Guideline",
"Interactive Tutorial",
"Interview",
"Lecture",
"Legal Case",
"Legislation",
"Letter",
"News",
"Newspaper Article",
"Patient Education Handout",
"Periodical Index",
"Personal Narrative",
"Practice Guideline",
"Published Erratum",
"Technical Report",
"Video-Audio Media",
"Webcast",
]
art_type = "(" + " OR ".join(f'"{j}"[Filter]' for j in art_type_list) + ")"
query = "( " + query + ")"
query += " AND (fha[Filter]) NOT " + art_type
id_list = await self.esearch(query=query, retmax=retmax)
if len(id_list) == 0:
return []
return id_list
async def esearch(self, query=None, retmax=10):
start_time = time.time()
db = "pubmed"
server = "esearch"
random_index = int((time.time() * 1000) % len(PUBMED_ACCOUNT))
random_pubmed_account = PUBMED_ACCOUNT[random_index]
api_key = random_pubmed_account["api_key"]
url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/{server}.fcgi?db={db}&term={query}&retmode=json&api_key={api_key}&sort=relevance&retmax={retmax}"
response = await self.async_http_get(url=url)
id_list = response["esearchresult"]["idlist"]
logger.info(
f"pubmed_async_http get id_list, search Time taken: {time.time() - start_time}s"
)
return id_list
async def async_http_get(self, url: str):
async with aiohttp.ClientSession() as session:
try_time = 1
while try_time < 4:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
logger.error(
f"{url},try_time:{try_time},Error: {response.status}"
)
try_time += 1
# 睡眠0.5秒后重试
await asyncio.sleep(0.5)
raise Exception(f"Failed to fetch data from {url} after 3 attempts")
async def async_http_get_text(self, url: str, params=None):
async with aiohttp.ClientSession() as session:
try_time = 1
while try_time < 4:
async with session.get(url, params=params) as response:
if response.status == 200:
return await response.text()
else:
logger.error(
f"{url},try_time:{try_time},Error: {response.status}"
)
try_time += 1
# 睡眠0.5秒后重试
await asyncio.sleep(0.5)
raise Exception(f"Failed to fetch data from {url} after 3 attempts")
# 获取详细信息(EFetch)
async def fetch_details(self, id_list, db="pubmed", rettype="abstract"):
start_time = time.time()
try:
ids = ",".join(id_list)
server = "efetch"
random_index = int((time.time() * 1000) % len(PUBMED_ACCOUNT))
random_pubmed_account = PUBMED_ACCOUNT[random_index]
api_key = random_pubmed_account["api_key"]
url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/{server}.fcgi?db={db}&id={ids}&retmode=xml&api_key={api_key}&rettype={rettype}"
response = await self.async_http_get_text(url=url)
articles = self.pubmed_xml_parse.parse_pubmed_xml(response)
logger.info(
f"pubmed_async_http fetch detail, Time taken: {time.time() - start_time}"
)
return articles
except Exception as e:
logger.error(f"Error fetching details for id_list: {id_list}, error: {e}")
# pmid 精准匹配
return []
|