LibraryRAG / app.py
Zwounds's picture
Update app.py
185c276 verified
import streamlit as st
import chromadb
import logging
import sys
import json
import os
from dotenv import load_dotenv
from huggingface_hub import InferenceClient, hf_hub_download
import numpy as np
import time
from tqdm import tqdm
from datasets import load_dataset
import pandas as pd
from sentence_transformers import SentenceTransformer
import tempfile # Added for temporary directory
import chromadb.config # Added for Settings
# --- Page Config (MUST BE FIRST Streamlit call) ---
st.set_page_config(layout="wide")
# ---
# --- Configuration ---
COLLECTION_NAME = "libguides_content"
LOCAL_EMBEDDING_MODEL = 'BAAI/bge-m3' # Local model for QUERY embedding
HF_GENERATION_MODEL = "google/gemma-3-27b-it" # HF model for generation
HF_DATASET_ID = "Zwounds/Libguides_Embeddings" # Your HF Dataset ID
PARQUET_FILENAME = "libguides_embeddings.parquet" # Filename within the dataset
ADD_BATCH_SIZE = 500 # Batch size for adding to Chroma
TOP_K = 20
INITIAL_N_RESULTS = 100
MAX_NEW_TOKENS = 512
# ---
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr)
# --- Cached Resource Loading ---
@st.cache_resource
def initialize_hf_client():
"""Initializes and returns the HF Inference Client for generation."""
generation_client_instance = None
try:
load_dotenv()
HF_TOKEN = os.getenv('HF_TOKEN') or os.getenv('HUGGING_FACE_HUB_TOKEN')
if not HF_TOKEN:
logging.error("HF_TOKEN or HUGGING_FACE_HUB_TOKEN not found.")
st.error("πŸ”΄ Hugging Face Token not found. Please set it as a Space secret named HF_TOKEN or in the .env file.")
st.stop()
else:
generation_client_instance = InferenceClient(model=HF_GENERATION_MODEL, token=HF_TOKEN)
logging.info(f"Initialized HF Inference Client for generation ({HF_GENERATION_MODEL}).")
return generation_client_instance
except Exception as e:
logging.exception("Error initializing Hugging Face Inference Client for generation.")
st.error(f"πŸ”΄ Error initializing Hugging Face Inference Client: {e}")
st.stop()
return None
@st.cache_resource
def load_local_embedding_model():
"""Loads and returns the local Sentence Transformer model for query embedding."""
logging.info(f"Loading local embedding model for queries: {LOCAL_EMBEDDING_MODEL}")
try:
import torch
device = 'cuda' if torch.cuda.is_available() else 'cpu'
logging.info(f"Using device: {device}")
except ImportError:
device = 'cpu'
logging.info("Torch not found, using device: cpu")
try:
model = SentenceTransformer(LOCAL_EMBEDDING_MODEL, device=device, trust_remote_code=True)
logging.info("Local embedding model loaded successfully.")
return model
except Exception as e:
st.error(f"Failed to load local embedding model ({LOCAL_EMBEDDING_MODEL}): {e}")
logging.exception(f"Failed to load local embedding model: {e}")
st.stop()
return None
@st.cache_resource
def load_dataset_from_hf():
"""Downloads the dataset parquet file and loads it into a Pandas DataFrame."""
try:
logging.info(f"Downloading dataset '{HF_DATASET_ID}' from Hugging Face Hub...")
parquet_path = hf_hub_download(repo_id=HF_DATASET_ID, filename=PARQUET_FILENAME, repo_type='dataset')
logging.info(f"Downloaded dataset file to: {parquet_path}")
logging.info(f"Loading Parquet file '{parquet_path}' into Pandas DataFrame...")
df = pd.read_parquet(parquet_path)
logging.info(f"Dataset loaded into DataFrame with shape: {df.shape}")
required_cols = ['id', 'document', 'embedding', 'metadata']
if not all(col in df.columns for col in required_cols):
st.error(f"Dataset Parquet file is missing required columns. Found: {df.columns}. Required: {required_cols}")
logging.error(f"Dataset Parquet file missing required columns. Found: {df.columns}")
return None
logging.info("Ensuring embeddings are in list format...")
if not df.empty and df['embedding'].iloc[0] is not None and (not isinstance(df['embedding'].iloc[0], list) or not isinstance(df['embedding'].iloc[0][0], float)):
df['embedding'] = df['embedding'].apply(lambda x: list(map(float, x)) if isinstance(x, (np.ndarray, list)) else None)
logging.info("Converted embeddings to list[float].")
else:
logging.info("Embeddings already seem to be in list[float] format or DataFrame is empty.")
initial_rows = len(df)
df.dropna(subset=['embedding'], inplace=True)
if len(df) < initial_rows:
logging.warning(f"Dropped {initial_rows - len(df)} rows due to invalid embedding format.")
if df.empty:
st.error("No valid data loaded from the dataset after processing embeddings.")
logging.error("DataFrame empty after embedding processing.")
return None
return df
except ImportError as e:
st.error(f"ImportError: {e}. Required libraries might be missing (datasets, pandas, pyarrow). Check requirements.txt.")
logging.error(f"ImportError during dataset loading: {e}")
except Exception as e:
st.error(f"Failed to load data from dataset: {e}")
logging.exception(f"An unexpected error occurred during data load: {e}")
return None
# --- Initialize Clients and Models ---
generation_client = initialize_hf_client()
embedding_model = load_local_embedding_model()
# ---
# --- Setup ChromaDB Collection (using Session State and Temp Dir) ---
def setup_chroma_collection():
"""Loads data from HF, sets up ChromaDB in a temp dir, populates it, and returns the collection."""
if 'chroma_collection' in st.session_state and st.session_state.chroma_collection is not None:
# Basic check: see if collection is queryable
try:
st.session_state.chroma_collection.peek(1) # Try a lightweight operation
logging.info("Using existing Chroma collection from session state.")
return st.session_state.chroma_collection
except Exception as e:
logging.warning(f"Error accessing existing collection in session state ({e}), re-initializing.")
st.session_state.chroma_collection = None # Force re-init
# Proceed with setup only if essential components are loaded
if not embedding_model or not generation_client:
st.error("Cannot setup ChromaDB: Required models/clients failed to initialize.")
return None
with st.spinner("Loading and preparing vector database..."):
df = load_dataset_from_hf()
if df is None or df.empty:
st.error("Failed to load embedding data. Cannot initialize vector database.")
return None
# Create a temporary directory for this session
# Note: This directory might be cleaned up automatically depending on the OS/environment
# In HF Spaces ephemeral storage, it will likely be wiped on restart anyway.
temp_dir = tempfile.mkdtemp()
logging.info(f"Created temporary directory for ChromaDB: {temp_dir}")
try:
logging.info("Initializing ChromaDB client with temporary storage...")
settings = chromadb.config.Settings(
persist_directory=temp_dir,
anonymized_telemetry=False,
is_persistent=True # Explicitly set for PersistentClient behavior in temp dir
)
# Use the standard Client, but point it to the temp directory
chroma_client = chromadb.Client(settings=settings)
# Check if collection exists and delete if it does
try:
existing_collections = [col.name for col in chroma_client.list_collections()]
if COLLECTION_NAME in existing_collections:
chroma_client.delete_collection(name=COLLECTION_NAME)
logging.info(f"Deleted existing collection: {COLLECTION_NAME}")
except Exception as delete_e:
logging.warning(f"Could not check/delete existing collection (might be okay): {delete_e}")
logging.info(f"Creating collection: {COLLECTION_NAME}")
collection_instance = chroma_client.create_collection(
name=COLLECTION_NAME,
metadata={"hnsw:space": "cosine"} # No embedding function needed here
)
logging.info(f"Adding {len(df)} documents to ChromaDB in batches of {ADD_BATCH_SIZE}...")
start_time = time.time()
error_count = 0
num_batches = (len(df) + ADD_BATCH_SIZE - 1) // ADD_BATCH_SIZE
for i in range(num_batches):
start_idx = i * ADD_BATCH_SIZE
end_idx = start_idx + ADD_BATCH_SIZE
batch_df = df.iloc[start_idx:end_idx]
try:
# Prepare and clean metadata for the batch
metadatas_list_raw = batch_df['metadata'].tolist()
cleaned_metadatas = []
for item in metadatas_list_raw:
cleaned_dict = {}
current_meta = item if isinstance(item, dict) else {}
if not isinstance(item, dict):
try: current_meta = json.loads(item) if isinstance(item, str) else {}
except: current_meta = {}
if isinstance(current_meta, dict):
for key, value in current_meta.items():
if value is None: cleaned_dict[key] = ""
elif isinstance(value, (str, int, float, bool)): cleaned_dict[key] = value
else:
try: cleaned_dict[key] = str(value)
except: pass
cleaned_metadatas.append(cleaned_dict)
# Add the batch
collection_instance.add(
ids=batch_df['id'].tolist(),
embeddings=batch_df['embedding'].tolist(),
documents=batch_df['document'].tolist(),
metadatas=cleaned_metadatas
)
except Exception as e:
logging.error(f"Error adding batch {i+1}/{num_batches} to Chroma: {e}")
error_count += 1
end_time = time.time()
logging.info(f"Finished loading data into ChromaDB. Took {end_time - start_time:.2f} seconds.")
if error_count > 0: logging.warning(f"Encountered errors in {error_count} batches during add.")
final_count = collection_instance.count()
logging.info(f"Final document count in Chroma collection: {final_count}")
if final_count > 0:
st.session_state.chroma_collection = collection_instance
st.success("Vector database loaded successfully!")
return collection_instance
else:
st.error("Failed to load documents into the vector database.")
return None
except Exception as setup_e:
st.error(f"Failed to setup ChromaDB: {setup_e}")
logging.exception(f"Failed to setup ChromaDB: {setup_e}")
return None
# --- Initialize collection ---
collection = setup_chroma_collection()
# ---
# --- Helper Functions ---
def query_hf_inference(prompt, client_instance=None, model_name=HF_GENERATION_MODEL):
"""Sends the prompt to the HF Inference API using the initialized client."""
if not client_instance: client_instance = generation_client
if not client_instance:
logging.error("HF Inference client not initialized.")
return "Error: HF Inference client failed to initialize."
try:
response_text = client_instance.text_generation(prompt, max_new_tokens=MAX_NEW_TOKENS)
if not response_text:
logging.warning(f"Received empty response from HF Inference API ({model_name}).")
return "Error: Received empty response from generation model."
return response_text.strip()
except Exception as e:
logging.exception(f"Error querying HF Inference API ({model_name}): {e}")
return f"Error: An unexpected error occurred while generating the answer using {model_name}."
def generate_query_variations(query, llm_func, model_name=HF_GENERATION_MODEL, num_variations=3):
"""Uses LLM (HF Inference API) to generate alternative phrasings."""
prompt = f"""Given the user query: "{query}"
Generate {num_variations} alternative phrasings or related queries someone might use to find the same information.
Focus on synonyms, different levels of specificity, and related concepts.
Return ONLY the generated queries, each on a new line, without any preamble or numbering.
Example Query: "who is the digital humanities liaison?"
Example Output:
digital scholarship librarian contact
staff directory digital humanities
Steve Zweibel digital humanities role
Example Query: "when are the next graduation dates?"
Example Output:
graduation deadlines academic calendar
dissertation deposit deadline
commencement schedule
User Query: "{query}"
Output:"""
logging.info(f"Generating query variations for: {query} using {model_name}")
try:
response = llm_func(prompt, model_name=model_name)
if response.startswith("Error:"):
logging.error(f"Query variation generation failed: {response}")
return []
variations = [line.strip() for line in response.split('\n') if line.strip()]
logging.info(f"Generated variations: {variations}")
return variations[:num_variations]
except Exception as e:
logging.error(f"Failed to generate query variations: {e}")
return []
def generate_prompt(query, context_chunks):
"""Generates a prompt for the LLM."""
context_str = "\n\n".join(context_chunks)
liaison_directory_url = "https://libguides.gc.cuny.edu/directory/subject"
# Updated system prompt for clarity
prompt = f"""You are an AI assistant for the CUNY Graduate Center Library (also known as the Mina Rees Library).
Based *only* on the following context extracted from the GC Library's LibGuides, answer the user's question about GC Library resources, services, or policies.
Do not use any prior knowledge. If the context doesn't contain the answer, state that the information wasn't found in the provided LibGuides context.
If your answer identifies a specific librarian or subject liaison, please also include this link to the main subject liaison directory: {liaison_directory_url}
Context:
---
{context_str}
---
Question: {query}
Answer:"""
return prompt
# --- Streamlit App UI ---
st.title("πŸ“š Ask the CUNY Graduate Center Library (RAG Demo)") # Updated title
# User input (only proceed if collection loaded)
if collection:
query = st.text_area("Enter your question:", height=100)
else:
st.error("Application initialization failed: Vector database not loaded.")
st.stop()
# --- Routing Prompt Definition ---
ROUTING_PROMPT_TEMPLATE = """You are a query routing assistant for a library chatbot. Your task is to classify the user's query into one of the following categories based on its intent:
Categories:
- RAG: The user is asking a general question about library services, policies, staff, or resources described in the library guides.
- HOURS: The user is asking about the library's opening or closing times, today's hours, or general operating hours.
- RESEARCH_QUERY: The user is asking for help starting research, finding databases/articles on a topic, or general research assistance.
- CATALOG_SEARCH: The user is asking if the library has a specific known item (book, journal title, article) or where to find it.
- ILL_REQUEST: The user is asking about Interlibrary Loan, requesting items not held by the library, or checking ILL status.
- ACCOUNT_INFO: The user is asking about their library account, fines, renewals, or logging in.
- TECH_SUPPORT: The user is reporting a *problem* like a broken link, login issue, or error message when trying to access resources.
- EVENTS_CALENDAR: The user is asking about upcoming library events, workshops, or the events calendar.
Analyze the user's query below and determine the most appropriate category. Respond with ONLY the category name (RAG, HOURS, RESEARCH_QUERY, CATALOG_SEARCH, ILL_REQUEST, ACCOUNT_INFO, TECH_SUPPORT, or EVENTS_CALENDAR) and nothing else.
Examples:
Query: "who is the comp lit liaison?"
Response: RAG
Query: "how do I get access to Westlaw?"
Response: RAG
Query: "how do I find articles on sociology?"
Response: RESEARCH_QUERY
Query: "when does the library close today?"
Response: HOURS
User Query: "{user_query}"
Response:"""
# --- Research Query Prompt Definition ---
RESEARCH_QUERY_PROMPT_TEMPLATE = """Based on the following context from the library guides, answer the user's research question.
1. Suggest 2-3 relevant databases or resources mentioned in the context that could help with their topic. If no specific databases are mentioned, suggest general multidisciplinary ones if appropriate based on the context.
2. Recommend contacting a subject librarian for further, more in-depth assistance.
3. Provide this link to the subject liaison directory: https://libguides.gc.cuny.edu/directory/subject
If the context doesn't seem relevant to the question, state that you couldn't find specific database recommendations in the guides but still recommend contacting a librarian using the provided directory link.
Context:
---
{context_str}
---
Question: {query}
Answer:"""
# --- End Prompt Definitions ---
# Only show button and process if collection is loaded
if collection and st.button("Ask"):
if not query:
st.warning("Please enter a question.")
else:
st.markdown("---")
# --- LLM Routing Step (Moved Before Spinner) ---
logging.info(f"Routing query: {query}")
routing_prompt = ROUTING_PROMPT_TEMPLATE.format(user_query=query)
try:
route_decision = query_hf_inference(routing_prompt).strip().upper()
logging.info(f"LLM (HF API) route decision: {route_decision}")
if route_decision.startswith("ERROR:"):
st.error(f"Routing failed: {route_decision}")
st.stop()
except Exception as e:
logging.error(f"LLM (HF API) routing failed: {e}. Defaulting to RAG.")
route_decision = "RAG" # Default to RAG on routing failure
# --- Handle specific routes immediately ---
if route_decision == "HOURS":
st.info("You can find the current library hours here: [https://gc-cuny.libcal.com/hours](https://gc-cuny.libcal.com/hours)")
st.stop()
elif route_decision == "EVENTS_CALENDAR":
events_url = "https://gc-cuny.libcal.com/calendar?cid=15537&t=d&d=0000-00-00&cal=15537&inc=0"
st.info(f"You can find information about upcoming library events and workshops on the calendar here: [{events_url}]({events_url})")
st.stop()
# Add other direct routes here
elif route_decision == "CATALOG_SEARCH":
catalog_url = "https://cuny-gc.primo.exlibrisgroup.com/discovery/search?vid=01CUNY_GC:CUNY_GC"
st.info(f"To check for specific books, journals, or articles, please search the library catalog directly here: [{catalog_url}]({catalog_url})")
st.stop() # Stop execution for this query
elif route_decision == "ILL_REQUEST":
ill_url = "https://ezproxy.gc.cuny.edu/login?url=https://gc-cuny.illiad.oclc.org/illiad/illiad.dll"
st.info(f"For Interlibrary Loan requests or questions, please use the ILL system here: [{ill_url}]({ill_url})")
st.stop()
elif route_decision == "ACCOUNT_INFO":
account_url = "https://cuny-gc.primo.exlibrisgroup.com/discovery/account?vid=01CUNY_GC:CUNY_GC&section=overview"
st.info(f"To manage your library account (renewals, fines, etc.), please log in here: [{account_url}]({account_url})")
st.stop()
elif route_decision == "TECH_SUPPORT":
support_url = "https://docs.google.com/forms/d/e/1FAIpQLSdF3a-Au-jIYRDN-mxU3MpZSANQJWFx0VEN2if01iRucIXsZA/viewform" # Assuming this is the correct form
st.info(f"To report a problem with accessing e-resources or other technical issues, please use this form: [{support_url}]({support_url})")
st.stop()
# --- Proceed with RAG/Research Query if not handled above ---
if route_decision in ["RAG", "RESEARCH_QUERY"]: # Only proceed if it's a general or research query
spinner_text = "Thinking... (RAG)" if route_decision != "RESEARCH_QUERY" else "Thinking... (Research Query)"
with st.spinner(spinner_text):
# 1. Generate Query Variations (using HF API)
logging.info(f"Proceeding with retrieval for query (Route: {route_decision}): {query}")
query_variations = generate_query_variations(query, query_hf_inference, HF_GENERATION_MODEL)
all_queries = [query] + query_variations
logging.info(f"--- DIAGNOSTIC: All queries for search: {all_queries}")
# 2. Embed Queries Locally
try:
logging.info(f"Generating query embeddings locally using {LOCAL_EMBEDDING_MODEL}...")
query_embeddings = embedding_model.encode(all_queries).tolist()
logging.info(f"Generated {len(query_embeddings)} query embeddings locally.")
except Exception as e:
st.error(f"Failed to embed query using local model: {e}")
logging.exception(f"Failed to embed query using local model: {e}")
st.stop()
# 3. Vector Search (using pre-computed query embeddings)
vector_results_ids = []
context_chunks = []
context_metadata_list = []
try:
logging.info(f"Performing vector search for {len(query_embeddings)} embeddings...")
# Query ChromaDB using the computed query_embeddings
vector_results = collection.query(
query_embeddings=query_embeddings, # Pass embeddings now
n_results=INITIAL_N_RESULTS,
include=['documents', 'metadatas', 'distances']
)
# Process results (Combine results from variations)
vector_results_best_rank = {}
retrieved_docs_map = {}
retrieved_meta_map = {}
if vector_results and vector_results.get('ids') and any(vector_results['ids']):
total_vector_results = 0
for i, ids_list in enumerate(vector_results['ids']):
if ids_list:
total_vector_results += len(ids_list)
distances_list = vector_results['distances'][i] if vector_results.get('distances') else [float('inf')] * len(ids_list)
docs_list = vector_results['documents'][i] if vector_results.get('documents') else [""] * len(ids_list)
metas_list = vector_results['metadatas'][i] if vector_results.get('metadatas') else [{}] * len(ids_list)
for rank, doc_id in enumerate(ids_list):
distance = distances_list[rank]
if doc_id not in vector_results_best_rank or distance < vector_results_best_rank[doc_id]:
vector_results_best_rank[doc_id] = distance
retrieved_docs_map[doc_id] = docs_list[rank]
retrieved_meta_map[doc_id] = metas_list[rank]
logging.info(f"Vector search retrieved {total_vector_results} total results, {len(vector_results_best_rank)} unique IDs.")
else:
logging.warning("Vector search returned no results.")
# Rank unique results by distance
vector_ranked_ids_for_selection = sorted(vector_results_best_rank.items(), key=lambda item: item[1])
vector_results_ids_list = [doc_id for doc_id, distance in vector_ranked_ids_for_selection]
# --- Selection ---
final_context_ids = []
seen_texts_for_final = set()
ids_to_use_for_final_selection = vector_results_ids_list
logging.info(f"Selecting top {TOP_K} unique results from Vector Search list...")
for doc_id in ids_to_use_for_final_selection:
doc_text = retrieved_docs_map.get(doc_id)
if doc_text and doc_text not in seen_texts_for_final:
seen_texts_for_final.add(doc_text)
final_context_ids.append(doc_id)
if len(final_context_ids) >= TOP_K:
break
elif not doc_text:
logging.warning(f"Document text not found in map for ID {doc_id} during final selection.")
logging.info(f"Selected {len(final_context_ids)} final unique IDs after deduplication.")
# Get final context chunks and metadata
log_chunks = []
for i, doc_id in enumerate(final_context_ids):
chunk_text = retrieved_docs_map.get(doc_id)
chunk_meta = retrieved_meta_map.get(doc_id)
if chunk_text:
context_chunks.append(chunk_text)
context_metadata_list.append(chunk_meta if chunk_meta else {})
log_chunks.append(f"Chunk {i+1} (ID: {doc_id}): '{chunk_text[:70]}...'")
logging.info(f"Selected {len(context_chunks)} unique context chunks for LLM.")
if log_chunks:
logging.info(f"--- DIAGNOSTIC: Final Context Chunks Sent to LLM:\n" + "\n".join(log_chunks))
except Exception as e:
st.error(f"An error occurred during vector search/selection: {e}")
logging.exception("Vector search/selection failed.")
context_chunks = []
# 4. Generate Final Prompt based on Route
if route_decision == "RESEARCH_QUERY":
logging.info("Using RESEARCH_QUERY prompt template.")
final_prompt = RESEARCH_QUERY_PROMPT_TEMPLATE.format(context_str="\n\n".join(context_chunks), query=query)
else: # Default to standard RAG
logging.info("Using standard RAG prompt template.")
final_prompt = generate_prompt(query, context_chunks)
# 5. Query HF Inference API LLM
logging.info(f"Sending final prompt to HF Inference API model: {HF_GENERATION_MODEL}...")
answer = query_hf_inference(final_prompt)
logging.info(f"Received answer from HF Inference API: {answer[:100]}...")
if answer.startswith("Error:"):
st.error(f"Answer generation failed: {answer}")
# 6. Display results
st.subheader("Answer:")
st.markdown(answer)
st.markdown("---")
with st.expander("Retrieved Context"):
if context_chunks:
for i, (chunk, metadata) in enumerate(zip(context_chunks, context_metadata_list)):
st.markdown(f"**Chunk {i+1}:**")
st.text(chunk)
source_url = metadata.get('source_url')
if source_url:
st.markdown(f"Source: [{source_url}]({source_url})")
st.markdown("---")
else:
st.info("No specific context was retrieved from the guides to answer this question.")
# Add instructions or footer
st.sidebar.header("About This Demo")
st.sidebar.info(
"This is an experimental RAG demo for the CUNY Graduate Center Library (Mina Rees Library).\n\n"
"1. Loads pre-computed embeddings from a Hugging Face Dataset.\n"
"2. Embeds user que ries locally.\n"
"3. Uses the Hugging Face Inference API for LLM generation.\n"
"4. Requires a `HUGGING_FACE_HUB_TOKEN` (set as Space secret `HF_TOKEN` or in `.env`)."
)
st.sidebar.header("Configuration Used")
st.sidebar.markdown(f"**Data Source:** HF Dataset (`{HF_DATASET_ID}`)")
st.sidebar.markdown(f"**Query Embedding:** Local (`{LOCAL_EMBEDDING_MODEL}`)")
st.sidebar.markdown(f"**Generation LLM:** HF API (`{HF_GENERATION_MODEL}`)")
st.sidebar.markdown(f"**Vector Store:** ChromaDB (In-Memory)")
st.sidebar.markdown(f"**Retrieval Mode:** Vector Search Only")
st.sidebar.markdown(f"**Final Unique Chunks:** `{TOP_K}` (from initial `{INITIAL_N_RESULTS}` vector search)")