import argparse import requests import os from tqdm import tqdm from concurrent.futures import ThreadPoolExecutor, as_completed def download_fasta(uniprot_id, outdir, merge_output=False): url = f"https://www.uniprot.org/uniprot/{uniprot_id}.fasta" response = requests.get(url) if not merge_output: out_path = os.path.join(outdir, f"{uniprot_id}.fasta") if os.path.exists(out_path): return uniprot_id, f"{uniprot_id}.fasta already exists, skipping", None if response.status_code != 200: return uniprot_id, f"{uniprot_id}.fasta failed, {response.status_code}", None if merge_output: return uniprot_id, f"{uniprot_id}.fasta successfully downloaded", response.text else: output_file = os.path.join(outdir, f"{uniprot_id}.fasta") with open(output_file, 'w') as file: file.write(response.text) return uniprot_id, f"{uniprot_id}.fasta successfully downloaded", None if __name__ == '__main__': parser = argparse.ArgumentParser(description='Download FASTA files from UniProt.') parser.add_argument('-i', '--uniprot_id', help='Single UniProt ID to download') parser.add_argument('-f', '--file', help='Input file containing UniProt IDs') parser.add_argument('-o', '--out_dir', help='Directory to save FASTA files') parser.add_argument('-n', '--num_workers', type=int, default=12, help='Number of workers to use for downloading') parser.add_argument('-m', '--merge', action='store_true', help='Merge all sequences into a single FASTA file') parser.add_argument('-e', '--error_file', help='File to save failed downloads. If not provided, errors will be printed to console') args = parser.parse_args() if not args.uniprot_id and not args.file: print("Error: Must provide either uniprot_id or file") exit(1) os.makedirs(args.out_dir, exist_ok=True) error_proteins = [] error_messages = [] all_sequences = [] if args.uniprot_id: uid, message, sequence = download_fasta(args.uniprot_id, args.out_dir, args.merge) print(message) if "failed" in message: error_proteins.append(uid) error_messages.append(message) elif args.merge and sequence: all_sequences.append(sequence) elif args.file: uids = open(args.file, 'r').read().splitlines() with ThreadPoolExecutor(max_workers=args.num_workers) as executor: future_to_fasta = {executor.submit(download_fasta, uid, args.out_dir, args.merge): uid for uid in uids} with tqdm(total=len(uids), desc="Downloading Files") as bar: for future in as_completed(future_to_fasta): uid, message, sequence = future.result() bar.set_description(message) if "failed" in message: error_proteins.append(uid) error_messages.append(message) elif args.merge and sequence: all_sequences.append(sequence) bar.update(1) if args.merge and all_sequences: merged_file = os.path.join(args.out_dir, "merged.fasta") with open(merged_file, 'w') as f: f.write(''.join(all_sequences)) if error_proteins and args.error_file: with open(args.error_file, 'w') as f: for protein, message in zip(error_proteins, error_messages): f.write(f"{protein} - {message}\n") elif error_proteins: print("Failed downloads:") for protein, message in zip(error_proteins, error_messages): print(f"{protein} - {message}")