Spaces:
Sleeping
Sleeping
| import os | |
| os.system("pip install streamlit pandas xlsxwriter openpyxl pymongo") | |
| import streamlit as st | |
| import pandas as pd | |
| import xlsxwriter | |
| from io import BytesIO | |
| from collections import defaultdict | |
| from pymongo import MongoClient | |
| import hashlib | |
| # MongoDB setup | |
| client = MongoClient("mongodb+srv://dhruvmangroliya:[email protected]/BTP_DB?retryWrites=true&w=majority") | |
| db = client['BTP_DB'] | |
| results_collection = db['protein_results'] | |
| # Utility | |
| def is_homo_repeat(s): | |
| return all(c == s[0] for c in s) | |
| def hash_sequence(sequence): | |
| return hashlib.md5(sequence.encode()).hexdigest() | |
| def fragment_protein_sequence(sequence, max_length=1000): | |
| return [sequence[i:i+max_length] for i in range(0, len(sequence), max_length)] | |
| def find_homorepeats(protein): | |
| n = len(protein) | |
| freq = defaultdict(int) | |
| i = 0 | |
| while i < n: | |
| curr = protein[i] | |
| repeat = "" | |
| while i < n and curr == protein[i]: | |
| repeat += protein[i] | |
| i += 1 | |
| if len(repeat) > 1: | |
| freq[repeat] += 1 | |
| return freq | |
| def find_hetero_amino_acid_repeats(sequence): | |
| repeat_counts = defaultdict(int) | |
| for length in range(2, len(sequence) + 1): | |
| for i in range(len(sequence) - length + 1): | |
| substring = sequence[i:i+length] | |
| repeat_counts[substring] += 1 | |
| return {k: v for k, v in repeat_counts.items() if v > 1} | |
| def check_boundary_repeats(fragments, final_repeats, overlap=50): | |
| for i in range(len(fragments) - 1): | |
| left_overlap = fragments[i][-overlap:] | |
| right_overlap = fragments[i + 1][:overlap] | |
| overlap_region = left_overlap + right_overlap | |
| boundary_repeats = find_hetero_amino_acid_repeats(overlap_region) | |
| for substring, count in boundary_repeats.items(): | |
| if any(aa in left_overlap for aa in substring) and any(aa in right_overlap for aa in substring): | |
| final_repeats[substring] += count | |
| return final_repeats | |
| def find_new_boundary_repeats(fragments, final_repeats, overlap=50): | |
| new_repeats = defaultdict(int) | |
| for i in range(len(fragments) - 1): | |
| left_overlap = fragments[i][-overlap:] | |
| right_overlap = fragments[i + 1][:overlap] | |
| overlap_region = left_overlap + right_overlap | |
| boundary_repeats = find_hetero_amino_acid_repeats(overlap_region) | |
| for substring, count in boundary_repeats.items(): | |
| if any(aa in left_overlap for aa in substring) and any(aa in right_overlap for aa in substring): | |
| if substring not in final_repeats: | |
| new_repeats[substring] += count | |
| return new_repeats | |
| def get_or_process_sequence(sequence, analysis_type, overlap=50): | |
| sequence_hash = hash_sequence(sequence) | |
| cached = results_collection.find_one({"_id": sequence_hash, "analysis_type": analysis_type}) | |
| if cached: | |
| return cached["repeats"] | |
| fragments = fragment_protein_sequence(sequence) | |
| final_repeats = defaultdict(int) | |
| if analysis_type == "Hetero": | |
| for fragment in fragments: | |
| fragment_repeats = find_hetero_amino_acid_repeats(fragment) | |
| for k, v in fragment_repeats.items(): | |
| final_repeats[k] += v | |
| final_repeats = check_boundary_repeats(fragments, final_repeats, overlap) | |
| new_repeats = find_new_boundary_repeats(fragments, final_repeats, overlap) | |
| for k, v in new_repeats.items(): | |
| final_repeats[k] += v | |
| final_repeats = {k: v for k, v in final_repeats.items() if not is_homo_repeat(k)} | |
| elif analysis_type == "Homo": | |
| final_repeats = find_homorepeats(sequence) | |
| elif analysis_type == "Both": | |
| hetero_repeats = defaultdict(int) | |
| for fragment in fragments: | |
| fragment_repeats = find_hetero_amino_acid_repeats(fragment) | |
| for k, v in fragment_repeats.items(): | |
| hetero_repeats[k] += v | |
| hetero_repeats = check_boundary_repeats(fragments, hetero_repeats, overlap) | |
| new_repeats = find_new_boundary_repeats(fragments, hetero_repeats, overlap) | |
| for k, v in new_repeats.items(): | |
| hetero_repeats[k] += v | |
| hetero_repeats = {k: v for k, v in hetero_repeats.items() if not is_homo_repeat(k)} | |
| homo_repeats = find_homorepeats(sequence) | |
| final_repeats = homo_repeats.copy() | |
| for k, v in hetero_repeats.items(): | |
| final_repeats[k] += v | |
| results_collection.insert_one({ | |
| "_id": sequence_hash, | |
| "analysis_type": analysis_type, | |
| "repeats": dict(final_repeats) | |
| }) | |
| return final_repeats | |
| def process_excel(excel_data, analysis_type): | |
| repeats = set() | |
| sequence_data = [] | |
| count = 0 | |
| for sheet_name in excel_data.sheet_names: | |
| df = excel_data.parse(sheet_name) | |
| if len(df.columns) < 3: | |
| st.error(f"Error: The sheet '{sheet_name}' must have at least three columns: ID, Protein Name, Sequence") | |
| return None, None | |
| for _, row in df.iterrows(): | |
| entry_id = str(row[0]) | |
| protein_name = str(row[1]) | |
| sequence = str(row[2]).replace('"', '').replace(' ', '').strip() | |
| if not sequence: # Skip empty sequence | |
| continue | |
| count += 1 | |
| freq = get_or_process_sequence(sequence, analysis_type) | |
| sequence_data.append((entry_id, protein_name, freq)) | |
| repeats.update(freq.keys()) | |
| st.toast(f"{count} sequences processed.") | |
| return repeats, sequence_data | |
| def create_excel(sequences_data, repeats, filenames): | |
| output = BytesIO() | |
| workbook = xlsxwriter.Workbook(output, {'in_memory': True}) | |
| for file_index, file_data in enumerate(sequences_data): | |
| filename = filenames[file_index] | |
| worksheet = workbook.add_worksheet(filename[:31]) | |
| worksheet.write(0, 0, "Entry") | |
| worksheet.write(0, 1, "Protein Name") | |
| col = 2 | |
| for repeat in sorted(repeats): | |
| worksheet.write(0, col, repeat) | |
| col += 1 | |
| row = 1 | |
| for entry_id, protein_name, freq in file_data: | |
| worksheet.write(row, 0, entry_id) | |
| worksheet.write(row, 1, protein_name) | |
| col = 2 | |
| for repeat in sorted(repeats): | |
| worksheet.write(row, col, freq.get(repeat, 0)) | |
| col += 1 | |
| row += 1 | |
| workbook.close() | |
| output.seek(0) | |
| return output | |
| # Streamlit UI | |
| st.title("Protein Repeat Analysis with Caching") | |
| analysis_type = st.radio("Select analysis type:", ["Homo", "Hetero", "Both"], index=2) | |
| uploaded_files = st.file_uploader("Upload Excel files", accept_multiple_files=True, type=["xlsx"]) | |
| if uploaded_files: | |
| all_repeats = set() | |
| all_sequences_data = [] | |
| filenames = [] | |
| for file in uploaded_files: | |
| excel_data = pd.ExcelFile(file) | |
| repeats, sequence_data = process_excel(excel_data, analysis_type) | |
| if repeats is not None: | |
| all_repeats.update(repeats) | |
| all_sequences_data.append(sequence_data) | |
| filenames.append(file.name) | |
| if all_sequences_data: | |
| st.toast(f"Processed {len(uploaded_files)} file(s) successfully.") | |
| excel_file = create_excel(all_sequences_data, all_repeats, filenames) | |
| st.download_button( | |
| label="Download Excel file", | |
| data=excel_file, | |
| file_name="protein_repeat_results.xlsx", | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | |
| ) | |
| if st.checkbox("Show Results Table"): | |
| rows = [] | |
| for file_index, file_data in enumerate(all_sequences_data): | |
| filename = filenames[file_index] | |
| for entry_id, protein_name, freq in file_data: | |
| row = {"Filename": filename, "Entry": entry_id, "Protein Name": protein_name} | |
| row.update({repeat: freq.get(repeat, 0) for repeat in sorted(all_repeats)}) | |
| rows.append(row) | |
| result_df = pd.DataFrame(rows) | |
| st.dataframe(result_df) |