import streamlit as st import chromadb import logging import sys import json import os from dotenv import load_dotenv from huggingface_hub import InferenceClient import numpy as np import time # Added for embedding delay/timing from tqdm import tqdm # Added for embedding progress # Import ChromaDB's helper for Sentence Transformers import chromadb.utils.embedding_functions as embedding_functions # from sentence_transformers import CrossEncoder # Keep if re-ranking might be used # --- Configuration --- DB_PATH = "./chroma_db" COLLECTION_NAME = "libguides_content" # Must match the embedding script LOCAL_EMBEDDING_MODEL = 'BAAI/bge-m3' # Local model for ChromaDB's function HF_GENERATION_MODEL = "google/gemma-3-27b-it" # HF model for generation INPUT_FILE = 'extracted_content.jsonl' # Source data for embedding EMBEDDING_BATCH_SIZE = 100 # Batch size for adding docs to ChromaDB # CROSS_ENCODER_MODEL_NAME = 'cross-encoder/ms-marco-MiniLM-L-6-v2' # Model for re-ranking (DISABLED) TOP_K = 10 # Number of *final* unique chunks to send to LLM INITIAL_N_RESULTS = 50 # Number of candidates from initial vector search API_RETRY_DELAY = 2 # Delay for generation API if needed MAX_NEW_TOKENS = 512 # Max tokens for HF text generation # --- # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr) # --- Load API Key and Initialize HF Generation Client --- # Wrap client initialization in a cached function to avoid re-initializing on every interaction @st.cache_resource def initialize_hf_client(): generation_client_instance = None try: load_dotenv() # Read HF_TOKEN from environment variable first (for Spaces secrets), fallback to .env HF_TOKEN = os.getenv('HF_TOKEN') or os.getenv('HUGGING_FACE_HUB_TOKEN') if not HF_TOKEN: logging.error("HF_TOKEN or HUGGING_FACE_HUB_TOKEN not found in environment variables or .env file.") st.error("🔴 Hugging Face Token not found. Please set it as a Space secret named HF_TOKEN or in the .env file as HUGGING_FACE_HUB_TOKEN.") st.stop() # Stop execution if token is missing else: generation_client_instance = InferenceClient(model=HF_GENERATION_MODEL, token=HF_TOKEN) logging.info(f"Initialized HF Inference Client for generation ({HF_GENERATION_MODEL}).") return generation_client_instance except Exception as e: logging.exception("Error initializing Hugging Face Inference Client for generation.") st.error(f"🔴 Error initializing Hugging Face Inference Client: {e}") st.stop() # Stop execution on error return None # Should not be reached if st.stop() works generation_client = initialize_hf_client() # --- # --- Embedding Function Definition (Needed for DB creation) --- # This part is similar to embed_and_store_local_chroma_ef.py # Cache the embedding function definition as well @st.cache_resource def get_embedding_function(): logging.info(f"Defining embedding function for model: {LOCAL_EMBEDDING_MODEL}") try: import torch device = 'cuda' if torch.cuda.is_available() else 'cpu' logging.info(f"Using device: {device}") except ImportError: device = 'cpu' logging.info("Torch not found, using device: cpu") try: ef = embedding_functions.SentenceTransformerEmbeddingFunction( model_name=LOCAL_EMBEDDING_MODEL, device=device, trust_remote_code=True ) logging.info("Embedding function defined.") return ef except Exception as e: st.error(f"Failed to initialize embedding function ({LOCAL_EMBEDDING_MODEL}): {e}") logging.exception(f"Failed to initialize embedding function: {e}") return None # --- Function to Create and Populate DB --- # This integrates logic from embed_and_store_local_chroma_ef.py # Use a simple flag file to check if initialization was done in this session/container lifetime INIT_FLAG_FILE = os.path.join(DB_PATH, ".initialized") def initialize_database(): # Check if DB exists and is initialized (using flag file for ephemeral systems) if os.path.exists(INIT_FLAG_FILE): logging.info("Initialization flag file found. Assuming DB is ready.") return True # Check if DB path exists but maybe wasn't fully initialized db_exists = os.path.exists(DB_PATH) and os.listdir(DB_PATH) if db_exists and not os.path.exists(INIT_FLAG_FILE): logging.warning("DB path exists but initialization flag not found. Re-initializing.") # Optionally, could try loading collection here and return True if successful # For simplicity, we'll just re-initialize fully if flag is missing st.warning(f"ChromaDB not found or needs initialization at {DB_PATH}. Initializing and embedding data... This may take a while.") logging.info(f"Database not found or needs initialization. Running embedding process...") try: ef = get_embedding_function() if not ef: return False # Stop if embedding function failed # Load Data logging.info(f"Loading data from {INPUT_FILE}...") if not os.path.exists(INPUT_FILE): st.error(f"Source data file '{INPUT_FILE}' not found. Cannot create database.") logging.error(f"Source data file '{INPUT_FILE}' not found.") return False documents = [] metadatas = [] ids = [] with open(INPUT_FILE, 'r', encoding='utf-8') as f: progress_bar = st.progress(0, text="Loading data...") lines = f.readlines() for i, line in enumerate(lines): try: data = json.loads(line) text = data.get('text') if not text: continue documents.append(text) metadata = data.get('metadata', {}) if not isinstance(metadata, dict): metadata = {} metadatas.append(metadata) ids.append(f"doc_{i}") except Exception as e: logging.warning(f"Error processing line {i+1}: {e}") progress_bar.progress((i + 1) / len(lines), text=f"Loading data... {i+1}/{len(lines)}") progress_bar.empty() logging.info(f"Loaded {len(documents)} valid documents.") if not documents: st.error("No valid documents loaded from source file.") logging.error("No valid documents loaded.") return False # Setup Vector DB logging.info(f"Initializing ChromaDB client at path: {DB_PATH}") chroma_client = chromadb.PersistentClient(path=DB_PATH) try: chroma_client.delete_collection(name=COLLECTION_NAME) logging.info(f"Deleted existing collection (if any): {COLLECTION_NAME}") except Exception: pass logging.info(f"Creating new collection '{COLLECTION_NAME}' with embedding function.") collection = chroma_client.create_collection( name=COLLECTION_NAME, embedding_function=ef, metadata={"hnsw:space": "cosine"} ) logging.info(f"Created new collection '{COLLECTION_NAME}'.") # Add Documents in Batches logging.info(f"Adding documents to ChromaDB (ChromaDB will embed)...") start_time = time.time() total_added = 0 error_count = 0 num_batches = (len(documents) + EMBEDDING_BATCH_SIZE - 1) // EMBEDDING_BATCH_SIZE progress_bar = st.progress(0, text="Embedding documents (this takes time)...") for i in range(num_batches): start_idx = i * EMBEDDING_BATCH_SIZE end_idx = start_idx + EMBEDDING_BATCH_SIZE batch_docs = documents[start_idx:end_idx] batch_metadatas = metadatas[start_idx:end_idx] batch_ids = ids[start_idx:end_idx] try: collection.add(documents=batch_docs, metadatas=batch_metadatas, ids=batch_ids) total_added += len(batch_ids) except Exception as e: logging.error(f"Error adding batch starting at index {start_idx}: {e}") error_count += 1 progress_bar.progress((i + 1) / num_batches, text=f"Embedding documents... Batch {i+1}/{num_batches}") progress_bar.empty() end_time = time.time() logging.info(f"Finished adding documents process.") logging.info(f"Successfully added {total_added} documents to ChromaDB.") if error_count > 0: logging.warning(f"Encountered errors in {error_count} batches during add.") logging.info(f"Document adding took {end_time - start_time:.2f} seconds.") # Create flag file on success os.makedirs(DB_PATH, exist_ok=True) with open(INIT_FLAG_FILE, 'w') as f: f.write('initialized') st.success(f"Database initialized successfully with {total_added} documents.") return True except Exception as e: st.error(f"Failed to initialize database: {e}") logging.exception(f"An unexpected error occurred during database initialization: {e}") return False # --- Caching Functions --- # Modified to depend on successful DB initialization @st.cache_resource def load_chromadb_collection(): if not initialize_database(): st.error("Database initialization failed. Cannot load collection.") st.stop() logging.info(f"Attempting to load ChromaDB collection: {COLLECTION_NAME}") try: _client = chromadb.PersistentClient(path=DB_PATH) collection = _client.get_collection(name=COLLECTION_NAME) logging.info(f"Collection '{COLLECTION_NAME}' loaded successfully.") return collection except Exception as e: st.error(f"Failed to load ChromaDB collection '{COLLECTION_NAME}' after initialization attempt: {e}") logging.error(f"Failed to load ChromaDB collection after initialization attempt: {e}") return None # --- Helper Functions --- def query_hf_inference(prompt, client_instance=None, model_name=HF_GENERATION_MODEL): """Sends the prompt to the HF Inference API using the initialized client.""" if not client_instance: client_instance = generation_client if not client_instance: logging.error("HF Inference client not initialized in query_hf_inference.") return "Error: HF Inference client failed to initialize." try: response_text = client_instance.text_generation( prompt, max_new_tokens=MAX_NEW_TOKENS, ) if not response_text: logging.warning(f"Received empty response from HF Inference API ({model_name}) for prompt: {prompt[:100]}...") return "Error: Received empty response from generation model." return response_text.strip() except Exception as e: logging.exception(f"An unexpected error occurred while querying HF Inference API ({model_name}): {e}") return f"Error: An unexpected error occurred while generating the answer using {model_name}." def generate_query_variations(query, llm_func, model_name=HF_GENERATION_MODEL, num_variations=3): """Uses LLM (HF Inference API) to generate alternative phrasings.""" prompt = f"""Given the user query: "{query}" Generate {num_variations} alternative phrasings or related queries someone might use to find the same information. Focus on synonyms, different levels of specificity, and related concepts. Return ONLY the generated queries, each on a new line, without any preamble or numbering. Example Query: "who is the digital humanities liaison?" Example Output: digital scholarship librarian contact staff directory digital humanities Steve Zweibel digital humanities role Example Query: "when are the next graduation dates?" Example Output: graduation deadlines academic calendar dissertation deposit deadline commencement schedule User Query: "{query}" Output:""" logging.info(f"Generating query variations for: {query} using {model_name}") try: response = llm_func(prompt, model_name=model_name) if response.startswith("Error:"): logging.error(f"Query variation generation failed: {response}") return [] variations = [line.strip() for line in response.split('\n') if line.strip()] logging.info(f"Generated variations: {variations}") return variations[:num_variations] except Exception as e: logging.error(f"Failed to generate query variations: {e}") return [] def generate_prompt(query, context_chunks): """Generates a prompt for the LLM.""" context_str = "\n\n".join(context_chunks) liaison_directory_url = "https://libguides.gc.cuny.edu/directory/subject" prompt = f"""Based on the following context from the library guides, answer the user's question. If the context doesn't contain the answer, state that you couldn't find the information in the guides. If your answer identifies a specific librarian or subject liaison, please also include this link to the main subject liaison directory: {liaison_directory_url} Context: --- {context_str} --- Question: {query} Answer:""" return prompt # --- Streamlit App UI --- st.set_page_config(layout="wide") st.title("📚 Ask the Library Guides (Local Embed + HF Gen)") # Load resources (this now includes the initialization check) collection = load_chromadb_collection() # User input (only proceed if collection loaded) if collection: query = st.text_area("Enter your question:", height=100) else: st.error("Application cannot proceed: Failed to load or initialize ChromaDB collection.") st.stop() # Stop if collection failed to load # --- Routing Prompt Definition --- ROUTING_PROMPT_TEMPLATE = """You are a query routing assistant for a library chatbot. Your task is to classify the user's query into one of the following categories based on its intent: Categories: - RAG: The user is asking a general question about library services, policies, staff, or resources described in the library guides. - HOURS: The user is asking about the library's opening or closing times, today's hours, or general operating hours. - RESEARCH_QUERY: The user is asking for help starting research, finding databases/articles on a topic, or general research assistance. - CATALOG_SEARCH: The user is asking if the library has a specific known item (book, journal title, article) or where to find it. - ILL_REQUEST: The user is asking about Interlibrary Loan, requesting items not held by the library, or checking ILL status. - ACCOUNT_INFO: The user is asking about their library account, fines, renewals, or logging in. - TECH_SUPPORT: The user is reporting a problem with accessing resources, broken links, or other technical issues. - EVENTS_CALENDAR: The user is asking about upcoming library events, workshops, or the events calendar. Analyze the user's query below and determine the most appropriate category. Respond with ONLY the category name (RAG, HOURS, RESEARCH_QUERY, CATALOG_SEARCH, ILL_REQUEST, ACCOUNT_INFO, TECH_SUPPORT, or EVENTS_CALENDAR) and nothing else. Examples: Query: "who is the comp lit liaison?" Response: RAG Query: "how do I find articles on sociology?" Response: RESEARCH_QUERY Query: "when does the library close today?" Response: HOURS User Query: "{user_query}" Response:""" # --- Research Query Prompt Definition --- RESEARCH_QUERY_PROMPT_TEMPLATE = """Based on the following context from the library guides, answer the user's research question. 1. Suggest 2-3 relevant databases or resources mentioned in the context that could help with their topic. If no specific databases are mentioned, suggest general multidisciplinary ones if appropriate based on the context. 2. Recommend contacting a subject librarian for further, more in-depth assistance. 3. Provide this link to the subject liaison directory: https://libguides.gc.cuny.edu/directory/subject If the context doesn't seem relevant to the question, state that you couldn't find specific database recommendations in the guides but still recommend contacting a librarian using the provided directory link. Context: --- {context_str} --- Question: {query} Answer:""" # --- End Prompt Definitions --- # Only show button and process if collection is loaded if collection and st.button("Ask"): if not query: st.warning("Please enter a question.") else: st.markdown("---") with st.spinner("Routing query..."): # --- LLM Routing Step --- logging.info(f"Routing query: {query}") routing_prompt = ROUTING_PROMPT_TEMPLATE.format(user_query=query) try: route_decision = query_hf_inference(routing_prompt).strip().upper() logging.info(f"LLM (HF API) route decision: {route_decision}") if route_decision.startswith("ERROR:"): st.error(f"Routing failed: {route_decision}") st.stop() except Exception as e: logging.error(f"LLM (HF API) routing failed: {e}. Defaulting to RAG.") route_decision = "RAG" # --- Handle specific routes --- if route_decision == "HOURS": st.info("You can find the current library hours here: [https://gc-cuny.libcal.com/hours](https://gc-cuny.libcal.com/hours)") st.stop() elif route_decision == "CATALOG_SEARCH": catalog_url = "https://cuny-gc.primo.exlibrisgroup.com/discovery/search?vid=01CUNY_GC:CUNY_GC" st.info(f"To check for specific books, journals, or articles, please search the library catalog directly here: [{catalog_url}]({catalog_url})") st.stop() elif route_decision == "ILL_REQUEST": ill_url = "https://ezproxy.gc.cuny.edu/login?url=https://gc-cuny.illiad.oclc.org/illiad/illiad.dll" st.info(f"For Interlibrary Loan requests or questions, please use the ILL system here: [{ill_url}]({ill_url})") st.stop() elif route_decision == "ACCOUNT_INFO": account_url = "https://cuny-gc.primo.exlibrisgroup.com/discovery/account?vid=01CUNY_GC:CUNY_GC§ion=overview" st.info(f"To manage your library account (renewals, fines, etc.), please log in here: [{account_url}]({account_url})") st.stop() elif route_decision == "TECH_SUPPORT": support_url = "https://docs.google.com/forms/d/e/1FAIpQLSdF3a-Au-jIYRDN-mxU3MpZSANQJWFx0VEN2if01iRucIXsZA/viewform" st.info(f"To report a problem with accessing e-resources or other technical issues, please use this form: [{support_url}]({support_url})") st.stop() elif route_decision == "EVENTS_CALENDAR": events_url = "https://gc-cuny.libcal.com/calendar?cid=15537&t=d&d=0000-00-00&cal=15537&inc=0" st.info(f"You can find information about upcoming library events and workshops on the calendar here: [{events_url}]({events_url})") st.stop() # --- End LLM Routing Step --- spinner_text = "Thinking... (RAG)" if route_decision != "RESEARCH_QUERY" else "Thinking... (Research Query)" with st.spinner(spinner_text): # 1. Generate Query Variations (using HF API) logging.info(f"Proceeding with retrieval for query (Route: {route_decision}): {query}") query_variations = generate_query_variations(query, query_hf_inference, HF_GENERATION_MODEL) all_queries = [query] + query_variations logging.info(f"--- DIAGNOSTIC: All queries for search: {all_queries}") # 2. Vector Search (ChromaDB handles query embedding internally) vector_results_ids = [] context_chunks = [] context_metadata_list = [] try: logging.info(f"Performing vector search for {len(all_queries)} queries (ChromaDB will embed)...") # Query ChromaDB using query_texts - it uses the collection's embedding function vector_results = collection.query( query_texts=all_queries, # Pass texts, not embeddings n_results=INITIAL_N_RESULTS, include=['documents', 'metadatas', 'distances'] ) # Process results (Combine results from variations) vector_results_best_rank = {} retrieved_docs_map = {} retrieved_meta_map = {} if vector_results and vector_results.get('ids') and any(vector_results['ids']): total_vector_results = 0 for i, ids_list in enumerate(vector_results['ids']): if ids_list: total_vector_results += len(ids_list) distances_list = vector_results['distances'][i] if vector_results.get('distances') else [float('inf')] * len(ids_list) docs_list = vector_results['documents'][i] if vector_results.get('documents') else [""] * len(ids_list) metas_list = vector_results['metadatas'][i] if vector_results.get('metadatas') else [{}] * len(ids_list) for rank, doc_id in enumerate(ids_list): distance = distances_list[rank] if doc_id not in vector_results_best_rank or distance < vector_results_best_rank[doc_id]: vector_results_best_rank[doc_id] = distance retrieved_docs_map[doc_id] = docs_list[rank] retrieved_meta_map[doc_id] = metas_list[rank] logging.info(f"Vector search retrieved {total_vector_results} total results, {len(vector_results_best_rank)} unique IDs.") else: logging.warning("Vector search returned no results.") # Rank unique results by distance vector_ranked_ids_for_selection = sorted(vector_results_best_rank.items(), key=lambda item: item[1]) vector_results_ids_list = [doc_id for doc_id, distance in vector_ranked_ids_for_selection] # --- Selection --- final_context_ids = [] seen_texts_for_final = set() ids_to_use_for_final_selection = vector_results_ids_list logging.info(f"Selecting top {TOP_K} unique results from Vector Search list...") for doc_id in ids_to_use_for_final_selection: doc_text = retrieved_docs_map.get(doc_id) if doc_text and doc_text not in seen_texts_for_final: seen_texts_for_final.add(doc_text) final_context_ids.append(doc_id) if len(final_context_ids) >= TOP_K: break elif not doc_text: logging.warning(f"Document text not found in map for ID {doc_id} during final selection.") logging.info(f"Selected {len(final_context_ids)} final unique IDs after deduplication.") # Get final context chunks and metadata log_chunks = [] for i, doc_id in enumerate(final_context_ids): chunk_text = retrieved_docs_map.get(doc_id) chunk_meta = retrieved_meta_map.get(doc_id) if chunk_text: context_chunks.append(chunk_text) context_metadata_list.append(chunk_meta if chunk_meta else {}) log_chunks.append(f"Chunk {i+1} (ID: {doc_id}): '{chunk_text[:70]}...'") logging.info(f"Selected {len(context_chunks)} unique context chunks for LLM.") if log_chunks: logging.info(f"--- DIAGNOSTIC: Final Context Chunks Sent to LLM:\n" + "\n".join(log_chunks)) except Exception as e: st.error(f"An error occurred during vector search/selection: {e}") logging.exception("Vector search/selection failed.") context_chunks = [] # 3. Generate Final Prompt based on Route if route_decision == "RESEARCH_QUERY": logging.info("Using RESEARCH_QUERY prompt template.") final_prompt = RESEARCH_QUERY_PROMPT_TEMPLATE.format(context_str="\n\n".join(context_chunks), query=query) else: # Default to standard RAG logging.info("Using standard RAG prompt template.") final_prompt = generate_prompt(query, context_chunks) # 4. Query HF Inference API LLM logging.info(f"Sending final prompt to HF Inference API model: {HF_GENERATION_MODEL}...") answer = query_hf_inference(final_prompt) logging.info(f"Received answer from HF Inference API: {answer[:100]}...") if answer.startswith("Error:"): st.error(f"Answer generation failed: {answer}") # 5. Display results st.subheader("Answer:") st.markdown(answer) st.markdown("---") with st.expander("Retrieved Context"): if context_chunks: for i, (chunk, metadata) in enumerate(zip(context_chunks, context_metadata_list)): st.markdown(f"**Chunk {i+1}:**") st.text(chunk) source_url = metadata.get('source_url') if source_url: st.markdown(f"Source: [{source_url}]({source_url})") st.markdown("---") else: st.info("No specific context was retrieved from the guides to answer this question.") # Add instructions or footer st.sidebar.header("How to Use") st.sidebar.info( "1. Ensure your `HUGGING_FACE_HUB_TOKEN` is correctly set as a Space secret (`HF_TOKEN`) or in the `.env` file.\n" f"2. The app will automatically create/embed the database using `{LOCAL_EMBEDDING_MODEL}` on first run if needed (requires `{INPUT_FILE}` to be present).\n" "3. Enter your question in the text area.\n" "4. Click 'Ask'." ) st.sidebar.header("Configuration") st.sidebar.markdown(f"**Embedding:** Local (`{LOCAL_EMBEDDING_MODEL}` via ChromaDB)") st.sidebar.markdown(f"**LLM (HF API):** `{HF_GENERATION_MODEL}`") st.sidebar.markdown(f"**ChromaDB Collection:** `{COLLECTION_NAME}`") st.sidebar.markdown(f"**Retrieval Mode:** Vector Search Only") st.sidebar.markdown(f"**Final Unique Chunks:** `{TOP_K}` (from initial `{INITIAL_N_RESULTS}` vector search)")