Deadmon commited on
Commit
c5d854a
·
verified ·
1 Parent(s): 64382cc

Update bot_runner_helpers.py

Browse files
Files changed (1) hide show
  1. bot_runner_helpers.py +24 -192
bot_runner_helpers.py CHANGED
@@ -1,211 +1,43 @@
1
- # bot_runner_helpers.py
2
- from typing import Any, Dict, Optional
3
 
4
- from bot_constants import (
5
- DEFAULT_CALLTRANSFER_MODE,
6
- DEFAULT_DIALIN_EXAMPLE,
7
- DEFAULT_SPEAK_SUMMARY,
8
- DEFAULT_STORE_SUMMARY,
9
- DEFAULT_TEST_IN_PREBUILT,
10
- )
11
- from call_connection_manager import CallConfigManager
12
 
13
- # ----------------- Configuration Helpers ----------------- #
14
-
15
-
16
- def determine_room_capabilities(config_body: Optional[Dict[str, Any]] = None) -> Dict[str, bool]:
17
- """Determine room capabilities based on the configuration.
18
-
19
- This function examines the configuration to determine which capabilities
20
- the Daily room should have enabled.
21
-
22
- Args:
23
- config_body: Configuration dictionary that determines room capabilities
24
-
25
- Returns:
26
- Dictionary of capability flags
27
- """
28
  capabilities = {
29
  "enable_dialin": False,
30
  "enable_dialout": False,
31
- # Add more capabilities here in the future as needed
32
  }
33
-
34
- if not config_body:
35
  return capabilities
36
 
37
- # Check for dialin capability
38
- capabilities["enable_dialin"] = "dialin_settings" in config_body
39
-
40
- # Check for dialout capability - needed for outbound calls or transfers
41
- has_dialout_settings = "dialout_settings" in config_body
42
-
43
- # Check if there's a transfer to an operator configured
44
- has_call_transfer = "call_transfer" in config_body
45
-
46
- # Enable dialout if any condition requires it
47
- capabilities["enable_dialout"] = has_dialout_settings or has_call_transfer
48
 
49
  return capabilities
50
 
51
 
52
- def ensure_dialout_settings_array(body: Dict[str, Any]) -> Dict[str, Any]:
53
- """Ensures dialout_settings is an array of objects.
54
-
55
- Args:
56
- body: The configuration dictionary
57
-
58
- Returns:
59
- Updated configuration with dialout_settings as an array
60
- """
61
- if "dialout_settings" in body:
62
- # Convert to array if it's not already one
63
- if not isinstance(body["dialout_settings"], list):
64
- body["dialout_settings"] = [body["dialout_settings"]]
65
-
66
- return body
67
-
68
-
69
- def ensure_prompt_config(body: Dict[str, Any]) -> Dict[str, Any]:
70
- """Ensures the body has appropriate prompts settings, but doesn't add defaults.
71
-
72
- Only makes sure the prompt section exists, allowing the bot script to handle defaults.
73
-
74
- Args:
75
- body: The configuration dictionary
76
-
77
- Returns:
78
- Updated configuration with prompt settings section
79
- """
80
- if "prompts" not in body:
81
- body["prompts"] = []
82
- return body
83
-
84
-
85
- def create_call_transfer_settings(body: Dict[str, Any]) -> Dict[str, Any]:
86
- """Create call transfer settings based on configuration and customer mapping.
87
-
88
- Args:
89
- body: The configuration dictionary
90
-
91
- Returns:
92
- Call transfer settings dictionary
93
- """
94
- # Default transfer settings
95
- transfer_settings = {
96
- "mode": DEFAULT_CALLTRANSFER_MODE,
97
- "speakSummary": DEFAULT_SPEAK_SUMMARY,
98
- "storeSummary": DEFAULT_STORE_SUMMARY,
99
- "testInPrebuilt": DEFAULT_TEST_IN_PREBUILT,
100
- }
101
-
102
- # If call_transfer already exists, merge the defaults with the existing settings
103
- # This ensures all required fields exist while preserving user-specified values
104
- if "call_transfer" in body:
105
- existing_settings = body["call_transfer"]
106
- # Update defaults with existing settings (existing values will override defaults)
107
- for key, value in existing_settings.items():
108
- transfer_settings[key] = value
109
- else:
110
- # No existing call_transfer - check if we have dialin settings for customer lookup
111
- if "dialin_settings" in body:
112
- # Create a temporary routing manager just for customer lookup
113
- call_config_manager = CallConfigManager(body)
114
-
115
- # Get caller info
116
- caller_info = call_config_manager.get_caller_info()
117
- from_number = caller_info.get("caller_number")
118
-
119
- if from_number:
120
- # Get customer name from phone number
121
- customer_name = call_config_manager.get_customer_name(from_number)
122
-
123
- # If we know the customer name, add it to the config for the bot to use
124
- if customer_name:
125
- transfer_settings["customerName"] = customer_name
126
-
127
- return transfer_settings
128
-
129
-
130
- def create_simple_dialin_settings(body: Dict[str, Any]) -> Dict[str, Any]:
131
- """Create simple dialin settings based on configuration.
132
-
133
- Args:
134
- body: The configuration dictionary
135
-
136
- Returns:
137
- Simple dialin settings dictionary
138
- """
139
- # Default simple dialin settings
140
- simple_dialin_settings = {
141
- "testInPrebuilt": DEFAULT_TEST_IN_PREBUILT,
142
- }
143
-
144
- # If simple_dialin already exists, merge the defaults with the existing settings
145
- if "simple_dialin" in body:
146
- existing_settings = body["simple_dialin"]
147
- # Update defaults with existing settings (existing values will override defaults)
148
- for key, value in existing_settings.items():
149
- simple_dialin_settings[key] = value
150
-
151
- return simple_dialin_settings
152
-
153
-
154
- def create_simple_dialout_settings(body: Dict[str, Any]) -> Dict[str, Any]:
155
- """Create simple dialout settings based on configuration.
156
-
157
- Args:
158
- body: The configuration dictionary
159
-
160
- Returns:
161
- Simple dialout settings dictionary
162
- """
163
- # Default simple dialout settings
164
- simple_dialout_settings = {
165
- "testInPrebuilt": DEFAULT_TEST_IN_PREBUILT,
166
- }
167
-
168
- # If simple_dialout already exists, merge the defaults with the existing settings
169
- if "simple_dialout" in body:
170
- existing_settings = body["simple_dialout"]
171
- # Update defaults with existing settings (existing values will override defaults)
172
- for key, value in existing_settings.items():
173
- simple_dialout_settings[key] = value
174
-
175
- return simple_dialout_settings
176
-
177
-
178
  async def process_dialin_request(data: Dict[str, Any]) -> Dict[str, Any]:
