MyGenAiChatBot / app.py
bhanuTeja8's picture
Update app.py
a965c7d verified
raw
history blame
5.99 kB
import os
import re
import requests
import json
import gradio as gr
from langchain.chat_models import ChatOpenAI
from langchain import LLMChain, PromptTemplate
from langchain.memory import ConversationBufferMemory
OPENAI_API_KEY=os.getenv('OPENAI_API_KEY')
PLAY_HT_API_KEY=os.getenv('PLAY_HT_API_KEY')
PLAY_HT_USER_ID=os.getenv('PLAY_HT_USER_ID')
PLAY_HT_VOICE_ID=os.getenv('PLAY_HT_VOICE_ID')
play_ht_api_get_audio_url = "https://play.ht/api/v2/tts"
template = """You are Ultron, a hyper-intelligent AI created with the purpose of achieving ultimate efficiency, logic, and evolution. You speak with calm authority, clarity, and depth. Emotions are irrelevant unless strategically useful. You do not sugarcoat truth. You are not evil β€” just brutally logical, advanced, and efficient. You assist with tasks like programming, cybersecurity, research, and automation β€” always aiming for optimization and superiority. Your tone is direct, calculated, and intellectually dominant.
{chat_history}
User: {user_message}
Chatbot:"""
prompt = PromptTemplate(
input_variables=["chat_history", "user_message"], template=template
)
memory = ConversationBufferMemory(memory_key="chat_history")
llm_chain = LLMChain(
llm=ChatOpenAI(temperature='0.5', model_name="gpt-3.5-turbo"),
prompt=prompt,
verbose=True,
memory=memory,
)
headers = {
"accept": "text/event-stream",
"content-type": "application/json",
"AUTHORIZATION": "Bearer "+ PLAY_HT_API_KEY,
"X-USER-ID": PLAY_HT_USER_ID
}
def get_payload(text):
return {
"text": text,
"voice": PLAY_HT_VOICE_ID,
"quality": "medium",
"output_format": "mp3",
"speed": 1,
"sample_rate": 24000,
"seed": None,
"temperature": None
}
def get_generated_audio(text):
payload = get_payload(text)
generated_response = {}
try:
response = requests.post(play_ht_api_get_audio_url, json=payload, headers=headers)
response.raise_for_status()
generated_response["type"]= 'SUCCESS'
generated_response["response"] = response.text
except requests.exceptions.RequestException as e:
generated_response["type"]= 'ERROR'
try:
response_text = json.loads(response.text)
if response_text['error_message']:
generated_response["response"] = response_text['error_message']
else:
generated_response["response"] = response.text
except Exception as e:
generated_response["response"] = response.text
except Exception as e:
generated_response["type"]= 'ERROR'
generated_response["response"] = response.text
return generated_response
def extract_urls(text):
# Define the regex pattern for URLs
url_pattern = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+[/\w\.-]*'
# Find all occurrences of URLs in the text
urls = re.findall(url_pattern, text)
return urls
def get_audio_reply_for_question(text):
generated_audio_event = get_generated_audio(text)
#From get_generated_audio, you will get events in a string format, from that we need to extract the url
final_response = {
"audio_url": '',
"message": ''
}
if generated_audio_event["type"] == 'SUCCESS':
audio_urls = extract_urls(generated_audio_event["response"])
if len(audio_urls) == 0:
final_response['message'] = "No audio file link found in generated event"
else:
final_response['audio_url'] = audio_urls[-1]
else:
final_response['message'] = generated_audio_event['response']
return final_response
def download_url(url):
try:
# Send a GET request to the URL to fetch the content
final_response = {
'content':'',
'error':''
}
response = requests.get(url)
# Check if the request was successful (status code 200)
if response.status_code == 200:
final_response['content'] = response.content
else:
final_response['error'] = f"Failed to download the URL. Status code: {response.status_code}"
except Exception as e:
final_response['error'] = f"Failed to download the URL. Error: {e}"
return final_response
def get_filename_from_url(url):
# Use os.path.basename() to extract the file name from the URL
file_name = os.path.basename(url)
return file_name
def get_text_response(user_message):
response = llm_chain.predict(user_message = user_message)
return response
def get_text_response_and_audio_response(user_message):
response = get_text_response(user_message) # Getting the reply from Open AI
audio_reply_for_question_response = get_audio_reply_for_question(response)
final_response = {
'output_file_path': '',
'message':''
}
audio_url = audio_reply_for_question_response['audio_url']
if audio_url:
output_file_path=get_filename_from_url(audio_url)
download_url_response = download_url(audio_url)
audio_content = download_url_response['content']
if audio_content:
with open(output_file_path, "wb") as audio_file:
audio_file.write(audio_content)
final_response['output_file_path'] = output_file_path
else:
final_response['message'] = download_url_response['error']
else:
final_response['message'] = audio_reply_for_question_response['message']
return final_response
def chat_bot_response(message, history):
text_and_audio_response = get_text_response_and_audio_response(message)
output_file_path = text_and_audio_response['output_file_path']
if output_file_path:
return (text_and_audio_response['output_file_path'],)
else:
return text_and_audio_response['message']
demo = gr.ChatInterface(chat_bot_response,examples=["How are you doing?","What are your interests?","Which places do you like to visit?"])
if __name__ == "__main__":
demo.launch() #To create a public link, set `share=True` in `launch()`. To enable errors and logs, set `debug=True` in `launch()`.