# # Copyright (c) 2024, Daily # # SPDX-License-Identifier: BSD 2-Clause License # import re from pipecat.frames.frames import EndFrame, Frame, InterimTranscriptionFrame, TextFrame from pipecat.processors.frame_processor import FrameDirection, FrameProcessor class SentenceAggregator(FrameProcessor): """This frame processor aggregates text frames into complete sentences. Frame input/output: TextFrame("Hello,") -> None TextFrame(" world.") -> TextFrame("Hello world.") Doctest: >>> async def print_frames(aggregator, frame): ... async for frame in aggregator.process_frame(frame): ... print(frame.text) >>> aggregator = SentenceAggregator() >>> asyncio.run(print_frames(aggregator, TextFrame("Hello,"))) >>> asyncio.run(print_frames(aggregator, TextFrame(" world."))) Hello, world. """ def __init__(self): super().__init__() self._aggregation = "" async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) # We ignore interim description at this point. if isinstance(frame, InterimTranscriptionFrame): return if isinstance(frame, TextFrame): m = re.search("(.*[?.!])(.*)", frame.text) if m: await self.push_frame(TextFrame(self._aggregation + m.group(1))) self._aggregation = m.group(2) else: self._aggregation += frame.text elif isinstance(frame, EndFrame): if self._aggregation: await self.push_frame(TextFrame(self._aggregation)) await self.push_frame(frame) else: await self.push_frame(frame, direction)