File size: 7,245 Bytes
ea6dab8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
import os
import json
import base64
import asyncio
from fastapi import FastAPI, Request, Response
from twilio.twiml.voice_response import VoiceResponse, Gather, Hangup
from twilio.rest import Client
import openai
from elevenlabs.client import ElevenLabs
import logging
from logging.handlers import RotatingFileHandler
# --- Configuration ---
# Secrets are loaded from environment variables, not a config file.
TWILIO_ACCOUNT_SID = os.environ.get('TWILIO_ACCOUNT_SID')
TWILIO_AUTH_TOKEN = os.environ.get('TWILIO_AUTH_TOKEN')
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')
ELEVENLABS_API_KEY = os.environ.get('ELEVENLABS_API_KEY')
ELEVENLABS_VOICE_ID = 'Xb7hH8MSUjPSbSDYk0k2' # Hardcoded Voice ID for "Alice"
# Setup clients
twilio_client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
openai.api_key = OPENAI_API_KEY
elevenlabs_client = ElevenLabs(api_key=ELEVENLABS_API_KEY)
# Setup logging
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s')
log_handler = RotatingFileHandler('app.log', maxBytes=10*1024*1024, backupCount=5)
log_handler.setFormatter(log_formatter)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(log_handler)
app = FastAPI()
# In-memory conversation state
conversations = {}
@app.post("/")
async def handle_incoming_call(request: Request):
"""Handle incoming calls and start the conversation."""
form = await request.form()
call_sid = form.get('CallSid', 'Unknown')
user_language = form.get('lang', 'en-US') # Default to English
logger.info(f"Incoming call {call_sid} in {user_language}")
# Initialize conversation history
conversations[call_sid] = {"history": [], "language": user_language}
response = VoiceResponse()
try:
with open('sales_script.txt', 'r') as f:
sales_script = f.read()
# Generate initial greeting
completion = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": f"You are a world-class AI sales agent for bracketworks.io. Your goal is to sell credit cards. Start the conversation with a friendly greeting in {user_language}."},
{"role": "user", "content": sales_script}
]
)
greeting = completion.choices[0].message.content
conversations[call_sid]["history"].append({"role": "assistant", "content": greeting})
# Generate audio for the greeting
audio_stream = elevenlabs_client.generate(
text=greeting,
voice=ELEVENLABS_VOICE_ID,
model="eleven_multilingual_v2",
stream=True
)
audio_base64 = base64.b64encode(b"".join(audio_stream)).decode('utf-8')
response.play(f"data:audio/mp3;base64,{audio_base64}")
except Exception as e:
logger.error(f"Error in initial greeting: {e}")
response.say("Hello, thank you for calling. I am having some technical difficulties.")
gather = Gather(input='speech', action='/gather', speechTimeout='auto', bargeIn=True, language=user_language)
response.append(gather)
return Response(content=str(response), media_type="application/xml")
@app.post("/gather")
async def gather_speech(request: Request):
"""Handle user speech and generate AI response."""
form = await request.form()
call_sid = form.get('CallSid', 'Unknown')
speech_text = form.get('SpeechResult', '').lower()
user_language = form.get('language', conversations.get(call_sid, {}).get('language', 'en-US'))
logger.info(f"Speech from {call_sid}: '{speech_text}' in {user_language}")
response = VoiceResponse()
if "bye" in speech_text:
response.say("Thank you for your time. Goodbye.")
response.hangup()
return Response(content=str(response), media_type="application/xml")
if call_sid in conversations:
conversations[call_sid]["history"].append({"role": "user", "content": speech_text})
conversations[call_sid]["language"] = user_language # Update language if it changes
try:
with open('sales_script.txt', 'r') as f:
sales_script = f.read()
completion = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": f"You are a persuasive AI sales agent for bracketworks.io. Your goal is to sell credit cards based on the following script: {sales_script}. Respond in {user_language}."},
] + conversations.get(call_sid, {}).get("history", []),
stream=True
)
ai_response_text = ""
for chunk in completion:
ai_response_text += chunk.choices[0].delta.content or ""
conversations.get(call_sid, {}).get("history", []).append({"role": "assistant", "content": ai_response_text})
audio_stream = elevenlabs_client.generate(
text=ai_response_text,
voice=ELEVENLABS_VOICE_ID,
model="eleven_multilingual_v2",
stream=True
)
audio_base64 = base64.b64encode(b"".join(audio_stream)).decode('utf-8')
response.play(f"data:audio/mp3;base64,{audio_base64}")
except Exception as e:
logger.error(f"Error during AI response generation: {e}")
response.say("I am sorry, I am having trouble processing your request.")
gather = Gather(input='speech', action='/gather', speechTimeout='auto', bargeIn=True, language=user_language)
response.append(gather)
return Response(content=str(response), media_type="application/xml")
@app.post("/status")
async def call_status(request: Request):
"""Handle call status updates and data extraction."""
form = await request.form()
call_sid = form.get('CallSid', 'Unknown')
call_status = form.get('CallStatus', 'Unknown')
logger.info(f"Call {call_sid} status: {call_status}")
if call_status == 'completed' and call_sid in conversations:
try:
history = conversations[call_sid]["history"]
conversation_text = "\n".join([f"{msg['role']}: {msg['content']}" for msg in history])
completion = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": 'Extract Name, Phone Number, and Email from the conversation. Also provide a brief summary. Respond in JSON format with keys: "name", "phone", "email", "summary".'},
{"role": "user", "content": conversation_text}
]
)
extracted_data = json.loads(completion.choices[0].message.content)
if not os.path.exists('application_data'):
os.makedirs('application_data')
with open(f"application_data/{call_sid}.json", "w") as f:
json.dump(extracted_data, f)
logger.info(f"Saved extracted data for {call_sid}")
except Exception as e:
logger.error(f"Error extracting data for {call_sid}: {e}")
finally:
del conversations[call_sid]
return Response(status_code=200)
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)
|