|
|
|
|
|
import os |
|
import json |
|
import base64 |
|
import asyncio |
|
from fastapi import FastAPI, Request, Response |
|
from twilio.twiml.voice_response import VoiceResponse, Gather, Hangup |
|
from twilio.rest import Client |
|
import openai |
|
from elevenlabs.client import ElevenLabs |
|
import logging |
|
from logging.handlers import RotatingFileHandler |
|
|
|
|
|
|
|
TWILIO_ACCOUNT_SID = os.environ.get('TWILIO_ACCOUNT_SID') |
|
TWILIO_AUTH_TOKEN = os.environ.get('TWILIO_AUTH_TOKEN') |
|
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY') |
|
ELEVENLABS_API_KEY = os.environ.get('ELEVENLABS_API_KEY') |
|
ELEVENLABS_VOICE_ID = 'Xb7hH8MSUjPSbSDYk0k2' |
|
|
|
|
|
twilio_client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN) |
|
openai.api_key = OPENAI_API_KEY |
|
elevenlabs_client = ElevenLabs(api_key=ELEVENLABS_API_KEY) |
|
|
|
|
|
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s') |
|
log_handler = RotatingFileHandler('app.log', maxBytes=10*1024*1024, backupCount=5) |
|
log_handler.setFormatter(log_formatter) |
|
|
|
logger = logging.getLogger(__name__) |
|
logger.setLevel(logging.INFO) |
|
logger.addHandler(log_handler) |
|
|
|
app = FastAPI() |
|
|
|
|
|
conversations = {} |
|
|
|
@app.post("/") |
|
async def handle_incoming_call(request: Request): |
|
"""Handle incoming calls and start the conversation.""" |
|
form = await request.form() |
|
call_sid = form.get('CallSid', 'Unknown') |
|
user_language = form.get('lang', 'en-US') |
|
logger.info(f"Incoming call {call_sid} in {user_language}") |
|
|
|
|
|
conversations[call_sid] = {"history": [], "language": user_language} |
|
|
|
response = VoiceResponse() |
|
try: |
|
with open('sales_script.txt', 'r') as f: |
|
sales_script = f.read() |
|
|
|
|
|
completion = openai.chat.completions.create( |
|
model="gpt-4o", |
|
messages=[ |
|
{"role": "system", "content": f"You are a world-class AI sales agent for bracketworks.io. Your goal is to sell credit cards. Start the conversation with a friendly greeting in {user_language}."}, |
|
{"role": "user", "content": sales_script} |
|
] |
|
) |
|
greeting = completion.choices[0].message.content |
|
conversations[call_sid]["history"].append({"role": "assistant", "content": greeting}) |
|
|
|
|
|
audio_stream = elevenlabs_client.generate( |
|
text=greeting, |
|
voice=ELEVENLABS_VOICE_ID, |
|
model="eleven_multilingual_v2", |
|
stream=True |
|
) |
|
audio_base64 = base64.b64encode(b"".join(audio_stream)).decode('utf-8') |
|
response.play(f"data:audio/mp3;base64,{audio_base64}") |
|
|
|
except Exception as e: |
|
logger.error(f"Error in initial greeting: {e}") |
|
response.say("Hello, thank you for calling. I am having some technical difficulties.") |
|
|
|
gather = Gather(input='speech', action='/gather', speechTimeout='auto', bargeIn=True, language=user_language) |
|
response.append(gather) |
|
return Response(content=str(response), media_type="application/xml") |
|
|
|
@app.post("/gather") |
|
async def gather_speech(request: Request): |
|
"""Handle user speech and generate AI response.""" |
|
form = await request.form() |
|
call_sid = form.get('CallSid', 'Unknown') |
|
speech_text = form.get('SpeechResult', '').lower() |
|
user_language = form.get('language', conversations.get(call_sid, {}).get('language', 'en-US')) |
|
logger.info(f"Speech from {call_sid}: '{speech_text}' in {user_language}") |
|
|
|
response = VoiceResponse() |
|
|
|
if "bye" in speech_text: |
|
response.say("Thank you for your time. Goodbye.") |
|
response.hangup() |
|
return Response(content=str(response), media_type="application/xml") |
|
|
|
if call_sid in conversations: |
|
conversations[call_sid]["history"].append({"role": "user", "content": speech_text}) |
|
conversations[call_sid]["language"] = user_language |
|
|
|
try: |
|
with open('sales_script.txt', 'r') as f: |
|
sales_script = f.read() |
|
|
|
completion = openai.chat.completions.create( |
|
model="gpt-4o", |
|
messages=[ |
|
{"role": "system", "content": f"You are a persuasive AI sales agent for bracketworks.io. Your goal is to sell credit cards based on the following script: {sales_script}. Respond in {user_language}."}, |
|
] + conversations.get(call_sid, {}).get("history", []), |
|
stream=True |
|
) |
|
ai_response_text = "" |
|
for chunk in completion: |
|
ai_response_text += chunk.choices[0].delta.content or "" |
|
|
|
conversations.get(call_sid, {}).get("history", []).append({"role": "assistant", "content": ai_response_text}) |
|
|
|
audio_stream = elevenlabs_client.generate( |
|
text=ai_response_text, |
|
voice=ELEVENLABS_VOICE_ID, |
|
model="eleven_multilingual_v2", |
|
stream=True |
|
) |
|
audio_base64 = base64.b64encode(b"".join(audio_stream)).decode('utf-8') |
|
response.play(f"data:audio/mp3;base64,{audio_base64}") |
|
|
|
except Exception as e: |
|
logger.error(f"Error during AI response generation: {e}") |
|
response.say("I am sorry, I am having trouble processing your request.") |
|
|
|
gather = Gather(input='speech', action='/gather', speechTimeout='auto', bargeIn=True, language=user_language) |
|
response.append(gather) |
|
return Response(content=str(response), media_type="application/xml") |
|
|
|
@app.post("/status") |
|
async def call_status(request: Request): |
|
"""Handle call status updates and data extraction.""" |
|
form = await request.form() |
|
call_sid = form.get('CallSid', 'Unknown') |
|
call_status = form.get('CallStatus', 'Unknown') |
|
logger.info(f"Call {call_sid} status: {call_status}") |
|
|
|
if call_status == 'completed' and call_sid in conversations: |
|
try: |
|
history = conversations[call_sid]["history"] |
|
conversation_text = "\n".join([f"{msg['role']}: {msg['content']}" for msg in history]) |
|
|
|
completion = openai.chat.completions.create( |
|
model="gpt-4o", |
|
messages=[ |
|
{"role": "system", "content": 'Extract Name, Phone Number, and Email from the conversation. Also provide a brief summary. Respond in JSON format with keys: "name", "phone", "email", "summary".'}, |
|
{"role": "user", "content": conversation_text} |
|
] |
|
) |
|
extracted_data = json.loads(completion.choices[0].message.content) |
|
|
|
if not os.path.exists('application_data'): |
|
os.makedirs('application_data') |
|
with open(f"application_data/{call_sid}.json", "w") as f: |
|
json.dump(extracted_data, f) |
|
|
|
logger.info(f"Saved extracted data for {call_sid}") |
|
except Exception as e: |
|
logger.error(f"Error extracting data for {call_sid}: {e}") |
|
finally: |
|
del conversations[call_sid] |
|
|
|
return Response(status_code=200) |
|
|
|
if __name__ == '__main__': |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8080) |
|
|