pipecat / call_connection_manager.py
Deadmon's picture
Update call_connection_manager.py
21dbc84 verified
#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import json
import os
from typing import Any, Dict, List, Optional
from loguru import logger
class CallFlowState:
def __init__(self):
self.dialed_operator = False
self.operator_connected = False
self.current_operator_index = 0
self.operator_dialout_settings = []
self.summary_finished = False
self.voicemail_detected = False
self.human_detected = False
self.voicemail_message_left = False
self.call_terminated = False
self.participant_left_early = False
def set_operator_dialed(self):
self.dialed_operator = True
def set_operator_connected(self):
self.operator_connected = True
self.summary_finished = False
def set_operator_disconnected(self):
self.operator_connected = False
self.summary_finished = False
def set_summary_finished(self):
self.summary_finished = True
def set_operator_dialout_settings(self, settings):
self.operator_dialout_settings = settings
self.current_operator_index = 0
def get_current_dialout_setting(self):
if not self.operator_dialout_settings or self.current_operator_index >= len(self.operator_dialout_settings):
return None
return self.operator_dialout_settings[self.current_operator_index]
def move_to_next_operator(self):
self.current_operator_index += 1
return self.get_current_dialout_setting()
def set_voicemail_detected(self):
self.voicemail_detected = True
self.human_detected = False
def set_human_detected(self):
self.human_detected = True
self.voicemail_detected = False
def set_voicemail_message_left(self):
self.voicemail_message_left = True
def set_call_terminated(self):
self.call_terminated = True
def set_participant_left_early(self):
self.participant_left_early = True
class SessionManager:
def __init__(self):
self.session_ids = {"operator": None, "customer": None, "bot": None}
self.session_id_refs = {"operator": [None], "customer": [None], "bot": [None]}
self.call_flow_state = CallFlowState()
def set_session_id(self, participant_type, session_id):
if participant_type in self.session_ids:
self.session_ids[participant_type] = session_id
if participant_type in self.session_id_refs:
self.session_id_refs[participant_type][0] = session_id
def get_session_id(self, participant_type):
return self.session_ids.get(participant_type)
def get_session_id_ref(self, participant_type):
return self.session_id_refs.get(participant_type)
def is_participant_type(self, session_id, participant_type):
return self.session_ids.get(participant_type) == session_id
def reset_participant(self, participant_type):
if participant_type in self.session_ids:
self.session_ids[participant_type] = None
if participant_type in self.session_id_refs:
self.session_id_refs[participant_type][0] = None
if participant_type == "operator":
self.call_flow_state.set_operator_disconnected()
class CallConfigManager:
def __init__(self, body_data: Dict[str, Any] = None):
self.body = body_data or {}
self.dial_in_from_number = os.environ.get("HF_DIAL_IN_FROM_NUMBER", "+10000000001")
self.dial_out_to_number = os.environ.get("HF_DIAL_OUT_TO_NUMBER", "+10000000002")
self.operator_number = os.environ.get("HF_OPERATOR_NUMBER", "+10000000003")
self._initialize_maps()
self._build_reverse_lookup_maps()
def _initialize_maps(self):
self.CUSTOMER_MAP = {
"Dominic": {"phoneNumber": self.dial_in_from_number},
"Stewart": {"phoneNumber": self.dial_out_to_number},
"James": {"phoneNumber": "+10000000000", "callerId": "james-caller-id-uuid", "sipUri": "sip:[email protected]"},
"Sarah": {"sipUri": "sip:[email protected]"},
"Michael": {"phoneNumber": "+16505557890", "callerId": "michael-caller-id-uuid"},
}
self.CUSTOMER_TO_OPERATOR_MAP = {
"Dominic": ["Yunyoung", "Maria"],
"Stewart": "Yunyoung",
"James": "Yunyoung",
"Sarah": "Jennifer",
"Michael": "Paul",
"Default": "Yunyoung",
}
self.OPERATOR_CONTACT_MAP = {
"Paul": {"phoneNumber": "+12345678904", "callerId": "paul-caller-id-uuid"},
"Yunyoung": {"phoneNumber": self.operator_number},
"Maria": {"sipUri": "sip:[email protected]"},
"Jennifer": {"phoneNumber": "+14155559876", "callerId": "jennifer-caller-id-uuid"},
"Default": {"phoneNumber": self.operator_number},
}
def _build_reverse_lookup_maps(self):
self._PHONE_TO_CUSTOMER_MAP = {}
self._SIP_TO_CUSTOMER_MAP = {}
for customer_name, contact_info in self.CUSTOMER_MAP.items():
if "phoneNumber" in contact_info:
self._PHONE_TO_CUSTOMER_MAP[contact_info["phoneNumber"]] = customer_name
if "sipUri" in contact_info:
self._SIP_TO_CUSTOMER_MAP[contact_info["sipUri"]] = customer_name
@classmethod
def from_json_string(cls, json_string: str):
body_data = json.loads(json_string)
return cls(body_data)
def find_customer_by_contact(self, contact_info: str) -> Optional[str]:
if contact_info in self._PHONE_TO_CUSTOMER_MAP:
return self._PHONE_TO_CUSTOMER_MAP[contact_info]
if contact_info in self._SIP_TO_CUSTOMER_MAP:
return self._SIP_TO_CUSTOMER_MAP[contact_info]
return None
def get_customer_name(self, phone_number: str) -> Optional[str]:
return self.find_customer_by_contact(phone_number)
def get_operators_for_customer(self, customer_name: Optional[str]) -> List[str]:
if not customer_name or customer_name not in self.CUSTOMER_TO_OPERATOR_MAP:
return ["Default"]
operators = self.CUSTOMER_TO_OPERATOR_MAP[customer_name]
if isinstance(operators, str):
return [operators]
return operators
def get_operator_dialout_settings(self, operator_name: str) -> Dict[str, str]:
return self.OPERATOR_CONTACT_MAP.get(operator_name, self.OPERATOR_CONTACT_MAP["Default"])
def get_dialout_settings_for_caller(self, from_number: Optional[str] = None) -> List[Dict[str, str]]:
if not from_number:
return [self.get_operator_dialout_settings("Default")]
customer_name = self.get_customer_name(from_number)
operator_names = self.get_operators_for_customer(customer_name)
return [self.get_operator_dialout_settings(name) for name in operator_names]
def get_caller_info(self) -> Dict[str, Optional[str]]:
raw_dialin_settings = self.body.get("dialin_settings")
if not raw_dialin_settings:
return {"caller_number": None, "dialed_number": None}
dialed_number = raw_dialin_settings.get("To") or raw_dialin_settings.get("to")
caller_number = raw_dialin_settings.get("From") or raw_dialin_settings.get("from")
return {"caller_number": caller_number, "dialed_number": dialed_number}
def get_caller_number(self) -> Optional[str]:
return self.get_caller_info()["caller_number"]
async def start_dialout(self, transport, dialout_settings=None):
settings = dialout_settings or self.get_dialout_settings()
if not settings:
logger.warning("No dialout settings available")
return
for setting in settings:
if "phoneNumber" in setting:
logger.info(f"Dialing number: {setting['phoneNumber']}")
if "callerId" in setting:
logger.info(f"with callerId: {setting['callerId']}")
await transport.start_dialout({"phoneNumber": setting["phoneNumber"], "callerId": setting["callerId"]})
else:
logger.info("with no callerId")
await transport.start_dialout({"phoneNumber": setting["phoneNumber"]})
elif "sipUri" in setting:
logger.info(f"Dialing sipUri: {setting['sipUri']}")
await transport.start_dialout({"sipUri": setting["sipUri"]})
else:
logger.warning(f"Unknown dialout setting format: {setting}")
def get_dialout_settings(self) -> Optional[List[Dict[str, Any]]]:
if "dialout_settings" in self.body:
dialout_settings = self.body["dialout_settings"]
if isinstance(dialout_settings, dict):
return [dialout_settings]
elif isinstance(dialout_settings, list):
return dialout_settings
return None
def get_dialin_settings(self) -> Optional[Dict[str, Any]]:
raw_dialin_settings = self.body.get("dialin_settings")
if not raw_dialin_settings:
return None
dialin_settings = {
"call_id": raw_dialin_settings.get("call_id") or raw_dialin_settings.get("callId"),
"call_domain": raw_dialin_settings.get("call_domain") or raw_dialin_settings.get("callDomain"),
"to": raw_dialin_settings.get("to") or raw_dialin_settings.get("To"),
"from": raw_dialin_settings.get("from") or raw_dialin_settings.get("From"),
}
return dialin_settings
def get_prompt(self, prompt_name: str) -> Optional[str]:
prompts = self.body.get("prompts", [])
for prompt in prompts:
if prompt.get("name") == prompt_name:
return prompt.get("text")
return None
def get_transfer_mode(self) -> Optional[str]:
if "call_transfer" in self.body:
return self.body["call_transfer"].get("mode")
return None
def get_speak_summary(self) -> Optional[bool]:
if "call_transfer" in self.body:
return self.body["call_transfer"].get("speakSummary")
return None
def get_store_summary(self) -> Optional[bool]:
if "call_transfer" in self.body:
return self.body["call_transfer"].get("storeSummary")
return None
def is_test_mode(self) -> bool:
if "voicemail_detection" in self.body:
return bool(self.body["voicemail_detection"].get("testInPrebuilt"))
if "call_transfer" in self.body:
return bool(self.body["call_transfer"].get("testInPrebuilt"))
if "simple_dialin" in self.body:
return bool(self.body["simple_dialin"].get("testInPrebuilt"))
if "simple_dialout" in self.body:
return bool(self.body["simple_dialout"].get("testInPrebuilt"))
return False
def is_voicemail_detection_enabled(self) -> bool:
return bool(self.body.get("voicemail_detection"))
def customize_prompt(self, prompt: str, customer_name: Optional[str] = None) -> str:
if customer_name and prompt:
return prompt.replace("{customer_name}", customer_name)
return prompt
def create_system_message(self, content: str) -> Dict[str, str]:
return {"role": "system", "content": content}
def create_user_message(self, content: str) -> Dict[str, str]:
return {"role": "user", "content": content}
def get_customer_info_suffix(self, customer_name: Optional[str] = None, preposition: str = "for") -> str:
if not customer_name:
return ""
space_prefix = " " if preposition else ""
space_suffix = " " if preposition else ""
return f"{space_prefix}{preposition}{space_suffix}{customer_name}"