# Import required environment settings before any other imports import os import sys # Set environment variable to prevent PyTorch custom class loading issues os.environ["PYTORCH_DISABLE_CUSTOM_CLASS_LOADING"] = "1" # Now import streamlit and other dependencies import streamlit as st from chroma_operations.ingestion import ingest from rag import ask_question from chroma_operations.delete_chroma import remove_from_chroma import json # Get list of processed PDF file names without `.pdf` def get_processed_file_names(folder_path="docs/processed"): try: files = os.listdir(folder_path) pdfs = [f[:-4] for f in files if f.endswith(".pdf")] return sorted(pdfs) except Exception as e: st.error(f"Error reading folder: {e}") return [] st.set_page_config(page_title="RAG Demo", layout="centered") st.title("📄 Retrieval-Augmented Generation (RAG) Demo") # Create tabs for different functionalities tab1, tab2, tab3 = st.tabs(["Ask Questions", "Upload Documents", "Manage Files"]) with tab1: st.markdown("Ask a question based on a specific processed document.") # Add a refresh button if st.button("🔄 Refresh Document List"): st.success("Document list refreshed!") # No need to do anything else - Streamlit will rerun and refresh the list # Fetch available document names doc_names = get_processed_file_names() if not doc_names: st.warning("No documents available. Please upload and process documents first.") else: # Add "All Documents" checkbox use_all_docs = st.checkbox("📚 Use All Documents", value=False) # Multi-select for documents (disabled if "All Documents" is checked) if use_all_docs: selected_files = doc_names st.info(f"Using all {len(doc_names)} available documents") else: selected_files = st.multiselect( "📁 Select Documents", options=doc_names, default=[doc_names[0]] if doc_names else None, help="Select one or more documents to search through", ) if not selected_files: st.warning("Please select at least one document.") # User question query_text = st.text_area( "🧠 Your Question", placeholder="e.g. What are the treatment steps for diabetes?", ) if st.button("Ask"): if not query_text or not selected_files: st.warning("Please fill in both the question and select a document.") else: with st.spinner("Processing..."): try: response = ask_question(query_text, selected_files) if response: st.success("✅ Answer:") st.markdown(f"{response['answer']}") with st.expander("📚 Retrieved Chunks"): for i, chunk in enumerate(response["chunks"]): st.markdown(f"**Chunk {i+1}:** {chunk}") else: st.error(f"Error in the answer") except Exception as e: st.error(f"Failed to connect to the backend: {e}") with tab2: st.markdown("Upload new documents to be processed for the RAG system.") # Ensure directories exist os.makedirs("docs/unprocessed", exist_ok=True) os.makedirs("docs/processed", exist_ok=True) # File uploader uploaded_file = st.file_uploader("Upload PDF Document", type=["pdf"]) if uploaded_file is not None: st.info(f"File '{uploaded_file.name}' ready for upload") # Create columns for buttons col1, col2 = st.columns(2) # Upload button if col1.button("Upload to System"): try: # Save the uploaded file to the docs/unprocessed directory with open(os.path.join("docs/unprocessed", uploaded_file.name), "wb") as f: f.write(uploaded_file.getbuffer()) st.success(f"File '{uploaded_file.name}' saved to docs/unprocessed/") except Exception as e: st.error(f"Error saving file: {e}") # Ingest button if col2.button("Process Document"): try: with st.spinner("Processing document... This may take a while."): # Call the ingestion script result = ingest() if result: st.success("Document processed successfully!") # Refresh the list of available documents doc_names = get_processed_file_names() else: st.error(f"Error processing document") except Exception as e: st.error(f"Error running ingestion process: {e}") # Display list of files in unprocessed folder st.subheader("Unprocessed Documents") try: unprocessed_files = os.listdir("docs/unprocessed") if unprocessed_files: for file in unprocessed_files: st.text(f"• {file}") else: st.info("No unprocessed documents.") except Exception as e: st.error(f"Error reading unprocessed folder: {e}") # Display list of processed files st.subheader("Processed Documents") try: processed_files = os.listdir("docs/processed") processed_files = [f for f in processed_files if f.endswith(".pdf")] if processed_files: for file in processed_files: st.text(f"• {file}") else: st.info("No processed documents.") except Exception as e: st.error(f"Error reading processed folder: {e}") with tab3: st.markdown( "Manage your documents by deleting files from processed or unprocessed folders." ) col1, col2 = st.columns(2) with col1: st.subheader("Delete Unprocessed Documents") try: unprocessed_files = os.listdir("docs/unprocessed") if unprocessed_files: file_to_delete_unprocessed = st.selectbox( "Select file to delete from unprocessed folder", unprocessed_files, key="unprocessed_select", ) if st.button("Delete Unprocessed File", key="delete_unprocessed"): try: file_path = os.path.join( "docs/unprocessed", file_to_delete_unprocessed ) os.remove(file_path) st.success(f"Successfully deleted {file_to_delete_unprocessed}") # Force refresh the app to show the updated file list st.rerun() except Exception as e: st.error(f"Error deleting file: {e}") else: st.info("No unprocessed documents to delete.") except Exception as e: st.error(f"Error accessing unprocessed folder: {e}") with col2: st.subheader("Delete Processed Documents") try: processed_files = [ f for f in os.listdir("docs/processed") if f.endswith(".pdf") ] if processed_files: file_to_delete_processed = st.selectbox( "Select file to delete from processed folder", processed_files, key="processed_select", ) if st.button("Delete Processed File", key="delete_processed"): try: # Delete the PDF file pdf_path = os.path.join( "docs/processed", file_to_delete_processed ) os.remove(pdf_path) # Also delete the corresponding vector store if it exists base_name = file_to_delete_processed[ :-4 ] # Remove .pdf extension vector_store_path = os.path.join( "docs/processed", f"{base_name}.faiss" ) if os.path.exists(vector_store_path): os.remove(vector_store_path) # Delete metadata file if it exists metadata_path = os.path.join( "docs/processed", f"{base_name}_metadata.json" ) if os.path.exists(metadata_path): os.remove(metadata_path) # Remove document from Chroma DB with st.spinner("Removing document from vector database..."): remove_from_chroma(base_name) st.success( f"Successfully deleted {file_to_delete_processed} and related files" ) # Force refresh the app to show the updated file list st.rerun() except Exception as e: st.error(f"Error deleting file: {e}") else: st.info("No processed documents to delete.") except Exception as e: st.error(f"Error accessing processed folder: {e}") # Add a separator st.markdown("---") # Delete all files section st.subheader("Bulk Operations") col3, col4 = st.columns(2) with col3: if st.button( "Delete ALL Unprocessed Files", type="primary", use_container_width=True ): try: unprocessed_files = os.listdir("docs/unprocessed") if unprocessed_files: for file in unprocessed_files: os.remove(os.path.join("docs/unprocessed", file)) st.success( f"Successfully deleted all {len(unprocessed_files)} unprocessed files" ) # Force refresh st.rerun() else: st.info("No files to delete.") except Exception as e: st.error(f"Error during bulk deletion: {e}") with col4: if st.button( "Delete ALL Processed Files", type="primary", use_container_width=True ): try: processed_files = os.listdir("docs/processed") if processed_files: for file in processed_files: file_path = os.path.join("docs/processed", file) os.remove(file_path) # If it's a PDF file, also remove from Chroma if file.endswith(".pdf"): base_name = file[:-4] # Remove .pdf extension remove_from_chroma(base_name) st.success( f"Successfully deleted all {len(processed_files)} processed files" ) # Force refresh st.rerun() else: st.info("No files to delete.") except Exception as e: st.error(f"Error during bulk deletion: {e}")