import datetime from threading import Lock from typing import Tuple, Optional from langchain import ConversationChain, LLMChain from config.config import MAX_TALKING_HEAD_TEXT_LENGTH, LOOPING_TALKING_HEAD_VIDEO_PATH, TALKING_HEAD_WIDTH from utilities.html_stuff import do_html_video_speak, create_html_video, do_html_audio_speak from utilities.transform_text import transform_text def reset_memory(history, memory): memory.clear() history = [] return history, history, memory class ChatWrapper: def __init__(self): self.lock = Lock() def __call__( self, api_key: str, inp: str, history: Optional[Tuple[str, str]], chain: Optional[ConversationChain], trace_chain: bool, speak_text: bool, talking_head: bool, monologue: bool, express_chain: Optional[LLMChain], num_words, formality, anticipation_level, joy_level, trust_level, fear_level, surprise_level, sadness_level, disgust_level, anger_level, lang_level, translate_to, literary_style, qa_chain, docsearch, use_embeddings, force_translate ): """Execute the chat functionality.""" self.lock.acquire() try: print("\n==== date/time: " + str(datetime.datetime.now()) + " ====") print("inp: " + inp) print("trace_chain: ", trace_chain) print("speak_text: ", speak_text) print("talking_head: ", talking_head) print("monologue: ", monologue) history = history or [] # If chain is None, that is because no API key was provided. output = "Please paste your OpenAI key from openai.com to use this app. " + str(datetime.datetime.now()) hidden_text = output if chain: # Set OpenAI key import openai openai.api_key = api_key if not monologue: if use_embeddings: if inp and inp.strip() != "": if docsearch: docs = docsearch.similarity_search(inp) output = str(qa_chain.run(input_documents=docs, question=inp)) else: output, hidden_text = "Please supply some text in the the Embeddings tab.", None else: output, hidden_text = "What's on your mind?", None else: output, hidden_text = run_chain(chain, inp, capture_hidden_text=trace_chain) else: output, hidden_text = inp, None output = transform_text(output, express_chain, num_words, formality, anticipation_level, joy_level, trust_level, fear_level, surprise_level, sadness_level, disgust_level, anger_level, lang_level, translate_to, literary_style, force_translate) text_to_display = output if trace_chain: text_to_display = hidden_text + "\n\n" + output history.append((inp, text_to_display)) html_video, temp_file, html_audio, temp_aud_file = None, None, None, None if speak_text: if talking_head: if len(output) <= MAX_TALKING_HEAD_TEXT_LENGTH: html_video, temp_file = do_html_video_speak(output, translate_to) else: temp_file = LOOPING_TALKING_HEAD_VIDEO_PATH html_video = create_html_video(temp_file, TALKING_HEAD_WIDTH) html_audio, temp_aud_file = do_html_audio_speak(output, translate_to) else: html_audio, temp_aud_file = do_html_audio_speak(output, translate_to) else: if talking_head: temp_file = LOOPING_TALKING_HEAD_VIDEO_PATH html_video = create_html_video(temp_file, TALKING_HEAD_WIDTH) else: # html_audio, temp_aud_file = do_html_audio_speak(output, translate_to) # html_video = create_html_video(temp_file, "128") pass except Exception as e: raise e finally: self.lock.release() return history, history, html_video, temp_file, html_audio, temp_aud_file, "" # return history, history, html_audio, temp_aud_file, ""