Spaces:
Sleeping
Sleeping
| import os | |
| os.system("pip install streamlit pandas xlsxwriter openpyxl") | |
| import streamlit as st | |
| import pandas as pd | |
| import xlsxwriter | |
| from io import BytesIO | |
| from collections import defaultdict | |
| import hashlib | |
| import sqlite3 | |
| import base64 | |
| # Initialize DB | |
| def init_db(): | |
| conn = sqlite3.connect("file_cache.db") | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS file_cache ( | |
| file_hash TEXT PRIMARY KEY, | |
| file_name TEXT, | |
| analysis_type TEXT, | |
| result BLOB | |
| ) | |
| ''') | |
| conn.commit() | |
| conn.close() | |
| init_db() | |
| # Hashing function | |
| def get_file_hash(file): | |
| return hashlib.sha256(file.read()).hexdigest() | |
| # Check if file hash exists in DB | |
| def check_cache(file_hash, analysis_type): | |
| conn = sqlite3.connect("file_cache.db") | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT result FROM file_cache WHERE file_hash = ? AND analysis_type = ?", (file_hash, analysis_type)) | |
| row = cursor.fetchone() | |
| conn.close() | |
| if row: | |
| return BytesIO(base64.b64decode(row[0])) | |
| return None | |
| # Store result in DB | |
| def cache_result(file_hash, file_name, analysis_type, result_bytes): | |
| conn = sqlite3.connect("file_cache.db") | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "INSERT OR REPLACE INTO file_cache (file_hash, file_name, analysis_type, result) VALUES (?, ?, ?, ?)", | |
| (file_hash, file_name, analysis_type, base64.b64encode(result_bytes.read()).decode('utf-8')) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| # === Protein Analysis Logic === | |
| def is_homo_repeat(s): | |
| return all(c == s[0] for c in s) | |
| def find_homorepeats(protein): | |
| n = len(protein) | |
| freq = defaultdict(int) | |
| i = 0 | |
| while i < n: | |
| curr = protein[i] | |
| repeat = "" | |
| while i < n and curr == protein[i]: | |
| repeat += protein[i] | |
| i += 1 | |
| if len(repeat) > 1: | |
| freq[repeat] += 1 | |
| return freq | |
| def find_hetero_amino_acid_repeats(sequence): | |
| repeat_counts = defaultdict(int) | |
| for length in range(2, len(sequence) + 1): | |
| for i in range(len(sequence) - length + 1): | |
| substring = sequence[i:i+length] | |
| repeat_counts[substring] += 1 | |
| return {k: v for k, v in repeat_counts.items() if v > 1} | |
| def fragment_protein_sequence(sequence, max_length=1000): | |
| return [sequence[i:i+max_length] for i in range(0, len(sequence), max_length)] | |
| def check_boundary_repeats(fragments, final_repeats, overlap=50): | |
| for i in range(len(fragments) - 1): | |
| left_overlap = fragments[i][-overlap:] | |
| right_overlap = fragments[i + 1][:overlap] | |
| overlap_region = left_overlap + right_overlap | |
| boundary_repeats = find_hetero_amino_acid_repeats(overlap_region) | |
| for substring, count in boundary_repeats.items(): | |
| if any(aa in left_overlap for aa in substring) and any(aa in right_overlap for aa in substring): | |
| final_repeats[substring] += count | |
| return final_repeats | |
| def find_new_boundary_repeats(fragments, final_repeats, overlap=50): | |
| new_repeats = defaultdict(int) | |
| for i in range(len(fragments) - 1): | |
| left_overlap = fragments[i][-overlap:] | |
| right_overlap = fragments[i + 1][:overlap] | |
| overlap_region = left_overlap + right_overlap | |
| boundary_repeats = find_hetero_amino_acid_repeats(overlap_region) | |
| for substring, count in boundary_repeats.items(): | |
| if any(aa in left_overlap for aa in substring) and any(aa in right_overlap for aa in substring): | |
| if substring not in final_repeats: | |
| new_repeats[substring] += count | |
| return new_repeats | |
| def process_protein_sequence(sequence, analysis_type, overlap=50): | |
| fragments = fragment_protein_sequence(sequence) | |
| final_repeats = defaultdict(int) | |
| if analysis_type == "Hetero": | |
| for fragment in fragments: | |
| fragment_repeats = find_hetero_amino_acid_repeats(fragment) | |
| for k, v in fragment_repeats.items(): | |
| final_repeats[k] += v | |
| final_repeats = check_boundary_repeats(fragments, final_repeats, overlap) | |
| new_repeats = find_new_boundary_repeats(fragments, final_repeats, overlap) | |
| for k, v in new_repeats.items(): | |
| final_repeats[k] += v | |
| final_repeats = {k: v for k, v in final_repeats.items() if not is_homo_repeat(k)} | |
| elif analysis_type == "Homo": | |
| final_repeats = find_homorepeats(sequence) | |
| elif analysis_type == "Both": | |
| hetero_repeats = defaultdict(int) | |
| for fragment in fragments: | |
| fragment_repeats = find_hetero_amino_acid_repeats(fragment) | |
| for k, v in fragment_repeats.items(): | |
| hetero_repeats[k] += v | |
| hetero_repeats = check_boundary_repeats(fragments, hetero_repeats) | |
| new_repeats = find_new_boundary_repeats(fragments, hetero_repeats) | |
| for k, v in new_repeats.items(): | |
| hetero_repeats[k] += v | |
| hetero_repeats = {k: v for k, v in hetero_repeats.items() if not is_homo_repeat(k)} | |
| homo_repeats = find_homorepeats(sequence) | |
| final_repeats = homo_repeats.copy() | |
| for k, v in hetero_repeats.items(): | |
| final_repeats[k] += v | |
| return final_repeats | |
| def process_excel(excel_data, analysis_type): | |
| repeats = set() | |
| sequence_data = [] | |
| for sheet_name in excel_data.sheet_names: | |
| df = excel_data.parse(sheet_name) | |
| if len(df.columns) < 3: | |
| st.error(f"Error: The sheet '{sheet_name}' must have at least three columns: ID, Protein Name, Sequence") | |
| return None, None | |
| for _, row in df.iterrows(): | |
| entry_id = str(row[0]) | |
| protein_name = str(row[1]) | |
| sequence = str(row[2]).replace('"', '').replace(' ', '') | |
| freq = process_protein_sequence(sequence, analysis_type) | |
| sequence_data.append((entry_id, protein_name, freq)) | |
| repeats.update(freq.keys()) | |
| return repeats, sequence_data | |
| def create_excel(sequences_data, repeats, filenames): | |
| output = BytesIO() | |
| workbook = xlsxwriter.Workbook(output, {'in_memory': True}) | |
| for file_index, file_data in enumerate(sequences_data): | |
| filename = filenames[file_index] | |
| worksheet = workbook.add_worksheet(filename[:31]) | |
| worksheet.write(0, 0, "Entry ID") | |
| worksheet.write(0, 1, "Protein Name") | |
| col = 2 | |
| for repeat in sorted(repeats): | |
| worksheet.write(0, col, repeat) | |
| col += 1 | |
| row = 1 | |
| for entry_id, protein_name, freq in file_data: | |
| worksheet.write(row, 0, entry_id) | |
| worksheet.write(row, 1, protein_name) | |
| col = 2 | |
| for repeat in sorted(repeats): | |
| worksheet.write(row, col, freq.get(repeat, 0)) | |
| col += 1 | |
| row += 1 | |
| workbook.close() | |
| output.seek(0) | |
| return output | |
| # === Streamlit UI === | |
| st.title("Protein Repeat Analysis with Caching") | |
| analysis_type = st.radio("Select analysis type:", ["Homo", "Hetero", "Both"], index=2) | |
| uploaded_files = st.file_uploader("Upload Excel files", accept_multiple_files=True, type=["xlsx"]) | |
| if uploaded_files: | |
| all_repeats = set() | |
| all_sequences_data = [] | |
| filenames = [] | |
| final_output = BytesIO() | |
| for file in uploaded_files: | |
| file.seek(0) | |
| file_hash = get_file_hash(file) | |
| file.seek(0) | |
| cached = check_cache(file_hash, analysis_type) | |
| if cached: | |
| st.success(f"Using cached result for {file.name}") | |
| cached_content = cached.read() | |
| final_output.write(cached_content) | |
| final_output.seek(0) | |
| else: | |
| st.info(f"Processing {file.name}...") | |
| excel_data = pd.ExcelFile(file) | |
| repeats, sequence_data = process_excel(excel_data, analysis_type) | |
| if repeats is not None: | |
| all_repeats.update(repeats) | |
| all_sequences_data.append(sequence_data) | |
| filenames.append(file.name) | |
| excel_file = create_excel(all_sequences_data, all_repeats, filenames) | |
| cache_result(file_hash, file.name, analysis_type, excel_file) | |
| final_output = excel_file | |
| st.download_button( | |
| label="Download Excel file", | |
| data=final_output, | |
| file_name="protein_repeat_results.xlsx", | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | |
| ) | |
| if st.checkbox("Show Results Table"): | |
| rows = [] | |
| for file_index, file_data in enumerate(all_sequences_data): | |
| filename = filenames[file_index] | |
| for entry_id, protein_name, freq in file_data: | |
| row = {"Filename": filename, "Entry ID": entry_id, "Protein Name": protein_name} | |
| row.update({repeat: freq.get(repeat, 0) for repeat in sorted(all_repeats)}) | |
| rows.append(row) | |
| result_df = pd.DataFrame(rows) | |
| st.dataframe(result_df) | |