protein_rag / interproscan.py
ericzhang1122's picture
Upload folder using huggingface_hub
5c20520 verified
import os
import datetime
class InterproScan():
def __init__(self, bash_path):
self.bash_path = bash_path
def run(self, fasta_file, goterms, pathways, save_dir) -> dict:
start_time = datetime.datetime.now()
temp_dir = f"{os.path.dirname(save_dir)}/temp"
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
seqs = self.read_fasta_to_list(fasta_file)
seqtype = self.is_protein_sequence(seqs)
# Call the InterproScan
cmd = f"{self.bash_path} \
-i {fasta_file} -o {save_dir} -f JSON"
cmd += f" -T {temp_dir}"
if goterms:
cmd += " -goterms"
if pathways:
cmd += " -pa"
if seqtype:
cmd += f" -t p"
else:
cmd += f" -t n"
print(cmd)
try:
os.system(cmd)
end_time = datetime.datetime.now()
spend_time = (end_time - start_time).total_seconds()
if os.listdir(save_dir):
print(f"InterproScan successfully completed. Output saved to {save_dir[len(self.out_dir)+1:]}.")
return {"output_dir": save_dir[len(self.out_dir)+1:], "duration": spend_time}
else:
raise Exception("InterproScan encountered an error. Please check your inputs and options.")
except Exception as e:
return {"error": str(e)}
def is_protein_sequence(self, sequences):
sequence = "".join(sequences)
# ATCG AUCG
if len(set(sequence.upper())) > 6:
return True
else:
return False
def read_fasta_to_list(self, file_path):
sequences = []
current_header = None
current_seq = []
with open(file_path, 'r') as f:
for line in f:
line = line.strip()
if line.startswith(">"):
if current_header is not None:
sequences.append("".join(current_seq))
current_header = line[1:]
current_seq = []
else:
current_seq.append(line)
if current_header is not None:
sequences.append("".join(current_seq))
return sequences
if __name__ == '__main__':
# Test
interproscan = InterproScan("interproscan/interproscan-5.75-106.0/interproscan.sh")
from utils.utils import get_protein_sequence_biopython, tofasta
import pickle
uids = []
seqs = []
with open("/zhangjiawei/interproscan/example/difference_20241122_ec_dict_list.pkl", "rb") as f:
datas = pickle.load(f)
for data in datas:
uids.append(data["uniprot_id"])
seqs.append(data["sequence"])
fasta_file = "example/protein_go_clean.fasta"
# seqs = [get_protein_sequence_biopython(uid) for uid in uids]
tofasta(fasta_file, uids, seqs)
input_args = {
"fasta_file": fasta_file,
"goterms": True,
"pathways": True,
"save_dir": "output/interproscan"
}
interproscan.run(**input_args)