LibraryRAG / app.py
Zwounds's picture
Upload app.py
93c51f9 verified
raw
history blame
26.2 kB
import streamlit as st
import chromadb
import logging
import sys
import json
import os
from dotenv import load_dotenv
from huggingface_hub import InferenceClient, hf_hub_download
import numpy as np
import time
from tqdm import tqdm
from datasets import load_dataset
import pandas as pd
from sentence_transformers import SentenceTransformer
# --- Page Config (MUST BE FIRST Streamlit call) ---
st.set_page_config(layout="wide")
# ---
# --- Configuration ---
COLLECTION_NAME = "libguides_content"
LOCAL_EMBEDDING_MODEL = 'BAAI/bge-m3' # Local model for QUERY embedding
HF_GENERATION_MODEL = "google/gemma-3-27b-it" # HF model for generation
HF_DATASET_ID = "Zwounds/Libguides_Embeddings" # Your HF Dataset ID
PARQUET_FILENAME = "libguides_embeddings.parquet" # Filename within the dataset
ADD_BATCH_SIZE = 500 # Batch size for adding to in-memory Chroma
TOP_K = 10
INITIAL_N_RESULTS = 50
MAX_NEW_TOKENS = 512
# ---
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr)
# --- Cached Resource Loading ---
@st.cache_resource
def initialize_hf_client():
"""Initializes and returns the HF Inference Client for generation."""
generation_client_instance = None
try:
load_dotenv()
HF_TOKEN = os.getenv('HF_TOKEN') or os.getenv('HUGGING_FACE_HUB_TOKEN')
if not HF_TOKEN:
logging.error("HF_TOKEN or HUGGING_FACE_HUB_TOKEN not found.")
st.error("πŸ”΄ Hugging Face Token not found. Please set it as a Space secret named HF_TOKEN or in the .env file.")
st.stop()
else:
generation_client_instance = InferenceClient(model=HF_GENERATION_MODEL, token=HF_TOKEN)
logging.info(f"Initialized HF Inference Client for generation ({HF_GENERATION_MODEL}).")
return generation_client_instance
except Exception as e:
logging.exception("Error initializing Hugging Face Inference Client for generation.")
st.error(f"πŸ”΄ Error initializing Hugging Face Inference Client: {e}")
st.stop()
return None
@st.cache_resource
def load_local_embedding_model():
"""Loads and returns the local Sentence Transformer model for query embedding."""
logging.info(f"Loading local embedding model for queries: {LOCAL_EMBEDDING_MODEL}")
try:
import torch
device = 'cuda' if torch.cuda.is_available() else 'cpu'
logging.info(f"Using device: {device}")
except ImportError:
device = 'cpu'
logging.info("Torch not found, using device: cpu")
try:
model = SentenceTransformer(LOCAL_EMBEDDING_MODEL, device=device, trust_remote_code=True)
logging.info("Local embedding model loaded successfully.")
return model
except Exception as e:
st.error(f"Failed to load local embedding model ({LOCAL_EMBEDDING_MODEL}): {e}")
logging.exception(f"Failed to load local embedding model: {e}")
st.stop()
return None
@st.cache_resource
def load_dataset_from_hf():
"""Downloads the dataset parquet file and loads it into a Pandas DataFrame."""
try:
logging.info(f"Downloading dataset '{HF_DATASET_ID}' from Hugging Face Hub...")
parquet_path = hf_hub_download(repo_id=HF_DATASET_ID, filename=PARQUET_FILENAME, repo_type='dataset')
logging.info(f"Downloaded dataset file to: {parquet_path}")
logging.info(f"Loading Parquet file '{parquet_path}' into Pandas DataFrame...")
df = pd.read_parquet(parquet_path)
logging.info(f"Dataset loaded into DataFrame with shape: {df.shape}")
# Verify required columns
required_cols = ['id', 'document', 'embedding', 'metadata']
if not all(col in df.columns for col in required_cols):
st.error(f"Dataset Parquet file is missing required columns. Found: {df.columns}. Required: {required_cols}")
logging.error(f"Dataset Parquet file missing required columns. Found: {df.columns}")
return None # Return None on error
# Ensure embeddings are lists of floats
logging.info("Ensuring embeddings are in list format...")
if not df.empty and df['embedding'].iloc[0] is not None and (not isinstance(df['embedding'].iloc[0], list) or not isinstance(df['embedding'].iloc[0][0], float)):
df['embedding'] = df['embedding'].apply(lambda x: list(map(float, x)) if isinstance(x, (np.ndarray, list)) else None)
logging.info("Converted embeddings to list[float].")
else:
logging.info("Embeddings already seem to be in list[float] format or DataFrame is empty.")
initial_rows = len(df)
df.dropna(subset=['embedding'], inplace=True)
if len(df) < initial_rows:
logging.warning(f"Dropped {initial_rows - len(df)} rows due to invalid embedding format.")
if df.empty:
st.error("No valid data loaded from the dataset after processing embeddings.")
logging.error("DataFrame empty after embedding processing.")
return None # Return None on error
return df
except ImportError as e:
st.error(f"ImportError: {e}. Required libraries might be missing (datasets, pandas, pyarrow). Check requirements.txt.")
logging.error(f"ImportError during dataset loading: {e}")
except Exception as e:
st.error(f"Failed to load data from dataset: {e}")
logging.exception(f"An unexpected error occurred during data load: {e}")
return None # Return None on any error
# --- Initialize Clients and Models ---
generation_client = initialize_hf_client()
embedding_model = load_local_embedding_model()
# ---
# --- Setup ChromaDB Collection (using Session State) ---
if 'chroma_collection' not in st.session_state:
st.session_state.chroma_collection = None
if embedding_model and generation_client: # Only proceed if models/clients loaded
with st.spinner("Loading and preparing vector database..."):
df = load_dataset_from_hf()
if df is not None and not df.empty:
try:
logging.info("Initializing Ephemeral ChromaDB client...")
chroma_client = chromadb.EphemeralClient() # Use Ephemeral Client
# Delete collection if it somehow exists (unlikely for ephemeral)
try:
chroma_client.delete_collection(name=COLLECTION_NAME)
logging.info(f"Deleted existing collection (if any): {COLLECTION_NAME}")
except: pass
logging.info(f"Creating collection: {COLLECTION_NAME}")
collection_instance = chroma_client.create_collection(
name=COLLECTION_NAME,
metadata={"hnsw:space": "cosine"}
)
logging.info(f"Adding {len(df)} documents to ChromaDB in batches of {ADD_BATCH_SIZE}...")
start_time = time.time()
error_count = 0
num_batches = (len(df) + ADD_BATCH_SIZE - 1) // ADD_BATCH_SIZE
for i in range(num_batches):
start_idx = i * ADD_BATCH_SIZE
end_idx = start_idx + ADD_BATCH_SIZE
batch_df = df.iloc[start_idx:end_idx]
try:
# Prepare and clean metadata for the batch
metadatas_list_raw = batch_df['metadata'].tolist()
cleaned_metadatas = []
for item in metadatas_list_raw:
cleaned_dict = {}
if isinstance(item, dict):
current_meta = item
else:
try: current_meta = json.loads(item) if isinstance(item, str) else {}
except: current_meta = {}
if isinstance(current_meta, dict):
for key, value in current_meta.items():
if value is None: cleaned_dict[key] = ""
elif isinstance(value, (str, int, float, bool)): cleaned_dict[key] = value
else:
try: cleaned_dict[key] = str(value)
except: pass # Skip unconvertible types
cleaned_metadatas.append(cleaned_dict)
# Add the batch
collection_instance.add(
ids=batch_df['id'].tolist(),
embeddings=batch_df['embedding'].tolist(),
documents=batch_df['document'].tolist(),
metadatas=cleaned_metadatas
)
except Exception as e:
logging.error(f"Error adding batch {i+1}/{num_batches} to Chroma: {e}")
error_count += 1
end_time = time.time()
logging.info(f"Finished loading data into ChromaDB. Took {end_time - start_time:.2f} seconds.")
if error_count > 0: logging.warning(f"Encountered errors in {error_count} batches during add.")
final_count = collection_instance.count()
logging.info(f"Final document count in Chroma collection: {final_count}")
if final_count > 0:
st.session_state.chroma_collection = collection_instance
st.success("Vector database loaded successfully!")
else:
st.error("Failed to load documents into the vector database.")
except Exception as setup_e:
st.error(f"Failed to setup ChromaDB: {setup_e}")
logging.exception(f"Failed to setup ChromaDB: {setup_e}")
else:
st.error("Failed to load data from the dataset. Cannot initialize database.")
# Assign collection from session state for use in the app
collection = st.session_state.get('chroma_collection', None)
# ---
# --- Helper Functions ---
def query_hf_inference(prompt, client_instance=None, model_name=HF_GENERATION_MODEL):
"""Sends the prompt to the HF Inference API using the initialized client."""
if not client_instance: client_instance = generation_client
if not client_instance:
logging.error("HF Inference client not initialized.")
return "Error: HF Inference client failed to initialize."
try:
response_text = client_instance.text_generation(prompt, max_new_tokens=MAX_NEW_TOKENS)
if not response_text:
logging.warning(f"Received empty response from HF Inference API ({model_name}).")
return "Error: Received empty response from generation model."
return response_text.strip()
except Exception as e:
logging.exception(f"Error querying HF Inference API ({model_name}): {e}")
return f"Error: An unexpected error occurred while generating the answer using {model_name}."
def generate_query_variations(query, llm_func, model_name=HF_GENERATION_MODEL, num_variations=3):
"""Uses LLM (HF Inference API) to generate alternative phrasings."""
# ... (rest of function remains the same) ...
prompt = f"""Given the user query: "{query}"
Generate {num_variations} alternative phrasings or related queries someone might use to find the same information.
Focus on synonyms, different levels of specificity, and related concepts.
Return ONLY the generated queries, each on a new line, without any preamble or numbering.
Example Query: "who is the digital humanities liaison?"
Example Output:
digital scholarship librarian contact
staff directory digital humanities
Steve Zweibel digital humanities role
Example Query: "when are the next graduation dates?"
Example Output:
graduation deadlines academic calendar
dissertation deposit deadline
commencement schedule
User Query: "{query}"
Output:"""
logging.info(f"Generating query variations for: {query} using {model_name}")
try:
response = llm_func(prompt, model_name=model_name)
if response.startswith("Error:"):
logging.error(f"Query variation generation failed: {response}")
return []
variations = [line.strip() for line in response.split('\n') if line.strip()]
logging.info(f"Generated variations: {variations}")
return variations[:num_variations]
except Exception as e:
logging.error(f"Failed to generate query variations: {e}")
return []
def generate_prompt(query, context_chunks):
"""Generates a prompt for the LLM."""
# ... (function remains the same) ...
context_str = "\n\n".join(context_chunks)
liaison_directory_url = "https://libguides.gc.cuny.edu/directory/subject"
prompt = f"""Based on the following context from the library guides, answer the user's question.
If the context doesn't contain the answer, state that you couldn't find the information in the guides.
If your answer identifies a specific librarian or subject liaison, please also include this link to the main subject liaison directory: {liaison_directory_url}
Context:
---
{context_str}
---
Question: {query}
Answer:"""
return prompt
# --- Streamlit App UI ---
st.title("πŸ“š Ask the Library Guides (Dataset Embed + HF Gen)")
# User input (only proceed if collection is ready)
if collection:
query = st.text_area("Enter your question:", height=100)
else:
st.error("Application initialization failed: Vector database not loaded.")
st.stop()
# --- Routing Prompt Definition ---
ROUTING_PROMPT_TEMPLATE = """You are a query routing assistant for a library chatbot. Your task is to classify the user's query into one of the following categories based on its intent:
Categories:
- RAG: The user is asking a general question about library services, policies, staff, or resources described in the library guides.
- HOURS: The user is asking about the library's opening or closing times, today's hours, or general operating hours.
- RESEARCH_QUERY: The user is asking for help starting research, finding databases/articles on a topic, or general research assistance.
- CATALOG_SEARCH: The user is asking if the library has a specific known item (book, journal title, article) or where to find it.
- ILL_REQUEST: The user is asking about Interlibrary Loan, requesting items not held by the library, or checking ILL status.
- ACCOUNT_INFO: The user is asking about their library account, fines, renewals, or logging in.
- TECH_SUPPORT: The user is reporting a problem with accessing resources, broken links, or other technical issues.
- EVENTS_CALENDAR: The user is asking about upcoming library events, workshops, or the events calendar.
Analyze the user's query below and determine the most appropriate category. Respond with ONLY the category name (RAG, HOURS, RESEARCH_QUERY, CATALOG_SEARCH, ILL_REQUEST, ACCOUNT_INFO, TECH_SUPPORT, or EVENTS_CALENDAR) and nothing else.
Examples:
Query: "who is the comp lit liaison?"
Response: RAG
Query: "how do I find articles on sociology?"
Response: RESEARCH_QUERY
Query: "when does the library close today?"
Response: HOURS
User Query: "{user_query}"
Response:"""
# --- Research Query Prompt Definition ---
RESEARCH_QUERY_PROMPT_TEMPLATE = """Based on the following context from the library guides, answer the user's research question.
1. Suggest 2-3 relevant databases or resources mentioned in the context that could help with their topic. If no specific databases are mentioned, suggest general multidisciplinary ones if appropriate based on the context.
2. Recommend contacting a subject librarian for further, more in-depth assistance.
3. Provide this link to the subject liaison directory: https://libguides.gc.cuny.edu/directory/subject
If the context doesn't seem relevant to the question, state that you couldn't find specific database recommendations in the guides but still recommend contacting a librarian using the provided directory link.
Context:
---
{context_str}
---
Question: {query}
Answer:"""
# --- End Prompt Definitions ---
# Only show button and process if collection is loaded
if collection and st.button("Ask"):
if not query:
st.warning("Please enter a question.")
else:
st.markdown("---")
with st.spinner("Routing query..."):
# --- LLM Routing Step ---
logging.info(f"Routing query: {query}")
routing_prompt = ROUTING_PROMPT_TEMPLATE.format(user_query=query)
try:
route_decision = query_hf_inference(routing_prompt).strip().upper()
logging.info(f"LLM (HF API) route decision: {route_decision}")
if route_decision.startswith("ERROR:"):
st.error(f"Routing failed: {route_decision}")
st.stop()
except Exception as e:
logging.error(f"LLM (HF API) routing failed: {e}. Defaulting to RAG.")
route_decision = "RAG"
# --- Handle specific routes ---
if route_decision == "HOURS":
st.info("You can find the current library hours here: [https://gc-cuny.libcal.com/hours](https://gc-cuny.libcal.com/hours)")
st.stop()
# ... (other routes) ...
elif route_decision == "EVENTS_CALENDAR":
events_url = "https://gc-cuny.libcal.com/calendar?cid=15537&t=d&d=0000-00-00&cal=15537&inc=0"
st.info(f"You can find information about upcoming library events and workshops on the calendar here: [{events_url}]({events_url})")
st.stop()
# --- End LLM Routing Step ---
spinner_text = "Thinking... (RAG)" if route_decision != "RESEARCH_QUERY" else "Thinking... (Research Query)"
with st.spinner(spinner_text):
# 1. Generate Query Variations (using HF API)
logging.info(f"Proceeding with retrieval for query (Route: {route_decision}): {query}")
query_variations = generate_query_variations(query, query_hf_inference, HF_GENERATION_MODEL)
all_queries = [query] + query_variations
logging.info(f"--- DIAGNOSTIC: All queries for search: {all_queries}")
# 2. Embed Queries Locally
try:
logging.info(f"Generating query embeddings locally using {LOCAL_EMBEDDING_MODEL}...")
query_embeddings = embedding_model.encode(all_queries).tolist()
logging.info(f"Generated {len(query_embeddings)} query embeddings locally.")
except Exception as e:
st.error(f"Failed to embed query using local model: {e}")
logging.exception(f"Failed to embed query using local model: {e}")
st.stop()
# 3. Vector Search (using pre-computed query embeddings)
vector_results_ids = []
context_chunks = []
context_metadata_list = []
try:
logging.info(f"Performing vector search for {len(query_embeddings)} embeddings...")
# Query ChromaDB using the computed query_embeddings
vector_results = collection.query(
query_embeddings=query_embeddings, # Pass embeddings now
n_results=INITIAL_N_RESULTS,
include=['documents', 'metadatas', 'distances']
)
# Process results (Combine results from variations)
vector_results_best_rank = {}
retrieved_docs_map = {}
retrieved_meta_map = {}
if vector_results and vector_results.get('ids') and any(vector_results['ids']):
total_vector_results = 0
for i, ids_list in enumerate(vector_results['ids']):
if ids_list:
total_vector_results += len(ids_list)
distances_list = vector_results['distances'][i] if vector_results.get('distances') else [float('inf')] * len(ids_list)
docs_list = vector_results['documents'][i] if vector_results.get('documents') else [""] * len(ids_list)
metas_list = vector_results['metadatas'][i] if vector_results.get('metadatas') else [{}] * len(ids_list)
for rank, doc_id in enumerate(ids_list):
distance = distances_list[rank]
if doc_id not in vector_results_best_rank or distance < vector_results_best_rank[doc_id]:
vector_results_best_rank[doc_id] = distance
retrieved_docs_map[doc_id] = docs_list[rank]
retrieved_meta_map[doc_id] = metas_list[rank]
logging.info(f"Vector search retrieved {total_vector_results} total results, {len(vector_results_best_rank)} unique IDs.")
else:
logging.warning("Vector search returned no results.")
# Rank unique results by distance
vector_ranked_ids_for_selection = sorted(vector_results_best_rank.items(), key=lambda item: item[1])
vector_results_ids_list = [doc_id for doc_id, distance in vector_ranked_ids_for_selection]
# --- Selection ---
final_context_ids = []
seen_texts_for_final = set()
ids_to_use_for_final_selection = vector_results_ids_list
logging.info(f"Selecting top {TOP_K} unique results from Vector Search list...")
for doc_id in ids_to_use_for_final_selection:
doc_text = retrieved_docs_map.get(doc_id)
if doc_text and doc_text not in seen_texts_for_final:
seen_texts_for_final.add(doc_text)
final_context_ids.append(doc_id)
if len(final_context_ids) >= TOP_K:
break
elif not doc_text:
logging.warning(f"Document text not found in map for ID {doc_id} during final selection.")
logging.info(f"Selected {len(final_context_ids)} final unique IDs after deduplication.")
# Get final context chunks and metadata
log_chunks = []
for i, doc_id in enumerate(final_context_ids):
chunk_text = retrieved_docs_map.get(doc_id)
chunk_meta = retrieved_meta_map.get(doc_id)
if chunk_text:
context_chunks.append(chunk_text)
context_metadata_list.append(chunk_meta if chunk_meta else {})
log_chunks.append(f"Chunk {i+1} (ID: {doc_id}): '{chunk_text[:70]}...'")
logging.info(f"Selected {len(context_chunks)} unique context chunks for LLM.")
if log_chunks:
logging.info(f"--- DIAGNOSTIC: Final Context Chunks Sent to LLM:\n" + "\n".join(log_chunks))
except Exception as e:
st.error(f"An error occurred during vector search/selection: {e}")
logging.exception("Vector search/selection failed.")
context_chunks = []
# 4. Generate Final Prompt based on Route
if route_decision == "RESEARCH_QUERY":
logging.info("Using RESEARCH_QUERY prompt template.")
final_prompt = RESEARCH_QUERY_PROMPT_TEMPLATE.format(context_str="\n\n".join(context_chunks), query=query)
else: # Default to standard RAG
logging.info("Using standard RAG prompt template.")
final_prompt = generate_prompt(query, context_chunks)
# 5. Query HF Inference API LLM
logging.info(f"Sending final prompt to HF Inference API model: {HF_GENERATION_MODEL}...")
answer = query_hf_inference(final_prompt)
logging.info(f"Received answer from HF Inference API: {answer[:100]}...")
if answer.startswith("Error:"):
st.error(f"Answer generation failed: {answer}")
# 6. Display results
st.subheader("Answer:")
st.markdown(answer)
st.markdown("---")
with st.expander("Retrieved Context"):
if context_chunks:
for i, (chunk, metadata) in enumerate(zip(context_chunks, context_metadata_list)):
st.markdown(f"**Chunk {i+1}:**")
st.text(chunk)
source_url = metadata.get('source_url')
if source_url:
st.markdown(f"Source: [{source_url}]({source_url})")
st.markdown("---")
else:
st.info("No specific context was retrieved from the guides to answer this question.")
# Add instructions or footer
st.sidebar.header("How to Use")
st.sidebar.info(
"1. Ensure your `HUGGING_FACE_HUB_TOKEN` is correctly set as a Space secret (`HF_TOKEN`) or in the `.env` file.\n"
f"2. The app will load pre-computed embeddings from the HF Dataset (`{HF_DATASET_ID}`).\n"
" (Ensure the dataset was created correctly using `export_chroma_to_parquet.py` and `upload_dataset_to_hf.py`)\n"
"3. Enter your question in the text area.\n"
"4. Click 'Ask'."
)
st.sidebar.header("Configuration")
st.sidebar.markdown(f"**Embedding:** Pre-computed (`{LOCAL_EMBEDDING_MODEL}` loaded from HF Dataset)")
st.sidebar.markdown(f"**LLM (HF API):** `{HF_GENERATION_MODEL}`")
st.sidebar.markdown(f"**ChromaDB Collection:** `{COLLECTION_NAME}` (In-Memory)")
st.sidebar.markdown(f"**Retrieval Mode:** Vector Search Only")
st.sidebar.markdown(f"**Final Unique Chunks:** `{TOP_K}` (from initial `{INITIAL_N_RESULTS}` vector search)")