File size: 6,834 Bytes
1366db9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import os
import sys
from call_connection_manager import CallConfigManager
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndTaskFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
daily_api_key = os.getenv("DAILY_API_KEY", "")
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
async def main(
room_url: str,
token: str,
body: dict,
):
# ------------ CONFIGURATION AND SETUP ------------
# Create a config manager using the provided body
call_config_manager = CallConfigManager.from_json_string(body) if body else CallConfigManager()
# Get important configuration values
dialout_settings = call_config_manager.get_dialout_settings()
test_mode = call_config_manager.is_test_mode()
# ------------ TRANSPORT SETUP ------------
transport_params = DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=False,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
)
# Initialize transport with Daily
transport = DailyTransport(
room_url,
token,
"Simple Dial-out Bot",
transport_params,
)
# Initialize TTS
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY", ""),
voice_id="b7d50908-b17c-442d-ad8d-810c63997ed9", # Use Helpful Woman voice by default
)
# ------------ FUNCTION DEFINITIONS ------------
async def terminate_call(params: FunctionCallParams):
"""Function the bot can call to terminate the call upon completion of a voicemail message."""
await params.llm.queue_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
# Define function schemas for tools
terminate_call_function = FunctionSchema(
name="terminate_call",
description="Call this function to terminate the call.",
properties={},
required=[],
)
# Create tools schema
tools = ToolsSchema(standard_tools=[terminate_call_function])
# ------------ LLM AND CONTEXT SETUP ------------
# Set up the system instruction for the LLM
system_instruction = """You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself. If the user ends the conversation, **IMMEDIATELY** call the `terminate_call` function. """
# Initialize LLM
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
# Register functions with the LLM
llm.register_function("terminate_call", terminate_call)
# Create system message and initialize messages list
messages = [call_config_manager.create_system_message(system_instruction)]
# Initialize LLM context and aggregator
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
# ------------ PIPELINE SETUP ------------
# Build pipeline
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
# Create pipeline task
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
# ------------ EVENT HANDLERS ------------
@transport.event_handler("on_joined")
async def on_joined(transport, data):
# Start dialout if needed
if not test_mode and dialout_settings:
logger.debug("Dialout settings detected; starting dialout")
await call_config_manager.start_dialout(transport, dialout_settings)
@transport.event_handler("on_dialout_connected")
async def on_dialout_connected(transport, data):
logger.debug(f"Dial-out connected: {data}")
@transport.event_handler("on_dialout_answered")
async def on_dialout_answered(transport, data):
logger.debug(f"Dial-out answered: {data}")
# Automatically start capturing transcription for the participant
await transport.capture_participant_transcription(data["sessionId"])
# The bot will wait to hear the user before the bot speaks
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
if test_mode:
logger.debug(f"First participant joined: {participant['id']}")
await transport.capture_participant_transcription(participant["id"])
# The bot will wait to hear the user before the bot speaks
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
logger.debug(f"Participant left: {participant}, reason: {reason}")
await task.cancel()
# ------------ RUN PIPELINE ------------
if test_mode:
logger.debug("Running in test mode (can be tested in Daily Prebuilt)")
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple Dial-out Bot")
parser.add_argument("-u", "--url", type=str, help="Room URL")
parser.add_argument("-t", "--token", type=str, help="Room Token")
parser.add_argument("-b", "--body", type=str, help="JSON configuration string")
args = parser.parse_args()
# Log the arguments for debugging
logger.info(f"Room URL: {args.url}")
logger.info(f"Token: {args.token}")
logger.info(f"Body provided: {bool(args.body)}")
asyncio.run(main(args.url, args.token, args.body))
|