File size: 5,578 Bytes
5c20520
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
from Bio import ExPASy
from Bio import SeqIO
import json
from Bio.Blast import NCBIXML

def get_protein_sequence_biopython(uniprot_id):
    """
    使用BioPython通过UniProt ID获取蛋白质序列
    
    参数:
        uniprot_id (str): UniProt ID (如P12345)
    
    返回:
        str: 蛋白质序列或错误信息
    """
    try:
        with ExPASy.get_sprot_raw(uniprot_id) as handle:
            seq_record = SeqIO.read(handle, "swiss")
            return str(seq_record.seq)
    except Exception as e:
        return f"Error: {str(e)}"


def extract_interproscan_metrics(file_path, librarys="PFAM"):
    """
    从InterProScan JSON结果中提取蛋白质信息和域信息。    
    参数:
        file_path (str): InterProScan JSON结果文件路径
        librarys (list): 需要提取的域库列表,默认为["PFAM"]
    返回:
        dict: 包含蛋白质序列和对应域信息的字典      
    """
    protein_info = {}
    with open(file_path, 'r', encoding='utf-8') as file:
        data = json.load(file)
    results = data["results"]

    for protein in results:
        sequence = protein["sequence"]
        domain_info = {}
        for library in librarys:
            domain_info[library] = []
        domain_info["GO"] = []

        matches = protein["matches"]
        for match in matches:
            if match["signature"]["signatureLibraryRelease"]["library"] in librarys:
                if match["signature"]["entry"]:
                    domain_info[match["signature"]["signatureLibraryRelease"]["library"]].append({match["signature"]["accession"]: match["signature"]["entry"]["accession"]})
                else:
                    domain_info[match["signature"]["signatureLibraryRelease"]["library"]].append({match["signature"]["accession"]: None})
            
            # 处理GO信息
            if match["signature"]["entry"]:
                if match["signature"]["entry"]["goXRefs"]:
                    for goXRef in match["signature"]["entry"]["goXRefs"]:
                        if goXRef["databaseName"] == "GO":
                            domain_info["GO"].append(goXRef["id"])

        protein_info[sequence] = domain_info

    return protein_info


def get_seqnid(file_path):
    seq_dict = {}
    current_header = None
    current_seq = []
    
    with open(file_path, 'r') as f:
        for line in f:
            line = line.strip()
            if line.startswith(">"):
                if current_header is not None: 
                    seq_dict[current_header] = "".join(current_seq)
                current_header = line[1:].split()[0]  # Take only the first part before whitespace
                current_seq = []
            else:
                current_seq.append(line)
        
        if current_header is not None:
            seq_dict[current_header] = "".join(current_seq)
    
    return seq_dict


def tofasta(fasta_path, uids, seqs):
    """
    Write sequences in FASTA format to a file.
    
    Parameters:
    - fasta_path: str, path to the output FASTA file
    - uids: list of str, sequence identifiers (headers)
    - seqs: list of str, corresponding sequences
    """
    if len(uids) != len(seqs):
        raise ValueError("Length of uids and seqs must be equal")
    
    with open(fasta_path, 'w') as f:
        for uid, seq in zip(uids, seqs):
            # Write header line starting with '>' followed by the uid
            f.write(f">{uid}\n")
            # Write sequence (you may want to split long sequences into multiple lines)
            f.write(f"{seq}\n")


def extract_blast_metrics(xml_file):
    """
    从BLAST XML结果中提取以下指标:
    - ID (提取UniProt ID)
    - Identity% (相似度百分比)
    - Coverage (覆盖率)
    - E-value
    - Bit Score
    - Positive% (相似残基百分比)
    """
    with open(xml_file) as f:
        blast_records = NCBIXML.parse(f)
        results = {}
        
        for blast_record in blast_records:
            _results = []
            query_length = blast_record.query_length
            
            for alignment in blast_record.alignments:
                for hsp in alignment.hsps:
                    # 提取UniProt ID (格式如 sp|A0A0H2ZM56|ADHE_STRP2)
                    hit_id = alignment.hit_id.split("|")[1] if "|" in alignment.hit_id else alignment.hit_id
                    
                    # 计算关键指标
                    identity_percent = (hsp.identities / hsp.align_length) * 100
                    coverage = (hsp.align_length / query_length) * 100
                    positive_percent = (hsp.positives / hsp.align_length) * 100
                    
                    # 存储结果
                    _results.append({
                        "ID": hit_id,
                        "Identity%": round(identity_percent, 2),
                        "Coverage%": round(coverage, 2),
                        "E-value": f"{hsp.expect:.1e}" if hsp.expect < 0.001 else round(hsp.expect, 4),
                        "Bit Score": round(hsp.bits, 1),
                        "Positive%": round(positive_percent, 2)
                    })
            results[blast_record.query] = _results
        return results


def rename_interproscan_keys(interproscan_results):
    new_results = {}
    for key, value in interproscan_results.items():
        if key == "PFAM":
            new_results["pfam_id"] = value
        elif key == "GO":
            new_results["go_id"] = value
        else:
            new_results[key.lower()] = value
        
    return new_results