Ai-Agent-Streamlit / webhook_server.py
MaroofTechSorcerer's picture
Upload 3 files
ea6dab8 verified
import os
import json
import base64
import asyncio
from fastapi import FastAPI, Request, Response
from twilio.twiml.voice_response import VoiceResponse, Gather, Hangup
from twilio.rest import Client
import openai
from elevenlabs.client import ElevenLabs
import logging
from logging.handlers import RotatingFileHandler
# --- Configuration ---
# Secrets are loaded from environment variables, not a config file.
TWILIO_ACCOUNT_SID = os.environ.get('TWILIO_ACCOUNT_SID')
TWILIO_AUTH_TOKEN = os.environ.get('TWILIO_AUTH_TOKEN')
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')
ELEVENLABS_API_KEY = os.environ.get('ELEVENLABS_API_KEY')
ELEVENLABS_VOICE_ID = 'Xb7hH8MSUjPSbSDYk0k2' # Hardcoded Voice ID for "Alice"
# Setup clients
twilio_client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
openai.api_key = OPENAI_API_KEY
elevenlabs_client = ElevenLabs(api_key=ELEVENLABS_API_KEY)
# Setup logging
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s')
log_handler = RotatingFileHandler('app.log', maxBytes=10*1024*1024, backupCount=5)
log_handler.setFormatter(log_formatter)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(log_handler)
app = FastAPI()
# In-memory conversation state
conversations = {}
@app.post("/")
async def handle_incoming_call(request: Request):
"""Handle incoming calls and start the conversation."""
form = await request.form()
call_sid = form.get('CallSid', 'Unknown')
user_language = form.get('lang', 'en-US') # Default to English
logger.info(f"Incoming call {call_sid} in {user_language}")
# Initialize conversation history
conversations[call_sid] = {"history": [], "language": user_language}
response = VoiceResponse()
try:
with open('sales_script.txt', 'r') as f:
sales_script = f.read()
# Generate initial greeting
completion = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": f"You are a world-class AI sales agent for bracketworks.io. Your goal is to sell credit cards. Start the conversation with a friendly greeting in {user_language}."},
{"role": "user", "content": sales_script}
]
)
greeting = completion.choices[0].message.content
conversations[call_sid]["history"].append({"role": "assistant", "content": greeting})
# Generate audio for the greeting
audio_stream = elevenlabs_client.generate(
text=greeting,
voice=ELEVENLABS_VOICE_ID,
model="eleven_multilingual_v2",
stream=True
)
audio_base64 = base64.b64encode(b"".join(audio_stream)).decode('utf-8')
response.play(f"data:audio/mp3;base64,{audio_base64}")
except Exception as e:
logger.error(f"Error in initial greeting: {e}")
response.say("Hello, thank you for calling. I am having some technical difficulties.")
gather = Gather(input='speech', action='/gather', speechTimeout='auto', bargeIn=True, language=user_language)
response.append(gather)
return Response(content=str(response), media_type="application/xml")
@app.post("/gather")
async def gather_speech(request: Request):
"""Handle user speech and generate AI response."""
form = await request.form()
call_sid = form.get('CallSid', 'Unknown')
speech_text = form.get('SpeechResult', '').lower()
user_language = form.get('language', conversations.get(call_sid, {}).get('language', 'en-US'))
logger.info(f"Speech from {call_sid}: '{speech_text}' in {user_language}")
response = VoiceResponse()
if "bye" in speech_text:
response.say("Thank you for your time. Goodbye.")
response.hangup()
return Response(content=str(response), media_type="application/xml")
if call_sid in conversations:
conversations[call_sid]["history"].append({"role": "user", "content": speech_text})
conversations[call_sid]["language"] = user_language # Update language if it changes
try:
with open('sales_script.txt', 'r') as f:
sales_script = f.read()
completion = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": f"You are a persuasive AI sales agent for bracketworks.io. Your goal is to sell credit cards based on the following script: {sales_script}. Respond in {user_language}."},
] + conversations.get(call_sid, {}).get("history", []),
stream=True
)
ai_response_text = ""
for chunk in completion:
ai_response_text += chunk.choices[0].delta.content or ""
conversations.get(call_sid, {}).get("history", []).append({"role": "assistant", "content": ai_response_text})
audio_stream = elevenlabs_client.generate(
text=ai_response_text,
voice=ELEVENLABS_VOICE_ID,
model="eleven_multilingual_v2",
stream=True
)
audio_base64 = base64.b64encode(b"".join(audio_stream)).decode('utf-8')
response.play(f"data:audio/mp3;base64,{audio_base64}")
except Exception as e:
logger.error(f"Error during AI response generation: {e}")
response.say("I am sorry, I am having trouble processing your request.")
gather = Gather(input='speech', action='/gather', speechTimeout='auto', bargeIn=True, language=user_language)
response.append(gather)
return Response(content=str(response), media_type="application/xml")
@app.post("/status")
async def call_status(request: Request):
"""Handle call status updates and data extraction."""
form = await request.form()
call_sid = form.get('CallSid', 'Unknown')
call_status = form.get('CallStatus', 'Unknown')
logger.info(f"Call {call_sid} status: {call_status}")
if call_status == 'completed' and call_sid in conversations:
try:
history = conversations[call_sid]["history"]
conversation_text = "\n".join([f"{msg['role']}: {msg['content']}" for msg in history])
completion = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": 'Extract Name, Phone Number, and Email from the conversation. Also provide a brief summary. Respond in JSON format with keys: "name", "phone", "email", "summary".'},
{"role": "user", "content": conversation_text}
]
)
extracted_data = json.loads(completion.choices[0].message.content)
if not os.path.exists('application_data'):
os.makedirs('application_data')
with open(f"application_data/{call_sid}.json", "w") as f:
json.dump(extracted_data, f)
logger.info(f"Saved extracted data for {call_sid}")
except Exception as e:
logger.error(f"Error extracting data for {call_sid}: {e}")
finally:
del conversations[call_sid]
return Response(status_code=200)
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)