Spaces:
Runtime error
Runtime error
File size: 4,352 Bytes
b11e88c 666db96 b11e88c 40cff94 b11e88c 40cff94 2106106 40cff94 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
import subprocess
subprocess.run(["pip", "install", "fastrtc==0.0.3.post7"])
import asyncio
import base64
import os
import gradio as gr
from gradio.utils import get_space
import numpy as np
from dotenv import load_dotenv
from fastrtc import (
AdditionalOutputs,
AsyncStreamHandler,
Stream,
get_twilio_turn_credentials,
WebRTCError,
audio_to_float32,
)
from fastapi import FastAPI
from phonic.client import PhonicSTSClient, get_voices
load_dotenv()
STS_URI = "wss://api.phonic.co/v1/sts/ws"
API_KEY = os.environ["PHONIC_API_KEY"]
SAMPLE_RATE = 44_100
voices = get_voices(API_KEY)
voice_ids = [voice["id"] for voice in voices]
class PhonicHandler(AsyncStreamHandler):
def __init__(self):
super().__init__(input_sample_rate=SAMPLE_RATE, output_sample_rate=SAMPLE_RATE)
self.output_queue = asyncio.Queue()
self.client = None
def copy(self) -> AsyncStreamHandler:
return PhonicHandler()
async def start_up(self):
await self.wait_for_args()
voice_id = self.latest_args[1]
try:
async with PhonicSTSClient(STS_URI, API_KEY) as client:
self.client = client
sts_stream = client.sts( # type: ignore
input_format="pcm_44100",
output_format="pcm_44100",
system_prompt="You are a helpful voice assistant. Respond conversationally.",
# welcome_message="Hello! I'm your voice assistant. How can I help you today?",
voice_id=voice_id,
)
async for message in sts_stream:
message_type = message.get("type")
if message_type == "audio_chunk":
audio_b64 = message["audio"]
audio_bytes = base64.b64decode(audio_b64)
await self.output_queue.put(
(SAMPLE_RATE, np.frombuffer(audio_bytes, dtype=np.int16))
)
if text := message.get("text"):
msg = {"role": "assistant", "content": text}
await self.output_queue.put(AdditionalOutputs(msg))
elif message_type == "input_text":
msg = {"role": "user", "content": message["text"]}
await self.output_queue.put(AdditionalOutputs(msg))
except Exception as e:
raise WebRTCError(f"Error starting up: {e}")
async def emit(self):
try:
return await self.output_queue.get()
except Exception as e:
raise WebRTCError(f"Error emitting: {e}")
async def receive(self, frame: tuple[int, np.ndarray]) -> None:
try:
if not self.client:
return
audio_float32 = audio_to_float32(frame)
await self.client.send_audio(audio_float32) # type: ignore
except Exception as e:
raise WebRTCError(f"Error sending audio: {e}")
async def shutdown(self):
if self.client:
await self.client._websocket.close()
return super().shutdown()
def add_to_chatbot(state, chatbot, message):
state.append(message)
return state, gr.skip()
state = gr.State(value=[])
chatbot = gr.Chatbot(type="messages", value=[])
stream = Stream(
handler=PhonicHandler(),
mode="send-receive",
modality="audio",
additional_inputs=[
gr.Dropdown(
choices=voice_ids,
value="katherine",
label="Voice",
info="Select a voice from the dropdown",
)
],
additional_outputs=[state, chatbot],
additional_outputs_handler=add_to_chatbot,
ui_args={"title": "Phonic Chat (Powered by FastRTC β‘οΈ)"},
rtc_configuration=get_twilio_turn_credentials() if get_space() else None,
concurrency_limit=5 if get_space() else None,
time_limit=90 if get_space() else None,
)
with stream.ui:
state.change(lambda s: s, inputs=state, outputs=chatbot)
app = FastAPI()
stream.mount(app)
if __name__ == "__main__":
if (mode := os.getenv("MODE")) == "UI":
stream.ui.launch(server_port=7860)
elif mode == "PHONE":
stream.fastphone(host="0.0.0.0", port=7860)
else:
stream.ui.launch(server_port=7860)
|