Spaces:
Running
Running
| from langchain.text_splitter import CharacterTextSplitter | |
| from langchain.embeddings import OpenAIEmbeddings | |
| from langchain.vectorstores import FAISS | |
| from langchain.chat_models import ChatOpenAI | |
| from langchain.memory import ConversationBufferMemory | |
| from langchain.chains import ConversationChain | |
| from langchain.chains import ConversationalRetrievalChain | |
| from langchain.document_loaders import UnstructuredFileLoader | |
| from typing import List, Dict, Tuple | |
| import gradio as gr | |
| import validators | |
| import requests | |
| import mimetypes | |
| import tempfile | |
| import os | |
| from langchain.chains.question_answering import load_qa_chain | |
| from langchain.llms import OpenAI | |
| from langchain.prompts import PromptTemplate | |
| from langchain.prompts.prompt import PromptTemplate | |
| import pandas as pd | |
| from langchain.agents import create_pandas_dataframe_agent | |
| from langchain.agents import ZeroShotAgent, Tool, AgentExecutor | |
| from langchain.agents.agent_types import AgentType | |
| from langchain.agents import create_csv_agent | |
| from langchain import OpenAI, LLMChain | |
| class ChatDocumentQA: | |
| def __init__(self) -> None: | |
| pass | |
| def _get_empty_state(self) -> Dict[str, None]: | |
| """Create an empty knowledge base.""" | |
| return {"knowledge_base": None} | |
| def _extract_text_from_pdfs(self, file_paths: List[str]) -> List[str]: | |
| """Extract text content from PDF files. | |
| Args: | |
| file_paths (List[str]): List of file paths. | |
| Returns: | |
| List[str]: Extracted text from the PDFs. | |
| """ | |
| docs = [] | |
| loaders = [UnstructuredFileLoader(file_obj, strategy="fast") for file_obj in file_paths] | |
| for loader in loaders: | |
| docs.extend(loader.load()) | |
| return docs | |
| def _get_content_from_url(self, urls: str) -> List[str]: | |
| """Fetch content from given URLs. | |
| Args: | |
| urls (str): Comma-separated URLs. | |
| Returns: | |
| List[str]: List of text content fetched from the URLs. | |
| """ | |
| file_paths = [] | |
| for url in urls.split(','): | |
| if validators.url(url): | |
| headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',} | |
| r = requests.get(url, headers=headers) | |
| if r.status_code != 200: | |
| raise ValueError("Check the url of your file; returned status code %s" % r.status_code) | |
| content_type = r.headers.get("content-type") | |
| file_extension = mimetypes.guess_extension(content_type) | |
| temp_file = tempfile.NamedTemporaryFile(suffix=file_extension, delete=False) | |
| temp_file.write(r.content) | |
| file_paths.append(temp_file.name) | |
| docs = self._extract_text_from_pdfs(file_paths) | |
| return docs | |
| def _split_text_into_chunks(self, text: str) -> List[str]: | |
| """Split text into smaller chunks. | |
| Args: | |
| text (str): Input text to be split. | |
| Returns: | |
| List[str]: List of smaller text chunks. | |
| """ | |
| text_splitter = CharacterTextSplitter(separator="\n", chunk_size=500, chunk_overlap=100, length_function=len) | |
| chunks = text_splitter.split_documents(text) | |
| return chunks | |
| def _create_vector_store_from_text_chunks(self, text_chunks: List[str]) -> FAISS: | |
| """Create a vector store from text chunks. | |
| Args: | |
| text_chunks (List[str]): List of text chunks. | |
| Returns: | |
| FAISS: Vector store created from the text chunks. | |
| """ | |
| embeddings = OpenAIEmbeddings() | |
| return FAISS.from_documents(documents=text_chunks, embedding=embeddings) | |
| def _create_conversation_chain(self,vectorstore): | |
| _template = """Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, in its original language. | |
| Chat History: {chat_history} | |
| Follow Up Input: {question} | |
| Standalone question:""" | |
| CONDENSE_QUESTION_PROMPT = PromptTemplate.from_template(_template) | |
| memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) | |
| # llm = ChatOpenAI(temperature=0) | |
| llm=OpenAI(temperature=0) | |
| return ConversationalRetrievalChain.from_llm(llm=llm, retriever=vectorstore.as_retriever(), | |
| condense_question_prompt=CONDENSE_QUESTION_PROMPT, | |
| memory=memory) | |
| def _get_documents_knowledge_base(self, file_paths: List[str]) -> Tuple[str, Dict[str, FAISS]]: | |
| """Build knowledge base from uploaded files. | |
| Args: | |
| file_paths (List[str]): List of file paths. | |
| Returns: | |
| Tuple[str, Dict]: Tuple containing a status message and the knowledge base. | |
| """ | |
| file_path = file_paths[0].name | |
| file_extension = os.path.splitext(file_path)[1] | |
| if file_extension == '.pdf': | |
| pdf_docs = [file_path.name for file_path in file_paths] | |
| raw_text = self._extract_text_from_pdfs(pdf_docs) | |
| text_chunks = self._split_text_into_chunks(raw_text) | |
| vectorstore = self._create_vector_store_from_text_chunks(text_chunks) | |
| return "file uploaded", {"knowledge_base": vectorstore} | |
| elif file_extension == '.csv': | |
| # agent = self.create_agent(file_path) | |
| # tools = self.get_agent_tools(agent) | |
| # memory,tools,prompt = self.create_memory_for_csv_qa(tools) | |
| # agent_chain = self.create_agent_chain_for_csv_qa(memory,tools,prompt) | |
| agent_chain = create_csv_agent( | |
| OpenAI(temperature=0), | |
| file_path, | |
| verbose=True, | |
| agent_type=AgentType.ZERO_SHOT_REACT_DESCRIPTION, | |
| ) | |
| return "file uploaded", {"knowledge_base": agent_chain} | |
| else: | |
| return "file uploaded", "" | |
| def _get_urls_knowledge_base(self, urls: str) -> Tuple[str, Dict[str, FAISS]]: | |
| """Build knowledge base from URLs. | |
| Args: | |
| urls (str): Comma-separated URLs. | |
| Returns: | |
| Tuple[str, Dict]: Tuple containing a status message and the knowledge base. | |
| """ | |
| webpage_text = self._get_content_from_url(urls) | |
| text_chunks = self._split_text_into_chunks(webpage_text) | |
| vectorstore = self._create_vector_store_from_text_chunks(text_chunks) | |
| return "file uploaded", {"knowledge_base": vectorstore} | |
| #************************ | |
| # csv qa | |
| #************************ | |
| def create_agent(self,file_path): | |
| agent_chain = create_csv_agent( | |
| OpenAI(temperature=0), | |
| file_path, | |
| verbose=True, | |
| agent_type=AgentType.ZERO_SHOT_REACT_DESCRIPTION, | |
| ) | |
| return agent_chain | |
| def get_agent_tools(self,agent): | |
| # search = agent | |
| tools = [ | |
| Tool( | |
| name="dataframe qa", | |
| func=agent.run, | |
| description="useful for when you need to answer questions about table data and dataframe data", | |
| ) | |
| ] | |
| return tools | |
| def create_memory_for_csv_qa(self,tools): | |
| prefix = """Have a conversation with a human, answering the following questions about table data and dataframe data as best you can. You have access to the following tools:""" | |
| suffix = """Begin!" | |
| {chat_history} | |
| Question: {input} | |
| {agent_scratchpad}""" | |
| prompt = ZeroShotAgent.create_prompt( | |
| tools, | |
| prefix=prefix, | |
| suffix=suffix, | |
| input_variables=["input", "chat_history", "agent_scratchpad"], | |
| ) | |
| memory = ConversationBufferMemory(memory_key="chat_history",return_messages=True) | |
| return memory,tools,prompt | |
| def create_agent_chain_for_csv_qa(self,memory,tools,prompt): | |
| llm_chain = LLMChain(llm=OpenAI(temperature=0), prompt=prompt) | |
| agent = ZeroShotAgent(llm_chain=llm_chain, tools=tools, verbose=True) | |
| agent_chain = AgentExecutor.from_agent_and_tools( | |
| agent=agent, tools=tools, verbose=True, memory=memory | |
| ) | |
| return agent_chain | |
| def _get_response(self, message: str, chat_history: List[Tuple[str, str]], state: Dict[str, FAISS],file_paths) -> Tuple[str, List[Tuple[str, str]]]: | |
| """Get a response from the chatbot. | |
| Args: | |
| message (str): User's message/question. | |
| chat_history (List[Tuple[str, str]]): List of chat history as tuples of (user_message, bot_response). | |
| state (dict): State containing the knowledge base. | |
| Returns: | |
| Tuple[str, List[Tuple[str, str]]]: Tuple containing a status message and updated chat history. | |
| """ | |
| try: | |
| if file_paths: | |
| file_path = file_paths[0].name | |
| file_extension = os.path.splitext(file_path)[1] | |
| if file_extension == ".pdf": | |
| vectorstore = state["knowledge_base"] | |
| chat = self._create_conversation_chain(vectorstore) | |
| # user_ques = {"question": message} | |
| print("chat_history",chat_history) | |
| response = chat({"question": message,"chat_history": chat_history}) | |
| chat_history.append((message, response["answer"])) | |
| return "", chat_history | |
| elif file_extension == '.csv': | |
| agent_chain = state["knowledge_base"] | |
| response = agent_chain.run(input = message) | |
| chat_history.append((message, response)) | |
| return "", chat_history | |
| else: | |
| vectorstore = state["knowledge_base"] | |
| chat = self._create_conversation_chain(vectorstore) | |
| # user_ques = {"question": message} | |
| print("chat_history",chat_history) | |
| response = chat({"question": message,"chat_history": chat_history}) | |
| chat_history.append((message, response["answer"])) | |
| return "", chat_history | |
| except: | |
| chat_history.append((message, "Please Upload Document or URL")) | |
| return "", chat_history | |
| def gradio_interface(self) -> None: | |
| """Create a Gradio interface for the chatbot.""" | |
| with gr.Blocks(theme='freddyaboulton/test-blue') as demo: | |
| gr.HTML("""<center class="darkblue" text-align:center;padding:30px;'> | |
| <h1 style="color:#fff"> | |
| Virtual Assistant Chatbot | |
| </h1> | |
| </center>""") | |
| state = gr.State(self._get_empty_state()) | |
| chatbot = gr.Chatbot() | |
| with gr.Row(): | |
| with gr.Column(scale=0.85): | |
| msg = gr.Textbox(label="Question") | |
| with gr.Column(scale=0.15): | |
| file_output = gr.Textbox(label="File Status") | |
| with gr.Row(): | |
| with gr.Column(scale=0.85): | |
| clear = gr.ClearButton([msg, chatbot]) | |
| with gr.Column(scale=0.15): | |
| upload_button = gr.UploadButton( | |
| "Browse File", | |
| file_types=[".txt", ".pdf", ".doc", ".docx", ".csv"], | |
| file_count="multiple", variant="primary" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| input_url = gr.Textbox(label="urls") | |
| input_url.submit(self._get_urls_knowledge_base, input_url, [file_output, state]) | |
| upload_button.upload(self._get_documents_knowledge_base, upload_button, [file_output, state]) | |
| msg.submit(self._get_response, [msg, chatbot, state,upload_button], [msg, chatbot]) | |
| demo.launch() | |
| if __name__ == "__main__": | |
| chatdocumentqa = ChatDocumentQA() | |
| chatdocumentqa.gradio_interface() |