Spaces:
Sleeping
Sleeping
import datetime | |
from threading import Lock | |
from typing import Tuple, Optional | |
from langchain import ConversationChain, LLMChain | |
from config.config import MAX_TALKING_HEAD_TEXT_LENGTH, LOOPING_TALKING_HEAD_VIDEO_PATH, TALKING_HEAD_WIDTH | |
from utilities.html_stuff import do_html_video_speak, create_html_video, do_html_audio_speak | |
from utilities.transform_text import transform_text | |
def reset_memory(history, memory): | |
memory.clear() | |
history = [] | |
return history, history, memory | |
class ChatWrapper: | |
def __init__(self): | |
self.lock = Lock() | |
def __call__( | |
self, api_key: str, inp: str, history: Optional[Tuple[str, str]], chain: Optional[ConversationChain], | |
trace_chain: bool, speak_text: bool, talking_head: bool, monologue: bool, express_chain: Optional[LLMChain], | |
num_words, formality, anticipation_level, joy_level, trust_level, | |
fear_level, surprise_level, sadness_level, disgust_level, anger_level, | |
lang_level, translate_to, literary_style, qa_chain, docsearch, use_embeddings, force_translate | |
): | |
"""Execute the chat functionality.""" | |
self.lock.acquire() | |
try: | |
print("\n==== date/time: " + str(datetime.datetime.now()) + " ====") | |
print("inp: " + inp) | |
print("trace_chain: ", trace_chain) | |
print("speak_text: ", speak_text) | |
print("talking_head: ", talking_head) | |
print("monologue: ", monologue) | |
history = history or [] | |
# If chain is None, that is because no API key was provided. | |
output = "Please paste your OpenAI key from openai.com to use this app. " + str(datetime.datetime.now()) | |
hidden_text = output | |
if chain: | |
# Set OpenAI key | |
import openai | |
openai.api_key = api_key | |
if not monologue: | |
if use_embeddings: | |
if inp and inp.strip() != "": | |
if docsearch: | |
docs = docsearch.similarity_search(inp) | |
output = str(qa_chain.run(input_documents=docs, question=inp)) | |
else: | |
output, hidden_text = "Please supply some text in the the Embeddings tab.", None | |
else: | |
output, hidden_text = "What's on your mind?", None | |
else: | |
output, hidden_text = run_chain(chain, inp, capture_hidden_text=trace_chain) | |
else: | |
output, hidden_text = inp, None | |
output = transform_text(output, express_chain, num_words, formality, anticipation_level, joy_level, | |
trust_level, | |
fear_level, surprise_level, sadness_level, disgust_level, anger_level, | |
lang_level, translate_to, literary_style, force_translate) | |
text_to_display = output | |
if trace_chain: | |
text_to_display = hidden_text + "\n\n" + output | |
history.append((inp, text_to_display)) | |
html_video, temp_file, html_audio, temp_aud_file = None, None, None, None | |
if speak_text: | |
if talking_head: | |
if len(output) <= MAX_TALKING_HEAD_TEXT_LENGTH: | |
html_video, temp_file = do_html_video_speak(output, translate_to) | |
else: | |
temp_file = LOOPING_TALKING_HEAD_VIDEO_PATH | |
html_video = create_html_video(temp_file, TALKING_HEAD_WIDTH) | |
html_audio, temp_aud_file = do_html_audio_speak(output, translate_to) | |
else: | |
html_audio, temp_aud_file = do_html_audio_speak(output, translate_to) | |
else: | |
if talking_head: | |
temp_file = LOOPING_TALKING_HEAD_VIDEO_PATH | |
html_video = create_html_video(temp_file, TALKING_HEAD_WIDTH) | |
else: | |
# html_audio, temp_aud_file = do_html_audio_speak(output, translate_to) | |
# html_video = create_html_video(temp_file, "128") | |
pass | |
except Exception as e: | |
raise e | |
finally: | |
self.lock.release() | |
return history, history, html_video, temp_file, html_audio, temp_aud_file, "" | |
# return history, history, html_audio, temp_aud_file, "" | |