|
import asyncio |
|
import os |
|
import sys |
|
import time |
|
import logging |
|
from pipecat.frames import TextFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, TTSStartedFrame, TTSStoppedFrame |
|
from pipecat.pipeline.pipeline import Pipeline |
|
from pipecat.pipeline.runner import PipelineRunner |
|
from pipecat.pipeline.task import PipelineParams |
|
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection |
|
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService |
|
from pipecat.services.deepgram.stt import DeepgramSTTService |
|
from pipecat.transports.services.daily import DailyParams, DailyTransport |
|
from pipecat.audio.vad.silero import SileroVADAnalyzer |
|
from pipecat.services.openai.llm import OpenAILLMService |
|
from elevenlabs import ElevenLabs |
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") |
|
logger = logging.getLogger(__name__) |
|
|
|
SILENCE_TIMEOUT_SECONDS = float(os.environ.get("SILENCE_TIMEOUT_SECONDS", 10)) |
|
MAX_SILENCE_PROMPTS = int(os.environ.get("MAX_SILENCE_PROMPTS", 3)) |
|
SILENCE_PROMPT_TEXT = "Are you still there?" |
|
GOODBYE_PROMPT_TEXT = "It seems you're no longer there. I'm hanging up now. Goodbye." |
|
|
|
class SilenceAndCallLogicProcessor(FrameProcessor): |
|
def __init__(self, tts_service, pipeline, app): |
|
super().__init__() |
|
self.tts_service = tts_service |
|
self.pipeline = pipeline |
|
self.app = app |
|
self.last_activity_ts = time.time() |
|
self.silence_prompts_count = 0 |
|
self._bot_is_speaking = False |
|
self._silence_check_task = None |
|
self.app.current_call_stats["silence_events"] = 0 |
|
|
|
async def start(self): |
|
self.last_activity_ts = time.time() |
|
self.silence_prompts_count = 0 |
|
self._bot_is_speaking = False |
|
if self._silence_check_task: |
|
self._silence_check_task.cancel() |
|
self._silence_check_task = asyncio.create_task(self._check_silence_loop()) |
|
|
|
async def stop(self): |
|
if self._silence_check_task: |
|
self._silence_check_task.cancel() |
|
try: |
|
await self._silence_check_task |
|
except asyncio.CancelledError: |
|
pass |
|
await self.tts_service.stop() |
|
|
|
def _reset_activity_timer(self): |
|
self.last_activity_ts = time.time() |
|
self.silence_prompts_count = 0 |
|
|
|
async def process_frame(self, frame, direction): |
|
if isinstance(frame, (UserStartedSpeakingFrame, TextFrame)) and direction == FrameDirection.UPSTREAM: |
|
self._reset_activity_timer() |
|
if isinstance(frame, TTSStartedFrame) and direction == FrameDirection.DOWNSTREAM: |
|
self._bot_is_speaking = True |
|
elif isinstance(frame, TTSStoppedFrame) and direction == FrameDirection.DOWNSTREAM: |
|
self._bot_is_speaking = False |
|
self.last_activity_ts = time.time() |
|
await self.push_frame(frame, direction) |
|
|
|
async def _check_silence_loop(self): |
|
while True: |
|
await asyncio.sleep(1) |
|
if self._bot_is_speaking: |
|
continue |
|
if time.time() - self.last_activity_ts > SILENCE_TIMEOUT_SECONDS: |
|
self.app.current_call_stats["silence_events"] += 1 |
|
self.silence_prompts_count += 1 |
|
self._bot_is_speaking = True |
|
if self.silence_prompts_count >= MAX_SILENCE_PROMPTS: |
|
await self.push_frame(TextFrame(GOODBYE_PROMPT_TEXT), FrameDirection.DOWNSTREAM) |
|
await asyncio.sleep(2) |
|
await self.pipeline.stop_when_done() |
|
break |
|
else: |
|
await self.push_frame(TextFrame(SILENCE_PROMPT_TEXT), FrameDirection.DOWNSTREAM) |
|
self.last_activity_ts = time.time() |
|
self._bot_is_speaking = False |
|
|
|
class PhoneChatbotApp: |
|
def __init__(self): |
|
self.daily_transport = None |
|
self.pipeline = None |
|
self.stt_service = None |
|
self.tts_service = None |
|
self.llm_service = None |
|
self.silence_processor = None |
|
self.call_start_time = None |
|
self.current_call_stats = { |
|
"duration_seconds": 0, |
|
"silence_events": 0, |
|
"start_time": None, |
|
"end_time": None, |
|
"ended_by_silence": False |
|
} |
|
|
|
def _reset_call_stats(self): |
|
self.call_start_time = time.time() |
|
self.current_call_stats = { |
|
"duration_seconds": 0, |
|
"silence_events": 0, |
|
"start_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(self.call_start_time)), |
|
"end_time": None, |
|
"ended_by_silence": False |
|
} |
|
|
|
async def _log_call_summary(self): |
|
if self.call_start_time: |
|
call_end_time = time.time() |
|
self.current_call_stats["duration_seconds"] = round(call_end_time - self.call_start_time, 2) |
|
self.current_call_stats["end_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(call_end_time)) |
|
if self.silence_processor and self.silence_processor.silence_prompts_count >= MAX_SILENCE_PROMPTS: |
|
self.current_call_stats["ended_by_silence"] = True |
|
logger.info("--- Post-Call Summary ---") |
|
for key, value in self.current_call_stats.items(): |
|
logger.info(f" {key.replace('_', ' ').title()}: {value}") |
|
logger.info("-------------------------") |
|
|
|
def setup_pipeline_hook(self, pipeline_params: PipelineParams, room_url: str, token: str): |
|
self._reset_call_stats() |
|
self.pipeline = Pipeline([self.stt_service, self.llm_service, self.tts_service]) |
|
self.silence_processor = SilenceAndCallLogicProcessor( |
|
tts_service=self.tts_service, |
|
pipeline=self.pipeline, |
|
app=self |
|
) |
|
self.pipeline.processors.append(self.silence_processor) |
|
pipeline_params.pipeline = self.pipeline |
|
pipeline_params.params = PipelineParams(allow_interruptions=True, enable_metrics=True) |
|
return pipeline_params |
|
|
|
def validate_voice_id(self, voice_id: str) -> bool: |
|
try: |
|
client = ElevenLabs(api_key=os.environ.get("elevenlabs")) |
|
client.voices.get(voice_id=voice_id) |
|
return True |
|
except Exception as e: |
|
logger.error(f"Failed to validate ElevenLabs voice ID {voice_id}: {e}") |
|
return False |
|
|
|
async def run(self): |
|
required_keys = [ |
|
"deepgram", "elevenlabs", "dailyco", "azure_openai" |
|
] |
|
missing_keys = [key for key in required_keys if not os.environ.get(key)] |
|
if missing_keys: |
|
logger.error(f"Missing environment variables: {', '.join(missing_keys)}") |
|
sys.exit(1) |
|
|
|
voice_id = os.environ.get("ELEVENLABS_VOICE_ID", "cgSgspJ2msm6clMCkdW9") |
|
if not self.validate_voice_id(voice_id): |
|
logger.error(f"Invalid ElevenLabs voice ID: {voice_id}") |
|
sys.exit(1) |
|
|
|
self.stt_service = DeepgramSTTService( |
|
api_key=os.environ.get("deepgram"), |
|
input_audio_format="linear16" |
|
) |
|
self.tts_service = ElevenLabsTTSService( |
|
api_key=os.environ.get("elevenlabs"), |
|
voice_id=voice_id |
|
) |
|
self.llm_service = OpenAILLMService( |
|
preprompt="You are a friendly and helpful phone assistant." |
|
) |
|
self.daily_transport = DailyTransport( |
|
os.environ.get("DAILY_DOMAIN", "your-username.daily.co"), |
|
os.environ.get("dailyco"), |
|
None, |
|
None, |
|
"Pipecat Phone Demo", |
|
vad_analyzer=SileroVADAnalyzer(), |
|
daily_params=DailyParams( |
|
audio_in_enabled=True, |
|
audio_out_enabled=True, |
|
transcription_enabled=False |
|
) |
|
) |
|
self.daily_transport.pipeline_params_hook = self.setup_pipeline_hook |
|
|
|
runner = PipelineRunner() |
|
try: |
|
await runner.run(self.daily_transport) |
|
except KeyboardInterrupt: |
|
logger.info("Ctrl+C pressed, shutting down") |
|
except Exception as e: |
|
logger.error(f"An error occurred: {e}", exc_info=True) |
|
finally: |
|
await self._log_call_summary() |
|
if self.pipeline: |
|
await self.pipeline.stop_when_done() |
|
if self.silence_processor: |
|
await self.silence_processor.stop() |
|
|