File size: 1,826 Bytes
8d7f55c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import re
from pipecat.frames.frames import EndFrame, Frame, InterimTranscriptionFrame, TextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class SentenceAggregator(FrameProcessor):
"""This frame processor aggregates text frames into complete sentences.
Frame input/output:
TextFrame("Hello,") -> None
TextFrame(" world.") -> TextFrame("Hello world.")
Doctest:
>>> async def print_frames(aggregator, frame):
... async for frame in aggregator.process_frame(frame):
... print(frame.text)
>>> aggregator = SentenceAggregator()
>>> asyncio.run(print_frames(aggregator, TextFrame("Hello,")))
>>> asyncio.run(print_frames(aggregator, TextFrame(" world.")))
Hello, world.
"""
def __init__(self):
super().__init__()
self._aggregation = ""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We ignore interim description at this point.
if isinstance(frame, InterimTranscriptionFrame):
return
if isinstance(frame, TextFrame):
m = re.search("(.*[?.!])(.*)", frame.text)
if m:
await self.push_frame(TextFrame(self._aggregation + m.group(1)))
self._aggregation = m.group(2)
else:
self._aggregation += frame.text
elif isinstance(frame, EndFrame):
if self._aggregation:
await self.push_frame(TextFrame(self._aggregation))
await self.push_frame(frame)
else:
await self.push_frame(frame, direction)
|