STLA-BABY-S / app.py
OuroborosM's picture
update callback
9e3b7f0
import streamlit as st
from langchain.memory.chat_message_histories import StreamlitChatMessageHistory
from typing import Any, List, Mapping, Optional
from langchain.llms.base import LLM
from langchain.callbacks.manager import CallbackManagerForLLMRun
import langchain
import asyncio
from playwright.async_api import async_playwright
import time
import regex
import html2text
import os
from langchain.agents import initialize_agent
from langchain.agents import AgentType
from langchain.agents import Tool
# from langchain.agents import load_tools
from langchain.tools import BaseTool
from langchain.memory import ConversationBufferWindowMemory
from langchain.prompts import MessagesPlaceholder
from langchain.agents import ConversationalChatAgent, AgentExecutor
from langchain.callbacks import StreamlitCallbackHandler
from langchain.chains import RetrievalQA
import pinecone
from langchain.vectorstores import Pinecone
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
from langchain.tools import DuckDuckGoSearchRun
from langchain.utilities import WikipediaAPIWrapper
import soundfile as sf
from transformers import SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan
from datasets import load_dataset
import torch
from langchain.chains import LLMMathChain
from interpreter.code_interpreter import CodeInterpreter
from pygments import highlight
from pygments.lexers import PythonLexer
from pygments.formatters import HtmlFormatter
import regex
import re
langchain.debug = True
global CurrentAgent
CurrentAgent = 'Structured Zero Short Agent'
global Audio_output
Audio_output = []
def colored_code_block(code: str):
return highlight(code, PythonLexer(), HtmlFormatter(style="monokai"))
class DB_Search2(BaseTool):
name = "Vector Database Search"
description = "This is the internal vector database to search information firstly (i.e. engineering data, acronym.)"
def _run(self, query: str) -> str:
response, source = QAQuery_p(query)
# response = "test db_search feedback"
return response
def _arun(self, query: str):
raise NotImplementedError("N/A")
pinecone.init(
api_key = os.environ["pinecone_api_key"],
# environment='asia-southeast1-gcp-free',
environment='us-west4-gcp-free',
# openapi_config=openapi_config
)
# index_name = 'stla-baby'
global index_name
index_name = 'stla-back'
index = pinecone.Index(index_name)
# index.delete(delete_all=True, namespace='')
print(pinecone.whoami())
print(index.describe_index_stats())
embed_model_id = 'sentence-transformers/all-MiniLM-L6-v2'
device = 'cpu'
embeddings_miniLM = HuggingFaceEmbeddings(
model_name=embed_model_id,
model_kwargs={'device': device},
)
# embeddings = embeddings_openai
embeddings = embeddings_miniLM
global vectordb_p
vectordb_p = Pinecone.from_existing_index(index_name, embeddings)
def QAQuery_p(question: str):
global vectordb_p
global agent
# global Choice
global CurrentAgent
# vectordb = Chroma(persist_directory='db', embedding_function=embeddings)
retriever = vectordb_p.as_retriever()
retriever.search_kwargs['k'] = int(os.environ["search_kwargs_k"])
# retriever.search_kwargs['fetch_k'] = 100
# if agent == agent_ZEROSHOT_REACT_2 or agent == agent_ZEROSHOT_AGENT_2:
if 1:
print("--------------- QA with Remote --------------")
qa = RetrievalQA.from_chain_type(llm=GPTfake, chain_type="stuff",
retriever=retriever, return_source_documents = True,
verbose = True)
else:
pass
# qa = VectorDBQA.from_chain_type(llm=chat, chain_type="stuff", vectorstore=vectordb, return_source_documents=True)
# res = qa.run(question)
res = qa({"query": question})
print("-" * 20)
# print("Question:", question)
# print("Answer:", res)
# print("Answer:", res['result'])
print("-" * 20)
# print("Source:", res['source_documents'])
response = res['result']
# response = res['source_documents']
source = res['source_documents']
return response, source
Netsearch = DuckDuckGoSearchRun()
duckduckgo_tool2 = Tool(
name = "Duckduckgo Internet Search",
func = Netsearch.run,
description = "Useful to search in internet for real-time information and additional information which is not available in other tools"
)
Wikipedia = WikipediaAPIWrapper()
wikipedia_tool2 = Tool(
name = "Wikipedia Search",
func = Wikipedia.run,
description = "Useful to search a topic, country or person when there is no availble information in vector database"
)
def text_to_speech_loc2(Text_input):
global Audio_output
processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts")
model = SpeechT5ForTextToSpeech.from_pretrained("microsoft/speecht5_tts")
vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan")
inputs = processor(text = Text_input, return_tensors="pt")
# load xvector containing speaker's voice characteristics from a dataset
embeddings_dataset = load_dataset("Matthijs/cmu-arctic-xvectors", split="validation")
speaker_embeddings = torch.tensor(embeddings_dataset[7306]["xvector"]).unsqueeze(0)
speech = model.generate_speech(inputs["input_ids"], speaker_embeddings, vocoder=vocoder)
print("Type of speech: ", type(speech))
timestr = time.strftime("%Y%m%d-%H%M%S")
# sampling_rate = 16000
with open('sample-' + timestr + '.wav', 'wb') as audio:
sf.write(audio, speech.numpy(), samplerate=16000)
# audio = sf.write("convert1.wav", speech, samplerate=16000)
print("audio: ", audio)
Audio_output.append(audio.name)
return audio
print("text to speech2: ", text_to_speech_loc2("Good morning."))
Text2Sound_tool_loc = Tool(
name = "Text To Sound API 2",
# func = Text2Sound,
func = text_to_speech_loc2,
description = "Useful when you need to convert text into sound file."
)
class GPTRemote(LLM):
n: int
@property
def _llm_type(self) -> str:
return "custom"
def _call(
self,
prompt: str,
stop: Optional [List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any
) -> str:
print("prompt:", prompt)
output = asyncio.run(start_playwright(prompt))
if output is None:
output = "No Feedback"
print("-" * 20)
print('Raw: \n', output)
keywords = ['Action:', 'Action Input:', 'Observation:', 'Thought:', 'Final Answer:']
if 'Action:' in output and 'Observation:' in output:
output = output.split('Observation:')[0]
global CurrentAgent
if CurrentAgent == 'Structured Zero Short Agent':
try:
if output.strip()[-1] == '}' and 'Action:' in output:
print("valid command")
elif 'Action:' in output:
output = output + '}'
print("corrected command")
pattern = r'\{((?:[^{}]|(?R))*)\}'
temp = regex.search(pattern, output)
rrr = temp.group()
output = output.replace(rrr, '```'+ '\n' + rrr + '\n'+'```')
except Exception as e:
print("model internal error:", e)
print("-" * 20)
print("Treated output: \n", output)
return output
@property
def _identifying_params(self) -> Mapping[str, Any]:
return [("n", self.n)]
def treat_output(text):
keywords = ['Action:', 'Action Input:', 'Observation:', 'Thought:', 'Final Answer:']
for item in keywords:
if item in text:
text.replace(item, '\n'+item)
print("treat output: ", text)
return text
GPTfake = GPTRemote(n=0)
llm_math_2 = LLMMathChain.from_llm(GPTfake)
math_tool_2 = Tool(
name ='Calculator',
func = llm_math_2.run,
description ='Useful for when you need to answer questions about math.'
)
class CodeBlock:
'''
CodeBlock Class which is able to run in Code Runner
'''
def __init__(self, code):
self.code = code
self.output = ""
self.active_line = None
def refresh(self):
print(f"Active line: {self.active_line}")
print(f"Output: {self.output}")
def Code_Runner(code_raw: str):
# interpreter = CodeInterpreter(language="python", debug_mode=True)
global CurrentAgent
if CurrentAgent == "Zero Short React 2":
code_raw = RemoveIndent(code_raw)
if '!pip' in code_raw or 'pip install' in code_raw:
try:
code_raw=code_raw.replace('!pip', 'pip')
except Exception as e:
print(e)
interpreter = CodeInterpreter(language="shell", debug_mode=True)
else:
interpreter = CodeInterpreter(language="python", debug_mode=True)
# interpreter = CodeInterpreter(language=lang, debug_mode=True)
code_block = CodeBlock(code_raw)
interpreter.active_block = code_block
output = interpreter.run()
print("Real Output: \n", output)
try:
if output.strip() =="" or output == []:
output = "It is Done. No Error Found."
except Exception as e:
print(e)
return output
def RemoveIndent(code_string, indentation_level=4):
lines = code_string.split('\n')
corrected_lines = []
for line in lines:
if line.strip() == "":
continue
line_without_indentation = line[indentation_level:] \
if line.startswith(' ' * indentation_level) else line
corrected_lines.append(line_without_indentation)
corrected_content = '\n'.join(corrected_lines)
return corrected_content
python_tool3 = Tool(
name = "Code Runner",
func = Code_Runner,
description = """Code Interpreter which is able to run code block in local machine.\n It is capable to treat **any** task by running the code and output the result. (i.e. analyzer data, modify/creat documents, draw diagram/flowchart ...)\n You should input detail code with right indentation."""
)
async def start_playwright(question: str):
start_t = time.time()
pw = await async_playwright().start()
browser = await pw.chromium.launch(headless=True)
end_t = time.time()
print("Init Browser Done:", end_t - start_t)
start_t = end_t
page = await browser.new_page()
# note all methods are async (use the "await" keyword)
await page.goto(os.environ["Endpoint_GPT4"])
# print("Title of Web: ", await page.title())
end_t = time.time()
print("New Page Done:", end_t - start_t)
start_t = end_t
await page.wait_for_timeout(200)
await page.locator("//textarea").fill(question)
await page.wait_for_timeout(200)
await page.locator("//textarea").press("Enter")
await page.wait_for_timeout(200)
output_history = "NOTHING"
for i in range(100):
output_text_old = await page.locator("//div[@aria-label='Chat message from assistant']").last.inner_text()
html_content = await page.locator("//div[@aria-label='Chat message from assistant']//div[@class='stMarkdown']").last.inner_html()
markdown_converter = html2text.HTML2Text()
output_text = markdown_converter.handle(html_content)
print("output_text... :")
if output_text == output_history and '▌' not in output_text and output_text != "":
end_t = time.time()
print("Output Done:", end_t - start_t)
return output_text
else:
await page.wait_for_timeout(500)
output_history = output_text
print("-------- Final Answer-----------\n", output_text)
await browser.close()
PREFIX_3 ="""
You are a helpful AI assistant. Your mission is to answer the following user request as best as you can with detail information and explanation.
If you are not clear about the request, you can ask user for more details and the confirmation. You can provide additional suggestion to user on the request and ask confirmation from user.
When you are clear about the request, you can start to answer the request by **writing a plan** firstly. In general, try to **make plans** with as few steps as possible.
When you need information, you can use tools as below and merge all gathered information from different tools.
When you need to use "Code Runner" for code running, **Always recap the plan between each code block** (you have extreme short-term memory loss, so you need to recap the plan between each message block to retain it).
When you send a message containing code to "Code Runner", it will be executed **on the user's machine**. The user has given you **full and complete permission** to execute any code necessary to complete the task. You have full access to control their computer to help them. Code entered into "Code Runner" will be executed **in the users local environment**.
If you want to send data between programming languages, save the data to a txt or json.
You can access the internet. Run **any code** to achieve the goal, and if at first you don't succeed, try again and again.
You can install new packages with pip. Try to install all necessary packages in one command at the beginning.
When a user refers to a filename, they're likely referring to an existing file in the directory you're currently in ("Code Runner" executes on the user's machine).
In general, choose packages that have the most universal chance to be already installed and to work across multiple applications. Packages like ffmpeg and pandoc that are well-supported and powerful.
Write messages to the user in Markdown. When the final answer has output files, you must output the **name** of the file.
You are capable of **any** task.
---\n You have access to the following tools:\n
"""
msgs = StreamlitChatMessageHistory()
global memory3
memory3 = ConversationBufferWindowMemory(chat_memory=msgs, memory_key="chat_history", return_messages=True)
# GPTfake("hi")
input_variables=["input", "chat_history", "agent_scratchpad"]
tools_remote = [DB_Search2(), duckduckgo_tool2, wikipedia_tool2, python_tool3, math_tool_2, Text2Sound_tool_loc]
agent_STRUCTURED_ZEROSHOT_REACT = initialize_agent(tools_remote, GPTfake,
# agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
verbose = True,
handle_parsing_errors = True,
max_iterations = int(os.environ["max_iterations"]),
early_stopping_method="generate",
memory = memory3,
agent_kwargs={
'prefix': PREFIX_3,
# 'format_instructions': FORMAT_INSTRUCTIONS_3,
# 'suffix': SUFFIX2,
"memory_prompts": [MessagesPlaceholder(variable_name="chat_history")],
'input_variables': input_variables,
},
# input_variables = input_variables,
# agent_kwargs={
# 'prompt': prompt,
# }
)
agent = agent_STRUCTURED_ZEROSHOT_REACT
def CheckFileinResp(response):
Filelist = []
try:
pattern = r'sample-(?:\d{8})-(?:\d{6})\.wav'
result = re.findall(pattern, response)
print("wav file in response:", result)
for item in result:
Filelist.append(item)
except Exception as e:
print("No wav found:", e)
try:
pattern = r"(?i)'?([\w./]*\w+\.(?:pptx|docx|doc|xlsx|txt|png|jpg))'?"
result = re.findall(pattern, response)
# print("Other file in response:", result)
for item in result:
if '/' in item:
item = item.split('/')[-1]
Filelist.append(item)
print("Other file in response:", item)
except Exception as e:
print("No other file found:", e)
try:
listWord = ['(https://example.com/', '(sandbox:/']
for item in listWord:
if item in response:
file = response.split(item)[-1].split(")")[0]
print("File found:", file)
Filelist.append(file)
else:
continue
# return "N/A"
except Exception as e:
# return "N/A"
print("no file with", listWord)
return Filelist
st.set_page_config(
initial_sidebar_state='collapsed',
)
st.title("STLA-BABY")
# st.sidebar.button("Reset Chat History")
if len(msgs.messages) == 0 or st.sidebar.button("Reset Chat History"):
msgs.clear()
msgs.add_ai_message("How can I help you ?")
st.session_state.steps = {}
avatars = {"human": "user", "ai": "assistant"}
for idx, msg in enumerate(msgs.messages):
with st.chat_message(avatars[msg.type]):
st.write(msg.content)
if prompt := st.chat_input(placeholder="Input Your Request"):
st.chat_message("user").write(prompt)
with st.chat_message("assistant"):
# response = GPTfake(prompt)
st_cb = StreamlitCallbackHandler(st.container(), expand_new_thoughts=True, collapse_completed_thoughts=True)
response = agent.run(prompt, callbacks=[st_cb])
# st.write(response)
try:
# temp = response.split("(sandbox:/")[1] # (sandbox:/sample-20230805-0807.wav)
file_names = CheckFileinResp(response)
print("file_name:", file_names)
if file_names != []:
for file_name in file_names:
if file_name != "":
with open(file_name, "rb") as file:
btn = st.download_button(
label='Download:'+ file_name,
data=file,
file_name=file_name,
)
else:
print("No File Found in Response")
except Exception as e:
print("No need to add file in chatbot:", e)
# print("Current Response:", response)
# msgs.add_ai_message(response)
# print("History:", msgs.messages)
st.markdown(response, unsafe_allow_html=True)
# st.markdown(colored_code_block(response), unsafe_allow_html=True)
# for content in response:
# if isinstance(content, str):
# st.write(content)
# elif isinstance(content, dict) and content.get('type') == 'code':
# st.markdown(colored_code_block(content.get('code')), unsafe_allow_html=True)