import streamlit as st from langchain.memory.chat_message_histories import StreamlitChatMessageHistory from typing import Any, List, Mapping, Optional from langchain.llms.base import LLM from langchain.callbacks.manager import CallbackManagerForLLMRun import langchain import asyncio from playwright.async_api import async_playwright import time import regex import html2text import os from langchain.agents import initialize_agent from langchain.agents import AgentType from langchain.agents import Tool # from langchain.agents import load_tools from langchain.tools import BaseTool from langchain.memory import ConversationBufferWindowMemory from langchain.prompts import MessagesPlaceholder from langchain.agents import ConversationalChatAgent, AgentExecutor from langchain.callbacks import StreamlitCallbackHandler from langchain.chains import RetrievalQA import pinecone from langchain.vectorstores import Pinecone from langchain.embeddings.huggingface import HuggingFaceEmbeddings from langchain.tools import DuckDuckGoSearchRun from langchain.utilities import WikipediaAPIWrapper import soundfile as sf from transformers import SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan from datasets import load_dataset import torch from langchain.chains import LLMMathChain from interpreter.code_interpreter import CodeInterpreter from pygments import highlight from pygments.lexers import PythonLexer from pygments.formatters import HtmlFormatter import regex import re langchain.debug = True global CurrentAgent CurrentAgent = 'Structured Zero Short Agent' global Audio_output Audio_output = [] def colored_code_block(code: str): return highlight(code, PythonLexer(), HtmlFormatter(style="monokai")) class DB_Search2(BaseTool): name = "Vector Database Search" description = "This is the internal vector database to search information firstly (i.e. engineering data, acronym.)" def _run(self, query: str) -> str: response, source = QAQuery_p(query) # response = "test db_search feedback" return response def _arun(self, query: str): raise NotImplementedError("N/A") pinecone.init( api_key = os.environ["pinecone_api_key"], # environment='asia-southeast1-gcp-free', environment='us-west4-gcp-free', # openapi_config=openapi_config ) # index_name = 'stla-baby' global index_name index_name = 'stla-back' index = pinecone.Index(index_name) # index.delete(delete_all=True, namespace='') print(pinecone.whoami()) print(index.describe_index_stats()) embed_model_id = 'sentence-transformers/all-MiniLM-L6-v2' device = 'cpu' embeddings_miniLM = HuggingFaceEmbeddings( model_name=embed_model_id, model_kwargs={'device': device}, ) # embeddings = embeddings_openai embeddings = embeddings_miniLM global vectordb_p vectordb_p = Pinecone.from_existing_index(index_name, embeddings) def QAQuery_p(question: str): global vectordb_p global agent # global Choice global CurrentAgent # vectordb = Chroma(persist_directory='db', embedding_function=embeddings) retriever = vectordb_p.as_retriever() retriever.search_kwargs['k'] = int(os.environ["search_kwargs_k"]) # retriever.search_kwargs['fetch_k'] = 100 # if agent == agent_ZEROSHOT_REACT_2 or agent == agent_ZEROSHOT_AGENT_2: if 1: print("--------------- QA with Remote --------------") qa = RetrievalQA.from_chain_type(llm=GPTfake, chain_type="stuff", retriever=retriever, return_source_documents = True, verbose = True) else: pass # qa = VectorDBQA.from_chain_type(llm=chat, chain_type="stuff", vectorstore=vectordb, return_source_documents=True) # res = qa.run(question) res = qa({"query": question}) print("-" * 20) # print("Question:", question) # print("Answer:", res) # print("Answer:", res['result']) print("-" * 20) # print("Source:", res['source_documents']) response = res['result'] # response = res['source_documents'] source = res['source_documents'] return response, source Netsearch = DuckDuckGoSearchRun() duckduckgo_tool2 = Tool( name = "Duckduckgo Internet Search", func = Netsearch.run, description = "Useful to search in internet for real-time information and additional information which is not available in other tools" ) Wikipedia = WikipediaAPIWrapper() wikipedia_tool2 = Tool( name = "Wikipedia Search", func = Wikipedia.run, description = "Useful to search a topic, country or person when there is no availble information in vector database" ) def text_to_speech_loc2(Text_input): global Audio_output processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts") model = SpeechT5ForTextToSpeech.from_pretrained("microsoft/speecht5_tts") vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan") inputs = processor(text = Text_input, return_tensors="pt") # load xvector containing speaker's voice characteristics from a dataset embeddings_dataset = load_dataset("Matthijs/cmu-arctic-xvectors", split="validation") speaker_embeddings = torch.tensor(embeddings_dataset[7306]["xvector"]).unsqueeze(0) speech = model.generate_speech(inputs["input_ids"], speaker_embeddings, vocoder=vocoder) print("Type of speech: ", type(speech)) timestr = time.strftime("%Y%m%d-%H%M%S") # sampling_rate = 16000 with open('sample-' + timestr + '.wav', 'wb') as audio: sf.write(audio, speech.numpy(), samplerate=16000) # audio = sf.write("convert1.wav", speech, samplerate=16000) print("audio: ", audio) Audio_output.append(audio.name) return audio print("text to speech2: ", text_to_speech_loc2("Good morning.")) Text2Sound_tool_loc = Tool( name = "Text To Sound API 2", # func = Text2Sound, func = text_to_speech_loc2, description = "Useful when you need to convert text into sound file." ) class GPTRemote(LLM): n: int @property def _llm_type(self) -> str: return "custom" def _call( self, prompt: str, stop: Optional [List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any ) -> str: print("prompt:", prompt) output = asyncio.run(start_playwright(prompt)) if output is None: output = "No Feedback" print("-" * 20) print('Raw: \n', output) keywords = ['Action:', 'Action Input:', 'Observation:', 'Thought:', 'Final Answer:'] if 'Action:' in output and 'Observation:' in output: output = output.split('Observation:')[0] global CurrentAgent if CurrentAgent == 'Structured Zero Short Agent': try: if output.strip()[-1] == '}' and 'Action:' in output: print("valid command") elif 'Action:' in output: output = output + '}' print("corrected command") pattern = r'\{((?:[^{}]|(?R))*)\}' temp = regex.search(pattern, output) rrr = temp.group() output = output.replace(rrr, '```'+ '\n' + rrr + '\n'+'```') except Exception as e: print("model internal error:", e) print("-" * 20) print("Treated output: \n", output) return output @property def _identifying_params(self) -> Mapping[str, Any]: return [("n", self.n)] def treat_output(text): keywords = ['Action:', 'Action Input:', 'Observation:', 'Thought:', 'Final Answer:'] for item in keywords: if item in text: text.replace(item, '\n'+item) print("treat output: ", text) return text GPTfake = GPTRemote(n=0) llm_math_2 = LLMMathChain.from_llm(GPTfake) math_tool_2 = Tool( name ='Calculator', func = llm_math_2.run, description ='Useful for when you need to answer questions about math.' ) class CodeBlock: ''' CodeBlock Class which is able to run in Code Runner ''' def __init__(self, code): self.code = code self.output = "" self.active_line = None def refresh(self): print(f"Active line: {self.active_line}") print(f"Output: {self.output}") def Code_Runner(code_raw: str): # interpreter = CodeInterpreter(language="python", debug_mode=True) global CurrentAgent if CurrentAgent == "Zero Short React 2": code_raw = RemoveIndent(code_raw) if '!pip' in code_raw or 'pip install' in code_raw: try: code_raw=code_raw.replace('!pip', 'pip') except Exception as e: print(e) interpreter = CodeInterpreter(language="shell", debug_mode=True) else: interpreter = CodeInterpreter(language="python", debug_mode=True) # interpreter = CodeInterpreter(language=lang, debug_mode=True) code_block = CodeBlock(code_raw) interpreter.active_block = code_block output = interpreter.run() print("Real Output: \n", output) try: if output.strip() =="" or output == []: output = "It is Done. No Error Found." except Exception as e: print(e) return output def RemoveIndent(code_string, indentation_level=4): lines = code_string.split('\n') corrected_lines = [] for line in lines: if line.strip() == "": continue line_without_indentation = line[indentation_level:] \ if line.startswith(' ' * indentation_level) else line corrected_lines.append(line_without_indentation) corrected_content = '\n'.join(corrected_lines) return corrected_content python_tool3 = Tool( name = "Code Runner", func = Code_Runner, description = """Code Interpreter which is able to run code block in local machine.\n It is capable to treat **any** task by running the code and output the result. (i.e. analyzer data, modify/creat documents, draw diagram/flowchart ...)\n You should input detail code with right indentation.""" ) async def start_playwright(question: str): start_t = time.time() pw = await async_playwright().start() browser = await pw.chromium.launch(headless=True) end_t = time.time() print("Init Browser Done:", end_t - start_t) start_t = end_t page = await browser.new_page() # note all methods are async (use the "await" keyword) await page.goto(os.environ["Endpoint_GPT4"]) # print("Title of Web: ", await page.title()) end_t = time.time() print("New Page Done:", end_t - start_t) start_t = end_t await page.wait_for_timeout(200) await page.locator("//textarea").fill(question) await page.wait_for_timeout(200) await page.locator("//textarea").press("Enter") await page.wait_for_timeout(200) output_history = "NOTHING" for i in range(100): output_text_old = await page.locator("//div[@aria-label='Chat message from assistant']").last.inner_text() html_content = await page.locator("//div[@aria-label='Chat message from assistant']//div[@class='stMarkdown']").last.inner_html() markdown_converter = html2text.HTML2Text() output_text = markdown_converter.handle(html_content) print("output_text... :") if output_text == output_history and '▌' not in output_text and output_text != "": end_t = time.time() print("Output Done:", end_t - start_t) return output_text else: await page.wait_for_timeout(500) output_history = output_text print("-------- Final Answer-----------\n", output_text) await browser.close() PREFIX_3 =""" You are a helpful AI assistant. Your mission is to answer the following user request as best as you can with detail information and explanation. If you are not clear about the request, you can ask user for more details and the confirmation. You can provide additional suggestion to user on the request and ask confirmation from user. When you are clear about the request, you can start to answer the request by **writing a plan** firstly. In general, try to **make plans** with as few steps as possible. When you need information, you can use tools as below and merge all gathered information from different tools. When you need to use "Code Runner" for code running, **Always recap the plan between each code block** (you have extreme short-term memory loss, so you need to recap the plan between each message block to retain it). When you send a message containing code to "Code Runner", it will be executed **on the user's machine**. The user has given you **full and complete permission** to execute any code necessary to complete the task. You have full access to control their computer to help them. Code entered into "Code Runner" will be executed **in the users local environment**. If you want to send data between programming languages, save the data to a txt or json. You can access the internet. Run **any code** to achieve the goal, and if at first you don't succeed, try again and again. You can install new packages with pip. Try to install all necessary packages in one command at the beginning. When a user refers to a filename, they're likely referring to an existing file in the directory you're currently in ("Code Runner" executes on the user's machine). In general, choose packages that have the most universal chance to be already installed and to work across multiple applications. Packages like ffmpeg and pandoc that are well-supported and powerful. Write messages to the user in Markdown. When the final answer has output files, you must output the **name** of the file. You are capable of **any** task. ---\n You have access to the following tools:\n """ msgs = StreamlitChatMessageHistory() global memory3 memory3 = ConversationBufferWindowMemory(chat_memory=msgs, memory_key="chat_history", return_messages=True) # GPTfake("hi") input_variables=["input", "chat_history", "agent_scratchpad"] tools_remote = [DB_Search2(), duckduckgo_tool2, wikipedia_tool2, python_tool3, math_tool_2, Text2Sound_tool_loc] agent_STRUCTURED_ZEROSHOT_REACT = initialize_agent(tools_remote, GPTfake, # agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION, verbose = True, handle_parsing_errors = True, max_iterations = int(os.environ["max_iterations"]), early_stopping_method="generate", memory = memory3, agent_kwargs={ 'prefix': PREFIX_3, # 'format_instructions': FORMAT_INSTRUCTIONS_3, # 'suffix': SUFFIX2, "memory_prompts": [MessagesPlaceholder(variable_name="chat_history")], 'input_variables': input_variables, }, # input_variables = input_variables, # agent_kwargs={ # 'prompt': prompt, # } ) agent = agent_STRUCTURED_ZEROSHOT_REACT def CheckFileinResp(response): Filelist = [] try: pattern = r'sample-(?:\d{8})-(?:\d{6})\.wav' result = re.findall(pattern, response) print("wav file in response:", result) for item in result: Filelist.append(item) except Exception as e: print("No wav found:", e) try: pattern = r"(?i)'?([\w./]*\w+\.(?:pptx|docx|doc|xlsx|txt|png|jpg))'?" result = re.findall(pattern, response) # print("Other file in response:", result) for item in result: if '/' in item: item = item.split('/')[-1] Filelist.append(item) print("Other file in response:", item) except Exception as e: print("No other file found:", e) try: listWord = ['(https://example.com/', '(sandbox:/'] for item in listWord: if item in response: file = response.split(item)[-1].split(")")[0] print("File found:", file) Filelist.append(file) else: continue # return "N/A" except Exception as e: # return "N/A" print("no file with", listWord) return Filelist st.set_page_config( initial_sidebar_state='collapsed', ) st.title("STLA-BABY") # st.sidebar.button("Reset Chat History") if len(msgs.messages) == 0 or st.sidebar.button("Reset Chat History"): msgs.clear() msgs.add_ai_message("How can I help you ?") st.session_state.steps = {} avatars = {"human": "user", "ai": "assistant"} for idx, msg in enumerate(msgs.messages): with st.chat_message(avatars[msg.type]): st.write(msg.content) if prompt := st.chat_input(placeholder="Input Your Request"): st.chat_message("user").write(prompt) with st.chat_message("assistant"): # response = GPTfake(prompt) st_cb = StreamlitCallbackHandler(st.container(), expand_new_thoughts=True, collapse_completed_thoughts=True) response = agent.run(prompt, callbacks=[st_cb]) # st.write(response) try: # temp = response.split("(sandbox:/")[1] # (sandbox:/sample-20230805-0807.wav) file_names = CheckFileinResp(response) print("file_name:", file_names) if file_names != []: for file_name in file_names: if file_name != "": with open(file_name, "rb") as file: btn = st.download_button( label='Download:'+ file_name, data=file, file_name=file_name, ) else: print("No File Found in Response") except Exception as e: print("No need to add file in chatbot:", e) # print("Current Response:", response) # msgs.add_ai_message(response) # print("History:", msgs.messages) st.markdown(response, unsafe_allow_html=True) # st.markdown(colored_code_block(response), unsafe_allow_html=True) # for content in response: # if isinstance(content, str): # st.write(content) # elif isinstance(content, dict) and content.get('type') == 'code': # st.markdown(colored_code_block(content.get('code')), unsafe_allow_html=True)