File size: 5,777 Bytes
1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb 1366db9 73c72bb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import os
import sys
from loguru import logger
from call_connection_manager import CallConfigManager, SessionManager
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
AudioRawFrame,
EndTaskFrame,
Frame,
LLMMessagesFrame,
TranscriptionFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class VoicemailDetectionProcessor(FrameProcessor):
def __init__(self, session_manager, call_config_manager, task):
super().__init__()
self.session_manager = session_manager
self.call_config_manager = call_config_manager
self.task = task
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if direction == FrameDirection.DOWNSTREAM:
if isinstance(frame, TranscriptionFrame):
logger.debug(f"Transcription: {frame.text}")
if not self.session_manager.call_flow_state.voicemail_detected:
if "voicemail" in frame.text.lower() or "leave a message" in frame.text.lower():
logger.info("Voicemail detected")
self.session_manager.call_flow_state.set_voicemail_detected()
content = "Voicemail detected, leaving a message."
message = self.call_config_manager.create_system_message(content)
await self.task.queue_frames([LLMMessagesFrame([message])])
else:
logger.info("Human detected")
self.session_manager.call_flow_state.set_human_detected()
await self.push_frame(frame, direction)
async def main(room_url: str, token: str, body: dict):
call_config_manager = CallConfigManager.from_json_string(body) if body else CallConfigManager()
dialout_settings = call_config_manager.get_dialout_settings()
test_mode = call_config_manager.is_test_mode()
session_manager = SessionManager()
# ------------ TRANSPORT SETUP ------------
transport_params = DailyParams(
api_url=os.environ.get("DAILY_API_URL", "https://api.daily.co/v1"),
api_key=os.environ.get("HF_DAILY_API_KEY", ""),
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=False,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=False, # Deepgram will handle transcription
)
transport = DailyTransport(room_url, token, "Voicemail Detection Bot", transport_params)
tts = CartesiaTTSService(
api_key=os.environ.get("HF_CARTESIA_API_KEY", ""),
voice_id="b7d50908-b17c-442d-ad8d-810c63997ed9",
)
stt = DeepgramSTTService(
api_key=os.environ.get("HF_DEEPGRAM_API_KEY", ""),
model="nova-2",
)
llm = OpenAILLMService(api_key=os.environ.get("HF_OPENAI_API_KEY"))
# ------------ LLM AND CONTEXT SETUP ------------
system_instruction = """You are a friendly, helpful robot. If a human answers, greet them and ask how you can assist. If a voicemail is detected, leave a brief message: 'Hello, this is a test call from Pipecat. Please call us back at your convenience.'"""
messages = [call_config_manager.create_system_message(system_instruction)]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# ------------ PIPELINE SETUP ------------
voicemail_detector = VoicemailDetectionProcessor(session_manager, call_config_manager, task)
pipeline = Pipeline([
transport.input(),
stt,
voicemail_detector,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
])
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
logger.debug(f"Participant left: {participant}, reason: {reason}")
await task.cancel()
# ------------ DIALOUT ------------
if not test_mode:
await call_config_manager.start_dialout(transport, dialout_settings)
# ------------ RUN PIPELINE ------------
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Voicemail Detection Bot")
parser.add_argument("-u", "--url", type=str, help="Room URL")
parser.add_argument("-t", "--token", type=str, help="Room Token")
parser.add_argument("-b", "--body", type=str, help="JSON configuration string")
args = parser.parse_args()
logger.info(f"Room URL: {args.url}")
logger.info(f"Token: {args.token}")
logger.info(f"Body provided: {bool(args.body)}")
asyncio.run(main(args.url, args.token, args.body)) |