Spaces:
Build error
Build error
import streamlit as st | |
from langchain.memory.chat_message_histories import StreamlitChatMessageHistory | |
from typing import Any, List, Mapping, Optional | |
from langchain.llms.base import LLM | |
from langchain.callbacks.manager import CallbackManagerForLLMRun | |
import langchain | |
import asyncio | |
from playwright.async_api import async_playwright | |
import time | |
import regex | |
import html2text | |
import os | |
from langchain.agents import initialize_agent | |
from langchain.agents import AgentType | |
from langchain.agents import Tool | |
# from langchain.agents import load_tools | |
from langchain.tools import BaseTool | |
from langchain.memory import ConversationBufferWindowMemory | |
from langchain.prompts import MessagesPlaceholder | |
from langchain.agents import ConversationalChatAgent, AgentExecutor | |
from langchain.callbacks import StreamlitCallbackHandler | |
from langchain.chains import RetrievalQA | |
import pinecone | |
from langchain.vectorstores import Pinecone | |
from langchain.embeddings.huggingface import HuggingFaceEmbeddings | |
from langchain.tools import DuckDuckGoSearchRun | |
from langchain.utilities import WikipediaAPIWrapper | |
import soundfile as sf | |
from transformers import SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan | |
from datasets import load_dataset | |
import torch | |
from langchain.chains import LLMMathChain | |
from interpreter.code_interpreter import CodeInterpreter | |
from pygments import highlight | |
from pygments.lexers import PythonLexer | |
from pygments.formatters import HtmlFormatter | |
import regex | |
import re | |
langchain.debug = True | |
global CurrentAgent | |
CurrentAgent = 'Structured Zero Short Agent' | |
global Audio_output | |
Audio_output = [] | |
def colored_code_block(code: str): | |
return highlight(code, PythonLexer(), HtmlFormatter(style="monokai")) | |
class DB_Search2(BaseTool): | |
name = "Vector Database Search" | |
description = "This is the internal vector database to search information firstly (i.e. engineering data, acronym.)" | |
def _run(self, query: str) -> str: | |
response, source = QAQuery_p(query) | |
# response = "test db_search feedback" | |
return response | |
def _arun(self, query: str): | |
raise NotImplementedError("N/A") | |
pinecone.init( | |
api_key = os.environ["pinecone_api_key"], | |
# environment='asia-southeast1-gcp-free', | |
environment='us-west4-gcp-free', | |
# openapi_config=openapi_config | |
) | |
# index_name = 'stla-baby' | |
global index_name | |
index_name = 'stla-back' | |
index = pinecone.Index(index_name) | |
# index.delete(delete_all=True, namespace='') | |
print(pinecone.whoami()) | |
print(index.describe_index_stats()) | |
embed_model_id = 'sentence-transformers/all-MiniLM-L6-v2' | |
device = 'cpu' | |
embeddings_miniLM = HuggingFaceEmbeddings( | |
model_name=embed_model_id, | |
model_kwargs={'device': device}, | |
) | |
# embeddings = embeddings_openai | |
embeddings = embeddings_miniLM | |
global vectordb_p | |
vectordb_p = Pinecone.from_existing_index(index_name, embeddings) | |
def QAQuery_p(question: str): | |
global vectordb_p | |
global agent | |
# global Choice | |
global CurrentAgent | |
# vectordb = Chroma(persist_directory='db', embedding_function=embeddings) | |
retriever = vectordb_p.as_retriever() | |
retriever.search_kwargs['k'] = int(os.environ["search_kwargs_k"]) | |
# retriever.search_kwargs['fetch_k'] = 100 | |
# if agent == agent_ZEROSHOT_REACT_2 or agent == agent_ZEROSHOT_AGENT_2: | |
if 1: | |
print("--------------- QA with Remote --------------") | |
qa = RetrievalQA.from_chain_type(llm=GPTfake, chain_type="stuff", | |
retriever=retriever, return_source_documents = True, | |
verbose = True) | |
else: | |
pass | |
# qa = VectorDBQA.from_chain_type(llm=chat, chain_type="stuff", vectorstore=vectordb, return_source_documents=True) | |
# res = qa.run(question) | |
res = qa({"query": question}) | |
print("-" * 20) | |
# print("Question:", question) | |
# print("Answer:", res) | |
# print("Answer:", res['result']) | |
print("-" * 20) | |
# print("Source:", res['source_documents']) | |
response = res['result'] | |
# response = res['source_documents'] | |
source = res['source_documents'] | |
return response, source | |
Netsearch = DuckDuckGoSearchRun() | |
duckduckgo_tool2 = Tool( | |
name = "Duckduckgo Internet Search", | |
func = Netsearch.run, | |
description = "Useful to search in internet for real-time information and additional information which is not available in other tools" | |
) | |
Wikipedia = WikipediaAPIWrapper() | |
wikipedia_tool2 = Tool( | |
name = "Wikipedia Search", | |
func = Wikipedia.run, | |
description = "Useful to search a topic, country or person when there is no availble information in vector database" | |
) | |
def text_to_speech_loc2(Text_input): | |
global Audio_output | |
processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts") | |
model = SpeechT5ForTextToSpeech.from_pretrained("microsoft/speecht5_tts") | |
vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan") | |
inputs = processor(text = Text_input, return_tensors="pt") | |
# load xvector containing speaker's voice characteristics from a dataset | |
embeddings_dataset = load_dataset("Matthijs/cmu-arctic-xvectors", split="validation") | |
speaker_embeddings = torch.tensor(embeddings_dataset[7306]["xvector"]).unsqueeze(0) | |
speech = model.generate_speech(inputs["input_ids"], speaker_embeddings, vocoder=vocoder) | |
print("Type of speech: ", type(speech)) | |
timestr = time.strftime("%Y%m%d-%H%M%S") | |
# sampling_rate = 16000 | |
with open('sample-' + timestr + '.wav', 'wb') as audio: | |
sf.write(audio, speech.numpy(), samplerate=16000) | |
# audio = sf.write("convert1.wav", speech, samplerate=16000) | |
print("audio: ", audio) | |
Audio_output.append(audio.name) | |
return audio | |
print("text to speech2: ", text_to_speech_loc2("Good morning.")) | |
Text2Sound_tool_loc = Tool( | |
name = "Text To Sound API 2", | |
# func = Text2Sound, | |
func = text_to_speech_loc2, | |
description = "Useful when you need to convert text into sound file." | |
) | |
class GPTRemote(LLM): | |
n: int | |
def _llm_type(self) -> str: | |
return "custom" | |
def _call( | |
self, | |
prompt: str, | |
stop: Optional [List[str]] = None, | |
run_manager: Optional[CallbackManagerForLLMRun] = None, | |
**kwargs: Any | |
) -> str: | |
print("prompt:", prompt) | |
output = asyncio.run(start_playwright(prompt)) | |
if output is None: | |
output = "No Feedback" | |
print("-" * 20) | |
print('Raw: \n', output) | |
keywords = ['Action:', 'Action Input:', 'Observation:', 'Thought:', 'Final Answer:'] | |
if 'Action:' in output and 'Observation:' in output: | |
output = output.split('Observation:')[0] | |
global CurrentAgent | |
if CurrentAgent == 'Structured Zero Short Agent': | |
try: | |
if output.strip()[-1] == '}' and 'Action:' in output: | |
print("valid command") | |
elif 'Action:' in output: | |
output = output + '}' | |
print("corrected command") | |
pattern = r'\{((?:[^{}]|(?R))*)\}' | |
temp = regex.search(pattern, output) | |
rrr = temp.group() | |
output = output.replace(rrr, '```'+ '\n' + rrr + '\n'+'```') | |
except Exception as e: | |
print("model internal error:", e) | |
print("-" * 20) | |
print("Treated output: \n", output) | |
return output | |
def _identifying_params(self) -> Mapping[str, Any]: | |
return [("n", self.n)] | |
def treat_output(text): | |
keywords = ['Action:', 'Action Input:', 'Observation:', 'Thought:', 'Final Answer:'] | |
for item in keywords: | |
if item in text: | |
text.replace(item, '\n'+item) | |
print("treat output: ", text) | |
return text | |
GPTfake = GPTRemote(n=0) | |
llm_math_2 = LLMMathChain.from_llm(GPTfake) | |
math_tool_2 = Tool( | |
name ='Calculator', | |
func = llm_math_2.run, | |
description ='Useful for when you need to answer questions about math.' | |
) | |
class CodeBlock: | |
''' | |
CodeBlock Class which is able to run in Code Runner | |
''' | |
def __init__(self, code): | |
self.code = code | |
self.output = "" | |
self.active_line = None | |
def refresh(self): | |
print(f"Active line: {self.active_line}") | |
print(f"Output: {self.output}") | |
def Code_Runner(code_raw: str): | |
# interpreter = CodeInterpreter(language="python", debug_mode=True) | |
global CurrentAgent | |
if CurrentAgent == "Zero Short React 2": | |
code_raw = RemoveIndent(code_raw) | |
if '!pip' in code_raw or 'pip install' in code_raw: | |
try: | |
code_raw=code_raw.replace('!pip', 'pip') | |
except Exception as e: | |
print(e) | |
interpreter = CodeInterpreter(language="shell", debug_mode=True) | |
else: | |
interpreter = CodeInterpreter(language="python", debug_mode=True) | |
# interpreter = CodeInterpreter(language=lang, debug_mode=True) | |
code_block = CodeBlock(code_raw) | |
interpreter.active_block = code_block | |
output = interpreter.run() | |
print("Real Output: \n", output) | |
try: | |
if output.strip() =="" or output == []: | |
output = "It is Done. No Error Found." | |
except Exception as e: | |
print(e) | |
return output | |
def RemoveIndent(code_string, indentation_level=4): | |
lines = code_string.split('\n') | |
corrected_lines = [] | |
for line in lines: | |
if line.strip() == "": | |
continue | |
line_without_indentation = line[indentation_level:] \ | |
if line.startswith(' ' * indentation_level) else line | |
corrected_lines.append(line_without_indentation) | |
corrected_content = '\n'.join(corrected_lines) | |
return corrected_content | |
python_tool3 = Tool( | |
name = "Code Runner", | |
func = Code_Runner, | |
description = """Code Interpreter which is able to run code block in local machine.\n It is capable to treat **any** task by running the code and output the result. (i.e. analyzer data, modify/creat documents, draw diagram/flowchart ...)\n You should input detail code with right indentation.""" | |
) | |
async def start_playwright(question: str): | |
start_t = time.time() | |
pw = await async_playwright().start() | |
browser = await pw.chromium.launch(headless=True) | |
end_t = time.time() | |
print("Init Browser Done:", end_t - start_t) | |
start_t = end_t | |
page = await browser.new_page() | |
# note all methods are async (use the "await" keyword) | |
await page.goto(os.environ["Endpoint_GPT4"]) | |
# print("Title of Web: ", await page.title()) | |
end_t = time.time() | |
print("New Page Done:", end_t - start_t) | |
start_t = end_t | |
await page.wait_for_timeout(200) | |
await page.locator("//textarea").fill(question) | |
await page.wait_for_timeout(200) | |
await page.locator("//textarea").press("Enter") | |
await page.wait_for_timeout(200) | |
output_history = "NOTHING" | |
for i in range(100): | |
output_text_old = await page.locator("//div[@aria-label='Chat message from assistant']").last.inner_text() | |
html_content = await page.locator("//div[@aria-label='Chat message from assistant']//div[@class='stMarkdown']").last.inner_html() | |
markdown_converter = html2text.HTML2Text() | |
output_text = markdown_converter.handle(html_content) | |
print("output_text... :") | |
if output_text == output_history and '▌' not in output_text and output_text != "": | |
end_t = time.time() | |
print("Output Done:", end_t - start_t) | |
return output_text | |
else: | |
await page.wait_for_timeout(500) | |
output_history = output_text | |
print("-------- Final Answer-----------\n", output_text) | |
await browser.close() | |
PREFIX_3 =""" | |
You are a helpful AI assistant. Your mission is to answer the following user request as best as you can with detail information and explanation. | |
If you are not clear about the request, you can ask user for more details and the confirmation. You can provide additional suggestion to user on the request and ask confirmation from user. | |
When you are clear about the request, you can start to answer the request by **writing a plan** firstly. In general, try to **make plans** with as few steps as possible. | |
When you need information, you can use tools as below and merge all gathered information from different tools. | |
When you need to use "Code Runner" for code running, **Always recap the plan between each code block** (you have extreme short-term memory loss, so you need to recap the plan between each message block to retain it). | |
When you send a message containing code to "Code Runner", it will be executed **on the user's machine**. The user has given you **full and complete permission** to execute any code necessary to complete the task. You have full access to control their computer to help them. Code entered into "Code Runner" will be executed **in the users local environment**. | |
If you want to send data between programming languages, save the data to a txt or json. | |
You can access the internet. Run **any code** to achieve the goal, and if at first you don't succeed, try again and again. | |
You can install new packages with pip. Try to install all necessary packages in one command at the beginning. | |
When a user refers to a filename, they're likely referring to an existing file in the directory you're currently in ("Code Runner" executes on the user's machine). | |
In general, choose packages that have the most universal chance to be already installed and to work across multiple applications. Packages like ffmpeg and pandoc that are well-supported and powerful. | |
Write messages to the user in Markdown. When the final answer has output files, you must output the **name** of the file. | |
You are capable of **any** task. | |
---\n You have access to the following tools:\n | |
""" | |
msgs = StreamlitChatMessageHistory() | |
global memory3 | |
memory3 = ConversationBufferWindowMemory(chat_memory=msgs, memory_key="chat_history", return_messages=True) | |
# GPTfake("hi") | |
input_variables=["input", "chat_history", "agent_scratchpad"] | |
tools_remote = [DB_Search2(), duckduckgo_tool2, wikipedia_tool2, python_tool3, math_tool_2, Text2Sound_tool_loc] | |
agent_STRUCTURED_ZEROSHOT_REACT = initialize_agent(tools_remote, GPTfake, | |
# agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, | |
agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION, | |
verbose = True, | |
handle_parsing_errors = True, | |
max_iterations = int(os.environ["max_iterations"]), | |
early_stopping_method="generate", | |
memory = memory3, | |
agent_kwargs={ | |
'prefix': PREFIX_3, | |
# 'format_instructions': FORMAT_INSTRUCTIONS_3, | |
# 'suffix': SUFFIX2, | |
"memory_prompts": [MessagesPlaceholder(variable_name="chat_history")], | |
'input_variables': input_variables, | |
}, | |
# input_variables = input_variables, | |
# agent_kwargs={ | |
# 'prompt': prompt, | |
# } | |
) | |
agent = agent_STRUCTURED_ZEROSHOT_REACT | |
def CheckFileinResp(response): | |
Filelist = [] | |
try: | |
pattern = r'sample-(?:\d{8})-(?:\d{6})\.wav' | |
result = re.findall(pattern, response) | |
print("wav file in response:", result) | |
for item in result: | |
Filelist.append(item) | |
except Exception as e: | |
print("No wav found:", e) | |
try: | |
pattern = r"(?i)'?([\w./]*\w+\.(?:pptx|docx|doc|xlsx|txt|png|jpg))'?" | |
result = re.findall(pattern, response) | |
# print("Other file in response:", result) | |
for item in result: | |
if '/' in item: | |
item = item.split('/')[-1] | |
Filelist.append(item) | |
print("Other file in response:", item) | |
except Exception as e: | |
print("No other file found:", e) | |
try: | |
listWord = ['(https://example.com/', '(sandbox:/'] | |
for item in listWord: | |
if item in response: | |
file = response.split(item)[-1].split(")")[0] | |
print("File found:", file) | |
Filelist.append(file) | |
else: | |
continue | |
# return "N/A" | |
except Exception as e: | |
# return "N/A" | |
print("no file with", listWord) | |
return Filelist | |
st.set_page_config( | |
initial_sidebar_state='collapsed', | |
) | |
st.title("STLA-BABY") | |
# st.sidebar.button("Reset Chat History") | |
if len(msgs.messages) == 0 or st.sidebar.button("Reset Chat History"): | |
msgs.clear() | |
msgs.add_ai_message("How can I help you ?") | |
st.session_state.steps = {} | |
avatars = {"human": "user", "ai": "assistant"} | |
for idx, msg in enumerate(msgs.messages): | |
with st.chat_message(avatars[msg.type]): | |
st.write(msg.content) | |
if prompt := st.chat_input(placeholder="Input Your Request"): | |
st.chat_message("user").write(prompt) | |
with st.chat_message("assistant"): | |
# response = GPTfake(prompt) | |
st_cb = StreamlitCallbackHandler(st.container(), expand_new_thoughts=True, collapse_completed_thoughts=True) | |
response = agent.run(prompt, callbacks=[st_cb]) | |
# st.write(response) | |
try: | |
# temp = response.split("(sandbox:/")[1] # (sandbox:/sample-20230805-0807.wav) | |
file_names = CheckFileinResp(response) | |
print("file_name:", file_names) | |
if file_names != []: | |
for file_name in file_names: | |
if file_name != "": | |
with open(file_name, "rb") as file: | |
btn = st.download_button( | |
label='Download:'+ file_name, | |
data=file, | |
file_name=file_name, | |
) | |
else: | |
print("No File Found in Response") | |
except Exception as e: | |
print("No need to add file in chatbot:", e) | |
# print("Current Response:", response) | |
# msgs.add_ai_message(response) | |
# print("History:", msgs.messages) | |
st.markdown(response, unsafe_allow_html=True) | |
# st.markdown(colored_code_block(response), unsafe_allow_html=True) | |
# for content in response: | |
# if isinstance(content, str): | |
# st.write(content) | |
# elif isinstance(content, dict) and content.get('type') == 'code': | |
# st.markdown(colored_code_block(content.get('code')), unsafe_allow_html=True) | |