File size: 3,214 Bytes
5c20520
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import os
import datetime

class InterproScan():
    def __init__(self, bash_path):
        self.bash_path = bash_path
    
    def run(self, fasta_file, goterms, pathways, save_dir) -> dict:
        start_time = datetime.datetime.now()
        temp_dir = f"{os.path.dirname(save_dir)}/temp"
        if not os.path.exists(temp_dir):
            os.makedirs(temp_dir)

        seqs = self.read_fasta_to_list(fasta_file)
        seqtype = self.is_protein_sequence(seqs)
        
        # Call the InterproScan
        cmd = f"{self.bash_path} \
                -i {fasta_file} -o {save_dir} -f JSON"
        
        cmd += f" -T {temp_dir}"
        if goterms:
            cmd += " -goterms"
        if pathways:
            cmd += " -pa"
        if seqtype:
            cmd += f" -t p"
        else:
            cmd += f" -t n"
        print(cmd)
        try:
            os.system(cmd)  
            end_time = datetime.datetime.now()
            spend_time = (end_time - start_time).total_seconds()
            if os.listdir(save_dir):
                print(f"InterproScan successfully completed. Output saved to {save_dir[len(self.out_dir)+1:]}.")
                return {"output_dir": save_dir[len(self.out_dir)+1:], "duration": spend_time}
            
            else:
                raise Exception("InterproScan encountered an error. Please check your inputs and options.")
        
        except Exception as e:
            return {"error": str(e)}
        
    def is_protein_sequence(self, sequences):
        sequence = "".join(sequences)
        # ATCG AUCG
        if len(set(sequence.upper())) > 6:
            return True
        else:
            return False
    
    def read_fasta_to_list(self, file_path):
        sequences = []
        current_header = None
        current_seq = []
        
        with open(file_path, 'r') as f:
            for line in f:
                line = line.strip()
                if line.startswith(">"):
                    if current_header is not None: 
                        sequences.append("".join(current_seq))
                    current_header = line[1:]  
                    current_seq = []
                else:
                    current_seq.append(line)
            
            if current_header is not None:
                sequences.append("".join(current_seq))
        
        return sequences


if __name__ == '__main__':
    # Test
    interproscan = InterproScan("interproscan/interproscan-5.75-106.0/interproscan.sh")
    from utils.utils import get_protein_sequence_biopython, tofasta
    import pickle

    uids = []
    seqs = []
    
    with open("/zhangjiawei/interproscan/example/difference_20241122_ec_dict_list.pkl", "rb") as f:
        datas = pickle.load(f)

    for data in datas:
        uids.append(data["uniprot_id"])
        seqs.append(data["sequence"])

    fasta_file = "example/protein_go_clean.fasta"
    
    # seqs = [get_protein_sequence_biopython(uid) for uid in uids]
    
    tofasta(fasta_file, uids, seqs)

    input_args = {
        "fasta_file": fasta_file,
        "goterms": True,
        "pathways": True,
        "save_dir": "output/interproscan"
    }

    interproscan.run(**input_args)