Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import re | |
| import gradio as gr | |
| import pandas as pd | |
| import requests | |
| import random | |
| import urllib.parse | |
| import spacy | |
| import nltk | |
| from nltk.tokenize import word_tokenize, sent_tokenize | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import numpy as np | |
| from nltk.tokenize import sent_tokenize | |
| from typing import List, Dict | |
| from tempfile import NamedTemporaryFile | |
| from typing import List, Dict | |
| from bs4 import BeautifulSoup | |
| from langchain.prompts import PromptTemplate | |
| from langchain.chains import LLMChain | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain_core.output_parsers import StrOutputParser | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.llms import HuggingFaceHub | |
| from langchain_core.documents import Document | |
| huggingface_token = os.environ.get("HUGGINGFACE_TOKEN") | |
| # Download necessary NLTK data | |
| nltk.download('punkt') | |
| nltk.download('averaged_perceptron_tagger') | |
| class ContextDrivenChatbot: | |
| def __init__(self, history_size=5): | |
| self.history = [] | |
| self.history_size = history_size | |
| self.vectorizer = TfidfVectorizer() | |
| nltk.download('punkt', quiet=True) | |
| nltk.download('averaged_perceptron_tagger', quiet=True) | |
| def add_to_history(self, text): | |
| self.history.append(text) | |
| if len(self.history) > self.history_size: | |
| self.history.pop(0) | |
| def get_context(self): | |
| return " ".join(self.history) | |
| def is_follow_up_question(self, question): | |
| tokens = word_tokenize(question.lower()) | |
| follow_up_indicators = set(['it', 'this', 'that', 'these', 'those', 'he', 'she', 'they', 'them']) | |
| return any(token in follow_up_indicators for token in tokens) | |
| def extract_topics(self, text): | |
| tokens = nltk.pos_tag(word_tokenize(text)) | |
| return [word for word, pos in tokens if pos.startswith('NN')] | |
| def get_most_relevant_context(self, question): | |
| if not self.history: | |
| return question | |
| # Create a combined context from history | |
| combined_context = self.get_context() | |
| # Vectorize the context and the question | |
| vectors = self.vectorizer.fit_transform([combined_context, question]) | |
| # Calculate similarity | |
| similarity = cosine_similarity(vectors[0], vectors[1])[0][0] | |
| # If similarity is low, it might be a new topic | |
| if similarity < 0.3: # This threshold can be adjusted | |
| return question | |
| # Otherwise, prepend the context | |
| return f"{combined_context} {question}" | |
| def process_question(self, question): | |
| contextualized_question = self.get_most_relevant_context(question) | |
| # Extract topics from the question | |
| topics = self.extract_topics(question) | |
| # Check if it's a follow-up question | |
| if self.is_follow_up_question(question): | |
| # If it's a follow-up, make sure to include previous context | |
| contextualized_question = f"{self.get_context()} {question}" | |
| # Add the new question to history | |
| self.add_to_history(question) | |
| return contextualized_question, topics | |
| def load_document(file: NamedTemporaryFile) -> List[Document]: | |
| """Loads and splits the document into pages.""" | |
| loader = PyPDFLoader(file.name) | |
| return loader.load_and_split() | |
| def update_vectors(files): | |
| if not files: | |
| return "Please upload at least one PDF file." | |
| embed = get_embeddings() | |
| total_chunks = 0 | |
| all_data = [] | |
| for file in files: | |
| data = load_document(file) | |
| all_data.extend(data) | |
| total_chunks += len(data) | |
| if os.path.exists("faiss_database"): | |
| database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) | |
| database.add_documents(all_data) | |
| else: | |
| database = FAISS.from_documents(all_data, embed) | |
| database.save_local("faiss_database") | |
| return f"Vector store updated successfully. Processed {total_chunks} chunks from {len(files)} files." | |
| def get_embeddings(): | |
| return HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") | |
| def clear_cache(): | |
| if os.path.exists("faiss_database"): | |
| os.remove("faiss_database") | |
| return "Cache cleared successfully." | |
| else: | |
| return "No cache to clear." | |
| def get_model(temperature, top_p, repetition_penalty): | |
| return HuggingFaceHub( | |
| repo_id="mistralai/Mistral-7B-Instruct-v0.3", | |
| model_kwargs={ | |
| "temperature": temperature, | |
| "top_p": top_p, | |
| "repetition_penalty": repetition_penalty, | |
| "max_length": 1000 | |
| }, | |
| huggingfacehub_api_token=huggingface_token | |
| ) | |
| def generate_chunked_response(model, prompt, max_tokens=1000, max_chunks=5): | |
| full_response = "" | |
| for i in range(max_chunks): | |
| try: | |
| chunk = model(prompt + full_response, max_new_tokens=max_tokens) | |
| chunk = chunk.strip() | |
| if chunk.endswith((".", "!", "?")): | |
| full_response += chunk | |
| break | |
| full_response += chunk | |
| except Exception as e: | |
| print(f"Error in generate_chunked_response: {e}") | |
| break | |
| return full_response.strip() | |
| def extract_text_from_webpage(html): | |
| soup = BeautifulSoup(html, 'html.parser') | |
| for script in soup(["script", "style"]): | |
| script.extract() | |
| text = soup.get_text() | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text = '\n'.join(chunk for chunk in chunks if chunk) | |
| return text | |
| _useragent_list = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", | |
| ] | |
| def google_search(term, num_results=5, lang="en", timeout=5, safe="active", ssl_verify=None): | |
| escaped_term = urllib.parse.quote_plus(term) | |
| start = 0 | |
| all_results = [] | |
| max_chars_per_page = 8000 | |
| print(f"Starting Google search for term: '{term}'") | |
| with requests.Session() as session: | |
| while start < num_results: | |
| try: | |
| user_agent = random.choice(_useragent_list) | |
| headers = { | |
| 'User-Agent': user_agent | |
| } | |
| resp = session.get( | |
| url="https://www.google.com/search", | |
| headers=headers, | |
| params={ | |
| "q": term, | |
| "num": num_results - start, | |
| "hl": lang, | |
| "start": start, | |
| "safe": safe, | |
| }, | |
| timeout=timeout, | |
| verify=ssl_verify, | |
| ) | |
| resp.raise_for_status() | |
| print(f"Successfully retrieved search results page (start={start})") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error retrieving search results: {e}") | |
| break | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| result_block = soup.find_all("div", attrs={"class": "g"}) | |
| if not result_block: | |
| print("No results found on this page") | |
| break | |
| print(f"Found {len(result_block)} results on this page") | |
| for result in result_block: | |
| link = result.find("a", href=True) | |
| if link: | |
| link = link["href"] | |
| print(f"Processing link: {link}") | |
| try: | |
| webpage = session.get(link, headers=headers, timeout=timeout) | |
| webpage.raise_for_status() | |
| visible_text = extract_text_from_webpage(webpage.text) | |
| if len(visible_text) > max_chars_per_page: | |
| visible_text = visible_text[:max_chars_per_page] + "..." | |
| all_results.append({"link": link, "text": visible_text}) | |
| print(f"Successfully extracted text from {link}") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error retrieving webpage content: {e}") | |
| all_results.append({"link": link, "text": None}) | |
| else: | |
| print("No link found for this result") | |
| all_results.append({"link": None, "text": None}) | |
| start += len(result_block) | |
| print(f"Search completed. Total results: {len(all_results)}") | |
| if not all_results: | |
| print("No search results found. Returning a default message.") | |
| return [{"link": None, "text": "No information found in the web search results."}] | |
| return all_results | |
| def ask_question(question, temperature, top_p, repetition_penalty, web_search, chatbot): | |
| if not question: | |
| return "Please enter a question." | |
| model = get_model(temperature, top_p, repetition_penalty) | |
| embed = get_embeddings() | |
| if os.path.exists("faiss_database"): | |
| database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) | |
| else: | |
| database = None | |
| max_attempts = 3 | |
| context_reduction_factor = 0.7 | |
| contextualized_question, topics = chatbot.process_question(question) | |
| if web_search: | |
| search_results = google_search(contextualized_question) | |
| all_answers = [] | |
| for attempt in range(max_attempts): | |
| try: | |
| web_docs = [Document(page_content=result["text"], metadata={"source": result["link"]}) for result in search_results if result["text"]] | |
| if database is None: | |
| database = FAISS.from_documents(web_docs, embed) | |
| else: | |
| database.add_documents(web_docs) | |
| database.save_local("faiss_database") | |
| context_str = "\n".join([f"Source: {doc.metadata['source']}\nContent: {doc.page_content}" for doc in web_docs]) | |
| prompt_template = """ | |
| Answer the question based on the following web search results and conversation context: | |
| Web Search Results: | |
| {context} | |
| Conversation Context: {conv_context} | |
| Current Question: {question} | |
| Topics: {topics} | |
| If the web search results don't contain relevant information, state that the information is not available in the search results. | |
| Provide a summarized and direct answer to the question without mentioning the web search or these instructions. | |
| Do not include any source information in your answer. | |
| """ | |
| prompt_val = ChatPromptTemplate.from_template(prompt_template) | |
| formatted_prompt = prompt_val.format( | |
| context=context_str, | |
| conv_context=chatbot.get_context(), | |
| question=question, | |
| topics=", ".join(topics) | |
| ) | |
| full_response = generate_chunked_response(model, formatted_prompt) | |
| answer_patterns = [ | |
| r"Provide a concise and direct answer to the question without mentioning the web search or these instructions:", | |
| r"Provide a concise and direct answer to the question:", | |
| r"Answer:", | |
| r"Provide a summarized and direct answer to the original question without mentioning the web search or these instructions:", | |
| r"Do not include any source information in your answer." | |
| ] | |
| for pattern in answer_patterns: | |
| match = re.split(pattern, full_response, flags=re.IGNORECASE) | |
| if len(match) > 1: | |
| answer = match[-1].strip() | |
| break | |
| else: | |
| answer = full_response.strip() | |
| all_answers.append(answer) | |
| break | |
| except Exception as e: | |
| print(f"Error in ask_question (attempt {attempt + 1}): {e}") | |
| if "Input validation error" in str(e) and attempt < max_attempts - 1: | |
| print(f"Reducing context length for next attempt") | |
| elif attempt == max_attempts - 1: | |
| all_answers.append(f"I apologize, but I'm having trouble processing the query due to its length or complexity.") | |
| answer = "\n\n".join(all_answers) | |
| sources = set(doc.metadata['source'] for doc in web_docs) | |
| sources_section = "\n\nSources:\n" + "\n".join(f"- {source}" for source in sources) | |
| answer += sources_section | |
| return answer | |
| else: | |
| for attempt in range(max_attempts): | |
| try: | |
| if database is None: | |
| return "No documents available. Please upload documents or enable web search to answer questions." | |
| retriever = database.as_retriever() | |
| relevant_docs = retriever.get_relevant_documents(contextualized_question) | |
| context_str = "\n".join([doc.page_content for doc in relevant_docs]) | |
| if attempt > 0: | |
| words = context_str.split() | |
| context_str = " ".join(words[:int(len(words) * context_reduction_factor)]) | |
| prompt_template = """ | |
| Answer the question based on the following context: | |
| Context: | |
| {context} | |
| Current Question: {question} | |
| If the context doesn't contain relevant information, state that the information is not available. | |
| Provide a summarized and direct answer to the question. | |
| Do not include any source information in your answer. | |
| """ | |
| prompt_val = ChatPromptTemplate.from_template(prompt_template) | |
| formatted_prompt = prompt_val.format(context=context_str, question=contextualized_question) | |
| full_response = generate_chunked_response(model, formatted_prompt) | |
| answer_patterns = [ | |
| r"Provide a concise and direct answer to the question without mentioning the web search or these instructions:", | |
| r"Provide a concise and direct answer to the question:", | |
| r"Answer:", | |
| r"Provide a summarized and direct answer to the original question without mentioning the web search or these instructions:", | |
| r"Do not include any source information in your answer." | |
| ] | |
| for pattern in answer_patterns: | |
| match = re.split(pattern, full_response, flags=re.IGNORECASE) | |
| if len(match) > 1: | |
| answer = match[-1].strip() | |
| break | |
| else: | |
| answer = full_response.strip() | |
| return answer | |
| except Exception as e: | |
| print(f"Error in ask_question (attempt {attempt + 1}): {e}") | |
| if "Input validation error" in str(e) and attempt < max_attempts - 1: | |
| print(f"Reducing context length for next attempt") | |
| elif attempt == max_attempts - 1: | |
| return f"I apologize, but I'm having trouble processing your question due to its length or complexity. Could you please try rephrasing it more concisely?" | |
| return "An unexpected error occurred. Please try again later." | |
| # Gradio interface | |
| # Gradio interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Context-Driven Conversational Chatbot") | |
| with gr.Row(): | |
| file_input = gr.Files(label="Upload your PDF documents", file_types=[".pdf"]) | |
| update_button = gr.Button("Upload PDF") | |
| update_output = gr.Textbox(label="Update Status") | |
| update_button.click(update_vectors, inputs=[file_input], outputs=update_output) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| chatbot = gr.Chatbot(label="Conversation") | |
| question_input = gr.Textbox(label="Ask a question") | |
| submit_button = gr.Button("Submit") | |
| with gr.Column(scale=1): | |
| temperature_slider = gr.Slider(label="Temperature", minimum=0.0, maximum=1.0, value=0.5, step=0.1) | |
| top_p_slider = gr.Slider(label="Top P", minimum=0.0, maximum=1.0, value=0.9, step=0.1) | |
| repetition_penalty_slider = gr.Slider(label="Repetition Penalty", minimum=1.0, maximum=2.0, value=1.0, step=0.1) | |
| web_search_checkbox = gr.Checkbox(label="Enable Web Search", value=False) | |
| context_driven_chatbot = ContextDrivenChatbot() | |
| def chat(question, history, temperature, top_p, repetition_penalty, web_search): | |
| answer = ask_question(question, temperature, top_p, repetition_penalty, web_search, context_driven_chatbot) | |
| history.append((question, answer)) | |
| return "", history | |
| submit_button.click(chat, inputs=[question_input, chatbot, temperature_slider, top_p_slider, repetition_penalty_slider, web_search_checkbox], outputs=[question_input, chatbot]) | |
| clear_button = gr.Button("Clear Cache") | |
| clear_output = gr.Textbox(label="Cache Status") | |
| clear_button.click(clear_cache, inputs=[], outputs=clear_output) | |
| if __name__ == "__main__": | |
| demo.launch() | |