|
import os |
|
import gradio as gr |
|
import google.generativeai as genai |
|
from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings |
|
from langchain_community.document_loaders import PyPDFLoader |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain_community.vectorstores import FAISS |
|
from langchain.prompts import PromptTemplate |
|
from langchain.chains import LLMChain |
|
from datetime import datetime |
|
import pytz |
|
import time |
|
import shutil |
|
|
|
|
|
google_api_key = os.environ.get("GOOGLE_API_KEY") |
|
|
|
if not google_api_key: |
|
raise ValueError("GOOGLE_API_KEY not found in environment variables. Please add it to Hugging Face Space secrets.") |
|
|
|
|
|
genai.configure(api_key=google_api_key) |
|
|
|
|
|
def get_current_datetime(): |
|
|
|
utc_now = datetime.now(pytz.UTC) |
|
|
|
|
|
ist_timezone = pytz.timezone('Asia/Kolkata') |
|
ist_now = utc_now.astimezone(ist_timezone) |
|
|
|
|
|
formatted_date = ist_now.strftime("%B %d, %Y") |
|
formatted_time = ist_now.strftime("%I:%M:%S %p") |
|
|
|
return formatted_date, formatted_time |
|
|
|
|
|
def initialize_retriever(): |
|
try: |
|
|
|
current_dir = os.getcwd() |
|
print(f"Current working directory: {current_dir}") |
|
|
|
|
|
print(f"Files in directory: {os.listdir(current_dir)}") |
|
|
|
|
|
pdf_path = os.path.join(current_dir, "Team1.pdf") |
|
print(f"Attempting to load PDF from: {pdf_path}") |
|
|
|
|
|
if not os.path.exists(pdf_path): |
|
raise FileNotFoundError(f"The file {pdf_path} does not exist") |
|
|
|
|
|
loader = PyPDFLoader(pdf_path) |
|
documents = loader.load() |
|
|
|
print(f"Successfully loaded {len(documents)} pages from the PDF") |
|
|
|
|
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150) |
|
text_chunks = text_splitter.split_documents(documents) |
|
print(f"Split into {len(text_chunks)} text chunks") |
|
|
|
|
|
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") |
|
|
|
|
|
vectorstore = FAISS.from_documents(text_chunks, embeddings) |
|
print("Successfully created vector store") |
|
return vectorstore.as_retriever(search_kwargs={"k": 10}) |
|
|
|
except Exception as e: |
|
print(f"Error in initialize_retriever: {str(e)}") |
|
|
|
class DummyRetriever: |
|
def get_relevant_documents(self, query): |
|
return [] |
|
|
|
print("Returning dummy retriever due to error") |
|
return DummyRetriever() |
|
|
|
|
|
def get_llm(): |
|
try: |
|
return ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0) |
|
except Exception as e: |
|
print(f"Error initializing LLM: {str(e)}") |
|
return None |
|
|
|
llm = get_llm() |
|
|
|
|
|
def rag_query(query, retriever): |
|
if retriever is None: |
|
return "Error: Could not initialize document retriever. Please check if Team1.pdf exists." |
|
|
|
|
|
current_date, current_time = get_current_datetime() |
|
|
|
try: |
|
|
|
docs = retriever.get_relevant_documents(query) |
|
|
|
if not docs: |
|
return "No relevant information found in the document. Try a general query instead." |
|
|
|
|
|
context = "\n".join([doc.page_content for doc in docs]) |
|
prompt = f"""Context:\n{context} |
|
Current Date: {current_date} |
|
Current Time: {current_time} |
|
Question: {query} |
|
Answer directly and concisely, using the current date and time information if relevant:""" |
|
|
|
response = llm.invoke(prompt) |
|
return response.content |
|
except Exception as e: |
|
return f"Error in RAG processing: {str(e)}" |
|
|
|
|
|
def general_query(query): |
|
if llm is None: |
|
return "Error: Could not initialize language model. Please check your API key." |
|
|
|
|
|
current_date, current_time = get_current_datetime() |
|
|
|
try: |
|
|
|
prompt_template = """Current Date: {date} |
|
Current Time: {time} |
|
Answer the following query, using the current date and time information if relevant: {query}""" |
|
|
|
prompt = PromptTemplate.from_template(prompt_template) |
|
|
|
|
|
chain = LLMChain(llm=llm, prompt=prompt) |
|
|
|
|
|
response = chain.run(date=current_date, time=current_time, query=query) |
|
return response |
|
|
|
except Exception as e: |
|
return f"Error in general query: {str(e)}" |
|
|
|
|
|
def file_not_found_message(): |
|
return ("The Team1.pdf file could not be found. Team Query mode will not work properly. " |
|
"Please ensure the PDF is correctly uploaded to the Hugging Face Space.") |
|
|
|
|
|
def query_router(query, method, retriever): |
|
if method == "Team Query": |
|
if isinstance(retriever, type) or retriever is None: |
|
return file_not_found_message() |
|
return rag_query(query, retriever) |
|
elif method == "General Query": |
|
return general_query(query) |
|
return "Invalid selection!" |
|
|
|
|
|
def reset_query_field(): |
|
return "", "" |
|
|
|
|
|
def update_datetime(): |
|
date, time = get_current_datetime() |
|
return date, time |
|
|
|
|
|
|
|
def main(): |
|
|
|
print("Initializing retriever...") |
|
retriever = initialize_retriever() |
|
|
|
|
|
|
|
logo_path = "Equinix-LOGO.jpeg" |
|
|
|
|
|
|
|
|
|
custom_css = """ |
|
.gradio-container { |
|
background-color: #f0f0f0; |
|
text-align: center; |
|
} |
|
#logo img { |
|
display: block; |
|
margin: 0 auto; |
|
max-width: 200px; /* Adjust size */ |
|
} |
|
/* Hide download buttons and controls */ |
|
.download-button { |
|
display: none !important; |
|
} |
|
/* Hide other download options */ |
|
.file-preview .download { |
|
display: none !important; |
|
} |
|
/* Hide the three dots menu that might contain download options */ |
|
.icon-button.secondary { |
|
display: none !important; |
|
} |
|
""" |
|
|
|
|
|
|
|
|
|
with gr.Blocks(css=custom_css) as demo: |
|
gr.Image(logo_path, elem_id="logo", show_label=False, height=100, width=400,show_download_button=False) |
|
|
|
|
|
|
|
|
|
gr.Markdown("<h1 style='text-align: center; color: black;'>Equinix Chatbot for Automation Team</h1>") |
|
|
|
|
|
with gr.Row(elem_classes="datetime-display"): |
|
date_display = gr.Textbox(label="Date", interactive=False) |
|
time_display = gr.Textbox(label="Time", interactive=False) |
|
|
|
|
|
refresh_btn = gr.Button("Update Date & Time") |
|
refresh_btn.click(fn=update_datetime, inputs=[], outputs=[date_display, time_display]) |
|
|
|
gr.Markdown("<p style='text-align: center; color: black;'>Ask me anything!</p>") |
|
|
|
|
|
with gr.Row(): |
|
query_input = gr.Textbox(label="Enter your query") |
|
query_method = gr.Dropdown(["Team Query", "General Query"], label="Select Query Type", value="Team Query") |
|
|
|
|
|
output_box = gr.Textbox(label="Response", interactive=False) |
|
|
|
|
|
with gr.Row(): |
|
submit_button = gr.Button("Submit") |
|
reset_button = gr.Button("Reset Query") |
|
|
|
|
|
submit_button.click( |
|
lambda query, method: query_router(query, method, retriever), |
|
inputs=[query_input, query_method], |
|
outputs=output_box |
|
) |
|
|
|
|
|
reset_button.click(reset_query_field, inputs=[], outputs=[query_input, output_box]) |
|
|
|
|
|
submit_button.click( |
|
fn=update_datetime, |
|
inputs=[], |
|
outputs=[date_display, time_display] |
|
) |
|
|
|
|
|
date_val, time_val = get_current_datetime() |
|
date_display.value = date_val |
|
time_display.value = time_val |
|
|
|
|
|
demo.launch(share=True) |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|
|
|
|
|