|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
import itertools
|
|
|
|
from PIL import Image
|
|
from typing import List
|
|
|
|
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
|
from pipecat.frames.frames import (
|
|
AudioRawFrame,
|
|
CancelFrame,
|
|
MetricsFrame,
|
|
SpriteFrame,
|
|
StartFrame,
|
|
EndFrame,
|
|
Frame,
|
|
ImageRawFrame,
|
|
StartInterruptionFrame,
|
|
StopInterruptionFrame,
|
|
SystemFrame,
|
|
TransportMessageFrame)
|
|
from pipecat.transports.base_transport import TransportParams
|
|
|
|
from loguru import logger
|
|
|
|
|
|
class BaseOutputTransport(FrameProcessor):
|
|
|
|
def __init__(self, params: TransportParams, **kwargs):
|
|
super().__init__(**kwargs)
|
|
|
|
self._params = params
|
|
|
|
|
|
|
|
self._camera_images = None
|
|
|
|
|
|
|
|
audio_bytes_10ms = int(self._params.audio_out_sample_rate / 100) * \
|
|
self._params.audio_out_channels * 2
|
|
self._audio_chunk_size = audio_bytes_10ms * 2
|
|
|
|
self._stopped_event = asyncio.Event()
|
|
|
|
|
|
|
|
|
|
self._create_sink_task()
|
|
|
|
|
|
|
|
self._create_push_task()
|
|
|
|
async def start(self, frame: StartFrame):
|
|
|
|
if self._params.camera_out_enabled:
|
|
self._camera_out_queue = asyncio.Queue()
|
|
self._camera_out_task = self.get_event_loop().create_task(self._camera_out_task_handler())
|
|
|
|
async def stop(self):
|
|
|
|
if self._params.camera_out_enabled:
|
|
self._camera_out_task.cancel()
|
|
await self._camera_out_task
|
|
|
|
self._stopped_event.set()
|
|
|
|
async def send_message(self, frame: TransportMessageFrame):
|
|
pass
|
|
|
|
async def send_metrics(self, frame: MetricsFrame):
|
|
pass
|
|
|
|
async def write_frame_to_camera(self, frame: ImageRawFrame):
|
|
pass
|
|
|
|
async def write_raw_audio_frames(self, frames: bytes):
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
async def cleanup(self):
|
|
if self._sink_task:
|
|
self._sink_task.cancel()
|
|
await self._sink_task
|
|
|
|
self._push_frame_task.cancel()
|
|
await self._push_frame_task
|
|
|
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
|
await super().process_frame(frame, direction)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if isinstance(frame, StartFrame):
|
|
await self.start(frame)
|
|
await self.push_frame(frame, direction)
|
|
|
|
elif isinstance(frame, CancelFrame):
|
|
await self.stop()
|
|
await self.push_frame(frame, direction)
|
|
elif isinstance(frame, StartInterruptionFrame) or isinstance(frame, StopInterruptionFrame):
|
|
await self._handle_interruptions(frame)
|
|
await self.push_frame(frame, direction)
|
|
elif isinstance(frame, MetricsFrame):
|
|
await self.send_metrics(frame)
|
|
await self.push_frame(frame, direction)
|
|
elif isinstance(frame, SystemFrame):
|
|
await self.push_frame(frame, direction)
|
|
elif isinstance(frame, AudioRawFrame):
|
|
await self._handle_audio(frame)
|
|
else:
|
|
await self._sink_queue.put(frame)
|
|
|
|
|
|
|
|
|
|
if isinstance(frame, CancelFrame) or isinstance(frame, EndFrame):
|
|
await self._stopped_event.wait()
|
|
|
|
async def _handle_interruptions(self, frame: Frame):
|
|
if not self.interruptions_allowed:
|
|
return
|
|
|
|
if isinstance(frame, StartInterruptionFrame):
|
|
|
|
self._sink_task.cancel()
|
|
await self._sink_task
|
|
self._create_sink_task()
|
|
|
|
self._push_frame_task.cancel()
|
|
await self._push_frame_task
|
|
self._create_push_task()
|
|
|
|
async def _handle_audio(self, frame: AudioRawFrame):
|
|
audio = frame.audio
|
|
for i in range(0, len(audio), self._audio_chunk_size):
|
|
chunk = AudioRawFrame(audio[i: i + self._audio_chunk_size],
|
|
sample_rate=frame.sample_rate, num_channels=frame.num_channels)
|
|
await self._sink_queue.put(chunk)
|
|
|
|
def _create_sink_task(self):
|
|
loop = self.get_event_loop()
|
|
self._sink_queue = asyncio.Queue()
|
|
self._sink_task = loop.create_task(self._sink_task_handler())
|
|
|
|
async def _sink_task_handler(self):
|
|
|
|
buffer = bytearray()
|
|
while True:
|
|
try:
|
|
frame = await self._sink_queue.get()
|
|
if isinstance(frame, AudioRawFrame) and self._params.audio_out_enabled:
|
|
buffer.extend(frame.audio)
|
|
buffer = await self._maybe_send_audio(buffer)
|
|
elif isinstance(frame, ImageRawFrame) and self._params.camera_out_enabled:
|
|
await self._set_camera_image(frame)
|
|
elif isinstance(frame, SpriteFrame) and self._params.camera_out_enabled:
|
|
await self._set_camera_images(frame.images)
|
|
elif isinstance(frame, TransportMessageFrame):
|
|
await self.send_message(frame)
|
|
else:
|
|
await self._internal_push_frame(frame)
|
|
|
|
if isinstance(frame, EndFrame):
|
|
await self.stop()
|
|
|
|
self._sink_queue.task_done()
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.exception(f"{self} error processing sink queue: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def _create_push_task(self):
|
|
loop = self.get_event_loop()
|
|
self._push_queue = asyncio.Queue()
|
|
self._push_frame_task = loop.create_task(self._push_frame_task_handler())
|
|
|
|
async def _internal_push_frame(
|
|
self,
|
|
frame: Frame | None,
|
|
direction: FrameDirection | None = FrameDirection.DOWNSTREAM):
|
|
await self._push_queue.put((frame, direction))
|
|
|
|
async def _push_frame_task_handler(self):
|
|
while True:
|
|
try:
|
|
(frame, direction) = await self._push_queue.get()
|
|
await self.push_frame(frame, direction)
|
|
except asyncio.CancelledError:
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
async def send_image(self, frame: ImageRawFrame | SpriteFrame):
|
|
await self.process_frame(frame, FrameDirection.DOWNSTREAM)
|
|
|
|
async def _draw_image(self, frame: ImageRawFrame):
|
|
desired_size = (self._params.camera_out_width, self._params.camera_out_height)
|
|
|
|
if frame.size != desired_size:
|
|
image = Image.frombytes(frame.format, frame.size, frame.image)
|
|
resized_image = image.resize(desired_size)
|
|
logger.warning(
|
|
f"{frame} does not have the expected size {desired_size}, resizing")
|
|
frame = ImageRawFrame(resized_image.tobytes(), resized_image.size, resized_image.format)
|
|
|
|
await self.write_frame_to_camera(frame)
|
|
|
|
async def _set_camera_image(self, image: ImageRawFrame):
|
|
if self._params.camera_out_is_live:
|
|
await self._camera_out_queue.put(image)
|
|
else:
|
|
self._camera_images = itertools.cycle([image])
|
|
|
|
async def _set_camera_images(self, images: List[ImageRawFrame]):
|
|
self._camera_images = itertools.cycle(images)
|
|
|
|
async def _camera_out_task_handler(self):
|
|
while True:
|
|
try:
|
|
if self._params.camera_out_is_live:
|
|
image = await self._camera_out_queue.get()
|
|
await self._draw_image(image)
|
|
self._camera_out_queue.task_done()
|
|
elif self._camera_images:
|
|
image = next(self._camera_images)
|
|
await self._draw_image(image)
|
|
await asyncio.sleep(1.0 / self._params.camera_out_framerate)
|
|
else:
|
|
await asyncio.sleep(1.0 / self._params.camera_out_framerate)
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.exception(f"{self} error writing to camera: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
async def send_audio(self, frame: AudioRawFrame):
|
|
await self.process_frame(frame, FrameDirection.DOWNSTREAM)
|
|
|
|
async def _maybe_send_audio(self, buffer: bytearray) -> bytearray:
|
|
if len(buffer) >= self._audio_chunk_size:
|
|
await self.write_raw_audio_frames(bytes(buffer[:self._audio_chunk_size]))
|
|
buffer = buffer[self._audio_chunk_size:]
|
|
return buffer
|
|
|