File size: 5,777 Bytes
1366db9
 
 
 
 
 
 
 
 
 
 
73c72bb
1366db9
 
73c72bb
1366db9
73c72bb
 
1366db9
 
 
 
 
73c72bb
1366db9
 
 
73c72bb
 
1366db9
 
 
 
73c72bb
 
1366db9
73c72bb
 
 
1366db9
73c72bb
1366db9
73c72bb
 
 
 
 
 
 
 
 
 
 
 
 
1366db9
 
73c72bb
1366db9
 
 
 
 
73c72bb
 
 
 
 
 
 
 
 
1366db9
 
73c72bb
1366db9
73c72bb
 
1366db9
73c72bb
 
 
1366db9
73c72bb
1366db9
73c72bb
 
 
 
 
1366db9
73c72bb
 
1366db9
73c72bb
 
 
 
 
 
 
 
 
 
1366db9
73c72bb
1366db9
 
 
73c72bb
1366db9
 
 
73c72bb
 
1366db9
73c72bb
 
 
1366db9
73c72bb
1366db9
73c72bb
1366db9
 
 
 
 
 
 
 
 
 
73c72bb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import os
import sys
from loguru import logger

from call_connection_manager import CallConfigManager, SessionManager
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
    AudioRawFrame,
    EndTaskFrame,
    Frame,
    LLMMessagesFrame,
    TranscriptionFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")

class VoicemailDetectionProcessor(FrameProcessor):
    def __init__(self, session_manager, call_config_manager, task):
        super().__init__()
        self.session_manager = session_manager
        self.call_config_manager = call_config_manager
        self.task = task

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)
        if direction == FrameDirection.DOWNSTREAM:
            if isinstance(frame, TranscriptionFrame):
                logger.debug(f"Transcription: {frame.text}")
                if not self.session_manager.call_flow_state.voicemail_detected:
                    if "voicemail" in frame.text.lower() or "leave a message" in frame.text.lower():
                        logger.info("Voicemail detected")
                        self.session_manager.call_flow_state.set_voicemail_detected()
                        content = "Voicemail detected, leaving a message."
                        message = self.call_config_manager.create_system_message(content)
                        await self.task.queue_frames([LLMMessagesFrame([message])])
                    else:
                        logger.info("Human detected")
                        self.session_manager.call_flow_state.set_human_detected()
        await self.push_frame(frame, direction)

async def main(room_url: str, token: str, body: dict):
    call_config_manager = CallConfigManager.from_json_string(body) if body else CallConfigManager()
    dialout_settings = call_config_manager.get_dialout_settings()
    test_mode = call_config_manager.is_test_mode()
    session_manager = SessionManager()

    # ------------ TRANSPORT SETUP ------------
    transport_params = DailyParams(
        api_url=os.environ.get("DAILY_API_URL", "https://api.daily.co/v1"),
        api_key=os.environ.get("HF_DAILY_API_KEY", ""),
        audio_in_enabled=True,
        audio_out_enabled=True,
        video_out_enabled=False,
        vad_analyzer=SileroVADAnalyzer(),
        transcription_enabled=False,  # Deepgram will handle transcription
    )

    transport = DailyTransport(room_url, token, "Voicemail Detection Bot", transport_params)
    tts = CartesiaTTSService(
        api_key=os.environ.get("HF_CARTESIA_API_KEY", ""),
        voice_id="b7d50908-b17c-442d-ad8d-810c63997ed9",
    )
    stt = DeepgramSTTService(
        api_key=os.environ.get("HF_DEEPGRAM_API_KEY", ""),
        model="nova-2",
    )
    llm = OpenAILLMService(api_key=os.environ.get("HF_OPENAI_API_KEY"))

    # ------------ LLM AND CONTEXT SETUP ------------
    system_instruction = """You are a friendly, helpful robot. If a human answers, greet them and ask how you can assist. If a voicemail is detected, leave a brief message: 'Hello, this is a test call from Pipecat. Please call us back at your convenience.'"""
    messages = [call_config_manager.create_system_message(system_instruction)]
    context = OpenAILLMContext(messages)
    context_aggregator = llm.create_context_aggregator(context)

    # ------------ PIPELINE SETUP ------------
    voicemail_detector = VoicemailDetectionProcessor(session_manager, call_config_manager, task)

    pipeline = Pipeline([
        transport.input(),
        stt,
        voicemail_detector,
        context_aggregator.user(),
        llm,
        tts,
        transport.output(),
        context_aggregator.assistant(),
    ])

    task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))

    @transport.event_handler("on_first_participant_joined")
    async def on_first_participant_joined(transport, participant):
        await task.queue_frames([context_aggregator.user().get_context_frame()])

    @transport.event_handler("on_participant_left")
    async def on_participant_left(transport, participant, reason):
        logger.debug(f"Participant left: {participant}, reason: {reason}")
        await task.cancel()

    # ------------ DIALOUT ------------
    if not test_mode:
        await call_config_manager.start_dialout(transport, dialout_settings)

    # ------------ RUN PIPELINE ------------
    runner = PipelineRunner()
    await runner.run(task)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Pipecat Voicemail Detection Bot")
    parser.add_argument("-u", "--url", type=str, help="Room URL")
    parser.add_argument("-t", "--token", type=str, help="Room Token")
    parser.add_argument("-b", "--body", type=str, help="JSON configuration string")
    args = parser.parse_args()
    logger.info(f"Room URL: {args.url}")
    logger.info(f"Token: {args.token}")
    logger.info(f"Body provided: {bool(args.body)}")
    asyncio.run(main(args.url, args.token, args.body))