Spaces:
Running
Running
File size: 4,009 Bytes
5c146d9 6ad8e7e 5c146d9 e192d93 5c146d9 e192d93 76617ef e192d93 fa18c1a 76617ef e192d93 5c146d9 e192d93 5c146d9 6ad8e7e e192d93 5c146d9 e192d93 5c146d9 e192d93 5c146d9 e192d93 5c146d9 e192d93 5c146d9 e192d93 6ad8e7e 5c146d9 e192d93 5c146d9 e192d93 5c146d9 e192d93 6ad8e7e 5c146d9 e192d93 5c146d9 e192d93 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
import os
import streamlit as st
import speech_recognition as sr
from gtts import gTTS
import google.generativeai as genai
import base64
from transformers import pipeline
import asyncio
# Ensure event loop exists (Fix for Streamlit async issue)
try:
asyncio.get_running_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
# Configure Generative AI (Ensure to use a secure way to handle API keys)
GOOGLE_API_KEY = "------------------------------------------------"
genai.configure(api_key=GOOGLE_API_KEY)
# Initialize recognizer
recognizer = sr.Recognizer()
# Emotion Detection Model
emotion_model = pipeline("text-classification", model="bhadresh-savani/distilbert-base-uncased-emotion")
def detect_emotion(text):
"""Detects emotion from text."""
try:
return emotion_model(text)[0]['label']
except Exception as e:
return f"Error detecting emotion: {str(e)}"
def listen_to_customer():
"""Captures voice input and converts it to text."""
with sr.Microphone() as source:
st.write("Listening...")
audio = recognizer.listen(source)
try:
return recognizer.recognize_google(audio)
except (sr.UnknownValueError, sr.RequestError):
return None
def process_text(customer_input):
"""Processes customer input using Generative AI."""
try:
model = genai.GenerativeModel('gemini-1.5-flash')
response = model.generate_content(customer_input)
return response.text
except Exception as e:
return f"Error in AI response: {str(e)}"
def text_to_speech(text, voice_option, language):
"""Converts AI response text to speech."""
try:
lang_code = {"English": "en", "Spanish": "es", "French": "fr", "Hindi": "hi"}.get(language, "en")
tts = gTTS(text=text, lang=lang_code, tld='com' if voice_option == "Male" else 'co.uk')
file_path = "response.mp3"
tts.save(file_path)
return file_path
except Exception as e:
st.error(f"Text-to-Speech Error: {str(e)}")
return None
def autoplay_audio(file_path):
"""Autoplays generated speech audio in Streamlit."""
try:
with open(file_path, "rb") as f:
data = f.read()
b64 = base64.b64encode(data).decode()
st.markdown(f"""
<audio controls autoplay>
<source src="data:audio/mp3;base64,{b64}" type="audio/mp3">
</audio>
""", unsafe_allow_html=True)
except Exception as e:
st.error(f"Error playing audio: {str(e)}")
def main():
st.title("Vocacity AI Voice Agent ποΈ")
st.sidebar.header("Settings")
language = st.sidebar.selectbox("Choose Language:", ["English", "Spanish", "French", "Hindi"])
voice_option = st.sidebar.selectbox("Choose AI Voice:", ["Male", "Female"])
clear_chat = st.sidebar.button("ποΈ Clear Chat")
if "chat_history" not in st.session_state:
st.session_state.chat_history = []
user_text_input = st.text_input("Type your query here:", "")
if st.button("ποΈ Speak"):
customer_input = listen_to_customer()
else:
customer_input = user_text_input.strip() if user_text_input else None
if customer_input:
emotion = detect_emotion(customer_input)
ai_response = process_text(customer_input)
st.session_state.chat_history.append((customer_input, ai_response))
st.write(f"**AI Response:** {ai_response} (Emotion: {emotion})")
audio_file = text_to_speech(ai_response, voice_option, language)
if audio_file:
autoplay_audio(audio_file)
os.remove(audio_file)
st.write("### Chat History")
for user, ai in st.session_state.chat_history[-5:]:
st.write(f"π€ {user}")
st.write(f"π€ {ai}")
if clear_chat:
st.session_state.chat_history = []
st.experimental_rerun()
if __name__ == "__main__":
main()
|