179
- """Process incoming dial-in request data to create a properly formatted body.
180
-
181
- Converts camelCase fields received from webhook to snake_case format
182
- for internal consistency across the codebase.
183
-
184
- Args:
185
- data: Raw dialin data from webhook
186
-
187
- Returns:
188
- Properly formatted configuration with snake_case keys
189
- """
190
- # Create base body with dialin settings
191
  body = {
192
  "dialin_settings": {
193
- "to": data.get("To", ""),
194
- "from": data.get("From", ""),
195
- "call_id": data.get("callId", data.get("CallSid", "")), # Convert to snake_case
196
- "call_domain": data.get("callDomain", ""), # Convert to snake_case
197
  }
198
  }
 
199
 
200
- # Use the global default to determine which example to run for dialin webhooks
201
- example = DEFAULT_DIALIN_EXAMPLE
202
-
203
- # Configure the bot based on the example
204
- if example == "call_transfer":
205
- # Create call transfer settings
206
- body["call_transfer"] = create_call_transfer_settings(body)
207
- elif example == "simple_dialin":
208
- # Create simple dialin settings
209
- body["simple_dialin"] = create_simple_dialin_settings(body)
210
 
211
- return body
 
 
 
 
1
+ from typing import Dict, Any, Optional
2
+ from bot_definitions import bot_registry
3
 
 
 
 
 
 
 
 
 
4
 
5
+ def determine_room_capabilities(body: Dict[str, Any]) -> Dict[str, bool]:
6
+ bot_type = bot_registry.detect_bot_type(body)
7
+ bot = bot_registry.get_bot(bot_type)
 
 
 
 
 
 
 
 
 
 
 
 
8
  capabilities = {
9
  "enable_dialin": False,
10
  "enable_dialout": False,
 
11
  }
12
+ if not bot:
 
13
  return capabilities
14
 
15
+ if bot.name == "call_transfer":
16
+ call_transfer_config = body.get("call_transfer", {})
17
+ capabilities["enable_dialin"] = True
18
+ if call_transfer_config.get("mode") == "dialout":
19
+ capabilities["enable_dialout"] = True
20
+ elif bot.name == "simple_dialin":
21
+ capabilities["enable_dialin"] = True
22
+ elif bot.name in ["simple_dialout", "voicemail_detection"]:
23
+ capabilities["enable_dialout"] = True
 
 
24
 
25
  return capabilities
26
 
27
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
  async def process_dialin_request(data: Dict[str, Any]) -> Dict[str, Any]:
 
 
 
 
 
 
 
 
 
 
 
 
29
  body = {
30
  "dialin_settings": {
31
+ "callId": data.get("callId"),
32
+ "callDomain": data.get("callDomain"),
33
+ "From": data.get("From"),
34
+ "To": data.get("To"),
35
  }
36
  }
37
+ return body
38
 
 
 
 
 
 
 
 
 
 
 
39
 
40
+ def ensure_prompt_config(body: Dict[str, Any]) -> Dict[str, Any]:
41
+ if "prompts" not in body:
42
+ body["prompts"] = []
43
+ return body