File size: 5,916 Bytes
eb07418
 
 
 
 
 
 
 
 
15bfd9b
 
 
eb07418
 
 
 
0ba64e8
eb07418
 
0ba64e8
 
eb07418
 
 
0ba64e8
eb07418
0ba64e8
eb07418
 
 
 
 
7224322
0ba64e8
eb07418
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9a51627
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import gradio as gr
import csv
import os
from Bio import Entrez
import xml.etree.ElementTree as ET
import time
import pandas as pd
from datetime import datetime

# 设置NCBI要求的电子邮件和API密钥从环境变量获取
Entrez.email = os.environ.get("EMAIL")
Entrez.api_key = os.environ.get("NCBI_API_KEY")

def search_pubmed(query, max_results=100):
    """在PubMed中搜索并返回文章ID列表"""
    try:
        handle = Entrez.esearch(db="pubmed", term=query, retmax=max_results, usehistory="y")
        record = Entrez.read(handle)
        handle.close()
        # return record["IdList"]
        return record
    except Exception as e:
        return f"Error during search: {str(e)}"

def fetch_details(search_res):
    """获取文章的详细信息"""
    pmids = search_res['IdList']
    if not pmids or isinstance(pmids, str):
        return []
    
    try:
        # 批量获取文章详情
        handle = Entrez.efetch(db="pubmed", rettype="medline", retmode="xml", id=",".join(pmids),
                               webenv=search_res['WebEnv'], query_key=search_res['QueryKey'])
        records = handle.read()
        handle.close()
        
        # 解析XML
        root = ET.fromstring(records)
        articles = []
        
        for article in root.findall(".//PubmedArticle"):
            try:
                # 获取标题
                title = article.find(".//ArticleTitle").text if article.find(".//ArticleTitle") is not None else "N/A"
                
                # 获取作者列表
                authors = article.findall(".//Author")
                author_list = []
                for author in authors:
                    last_name = author.find("LastName").text if author.find("LastName") is not None else ""
                    initials = author.find("Initials").text if author.find("Initials") is not None else ""
                    author_list.append(f"{last_name} {initials}".strip())
                authors_str = "; ".join(author_list) if author_list else "N/A"
                
                # 获取摘要
                abstract = article.find(".//AbstractText")
                abstract_text = abstract.text if abstract is not None else "N/A"
                
                # 获取PMID
                pmid = article.find(".//PMID").text if article.find(".//PMID") is not None else "N/A"
                
                # 获取发表年份
                pub_year = article.find(".//PubDate/Year")
                pub_year = pub_year.text if pub_year is not None else "N/A"
                
                # 获取期刊
                journal = article.find(".//Journal/Title").text if article.find(".//Journal/Title") is not None else "N/A"
                
                articles.append({
                    "PMID": pmid,
                    "Title": title,
                    "Authors": authors_str,
                    "Abstract": abstract_text,
                    "Year": pub_year,
                    "Journal": journal
                })
            except Exception as e:
                print(f"Error processing article with PMID {pmid}: {e}")
                continue
        
        return articles
    except Exception as e:
        return f"Error fetching details: {str(e)}"

def save_to_csv(articles, filename="pubmed_results.csv"):
    """将文章信息保存到CSV文件并返回文件路径"""
    if not articles or isinstance(articles, str):
        return None
    
    headers = ["PMID", "Title", "Authors", "Abstract", "Year", "Journal"]
    with open(filename, "w", newline="", encoding="utf-8") as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=headers)
        writer.writeheader()
        for article in articles:
            writer.writerow(article)
    return filename

def search_and_display(query, max_results):
    """主函数:执行搜索并返回结果和CSV下载链接"""
    if not query:
        return "Please enter a search query.", None, None
    
    try:
        max_results = int(max_results)
        if max_results <= 0:
            return "Max results must be a positive number.", None, None
    except ValueError:
        return "Max results must be a valid number.", None, None

    # 执行搜索
    pmids = search_pubmed(query, max_results)
    if isinstance(pmids, str):
        return pmids, None, None
    
    if not pmids:
        return "No results found.", None, None
    
    # 获取详细信息
    articles = fetch_details(pmids)
    if isinstance(articles, str):
        return articles, None, None
    
    if not articles:
        return "No valid articles retrieved.", None, None
    
    # 转换为DataFrame用于显示
    df = pd.DataFrame(articles)
    
    # 保存CSV文件
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    csv_filename = f"pubmed_results_{timestamp}.csv"
    csv_path = save_to_csv(articles, csv_filename)
    
    return df, csv_path, f"Found {len(articles)} articles."

# Gradio界面
with gr.Blocks() as demo:
    gr.Markdown("# PubMed Search App")
    gr.Markdown("Enter a PubMed search query and the maximum number of results to retrieve. Results will be displayed in a table and available for download as a CSV file.")
    
    with gr.Row():
        query_input = gr.Textbox(label="Search Query", placeholder="e.g., breast cancer AND 2020[PDAT]")
        max_results_input = gr.Number(label="Max Results", value=10, minimum=1, maximum=100)
    
    search_button = gr.Button("Search")
    
    output_text = gr.Textbox(label="Status")
    output_table = gr.DataFrame(label="Search Results")
    output_file = gr.File(label="Download CSV")
    
    search_button.click(
        fn=search_and_display,
        inputs=[query_input, max_results_input],
        outputs=[output_table, output_file, output_text]
    )

# 启动Gradio应用
if __name__ == "__main__":
    demo.launch(mcp_server=True)