LibraryRAG / app.py
Zwounds's picture
Upload app.py
8dadc91 verified
raw
history blame
26.9 kB
import streamlit as st
import chromadb
import logging
import sys
import json
import os
from dotenv import load_dotenv
from huggingface_hub import InferenceClient
import numpy as np
import time # Added for embedding delay/timing
from tqdm import tqdm # Added for embedding progress
# Import ChromaDB's helper for Sentence Transformers
import chromadb.utils.embedding_functions as embedding_functions
# from sentence_transformers import CrossEncoder # Keep if re-ranking might be used
# --- Configuration ---
DB_PATH = "./chroma_db"
COLLECTION_NAME = "libguides_content" # Must match the embedding script
LOCAL_EMBEDDING_MODEL = 'BAAI/bge-m3' # Local model for ChromaDB's function
HF_GENERATION_MODEL = "google/gemma-3-27b-it" # HF model for generation
INPUT_FILE = 'extracted_content.jsonl' # Source data for embedding
EMBEDDING_BATCH_SIZE = 100 # Batch size for adding docs to ChromaDB
# CROSS_ENCODER_MODEL_NAME = 'cross-encoder/ms-marco-MiniLM-L-6-v2' # Model for re-ranking (DISABLED)
TOP_K = 10 # Number of *final* unique chunks to send to LLM
INITIAL_N_RESULTS = 50 # Number of candidates from initial vector search
API_RETRY_DELAY = 2 # Delay for generation API if needed
MAX_NEW_TOKENS = 512 # Max tokens for HF text generation
# ---
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr)
# --- Load API Key and Initialize HF Generation Client ---
# Wrap client initialization in a cached function to avoid re-initializing on every interaction
@st.cache_resource
def initialize_hf_client():
generation_client_instance = None
try:
load_dotenv()
# Read HF_TOKEN from environment variable first (for Spaces secrets), fallback to .env
HF_TOKEN = os.getenv('HF_TOKEN') or os.getenv('HUGGING_FACE_HUB_TOKEN')
if not HF_TOKEN:
logging.error("HF_TOKEN or HUGGING_FACE_HUB_TOKEN not found in environment variables or .env file.")
st.error("πŸ”΄ Hugging Face Token not found. Please set it as a Space secret named HF_TOKEN or in the .env file as HUGGING_FACE_HUB_TOKEN.")
st.stop() # Stop execution if token is missing
else:
generation_client_instance = InferenceClient(model=HF_GENERATION_MODEL, token=HF_TOKEN)
logging.info(f"Initialized HF Inference Client for generation ({HF_GENERATION_MODEL}).")
return generation_client_instance
except Exception as e:
logging.exception("Error initializing Hugging Face Inference Client for generation.")
st.error(f"πŸ”΄ Error initializing Hugging Face Inference Client: {e}")
st.stop() # Stop execution on error
return None # Should not be reached if st.stop() works
generation_client = initialize_hf_client()
# ---
# --- Embedding Function Definition (Needed for DB creation) ---
# This part is similar to embed_and_store_local_chroma_ef.py
# Cache the embedding function definition as well
@st.cache_resource
def get_embedding_function():
logging.info(f"Defining embedding function for model: {LOCAL_EMBEDDING_MODEL}")
try:
import torch
device = 'cuda' if torch.cuda.is_available() else 'cpu'
logging.info(f"Using device: {device}")
except ImportError:
device = 'cpu'
logging.info("Torch not found, using device: cpu")
try:
ef = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name=LOCAL_EMBEDDING_MODEL,
device=device,
trust_remote_code=True
)
logging.info("Embedding function defined.")
return ef
except Exception as e:
st.error(f"Failed to initialize embedding function ({LOCAL_EMBEDDING_MODEL}): {e}")
logging.exception(f"Failed to initialize embedding function: {e}")
return None
# --- Function to Create and Populate DB ---
# This integrates logic from embed_and_store_local_chroma_ef.py
# Use a simple flag file to check if initialization was done in this session/container lifetime
INIT_FLAG_FILE = os.path.join(DB_PATH, ".initialized")
def initialize_database():
# Check if DB exists and is initialized (using flag file for ephemeral systems)
if os.path.exists(INIT_FLAG_FILE):
logging.info("Initialization flag file found. Assuming DB is ready.")
return True
# Check if DB path exists but maybe wasn't fully initialized
db_exists = os.path.exists(DB_PATH) and os.listdir(DB_PATH)
if db_exists and not os.path.exists(INIT_FLAG_FILE):
logging.warning("DB path exists but initialization flag not found. Re-initializing.")
# Optionally, could try loading collection here and return True if successful
# For simplicity, we'll just re-initialize fully if flag is missing
st.warning(f"ChromaDB not found or needs initialization at {DB_PATH}. Initializing and embedding data... This may take a while.")
logging.info(f"Database not found or needs initialization. Running embedding process...")
try:
ef = get_embedding_function()
if not ef: return False # Stop if embedding function failed
# Load Data
logging.info(f"Loading data from {INPUT_FILE}...")
if not os.path.exists(INPUT_FILE):
st.error(f"Source data file '{INPUT_FILE}' not found. Cannot create database.")
logging.error(f"Source data file '{INPUT_FILE}' not found.")
return False
documents = []
metadatas = []
ids = []
with open(INPUT_FILE, 'r', encoding='utf-8') as f:
progress_bar = st.progress(0, text="Loading data...")
lines = f.readlines()
for i, line in enumerate(lines):
try:
data = json.loads(line)
text = data.get('text')
if not text: continue
documents.append(text)
metadata = data.get('metadata', {})
if not isinstance(metadata, dict): metadata = {}
metadatas.append(metadata)
ids.append(f"doc_{i}")
except Exception as e:
logging.warning(f"Error processing line {i+1}: {e}")
progress_bar.progress((i + 1) / len(lines), text=f"Loading data... {i+1}/{len(lines)}")
progress_bar.empty()
logging.info(f"Loaded {len(documents)} valid documents.")
if not documents:
st.error("No valid documents loaded from source file.")
logging.error("No valid documents loaded.")
return False
# Setup Vector DB
logging.info(f"Initializing ChromaDB client at path: {DB_PATH}")
chroma_client = chromadb.PersistentClient(path=DB_PATH)
try:
chroma_client.delete_collection(name=COLLECTION_NAME)
logging.info(f"Deleted existing collection (if any): {COLLECTION_NAME}")
except Exception: pass
logging.info(f"Creating new collection '{COLLECTION_NAME}' with embedding function.")
collection = chroma_client.create_collection(
name=COLLECTION_NAME,
embedding_function=ef,
metadata={"hnsw:space": "cosine"}
)
logging.info(f"Created new collection '{COLLECTION_NAME}'.")
# Add Documents in Batches
logging.info(f"Adding documents to ChromaDB (ChromaDB will embed)...")
start_time = time.time()
total_added = 0
error_count = 0
num_batches = (len(documents) + EMBEDDING_BATCH_SIZE - 1) // EMBEDDING_BATCH_SIZE
progress_bar = st.progress(0, text="Embedding documents (this takes time)...")
for i in range(num_batches):
start_idx = i * EMBEDDING_BATCH_SIZE
end_idx = start_idx + EMBEDDING_BATCH_SIZE
batch_docs = documents[start_idx:end_idx]
batch_metadatas = metadatas[start_idx:end_idx]
batch_ids = ids[start_idx:end_idx]
try:
collection.add(documents=batch_docs, metadatas=batch_metadatas, ids=batch_ids)
total_added += len(batch_ids)
except Exception as e:
logging.error(f"Error adding batch starting at index {start_idx}: {e}")
error_count += 1
progress_bar.progress((i + 1) / num_batches, text=f"Embedding documents... Batch {i+1}/{num_batches}")
progress_bar.empty()
end_time = time.time()
logging.info(f"Finished adding documents process.")
logging.info(f"Successfully added {total_added} documents to ChromaDB.")
if error_count > 0:
logging.warning(f"Encountered errors in {error_count} batches during add.")
logging.info(f"Document adding took {end_time - start_time:.2f} seconds.")
# Create flag file on success
os.makedirs(DB_PATH, exist_ok=True)
with open(INIT_FLAG_FILE, 'w') as f:
f.write('initialized')
st.success(f"Database initialized successfully with {total_added} documents.")
return True
except Exception as e:
st.error(f"Failed to initialize database: {e}")
logging.exception(f"An unexpected error occurred during database initialization: {e}")
return False
# --- Caching Functions ---
# Modified to depend on successful DB initialization
@st.cache_resource
def load_chromadb_collection():
if not initialize_database():
st.error("Database initialization failed. Cannot load collection.")
st.stop()
logging.info(f"Attempting to load ChromaDB collection: {COLLECTION_NAME}")
try:
_client = chromadb.PersistentClient(path=DB_PATH)
collection = _client.get_collection(name=COLLECTION_NAME)
logging.info(f"Collection '{COLLECTION_NAME}' loaded successfully.")
return collection
except Exception as e:
st.error(f"Failed to load ChromaDB collection '{COLLECTION_NAME}' after initialization attempt: {e}")
logging.error(f"Failed to load ChromaDB collection after initialization attempt: {e}")
return None
# --- Helper Functions ---
def query_hf_inference(prompt, client_instance=None, model_name=HF_GENERATION_MODEL):
"""Sends the prompt to the HF Inference API using the initialized client."""
if not client_instance:
client_instance = generation_client
if not client_instance:
logging.error("HF Inference client not initialized in query_hf_inference.")
return "Error: HF Inference client failed to initialize."
try:
response_text = client_instance.text_generation(
prompt,
max_new_tokens=MAX_NEW_TOKENS,
)
if not response_text:
logging.warning(f"Received empty response from HF Inference API ({model_name}) for prompt: {prompt[:100]}...")
return "Error: Received empty response from generation model."
return response_text.strip()
except Exception as e:
logging.exception(f"An unexpected error occurred while querying HF Inference API ({model_name}): {e}")
return f"Error: An unexpected error occurred while generating the answer using {model_name}."
def generate_query_variations(query, llm_func, model_name=HF_GENERATION_MODEL, num_variations=3):
"""Uses LLM (HF Inference API) to generate alternative phrasings."""
prompt = f"""Given the user query: "{query}"
Generate {num_variations} alternative phrasings or related queries someone might use to find the same information.
Focus on synonyms, different levels of specificity, and related concepts.
Return ONLY the generated queries, each on a new line, without any preamble or numbering.
Example Query: "who is the digital humanities liaison?"
Example Output:
digital scholarship librarian contact
staff directory digital humanities
Steve Zweibel digital humanities role
Example Query: "when are the next graduation dates?"
Example Output:
graduation deadlines academic calendar
dissertation deposit deadline
commencement schedule
User Query: "{query}"
Output:"""
logging.info(f"Generating query variations for: {query} using {model_name}")
try:
response = llm_func(prompt, model_name=model_name)
if response.startswith("Error:"):
logging.error(f"Query variation generation failed: {response}")
return []
variations = [line.strip() for line in response.split('\n') if line.strip()]
logging.info(f"Generated variations: {variations}")
return variations[:num_variations]
except Exception as e:
logging.error(f"Failed to generate query variations: {e}")
return []
def generate_prompt(query, context_chunks):
"""Generates a prompt for the LLM."""
context_str = "\n\n".join(context_chunks)
liaison_directory_url = "https://libguides.gc.cuny.edu/directory/subject"
prompt = f"""Based on the following context from the library guides, answer the user's question.
If the context doesn't contain the answer, state that you couldn't find the information in the guides.
If your answer identifies a specific librarian or subject liaison, please also include this link to the main subject liaison directory: {liaison_directory_url}
Context:
---
{context_str}
---
Question: {query}
Answer:"""
return prompt
# --- Streamlit App UI ---
st.set_page_config(layout="wide")
st.title("πŸ“š Ask the Library Guides (Local Embed + HF Gen)")
# Load resources (this now includes the initialization check)
collection = load_chromadb_collection()
# User input (only proceed if collection loaded)
if collection:
query = st.text_area("Enter your question:", height=100)
else:
st.error("Application cannot proceed: Failed to load or initialize ChromaDB collection.")
st.stop() # Stop if collection failed to load
# --- Routing Prompt Definition ---
ROUTING_PROMPT_TEMPLATE = """You are a query routing assistant for a library chatbot. Your task is to classify the user's query into one of the following categories based on its intent:
Categories:
- RAG: The user is asking a general question about library services, policies, staff, or resources described in the library guides.
- HOURS: The user is asking about the library's opening or closing times, today's hours, or general operating hours.
- RESEARCH_QUERY: The user is asking for help starting research, finding databases/articles on a topic, or general research assistance.
- CATALOG_SEARCH: The user is asking if the library has a specific known item (book, journal title, article) or where to find it.
- ILL_REQUEST: The user is asking about Interlibrary Loan, requesting items not held by the library, or checking ILL status.
- ACCOUNT_INFO: The user is asking about their library account, fines, renewals, or logging in.
- TECH_SUPPORT: The user is reporting a problem with accessing resources, broken links, or other technical issues.
- EVENTS_CALENDAR: The user is asking about upcoming library events, workshops, or the events calendar.
Analyze the user's query below and determine the most appropriate category. Respond with ONLY the category name (RAG, HOURS, RESEARCH_QUERY, CATALOG_SEARCH, ILL_REQUEST, ACCOUNT_INFO, TECH_SUPPORT, or EVENTS_CALENDAR) and nothing else.
Examples:
Query: "who is the comp lit liaison?"
Response: RAG
Query: "how do I find articles on sociology?"
Response: RESEARCH_QUERY
Query: "when does the library close today?"
Response: HOURS
User Query: "{user_query}"
Response:"""
# --- Research Query Prompt Definition ---
RESEARCH_QUERY_PROMPT_TEMPLATE = """Based on the following context from the library guides, answer the user's research question.
1. Suggest 2-3 relevant databases or resources mentioned in the context that could help with their topic. If no specific databases are mentioned, suggest general multidisciplinary ones if appropriate based on the context.
2. Recommend contacting a subject librarian for further, more in-depth assistance.
3. Provide this link to the subject liaison directory: https://libguides.gc.cuny.edu/directory/subject
If the context doesn't seem relevant to the question, state that you couldn't find specific database recommendations in the guides but still recommend contacting a librarian using the provided directory link.
Context:
---
{context_str}
---
Question: {query}
Answer:"""
# --- End Prompt Definitions ---
# Only show button and process if collection is loaded
if collection and st.button("Ask"):
if not query:
st.warning("Please enter a question.")
else:
st.markdown("---")
with st.spinner("Routing query..."):
# --- LLM Routing Step ---
logging.info(f"Routing query: {query}")
routing_prompt = ROUTING_PROMPT_TEMPLATE.format(user_query=query)
try:
route_decision = query_hf_inference(routing_prompt).strip().upper()
logging.info(f"LLM (HF API) route decision: {route_decision}")
if route_decision.startswith("ERROR:"):
st.error(f"Routing failed: {route_decision}")
st.stop()
except Exception as e:
logging.error(f"LLM (HF API) routing failed: {e}. Defaulting to RAG.")
route_decision = "RAG"
# --- Handle specific routes ---
if route_decision == "HOURS":
st.info("You can find the current library hours here: [https://gc-cuny.libcal.com/hours](https://gc-cuny.libcal.com/hours)")
st.stop()
elif route_decision == "CATALOG_SEARCH":
catalog_url = "https://cuny-gc.primo.exlibrisgroup.com/discovery/search?vid=01CUNY_GC:CUNY_GC"
st.info(f"To check for specific books, journals, or articles, please search the library catalog directly here: [{catalog_url}]({catalog_url})")
st.stop()
elif route_decision == "ILL_REQUEST":
ill_url = "https://ezproxy.gc.cuny.edu/login?url=https://gc-cuny.illiad.oclc.org/illiad/illiad.dll"
st.info(f"For Interlibrary Loan requests or questions, please use the ILL system here: [{ill_url}]({ill_url})")
st.stop()
elif route_decision == "ACCOUNT_INFO":
account_url = "https://cuny-gc.primo.exlibrisgroup.com/discovery/account?vid=01CUNY_GC:CUNY_GC&section=overview"
st.info(f"To manage your library account (renewals, fines, etc.), please log in here: [{account_url}]({account_url})")
st.stop()
elif route_decision == "TECH_SUPPORT":
support_url = "https://docs.google.com/forms/d/e/1FAIpQLSdF3a-Au-jIYRDN-mxU3MpZSANQJWFx0VEN2if01iRucIXsZA/viewform"
st.info(f"To report a problem with accessing e-resources or other technical issues, please use this form: [{support_url}]({support_url})")
st.stop()
elif route_decision == "EVENTS_CALENDAR":
events_url = "https://gc-cuny.libcal.com/calendar?cid=15537&t=d&d=0000-00-00&cal=15537&inc=0"
st.info(f"You can find information about upcoming library events and workshops on the calendar here: [{events_url}]({events_url})")
st.stop()
# --- End LLM Routing Step ---
spinner_text = "Thinking... (RAG)" if route_decision != "RESEARCH_QUERY" else "Thinking... (Research Query)"
with st.spinner(spinner_text):
# 1. Generate Query Variations (using HF API)
logging.info(f"Proceeding with retrieval for query (Route: {route_decision}): {query}")
query_variations = generate_query_variations(query, query_hf_inference, HF_GENERATION_MODEL)
all_queries = [query] + query_variations
logging.info(f"--- DIAGNOSTIC: All queries for search: {all_queries}")
# 2. Vector Search (ChromaDB handles query embedding internally)
vector_results_ids = []
context_chunks = []
context_metadata_list = []
try:
logging.info(f"Performing vector search for {len(all_queries)} queries (ChromaDB will embed)...")
# Query ChromaDB using query_texts - it uses the collection's embedding function
vector_results = collection.query(
query_texts=all_queries, # Pass texts, not embeddings
n_results=INITIAL_N_RESULTS,
include=['documents', 'metadatas', 'distances']
)
# Process results (Combine results from variations)
vector_results_best_rank = {}
retrieved_docs_map = {}
retrieved_meta_map = {}
if vector_results and vector_results.get('ids') and any(vector_results['ids']):
total_vector_results = 0
for i, ids_list in enumerate(vector_results['ids']):
if ids_list:
total_vector_results += len(ids_list)
distances_list = vector_results['distances'][i] if vector_results.get('distances') else [float('inf')] * len(ids_list)
docs_list = vector_results['documents'][i] if vector_results.get('documents') else [""] * len(ids_list)
metas_list = vector_results['metadatas'][i] if vector_results.get('metadatas') else [{}] * len(ids_list)
for rank, doc_id in enumerate(ids_list):
distance = distances_list[rank]
if doc_id not in vector_results_best_rank or distance < vector_results_best_rank[doc_id]:
vector_results_best_rank[doc_id] = distance
retrieved_docs_map[doc_id] = docs_list[rank]
retrieved_meta_map[doc_id] = metas_list[rank]
logging.info(f"Vector search retrieved {total_vector_results} total results, {len(vector_results_best_rank)} unique IDs.")
else:
logging.warning("Vector search returned no results.")
# Rank unique results by distance
vector_ranked_ids_for_selection = sorted(vector_results_best_rank.items(), key=lambda item: item[1])
vector_results_ids_list = [doc_id for doc_id, distance in vector_ranked_ids_for_selection]
# --- Selection ---
final_context_ids = []
seen_texts_for_final = set()
ids_to_use_for_final_selection = vector_results_ids_list
logging.info(f"Selecting top {TOP_K} unique results from Vector Search list...")
for doc_id in ids_to_use_for_final_selection:
doc_text = retrieved_docs_map.get(doc_id)
if doc_text and doc_text not in seen_texts_for_final:
seen_texts_for_final.add(doc_text)
final_context_ids.append(doc_id)
if len(final_context_ids) >= TOP_K:
break
elif not doc_text:
logging.warning(f"Document text not found in map for ID {doc_id} during final selection.")
logging.info(f"Selected {len(final_context_ids)} final unique IDs after deduplication.")
# Get final context chunks and metadata
log_chunks = []
for i, doc_id in enumerate(final_context_ids):
chunk_text = retrieved_docs_map.get(doc_id)
chunk_meta = retrieved_meta_map.get(doc_id)
if chunk_text:
context_chunks.append(chunk_text)
context_metadata_list.append(chunk_meta if chunk_meta else {})
log_chunks.append(f"Chunk {i+1} (ID: {doc_id}): '{chunk_text[:70]}...'")
logging.info(f"Selected {len(context_chunks)} unique context chunks for LLM.")
if log_chunks:
logging.info(f"--- DIAGNOSTIC: Final Context Chunks Sent to LLM:\n" + "\n".join(log_chunks))
except Exception as e:
st.error(f"An error occurred during vector search/selection: {e}")
logging.exception("Vector search/selection failed.")
context_chunks = []
# 3. Generate Final Prompt based on Route
if route_decision == "RESEARCH_QUERY":
logging.info("Using RESEARCH_QUERY prompt template.")
final_prompt = RESEARCH_QUERY_PROMPT_TEMPLATE.format(context_str="\n\n".join(context_chunks), query=query)
else: # Default to standard RAG
logging.info("Using standard RAG prompt template.")
final_prompt = generate_prompt(query, context_chunks)
# 4. Query HF Inference API LLM
logging.info(f"Sending final prompt to HF Inference API model: {HF_GENERATION_MODEL}...")
answer = query_hf_inference(final_prompt)
logging.info(f"Received answer from HF Inference API: {answer[:100]}...")
if answer.startswith("Error:"):
st.error(f"Answer generation failed: {answer}")
# 5. Display results
st.subheader("Answer:")
st.markdown(answer)
st.markdown("---")
with st.expander("Retrieved Context"):
if context_chunks:
for i, (chunk, metadata) in enumerate(zip(context_chunks, context_metadata_list)):
st.markdown(f"**Chunk {i+1}:**")
st.text(chunk)
source_url = metadata.get('source_url')
if source_url:
st.markdown(f"Source: [{source_url}]({source_url})")
st.markdown("---")
else:
st.info("No specific context was retrieved from the guides to answer this question.")
# Add instructions or footer
st.sidebar.header("How to Use")
st.sidebar.info(
"1. Ensure your `HUGGING_FACE_HUB_TOKEN` is correctly set as a Space secret (`HF_TOKEN`) or in the `.env` file.\n"
f"2. The app will automatically create/embed the database using `{LOCAL_EMBEDDING_MODEL}` on first run if needed (requires `{INPUT_FILE}` to be present).\n"
"3. Enter your question in the text area.\n"
"4. Click 'Ask'."
)
st.sidebar.header("Configuration")
st.sidebar.markdown(f"**Embedding:** Local (`{LOCAL_EMBEDDING_MODEL}` via ChromaDB)")
st.sidebar.markdown(f"**LLM (HF API):** `{HF_GENERATION_MODEL}`")
st.sidebar.markdown(f"**ChromaDB Collection:** `{COLLECTION_NAME}`")
st.sidebar.markdown(f"**Retrieval Mode:** Vector Search Only")
st.sidebar.markdown(f"**Final Unique Chunks:** `{TOP_K}` (from initial `{INITIAL_N_RESULTS}` vector search)")