import os import gradio as gr import google.generativeai as genai from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings from langchain_community.document_loaders import PyPDFLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import FAISS from langchain.prompts import PromptTemplate from langchain.chains import LLMChain from datetime import datetime import pytz import time import shutil # Get API key from Hugging Face Spaces secrets google_api_key = os.environ.get("GOOGLE_API_KEY") if not google_api_key: raise ValueError("GOOGLE_API_KEY not found in environment variables. Please add it to Hugging Face Space secrets.") # Configure Google Generative AI genai.configure(api_key=google_api_key) # Function to get current date and time def get_current_datetime(): # Using UTC as default, but you can change to any timezone utc_now = datetime.now(pytz.UTC) # Convert to IST (Indian Standard Time) - modify as needed ist_timezone = pytz.timezone('Asia/Kolkata') ist_now = utc_now.astimezone(ist_timezone) # Format the datetime formatted_date = ist_now.strftime("%B %d, %Y") formatted_time = ist_now.strftime("%I:%M:%S %p") return formatted_date, formatted_time # Load PDF and create vector store def initialize_retriever(): try: # Get current directory current_dir = os.getcwd() print(f"Current working directory: {current_dir}") # List files in current directory for debugging print(f"Files in directory: {os.listdir(current_dir)}") # Use absolute path for the PDF pdf_path = os.path.join(current_dir, "RAG.pdf") print(f"Attempting to load PDF from: {pdf_path}") # Check if file exists if not os.path.exists(pdf_path): raise FileNotFoundError(f"The file {pdf_path} does not exist") # Load PDF loader = PyPDFLoader(pdf_path) documents = loader.load() print(f"Successfully loaded {len(documents)} pages from the PDF") # Split text into chunks text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150) text_chunks = text_splitter.split_documents(documents) print(f"Split into {len(text_chunks)} text chunks") # Generate embeddings embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") # Store embeddings in FAISS index vectorstore = FAISS.from_documents(text_chunks, embeddings) print("Successfully created vector store") return vectorstore.as_retriever(search_kwargs={"k": 10}) except Exception as e: print(f"Error in initialize_retriever: {str(e)}") # Return a dummy retriever for graceful failure class DummyRetriever: def get_relevant_documents(self, query): return [] print("Returning dummy retriever due to error") return DummyRetriever() # Initialize LLM def get_llm(): try: return ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0) except Exception as e: print(f"Error initializing LLM: {str(e)}") return None llm = get_llm() # RAG query function def rag_query(query, retriever): if retriever is None: return "Error: Could not initialize document retriever. Please check if Team1.pdf exists." # Get current date and time for context current_date, current_time = get_current_datetime() try: # Retrieve relevant documents docs = retriever.get_relevant_documents(query) if not docs: return "No relevant information found in the document. Try a general query instead." # Create context from retrieved documents context = "\n".join([doc.page_content for doc in docs]) prompt = f"""Context:\n{context} Current Date: {current_date} Current Time: {current_time} Question: {query} Answer directly and concisely, using the current date and time information if relevant:""" response = llm.invoke(prompt) return response.content except Exception as e: return f"Error in RAG processing: {str(e)}" # General query function def general_query(query): if llm is None: return "Error: Could not initialize language model. Please check your API key." # Get current date and time for context current_date, current_time = get_current_datetime() try: # Define the prompt with date and time context prompt_template = """Current Date: {date} Current Time: {time} Answer the following query, using the current date and time information if relevant: {query}""" prompt = PromptTemplate.from_template(prompt_template) # Create an LLM Chain chain = LLMChain(llm=llm, prompt=prompt) # Run chatbot and get response response = chain.run(date=current_date, time=current_time, query=query) return response except Exception as e: return f"Error in general query: {str(e)}" # Function to handle the case when no PDF is found def file_not_found_message(): return ("The Team1.pdf file could not be found. Team Query mode will not work properly. " "Please ensure the PDF is correctly uploaded to the Hugging Face Space.") # Query router function def query_router(query, method, retriever): if method == "Team Query": if isinstance(retriever, type) or retriever is None: return file_not_found_message() return rag_query(query, retriever) elif method == "General Query": return general_query(query) return "Invalid selection!" # Function to reset input and output def reset_query_field(): return "","" # Reset only the query input # Function to update the clock def update_datetime(): date, time = get_current_datetime() return date, time # Debug function to check logo file availability def check_logo_file(logo_path): print(f"Looking for logo at: {os.path.abspath(logo_path)}") print(f"Logo file exists: {os.path.exists(logo_path)}") print(f"Files in directory: {os.listdir()}") return os.path.exists(logo_path) # In your main function, before creating the UI: # Ensure the static directory exists os.makedirs("static", exist_ok=True) # Copy the logo file to the static directory (if it's not already there) src_logo = "Equinix-LOGO.jpeg" dst_logo = os.path.join("static", "Equinix-LOGO.jpeg") if os.path.exists(src_logo) and not os.path.exists(dst_logo): shutil.copy(src_logo, dst_logo) print(f"Copied logo to {dst_logo}") # Main function to create and launch the Gradio interface def main(): # Initialize retriever print("Initializing retriever...") retriever = initialize_retriever() # Define custom CSS to style the logo and header custom_css = """ #header-container { width: 100%; background-color: black; padding: 15px 0; text-align: center; margin-bottom: 20px; } #logo-image { max-height: 80px; margin: 0 auto; display: block; } /* Hide download buttons and controls */ #logo-wrapper .svelte-1b6r3t8 { display: none !important; } .gradio-container .wrap.svelte-19hvt5v { display: none !important; } /* Hide any interactive elements in the header */ #header-container button, #header-container .gr-button { display: none !important; } /* Make date-time display more prominent */ .datetime-display input { font-size: 16px; text-align: center; font-weight: bold; } """ logo_path = "Equinix-LOGO.jpeg" # Make sure this file exists in the Space # Then in your UI code: with gr.Row(elem_id="header-container"): with gr.Column(): if os.path.exists(dst_logo): gr.HTML(f"""
Equinix Logo
""") else: # Fallback to text logo gr.HTML("""
■■■■
EQUINIX
""") # Title & Description gr.Markdown("

Equinix Chatbot for Automation Team

") # Date and Time Display with gr.Row(elem_classes="datetime-display"): date_display = gr.Textbox(label="Date", interactive=False) time_display = gr.Textbox(label="Time", interactive=False) # Update date and time initially date_val, time_val = get_current_datetime() date_display.value = date_val time_display.value = time_val # Add refresh button for time refresh_btn = gr.Button("Update Date & Time") refresh_btn.click(fn=update_datetime, inputs=[], outputs=[date_display, time_display]) gr.Markdown("

Ask me anything!

") # Input & Dropdown Section with gr.Row(): query_input = gr.Textbox(label="Enter your query") query_method = gr.Dropdown(["Team Query", "General Query"], label="Select Query Type", value="Team Query") # Output Textbox output_box = gr.Textbox(label="Response", interactive=False) # Buttons Section with gr.Row(): submit_button = gr.Button("Submit") reset_button = gr.Button("Reset Query") # Button Click Events submit_button.click( lambda query, method: query_router(query, method, retriever), inputs=[query_input, query_method], outputs=output_box ) # Reset only the query input reset_button.click(reset_query_field, inputs=[], outputs=[query_input,output_box]) # Update date and time on submission submit_button.click( fn=update_datetime, inputs=[], outputs=[date_display, time_display] ) # Launch UI ui.launch(share=True) if __name__ == "__main__": main()