rag-medical / app.py
baderanas's picture
Update app.py
52ba610 verified
# Import required environment settings before any other imports
import os
import sys
# Set environment variable to prevent PyTorch custom class loading issues
os.environ["PYTORCH_DISABLE_CUSTOM_CLASS_LOADING"] = "1"
# Now import streamlit and other dependencies
import streamlit as st
from chroma_operations.ingestion import ingest
from rag import ask_question
from chroma_operations.delete_chroma import remove_from_chroma
import json
# Get list of processed PDF file names without `.pdf`
def get_processed_file_names(folder_path="docs/processed"):
try:
files = os.listdir(folder_path)
pdfs = [f[:-4] for f in files if f.endswith(".pdf")]
return sorted(pdfs)
except Exception as e:
st.error(f"Error reading folder: {e}")
return []
st.set_page_config(page_title="RAG Demo", layout="centered")
st.title("πŸ“„ Retrieval-Augmented Generation (RAG) Demo")
# Create tabs for different functionalities
tab1, tab2, tab3 = st.tabs(["Ask Questions", "Upload Documents", "Manage Files"])
with tab1:
st.markdown("Ask a question based on a specific processed document.")
# Add a refresh button
if st.button("πŸ”„ Refresh Document List"):
st.success("Document list refreshed!")
# No need to do anything else - Streamlit will rerun and refresh the list
# Fetch available document names
doc_names = get_processed_file_names()
if not doc_names:
st.warning("No documents available. Please upload and process documents first.")
else:
# Add "All Documents" checkbox
use_all_docs = st.checkbox("πŸ“š Use All Documents", value=False)
# Multi-select for documents (disabled if "All Documents" is checked)
if use_all_docs:
selected_files = doc_names
st.info(f"Using all {len(doc_names)} available documents")
else:
selected_files = st.multiselect(
"πŸ“ Select Documents",
options=doc_names,
default=[doc_names[0]] if doc_names else None,
help="Select one or more documents to search through",
)
if not selected_files:
st.warning("Please select at least one document.")
# User question
query_text = st.text_area(
"🧠 Your Question",
placeholder="e.g. What are the treatment steps for diabetes?",
)
if st.button("Ask"):
if not query_text or not selected_files:
st.warning("Please fill in both the question and select a document.")
else:
with st.spinner("Processing..."):
try:
response = ask_question(query_text, selected_files)
if response:
st.success("βœ… Answer:")
st.markdown(f"{response['answer']}")
with st.expander("πŸ“š Retrieved Chunks"):
for i, chunk in enumerate(response["chunks"]):
st.markdown(f"**Chunk {i+1}:** {chunk}")
else:
st.error(f"Error in the answer")
except Exception as e:
st.error(f"Failed to connect to the backend: {e}")
with tab2:
st.markdown("Upload new documents to be processed for the RAG system.")
# Ensure directories exist
os.makedirs("docs/unprocessed", exist_ok=True)
os.makedirs("docs/processed", exist_ok=True)
# File uploader
uploaded_file = st.file_uploader("Upload PDF Document", type=["pdf"])
if uploaded_file is not None:
st.info(f"File '{uploaded_file.name}' ready for upload")
# Create columns for buttons
col1, col2 = st.columns(2)
# Upload button
if col1.button("Upload to System"):
try:
# Save the uploaded file to the docs/unprocessed directory
with open(os.path.join("docs/unprocessed", uploaded_file.name), "wb") as f:
f.write(uploaded_file.getbuffer())
st.success(f"File '{uploaded_file.name}' saved to docs/unprocessed/")
except Exception as e:
st.error(f"Error saving file: {e}")
# Ingest button
if col2.button("Process Document"):
try:
with st.spinner("Processing document... This may take a while."):
# Call the ingestion script
result = ingest()
if result:
st.success("Document processed successfully!")
# Refresh the list of available documents
doc_names = get_processed_file_names()
else:
st.error(f"Error processing document")
except Exception as e:
st.error(f"Error running ingestion process: {e}")
# Display list of files in unprocessed folder
st.subheader("Unprocessed Documents")
try:
unprocessed_files = os.listdir("docs/unprocessed")
if unprocessed_files:
for file in unprocessed_files:
st.text(f"β€’ {file}")
else:
st.info("No unprocessed documents.")
except Exception as e:
st.error(f"Error reading unprocessed folder: {e}")
# Display list of processed files
st.subheader("Processed Documents")
try:
processed_files = os.listdir("docs/processed")
processed_files = [f for f in processed_files if f.endswith(".pdf")]
if processed_files:
for file in processed_files:
st.text(f"β€’ {file}")
else:
st.info("No processed documents.")
except Exception as e:
st.error(f"Error reading processed folder: {e}")
with tab3:
st.markdown(
"Manage your documents by deleting files from processed or unprocessed folders."
)
col1, col2 = st.columns(2)
with col1:
st.subheader("Delete Unprocessed Documents")
try:
unprocessed_files = os.listdir("docs/unprocessed")
if unprocessed_files:
file_to_delete_unprocessed = st.selectbox(
"Select file to delete from unprocessed folder",
unprocessed_files,
key="unprocessed_select",
)
if st.button("Delete Unprocessed File", key="delete_unprocessed"):
try:
file_path = os.path.join(
"docs/unprocessed", file_to_delete_unprocessed
)
os.remove(file_path)
st.success(f"Successfully deleted {file_to_delete_unprocessed}")
# Force refresh the app to show the updated file list
st.rerun()
except Exception as e:
st.error(f"Error deleting file: {e}")
else:
st.info("No unprocessed documents to delete.")
except Exception as e:
st.error(f"Error accessing unprocessed folder: {e}")
with col2:
st.subheader("Delete Processed Documents")
try:
processed_files = [
f for f in os.listdir("docs/processed") if f.endswith(".pdf")
]
if processed_files:
file_to_delete_processed = st.selectbox(
"Select file to delete from processed folder",
processed_files,
key="processed_select",
)
if st.button("Delete Processed File", key="delete_processed"):
try:
# Delete the PDF file
pdf_path = os.path.join(
"docs/processed", file_to_delete_processed
)
os.remove(pdf_path)
# Also delete the corresponding vector store if it exists
base_name = file_to_delete_processed[
:-4
] # Remove .pdf extension
vector_store_path = os.path.join(
"docs/processed", f"{base_name}.faiss"
)
if os.path.exists(vector_store_path):
os.remove(vector_store_path)
# Delete metadata file if it exists
metadata_path = os.path.join(
"docs/processed", f"{base_name}_metadata.json"
)
if os.path.exists(metadata_path):
os.remove(metadata_path)
# Remove document from Chroma DB
with st.spinner("Removing document from vector database..."):
remove_from_chroma(base_name)
st.success(
f"Successfully deleted {file_to_delete_processed} and related files"
)
# Force refresh the app to show the updated file list
st.rerun()
except Exception as e:
st.error(f"Error deleting file: {e}")
else:
st.info("No processed documents to delete.")
except Exception as e:
st.error(f"Error accessing processed folder: {e}")
# Add a separator
st.markdown("---")
# Delete all files section
st.subheader("Bulk Operations")
col3, col4 = st.columns(2)
with col3:
if st.button(
"Delete ALL Unprocessed Files", type="primary", use_container_width=True
):
try:
unprocessed_files = os.listdir("docs/unprocessed")
if unprocessed_files:
for file in unprocessed_files:
os.remove(os.path.join("docs/unprocessed", file))
st.success(
f"Successfully deleted all {len(unprocessed_files)} unprocessed files"
)
# Force refresh
st.rerun()
else:
st.info("No files to delete.")
except Exception as e:
st.error(f"Error during bulk deletion: {e}")
with col4:
if st.button(
"Delete ALL Processed Files", type="primary", use_container_width=True
):
try:
processed_files = os.listdir("docs/processed")
if processed_files:
for file in processed_files:
file_path = os.path.join("docs/processed", file)
os.remove(file_path)
# If it's a PDF file, also remove from Chroma
if file.endswith(".pdf"):
base_name = file[:-4] # Remove .pdf extension
remove_from_chroma(base_name)
st.success(
f"Successfully deleted all {len(processed_files)} processed files"
)
# Force refresh
st.rerun()
else:
st.info("No files to delete.")
except Exception as e:
st.error(f"Error during bulk deletion: {e}")