Spaces:
Running
Running
# Import required environment settings before any other imports | |
import os | |
import sys | |
# Set environment variable to prevent PyTorch custom class loading issues | |
os.environ["PYTORCH_DISABLE_CUSTOM_CLASS_LOADING"] = "1" | |
# Now import streamlit and other dependencies | |
import streamlit as st | |
from chroma_operations.ingestion import ingest | |
from rag import ask_question | |
from chroma_operations.delete_chroma import remove_from_chroma | |
import json | |
# Get list of processed PDF file names without `.pdf` | |
def get_processed_file_names(folder_path="docs/processed"): | |
try: | |
files = os.listdir(folder_path) | |
pdfs = [f[:-4] for f in files if f.endswith(".pdf")] | |
return sorted(pdfs) | |
except Exception as e: | |
st.error(f"Error reading folder: {e}") | |
return [] | |
st.set_page_config(page_title="RAG Demo", layout="centered") | |
st.title("π Retrieval-Augmented Generation (RAG) Demo") | |
# Create tabs for different functionalities | |
tab1, tab2, tab3 = st.tabs(["Ask Questions", "Upload Documents", "Manage Files"]) | |
with tab1: | |
st.markdown("Ask a question based on a specific processed document.") | |
# Add a refresh button | |
if st.button("π Refresh Document List"): | |
st.success("Document list refreshed!") | |
# No need to do anything else - Streamlit will rerun and refresh the list | |
# Fetch available document names | |
doc_names = get_processed_file_names() | |
if not doc_names: | |
st.warning("No documents available. Please upload and process documents first.") | |
else: | |
# Add "All Documents" checkbox | |
use_all_docs = st.checkbox("π Use All Documents", value=False) | |
# Multi-select for documents (disabled if "All Documents" is checked) | |
if use_all_docs: | |
selected_files = doc_names | |
st.info(f"Using all {len(doc_names)} available documents") | |
else: | |
selected_files = st.multiselect( | |
"π Select Documents", | |
options=doc_names, | |
default=[doc_names[0]] if doc_names else None, | |
help="Select one or more documents to search through", | |
) | |
if not selected_files: | |
st.warning("Please select at least one document.") | |
# User question | |
query_text = st.text_area( | |
"π§ Your Question", | |
placeholder="e.g. What are the treatment steps for diabetes?", | |
) | |
if st.button("Ask"): | |
if not query_text or not selected_files: | |
st.warning("Please fill in both the question and select a document.") | |
else: | |
with st.spinner("Processing..."): | |
try: | |
response = ask_question(query_text, selected_files) | |
if response: | |
st.success("β Answer:") | |
st.markdown(f"{response['answer']}") | |
with st.expander("π Retrieved Chunks"): | |
for i, chunk in enumerate(response["chunks"]): | |
st.markdown(f"**Chunk {i+1}:** {chunk}") | |
else: | |
st.error(f"Error in the answer") | |
except Exception as e: | |
st.error(f"Failed to connect to the backend: {e}") | |
with tab2: | |
st.markdown("Upload new documents to be processed for the RAG system.") | |
# Ensure directories exist | |
os.makedirs("docs/unprocessed", exist_ok=True) | |
os.makedirs("docs/processed", exist_ok=True) | |
# File uploader | |
uploaded_file = st.file_uploader("Upload PDF Document", type=["pdf"]) | |
if uploaded_file is not None: | |
st.info(f"File '{uploaded_file.name}' ready for upload") | |
# Create columns for buttons | |
col1, col2 = st.columns(2) | |
# Upload button | |
if col1.button("Upload to System"): | |
try: | |
# Save the uploaded file to the docs/unprocessed directory | |
with open(os.path.join("docs/unprocessed", uploaded_file.name), "wb") as f: | |
f.write(uploaded_file.getbuffer()) | |
st.success(f"File '{uploaded_file.name}' saved to docs/unprocessed/") | |
except Exception as e: | |
st.error(f"Error saving file: {e}") | |
# Ingest button | |
if col2.button("Process Document"): | |
try: | |
with st.spinner("Processing document... This may take a while."): | |
# Call the ingestion script | |
result = ingest() | |
if result: | |
st.success("Document processed successfully!") | |
# Refresh the list of available documents | |
doc_names = get_processed_file_names() | |
else: | |
st.error(f"Error processing document") | |
except Exception as e: | |
st.error(f"Error running ingestion process: {e}") | |
# Display list of files in unprocessed folder | |
st.subheader("Unprocessed Documents") | |
try: | |
unprocessed_files = os.listdir("docs/unprocessed") | |
if unprocessed_files: | |
for file in unprocessed_files: | |
st.text(f"β’ {file}") | |
else: | |
st.info("No unprocessed documents.") | |
except Exception as e: | |
st.error(f"Error reading unprocessed folder: {e}") | |
# Display list of processed files | |
st.subheader("Processed Documents") | |
try: | |
processed_files = os.listdir("docs/processed") | |
processed_files = [f for f in processed_files if f.endswith(".pdf")] | |
if processed_files: | |
for file in processed_files: | |
st.text(f"β’ {file}") | |
else: | |
st.info("No processed documents.") | |
except Exception as e: | |
st.error(f"Error reading processed folder: {e}") | |
with tab3: | |
st.markdown( | |
"Manage your documents by deleting files from processed or unprocessed folders." | |
) | |
col1, col2 = st.columns(2) | |
with col1: | |
st.subheader("Delete Unprocessed Documents") | |
try: | |
unprocessed_files = os.listdir("docs/unprocessed") | |
if unprocessed_files: | |
file_to_delete_unprocessed = st.selectbox( | |
"Select file to delete from unprocessed folder", | |
unprocessed_files, | |
key="unprocessed_select", | |
) | |
if st.button("Delete Unprocessed File", key="delete_unprocessed"): | |
try: | |
file_path = os.path.join( | |
"docs/unprocessed", file_to_delete_unprocessed | |
) | |
os.remove(file_path) | |
st.success(f"Successfully deleted {file_to_delete_unprocessed}") | |
# Force refresh the app to show the updated file list | |
st.rerun() | |
except Exception as e: | |
st.error(f"Error deleting file: {e}") | |
else: | |
st.info("No unprocessed documents to delete.") | |
except Exception as e: | |
st.error(f"Error accessing unprocessed folder: {e}") | |
with col2: | |
st.subheader("Delete Processed Documents") | |
try: | |
processed_files = [ | |
f for f in os.listdir("docs/processed") if f.endswith(".pdf") | |
] | |
if processed_files: | |
file_to_delete_processed = st.selectbox( | |
"Select file to delete from processed folder", | |
processed_files, | |
key="processed_select", | |
) | |
if st.button("Delete Processed File", key="delete_processed"): | |
try: | |
# Delete the PDF file | |
pdf_path = os.path.join( | |
"docs/processed", file_to_delete_processed | |
) | |
os.remove(pdf_path) | |
# Also delete the corresponding vector store if it exists | |
base_name = file_to_delete_processed[ | |
:-4 | |
] # Remove .pdf extension | |
vector_store_path = os.path.join( | |
"docs/processed", f"{base_name}.faiss" | |
) | |
if os.path.exists(vector_store_path): | |
os.remove(vector_store_path) | |
# Delete metadata file if it exists | |
metadata_path = os.path.join( | |
"docs/processed", f"{base_name}_metadata.json" | |
) | |
if os.path.exists(metadata_path): | |
os.remove(metadata_path) | |
# Remove document from Chroma DB | |
with st.spinner("Removing document from vector database..."): | |
remove_from_chroma(base_name) | |
st.success( | |
f"Successfully deleted {file_to_delete_processed} and related files" | |
) | |
# Force refresh the app to show the updated file list | |
st.rerun() | |
except Exception as e: | |
st.error(f"Error deleting file: {e}") | |
else: | |
st.info("No processed documents to delete.") | |
except Exception as e: | |
st.error(f"Error accessing processed folder: {e}") | |
# Add a separator | |
st.markdown("---") | |
# Delete all files section | |
st.subheader("Bulk Operations") | |
col3, col4 = st.columns(2) | |
with col3: | |
if st.button( | |
"Delete ALL Unprocessed Files", type="primary", use_container_width=True | |
): | |
try: | |
unprocessed_files = os.listdir("docs/unprocessed") | |
if unprocessed_files: | |
for file in unprocessed_files: | |
os.remove(os.path.join("docs/unprocessed", file)) | |
st.success( | |
f"Successfully deleted all {len(unprocessed_files)} unprocessed files" | |
) | |
# Force refresh | |
st.rerun() | |
else: | |
st.info("No files to delete.") | |
except Exception as e: | |
st.error(f"Error during bulk deletion: {e}") | |
with col4: | |
if st.button( | |
"Delete ALL Processed Files", type="primary", use_container_width=True | |
): | |
try: | |
processed_files = os.listdir("docs/processed") | |
if processed_files: | |
for file in processed_files: | |
file_path = os.path.join("docs/processed", file) | |
os.remove(file_path) | |
# If it's a PDF file, also remove from Chroma | |
if file.endswith(".pdf"): | |
base_name = file[:-4] # Remove .pdf extension | |
remove_from_chroma(base_name) | |
st.success( | |
f"Successfully deleted all {len(processed_files)} processed files" | |
) | |
# Force refresh | |
st.rerun() | |
else: | |
st.info("No files to delete.") | |
except Exception as e: | |
st.error(f"Error during bulk deletion: {e}") |