|
import argparse |
|
import json |
|
import os |
|
import shlex |
|
import subprocess |
|
from contextlib import asynccontextmanager |
|
from typing import Any, Dict |
|
|
|
import aiohttp |
|
from bot_constants import ( |
|
MAX_SESSION_TIME, |
|
REQUIRED_ENV_VARS, |
|
) |
|
from bot_registry import BotRegistry |
|
from bot_runner_helpers import ( |
|
determine_room_capabilities, |
|
ensure_prompt_config, |
|
process_dialin_request, |
|
) |
|
from fastapi import FastAPI, HTTPException, Request |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi.responses import JSONResponse |
|
|
|
from pipecat.transports.services.helpers.daily_rest import ( |
|
DailyRESTHelper, |
|
DailyRoomParams, |
|
DailyRoomProperties, |
|
DailyRoomSipParams, |
|
) |
|
|
|
daily_helpers = {} |
|
bot_registry = BotRegistry() |
|
|
|
async def create_daily_room(room_url: str = None, config_body: Dict[str, Any] = None): |
|
if not room_url: |
|
capabilities = determine_room_capabilities(config_body) |
|
sip_params = None |
|
if capabilities["enable_dialin"]: |
|
sip_params = DailyRoomSipParams( |
|
display_name="dialin-user", video=False, sip_mode="dial-in", num_endpoints=2 |
|
) |
|
properties = DailyRoomProperties(sip=sip_params) |
|
if capabilities["enable_dialout"]: |
|
properties.enable_dialout = True |
|
capability_str = ", ".join([f"{k}={v}" for k, v in capabilities.items()]) |
|
print(f"Creating room with capabilities: {capability_str}") |
|
params = DailyRoomParams(properties=properties) |
|
print("Creating new room...") |
|
room = await daily_helpers["rest"].create_room(params=params) |
|
else: |
|
try: |
|
room = await daily_helpers["rest"].get_room_from_url(room_url) |
|
except Exception: |
|
raise HTTPException(status_code=500, detail=f"Room not found: {room_url}") |
|
print(f"Daily room: {room.url} {room.config.sip_endpoint}") |
|
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME) |
|
if not room or not token: |
|
raise HTTPException(status_code=500, detail="Failed to get room or token") |
|
return {"room": room.url, "token": token, "sip_endpoint": room.config.sip_endpoint} |
|
|
|
async def start_bot(room_details: Dict[str, str], body: Dict[str, Any], example: str) -> bool: |
|
room_url = room_details["room"] |
|
token = room_details["token"] |
|
body_json = json.dumps(body).replace('"', '\\"') |
|
print(f"++++ Body JSON: {body_json}") |
|
bot_proc = f'python3 -m {example} -u {room_url} -t {token} -b "{body_json}"' |
|
print(f"Starting bot. Example: {example}, Room: {room_url}") |
|
try: |
|
command_parts = shlex.split(bot_proc) |
|
subprocess.Popen(command_parts, bufsize=1, cwd=os.path.dirname(os.path.abspath(__file__))) |
|
return True |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}") |
|
|
|
@asynccontextmanager |
|
async def lifespan(app: FastAPI): |
|
aiohttp_session = aiohttp.ClientSession() |
|
daily_helpers["rest"] = DailyRESTHelper( |
|
daily_api_key=os.environ.get("HF_DAILY_API_KEY", ""), |
|
daily_api_url=os.environ.get("DAILY_API_URL", "https://api.daily.co/v1"), |
|
aiohttp_session=aiohttp_session, |
|
) |
|
yield |
|
await aiohttp_session.close() |
|
|
|
app = FastAPI(lifespan=lifespan) |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
@app.post("/start") |
|
async def handle_start_request(request: Request) -> JSONResponse: |
|
room_url = os.environ.get("HF_DAILY_SAMPLE_ROOM_URL", None) |
|
try: |
|
data = await request.json() |
|
if "test" in data: |
|
return JSONResponse({"test": True}) |
|
if all(key in data for key in ["From", "To", "callId", "callDomain"]): |
|
body = await process_dialin_request(data) |
|
elif "config" in data: |
|
body = bot_registry.setup_configuration(data["config"]) |
|
else: |
|
raise HTTPException(status_code=400, detail="Invalid request format") |
|
body = ensure_prompt_config(body) |
|
bot_type_name = bot_registry.detect_bot_type(body) |
|
if not bot_type_name: |
|
raise HTTPException(status_code=400, detail="Configuration doesn't match any supported scenario") |
|
room_details = await create_daily_room(room_url, body) |
|
await start_bot(room_details, body, bot_type_name) |
|
bot_type = bot_registry.get_bot(bot_type_name) |
|
response = {"status": "Bot started", "bot_type": bot_type_name} |
|
if bot_type.has_test_mode(body): |
|
response["room_url"] = room_details["room"] |
|
if "llm" in body: |
|
response["llm_provider"] = body["llm"] |
|
if "dialout_settings" in body and len(body["dialout_settings"]) > 0: |
|
first_setting = body["dialout_settings"][0] |
|
if "phoneNumber" in first_setting: |
|
response["dialing_to"] = f"phone:{first_setting['phoneNumber']}" |
|
elif "sipUri" in first_setting: |
|
response["dialing_to"] = f"sip:{first_setting['sipUri']}" |
|
return JSONResponse(response) |
|
except json.JSONDecodeError: |
|
raise HTTPException(status_code=400, detail="Invalid JSON in request body") |
|
except Exception as e: |
|
raise HTTPException(status_code=400, detail=f"Request processing error: {str(e)}") |
|
|
|
if __name__ == "__main__": |
|
for env_var in REQUIRED_ENV_VARS: |
|
hf_env_var = f"HF_{env_var}" |
|
if hf_env_var not in os.environ: |
|
raise Exception(f"Missing environment variable: {hf_env_var}.") |
|
parser = argparse.ArgumentParser(description="Pipecat Bot Runner") |
|
parser.add_argument("--host", type=str, default=os.environ.get("HOST", "0.0.0.0"), help="Host address") |
|
parser.add_argument("--port", type=int, default=os.environ.get("PORT", 7860), help="Port number") |
|
parser.add_argument("--reload", action="store_true", default=True, help="Reload code on change") |
|
config = parser.parse_args() |
|
try: |
|
import uvicorn |
|
uvicorn.run("bot_runner:app", host=config.host, port=config.port, reload=config.reload) |
|
except KeyboardInterrupt: |
|
print("Pipecat runner shutting down...") |