File size: 7,484 Bytes
79899c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import asyncio
import time
from typing import Dict, List
import aiohttp

from config.global_storage import get_model_config
from dto.bio_document import PubMedDocument
from service.pubmed_xml_parse import PubmedXmlParse
from utils.bio_logger import bio_logger as logger

PUBMED_ACCOUNT = [
    {"email": "[email protected]", "api_key": "60eb67add17f39aa588a43e30bb7fce98809"},
    {"email": "[email protected]", "api_key": "fd9bb5b827c95086b9c2d579df20beca2708"},
    {"email": "[email protected]", "api_key": "026586b79437a2b21d1e27d8c3f339230208"},
    {"email": "[email protected]", "api_key": "bca0489d8fe314bfdbb1f7bfe63fb5d76e09"},
]


class PubMedAsyncApi:
    def __init__(self):
        self.pubmed_xml_parse = PubmedXmlParse()
        self.model_config = get_model_config()

    async def pubmed_search_function(
        self, query: str, top_k: int, search_type: str
    ) -> List[PubMedDocument]:

        try:
            start_time = time.time()
            logger.info(
                f'Trying to search PubMed for "{query}", top_k={top_k}, search_type={search_type}'
            )
            id_list = await self.search_database(
                query, db="pubmed", retmax=top_k, search_type=search_type
            )
            articles = await self.fetch_details(
                id_list, db="pubmed", rettype="abstract"
            )

            end_search_pubmed_time = time.time()
            logger.info(
                f'Finished searching PubMed for "{query}", took {end_search_pubmed_time - start_time:.2f} seconds, found {len(articles)} results'
            )

            return [
                PubMedDocument(
                    title=result["title"],
                    abstract=result["abstract"],
                    authors=self.process_authors(result["authors"]),
                    doi=result["doi"],
                    source="pubmed",
                    source_id=result["pmid"],
                    pub_date=result["pub_date"],
                    journal=result["journal"],
                )
                for result in articles
            ]
        except Exception as e:
            logger.error(f"Error searching PubMed query: {query} error: {e}")
            raise e

    def process_authors(self, author_list: List[Dict]) -> str:

        return ", ".join(
            [f"{author['forename']} {author['lastname']}" for author in author_list]
        )

    # 搜索数据库(ESearch)
    async def search_database(
        self, query: str, db: str, retmax: int, search_type: str = "keyword"
    ) -> List[Dict]:
        if search_type not in ["keyword", "advanced"]:
            raise ValueError("search_type must be one of 'keyword' or 'advanced'")

        if search_type == "keyword":
            art_type_list = [
                "Address",
                "Bibliography",
                "Biography",
                "Books and Documents",
                "Clinical Conference",
                "Clinical Study",
                "Collected Works",
                "Comment",
                "Congress",
                "Consensus Development Conference",
                "Consensus Development Conference, NIH",
                "Dictionary",
                "Directory",
                "Duplicate Publication",
                "Editorial",
                "Festschrift",
                "Government Document",
                "Guideline",
                "Interactive Tutorial",
                "Interview",
                "Lecture",
                "Legal Case",
                "Legislation",
                "Letter",
                "News",
                "Newspaper Article",
                "Patient Education Handout",
                "Periodical Index",
                "Personal Narrative",
                "Practice Guideline",
                "Published Erratum",
                "Technical Report",
                "Video-Audio Media",
                "Webcast",
            ]
            art_type = "(" + " OR ".join(f'"{j}"[Filter]' for j in art_type_list) + ")"
            query = "( " + query + ")"
            query += " AND (fha[Filter]) NOT " + art_type

        id_list = await self.esearch(query=query, retmax=retmax)

        if len(id_list) == 0:
            return []

        return id_list

    async def esearch(self, query=None, retmax=10):
        start_time = time.time()
        db = "pubmed"
        server = "esearch"
        random_index = int((time.time() * 1000) % len(PUBMED_ACCOUNT))
        random_pubmed_account = PUBMED_ACCOUNT[random_index]

        api_key = random_pubmed_account["api_key"]
        url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/{server}.fcgi?db={db}&term={query}&retmode=json&api_key={api_key}&sort=relevance&retmax={retmax}"
        response = await self.async_http_get(url=url)

        id_list = response["esearchresult"]["idlist"]
        logger.info(
            f"pubmed_async_http get id_list, search Time taken: {time.time() - start_time}s"
        )

        return id_list

    async def async_http_get(self, url: str):
        async with aiohttp.ClientSession() as session:
            try_time = 1
            while try_time < 4:
                async with session.get(url) as response:
                    if response.status == 200:
                        return await response.json()
                    else:
                        logger.error(
                            f"{url},try_time:{try_time},Error: {response.status}"
                        )
                        try_time += 1
                        # 睡眠0.5秒后重试
                        await asyncio.sleep(0.5)
        raise Exception(f"Failed to fetch data from {url} after 3 attempts")

    async def async_http_get_text(self, url: str, params=None):
        async with aiohttp.ClientSession() as session:
            try_time = 1
            while try_time < 4:
                async with session.get(url, params=params) as response:
                    if response.status == 200:

                        return await response.text()
                    else:
                        logger.error(
                            f"{url},try_time:{try_time},Error: {response.status}"
                        )
                        try_time += 1
                        # 睡眠0.5秒后重试
                        await asyncio.sleep(0.5)
        raise Exception(f"Failed to fetch data from {url} after 3 attempts")

    # 获取详细信息(EFetch)
    async def fetch_details(self, id_list, db="pubmed", rettype="abstract"):
        start_time = time.time()
        try:
            ids = ",".join(id_list)
            server = "efetch"

            random_index = int((time.time() * 1000) % len(PUBMED_ACCOUNT))
            random_pubmed_account = PUBMED_ACCOUNT[random_index]
            api_key = random_pubmed_account["api_key"]
            url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/{server}.fcgi?db={db}&id={ids}&retmode=xml&api_key={api_key}&rettype={rettype}"
            response = await self.async_http_get_text(url=url)
            articles = self.pubmed_xml_parse.parse_pubmed_xml(response)
            logger.info(
                f"pubmed_async_http fetch detail, Time taken: {time.time() - start_time}"
            )
            return articles
        except Exception as e:
            logger.error(f"Error fetching details for id_list: {id_list}, error: {e}")
            # pmid 精准匹配

        return []