Spaces:
Runtime error
Runtime error
from Bio import ExPASy | |
from Bio import SeqIO | |
import json | |
from Bio.Blast import NCBIXML | |
def get_protein_sequence_biopython(uniprot_id): | |
""" | |
使用BioPython通过UniProt ID获取蛋白质序列 | |
参数: | |
uniprot_id (str): UniProt ID (如P12345) | |
返回: | |
str: 蛋白质序列或错误信息 | |
""" | |
try: | |
with ExPASy.get_sprot_raw(uniprot_id) as handle: | |
seq_record = SeqIO.read(handle, "swiss") | |
return str(seq_record.seq) | |
except Exception as e: | |
return f"Error: {str(e)}" | |
def extract_interproscan_metrics(file_path, librarys="PFAM"): | |
""" | |
从InterProScan JSON结果中提取蛋白质信息和域信息。 | |
参数: | |
file_path (str): InterProScan JSON结果文件路径 | |
librarys (list): 需要提取的域库列表,默认为["PFAM"] | |
返回: | |
dict: 包含蛋白质序列和对应域信息的字典 | |
""" | |
protein_info = {} | |
with open(file_path, 'r', encoding='utf-8') as file: | |
data = json.load(file) | |
results = data["results"] | |
for protein in results: | |
sequence = protein["sequence"] | |
domain_info = {} | |
for library in librarys: | |
domain_info[library] = [] | |
domain_info["GO"] = [] | |
matches = protein["matches"] | |
for match in matches: | |
if match["signature"]["signatureLibraryRelease"]["library"] in librarys: | |
if match["signature"]["entry"]: | |
domain_info[match["signature"]["signatureLibraryRelease"]["library"]].append({match["signature"]["accession"]: match["signature"]["entry"]["accession"]}) | |
else: | |
domain_info[match["signature"]["signatureLibraryRelease"]["library"]].append({match["signature"]["accession"]: None}) | |
# 处理GO信息 | |
if match["signature"]["entry"]: | |
if match["signature"]["entry"]["goXRefs"]: | |
for goXRef in match["signature"]["entry"]["goXRefs"]: | |
if goXRef["databaseName"] == "GO": | |
domain_info["GO"].append(goXRef["id"]) | |
protein_info[sequence] = domain_info | |
return protein_info | |
def get_seqnid(file_path): | |
seq_dict = {} | |
current_header = None | |
current_seq = [] | |
with open(file_path, 'r') as f: | |
for line in f: | |
line = line.strip() | |
if line.startswith(">"): | |
if current_header is not None: | |
seq_dict[current_header] = "".join(current_seq) | |
current_header = line[1:].split()[0] # Take only the first part before whitespace | |
current_seq = [] | |
else: | |
current_seq.append(line) | |
if current_header is not None: | |
seq_dict[current_header] = "".join(current_seq) | |
return seq_dict | |
def tofasta(fasta_path, uids, seqs): | |
""" | |
Write sequences in FASTA format to a file. | |
Parameters: | |
- fasta_path: str, path to the output FASTA file | |
- uids: list of str, sequence identifiers (headers) | |
- seqs: list of str, corresponding sequences | |
""" | |
if len(uids) != len(seqs): | |
raise ValueError("Length of uids and seqs must be equal") | |
with open(fasta_path, 'w') as f: | |
for uid, seq in zip(uids, seqs): | |
# Write header line starting with '>' followed by the uid | |
f.write(f">{uid}\n") | |
# Write sequence (you may want to split long sequences into multiple lines) | |
f.write(f"{seq}\n") | |
def extract_blast_metrics(xml_file): | |
""" | |
从BLAST XML结果中提取以下指标: | |
- ID (提取UniProt ID) | |
- Identity% (相似度百分比) | |
- Coverage (覆盖率) | |
- E-value | |
- Bit Score | |
- Positive% (相似残基百分比) | |
""" | |
with open(xml_file) as f: | |
blast_records = NCBIXML.parse(f) | |
results = {} | |
for blast_record in blast_records: | |
_results = [] | |
query_length = blast_record.query_length | |
for alignment in blast_record.alignments: | |
for hsp in alignment.hsps: | |
# 提取UniProt ID (格式如 sp|A0A0H2ZM56|ADHE_STRP2) | |
hit_id = alignment.hit_id.split("|")[1] if "|" in alignment.hit_id else alignment.hit_id | |
# 计算关键指标 | |
identity_percent = (hsp.identities / hsp.align_length) * 100 | |
coverage = (hsp.align_length / query_length) * 100 | |
positive_percent = (hsp.positives / hsp.align_length) * 100 | |
# 存储结果 | |
_results.append({ | |
"ID": hit_id, | |
"Identity%": round(identity_percent, 2), | |
"Coverage%": round(coverage, 2), | |
"E-value": f"{hsp.expect:.1e}" if hsp.expect < 0.001 else round(hsp.expect, 4), | |
"Bit Score": round(hsp.bits, 1), | |
"Positive%": round(positive_percent, 2) | |
}) | |
results[blast_record.query] = _results | |
return results | |
def rename_interproscan_keys(interproscan_results): | |
new_results = {} | |
for key, value in interproscan_results.items(): | |
if key == "PFAM": | |
new_results["pfam_id"] = value | |
elif key == "GO": | |
new_results["go_id"] = value | |
else: | |
new_results[key.lower()] = value | |
return new_results |