import os os.system("pip install streamlit pandas xlsxwriter openpyxl pymongo") import streamlit as st import pandas as pd import xlsxwriter from io import BytesIO from collections import defaultdict import hashlib # Optional for Repeats Functionality try: from pymongo import MongoClient client = MongoClient("mongodb+srv://dhruvmangroliya:Eussmh5MbCBIkLJ6@cluster0.rrnbxfw.mongodb.net/BTP_DB?retryWrites=true&w=majority") db = client['BTP_DB'] results_collection = db['protein_results'] except: results_collection = None st.set_page_config(page_title="Protein Tool", layout="wide") st.title("🧬 Protein Analysis Toolkit") app_choice = st.radio("Choose an option", ["🔁 Protein Repeat Finder", "📊 Protein Comparator"]) # ------------------- REPEATS FUNCTIONALITY ------------------- if app_choice == "🔁 Protein Repeat Finder": def is_homo_repeat(s): return all(c == s[0] for c in s) def hash_sequence(sequence): return hashlib.md5(sequence.encode()).hexdigest() @st.cache_data(show_spinner=False) def fragment_protein_sequence(sequence, max_length=1000): return [sequence[i:i+max_length] for i in range(0, len(sequence), max_length)] def find_homorepeats(protein): n = len(protein) freq = defaultdict(int) i = 0 while i < n: curr = protein[i] repeat = "" while i < n and curr == protein[i]: repeat += protein[i] i += 1 if len(repeat) > 1: freq[repeat] += 1 return freq def find_hetero_amino_acid_repeats(sequence): repeat_counts = defaultdict(int) for length in range(2, len(sequence) + 1): for i in range(len(sequence) - length + 1): substring = sequence[i:i+length] repeat_counts[substring] += 1 return {k: v for k, v in repeat_counts.items() if v > 1} def check_boundary_repeats(fragments, final_repeats, overlap=50): for i in range(len(fragments) - 1): left_overlap = fragments[i][-overlap:] right_overlap = fragments[i + 1][:overlap] overlap_region = left_overlap + right_overlap boundary_repeats = find_hetero_amino_acid_repeats(overlap_region) for substring, count in boundary_repeats.items(): if any(aa in left_overlap for aa in substring) and any(aa in right_overlap for aa in substring): final_repeats[substring] += count return final_repeats def find_new_boundary_repeats(fragments, final_repeats, overlap=50): new_repeats = defaultdict(int) for i in range(len(fragments) - 1): left_overlap = fragments[i][-overlap:] right_overlap = fragments[i + 1][:overlap] overlap_region = left_overlap + right_overlap boundary_repeats = find_hetero_amino_acid_repeats(overlap_region) for substring, count in boundary_repeats.items(): if any(aa in left_overlap for aa in substring) and any(aa in right_overlap for aa in substring): if substring not in final_repeats: new_repeats[substring] += count return new_repeats def get_or_process_sequence(sequence, analysis_type, overlap=50): if results_collection is None: return {} hash_input = f"{sequence}_{analysis_type}" sequence_hash = hash_sequence(hash_input) cached = results_collection.find_one({"_id": sequence_hash}) if cached: return cached["repeats"] fragments = fragment_protein_sequence(sequence) final_repeats = defaultdict(int) if analysis_type == "Hetero": for fragment in fragments: fragment_repeats = find_hetero_amino_acid_repeats(fragment) for k, v in fragment_repeats.items(): final_repeats[k] += v final_repeats = check_boundary_repeats(fragments, final_repeats, overlap) new_repeats = find_new_boundary_repeats(fragments, final_repeats, overlap) for k, v in new_repeats.items(): final_repeats[k] += v final_repeats = {k: v for k, v in final_repeats.items() if not is_homo_repeat(k)} elif analysis_type == "Homo": final_repeats = find_homorepeats(sequence) elif analysis_type == "Both": hetero_repeats = defaultdict(int) for fragment in fragments: fragment_repeats = find_hetero_amino_acid_repeats(fragment) for k, v in fragment_repeats.items(): hetero_repeats[k] += v hetero_repeats = check_boundary_repeats(fragments, hetero_repeats, overlap) new_repeats = find_new_boundary_repeats(fragments, hetero_repeats, overlap) for k, v in new_repeats.items(): hetero_repeats[k] += v hetero_repeats = {k: v for k, v in hetero_repeats.items() if not is_homo_repeat(k)} homo_repeats = find_homorepeats(sequence) final_repeats = homo_repeats.copy() for k, v in hetero_repeats.items(): final_repeats[k] += v results_collection.insert_one({ "_id": sequence_hash, "sequence": sequence, "analysis_type": analysis_type, "repeats": dict(final_repeats) }) return final_repeats def process_excel(excel_data, analysis_type): repeats = set() sequence_data = [] count = 0 for sheet_name in excel_data.sheet_names: df = excel_data.parse(sheet_name) if len(df.columns) < 3: st.error(f"Error: Sheet '{sheet_name}' must have at least 3 columns: ID, Name, Sequence.") return None, None for _, row in df.iterrows(): entry_id = str(row[0]) protein_name = str(row[1]) sequence = str(row[2]).replace('"', '').replace(' ', '').strip() if not sequence: continue count += 1 freq = get_or_process_sequence(sequence, analysis_type) sequence_data.append((entry_id, protein_name, freq)) repeats.update(freq.keys()) st.toast(f"{count} sequences processed.") return repeats, sequence_data def create_excel(sequences_data, repeats, filenames): output = BytesIO() workbook = xlsxwriter.Workbook(output, {'in_memory': True}) for file_index, file_data in enumerate(sequences_data): filename = filenames[file_index] worksheet = workbook.add_worksheet(filename[:31]) worksheet.write(0, 0, "Entry") worksheet.write(0, 1, "Protein Name") col = 2 for repeat in sorted(repeats): worksheet.write(0, col, repeat) col += 1 row = 1 for entry_id, protein_name, freq in file_data: worksheet.write(row, 0, entry_id) worksheet.write(row, 1, protein_name) col = 2 for repeat in sorted(repeats): worksheet.write(row, col, freq.get(repeat, 0)) col += 1 row += 1 workbook.close() output.seek(0) return output analysis_type = st.radio("Select analysis type:", ["Homo", "Hetero", "Both"], index=2) uploaded_files = st.file_uploader("Upload Excel files", accept_multiple_files=True, type=["xlsx"]) if 'all_sequences_data' not in st.session_state: st.session_state.all_sequences_data = [] st.session_state.all_repeats = set() st.session_state.filenames = [] st.session_state.excel_file = None if uploaded_files and st.button("Process Files"): st.session_state.all_repeats = set() st.session_state.all_sequences_data = [] st.session_state.filenames = [] for file in uploaded_files: excel_data = pd.ExcelFile(file) repeats, sequence_data = process_excel(excel_data, analysis_type) if repeats is not None: st.session_state.all_repeats.update(repeats) st.session_state.all_sequences_data.append(sequence_data) st.session_state.filenames.append(file.name) if st.session_state.all_sequences_data: st.toast(f"Processed {len(uploaded_files)} file(s) successfully.") st.session_state.excel_file = create_excel( st.session_state.all_sequences_data, st.session_state.all_repeats, st.session_state.filenames ) if st.session_state.excel_file: st.download_button( label="Download Excel file", data=st.session_state.excel_file, file_name="protein_repeat_results.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ) if st.checkbox("Show Results Table"): rows = [] for file_index, file_data in enumerate(st.session_state.all_sequences_data): filename = st.session_state.filenames[file_index] for entry_id, protein_name, freq in file_data: row = {"Filename": filename, "Entry": entry_id, "Protein Name": protein_name} row.update({repeat: freq.get(repeat, 0) for repeat in sorted(st.session_state.all_repeats)}) rows.append(row) result_df = pd.DataFrame(rows) st.dataframe(result_df) # ------------------- COMPARATOR FUNCTIONALITY ------------------- # ------------------- COMPARATOR FUNCTIONALITY ------------------- elif app_choice == "📊 Protein Comparator": st.set_page_config(page_title="Protein Repeat Comparator", layout="centered") st.title("🧬 Protein Repeat Comparator") st.write("Upload two Excel files with protein data. Frequency values should start from the first row (header).") uploaded_file1 = st.file_uploader("Upload First Excel File", type=["xlsx"], key="comp1") uploaded_file2 = st.file_uploader("Upload Second Excel File", type=["xlsx"], key="comp2") if uploaded_file1 and uploaded_file2: try: df1 = pd.read_excel(uploaded_file1, header=0) df2 = pd.read_excel(uploaded_file2, header=0) df1.columns = df1.columns.astype(str) df2.columns = df2.columns.astype(str) id_col = df1.columns[0] name_col = df1.columns[1] repeat_columns = df1.columns[2:] differences = [] for _, row1 in df1.iterrows(): entry_id = row1[id_col] protein_name = row1[name_col] row2_match = df2[(df2[id_col] == entry_id) & (df2[name_col] == protein_name)] if row2_match.empty: continue row2 = row2_match.iloc[0] for repeat_col in repeat_columns: freq1 = row1[repeat_col] freq2 = row2[repeat_col] if pd.isna(freq1) or pd.isna(freq2): continue if freq1 != freq2: if freq1 == 0: pct_change = "Infinity" else: pct_change = ((freq2 - freq1) / freq1) * 100 pct_change = round(pct_change, 2) diff = abs(freq1 - freq2) differences.append({ id_col: entry_id, name_col: protein_name, "Repeat": repeat_col, "Frequency File 1": freq1, "Frequency File 2": freq2, "Difference": diff, "%age Change": pct_change }) if differences: result_df = pd.DataFrame(differences) result_df = result_df.sort_values(by="Difference", ascending=False) # Show DataFrame in Streamlit app st.subheader("🔍 View Changed Repeats") st.dataframe(result_df, use_container_width=True) # Apply styling def color_pct(val): if isinstance(val, str) and val == "Infinity": return 'color: green' elif isinstance(val, (int, float)): if val > 0: return 'color: green' elif val < 0: return 'color: red' return '' styled_df = result_df.style.applymap(color_pct, subset=["%age Change"]) # Save styled output output = BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: styled_df.to_excel(writer, index=False, sheet_name="Changed Repeats") output.seek(0) st.download_button( label="📥 Download Excel File", data=output, file_name="changed_repeats_with_percentage.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ) else: st.info("No changes in repeat frequencies were found.") except Exception as e: st.error(f"⚠ Error: {e}")