import os import datetime class InterproScan(): def __init__(self, bash_path): self.bash_path = bash_path def run(self, fasta_file, goterms, pathways, save_dir) -> dict: start_time = datetime.datetime.now() temp_dir = f"{os.path.dirname(save_dir)}/temp" if not os.path.exists(temp_dir): os.makedirs(temp_dir) seqs = self.read_fasta_to_list(fasta_file) seqtype = self.is_protein_sequence(seqs) # Call the InterproScan cmd = f"{self.bash_path} \ -i {fasta_file} -o {save_dir} -f JSON" cmd += f" -T {temp_dir}" if goterms: cmd += " -goterms" if pathways: cmd += " -pa" if seqtype: cmd += f" -t p" else: cmd += f" -t n" print(cmd) try: os.system(cmd) end_time = datetime.datetime.now() spend_time = (end_time - start_time).total_seconds() if os.listdir(save_dir): print(f"InterproScan successfully completed. Output saved to {save_dir[len(self.out_dir)+1:]}.") return {"output_dir": save_dir[len(self.out_dir)+1:], "duration": spend_time} else: raise Exception("InterproScan encountered an error. Please check your inputs and options.") except Exception as e: return {"error": str(e)} def is_protein_sequence(self, sequences): sequence = "".join(sequences) # ATCG AUCG if len(set(sequence.upper())) > 6: return True else: return False def read_fasta_to_list(self, file_path): sequences = [] current_header = None current_seq = [] with open(file_path, 'r') as f: for line in f: line = line.strip() if line.startswith(">"): if current_header is not None: sequences.append("".join(current_seq)) current_header = line[1:] current_seq = [] else: current_seq.append(line) if current_header is not None: sequences.append("".join(current_seq)) return sequences if __name__ == '__main__': # Test interproscan = InterproScan("interproscan/interproscan-5.75-106.0/interproscan.sh") from utils.utils import get_protein_sequence_biopython, tofasta import pickle uids = [] seqs = [] with open("/zhangjiawei/interproscan/example/difference_20241122_ec_dict_list.pkl", "rb") as f: datas = pickle.load(f) for data in datas: uids.append(data["uniprot_id"]) seqs.append(data["sequence"]) fasta_file = "example/protein_go_clean.fasta" # seqs = [get_protein_sequence_biopython(uid) for uid in uids] tofasta(fasta_file, uids, seqs) input_args = { "fasta_file": fasta_file, "goterms": True, "pathways": True, "save_dir": "output/interproscan" } interproscan.run(**input_args)