Spaces:
Runtime error
Runtime error
File size: 5,578 Bytes
5c20520 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
from Bio import ExPASy
from Bio import SeqIO
import json
from Bio.Blast import NCBIXML
def get_protein_sequence_biopython(uniprot_id):
"""
使用BioPython通过UniProt ID获取蛋白质序列
参数:
uniprot_id (str): UniProt ID (如P12345)
返回:
str: 蛋白质序列或错误信息
"""
try:
with ExPASy.get_sprot_raw(uniprot_id) as handle:
seq_record = SeqIO.read(handle, "swiss")
return str(seq_record.seq)
except Exception as e:
return f"Error: {str(e)}"
def extract_interproscan_metrics(file_path, librarys="PFAM"):
"""
从InterProScan JSON结果中提取蛋白质信息和域信息。
参数:
file_path (str): InterProScan JSON结果文件路径
librarys (list): 需要提取的域库列表,默认为["PFAM"]
返回:
dict: 包含蛋白质序列和对应域信息的字典
"""
protein_info = {}
with open(file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
results = data["results"]
for protein in results:
sequence = protein["sequence"]
domain_info = {}
for library in librarys:
domain_info[library] = []
domain_info["GO"] = []
matches = protein["matches"]
for match in matches:
if match["signature"]["signatureLibraryRelease"]["library"] in librarys:
if match["signature"]["entry"]:
domain_info[match["signature"]["signatureLibraryRelease"]["library"]].append({match["signature"]["accession"]: match["signature"]["entry"]["accession"]})
else:
domain_info[match["signature"]["signatureLibraryRelease"]["library"]].append({match["signature"]["accession"]: None})
# 处理GO信息
if match["signature"]["entry"]:
if match["signature"]["entry"]["goXRefs"]:
for goXRef in match["signature"]["entry"]["goXRefs"]:
if goXRef["databaseName"] == "GO":
domain_info["GO"].append(goXRef["id"])
protein_info[sequence] = domain_info
return protein_info
def get_seqnid(file_path):
seq_dict = {}
current_header = None
current_seq = []
with open(file_path, 'r') as f:
for line in f:
line = line.strip()
if line.startswith(">"):
if current_header is not None:
seq_dict[current_header] = "".join(current_seq)
current_header = line[1:].split()[0] # Take only the first part before whitespace
current_seq = []
else:
current_seq.append(line)
if current_header is not None:
seq_dict[current_header] = "".join(current_seq)
return seq_dict
def tofasta(fasta_path, uids, seqs):
"""
Write sequences in FASTA format to a file.
Parameters:
- fasta_path: str, path to the output FASTA file
- uids: list of str, sequence identifiers (headers)
- seqs: list of str, corresponding sequences
"""
if len(uids) != len(seqs):
raise ValueError("Length of uids and seqs must be equal")
with open(fasta_path, 'w') as f:
for uid, seq in zip(uids, seqs):
# Write header line starting with '>' followed by the uid
f.write(f">{uid}\n")
# Write sequence (you may want to split long sequences into multiple lines)
f.write(f"{seq}\n")
def extract_blast_metrics(xml_file):
"""
从BLAST XML结果中提取以下指标:
- ID (提取UniProt ID)
- Identity% (相似度百分比)
- Coverage (覆盖率)
- E-value
- Bit Score
- Positive% (相似残基百分比)
"""
with open(xml_file) as f:
blast_records = NCBIXML.parse(f)
results = {}
for blast_record in blast_records:
_results = []
query_length = blast_record.query_length
for alignment in blast_record.alignments:
for hsp in alignment.hsps:
# 提取UniProt ID (格式如 sp|A0A0H2ZM56|ADHE_STRP2)
hit_id = alignment.hit_id.split("|")[1] if "|" in alignment.hit_id else alignment.hit_id
# 计算关键指标
identity_percent = (hsp.identities / hsp.align_length) * 100
coverage = (hsp.align_length / query_length) * 100
positive_percent = (hsp.positives / hsp.align_length) * 100
# 存储结果
_results.append({
"ID": hit_id,
"Identity%": round(identity_percent, 2),
"Coverage%": round(coverage, 2),
"E-value": f"{hsp.expect:.1e}" if hsp.expect < 0.001 else round(hsp.expect, 4),
"Bit Score": round(hsp.bits, 1),
"Positive%": round(positive_percent, 2)
})
results[blast_record.query] = _results
return results
def rename_interproscan_keys(interproscan_results):
new_results = {}
for key, value in interproscan_results.items():
if key == "PFAM":
new_results["pfam_id"] = value
elif key == "GO":
new_results["go_id"] = value
else:
new_results[key.lower()] = value
return new_results |