pipecat / bot_runner.py
Deadmon's picture
Update bot_runner.py
a916901 verified
import argparse
import json
import os
import shlex
import subprocess
from contextlib import asynccontextmanager
from typing import Any, Dict
import aiohttp
from bot_constants import (
MAX_SESSION_TIME,
REQUIRED_ENV_VARS,
)
from bot_registry import BotRegistry # Changed from bot_definitions to bot_registry
from bot_runner_helpers import (
determine_room_capabilities,
ensure_prompt_config,
process_dialin_request,
)
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper,
DailyRoomParams,
DailyRoomProperties,
DailyRoomSipParams,
)
daily_helpers = {}
bot_registry = BotRegistry() # Instantiate BotRegistry
async def create_daily_room(room_url: str = None, config_body: Dict[str, Any] = None):
if not room_url:
capabilities = determine_room_capabilities(config_body)
sip_params = None
if capabilities["enable_dialin"]:
sip_params = DailyRoomSipParams(
display_name="dialin-user", video=False, sip_mode="dial-in", num_endpoints=2
)
properties = DailyRoomProperties(sip=sip_params)
if capabilities["enable_dialout"]:
properties.enable_dialout = True
capability_str = ", ".join([f"{k}={v}" for k, v in capabilities.items()])
print(f"Creating room with capabilities: {capability_str}")
params = DailyRoomParams(properties=properties)
print("Creating new room...")
room = await daily_helpers["rest"].create_room(params=params)
else:
try:
room = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(status_code=500, detail=f"Room not found: {room_url}")
print(f"Daily room: {room.url} {room.config.sip_endpoint}")
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(status_code=500, detail="Failed to get room or token")
return {"room": room.url, "token": token, "sip_endpoint": room.config.sip_endpoint}
async def start_bot(room_details: Dict[str, str], body: Dict[str, Any], example: str) -> bool:
room_url = room_details["room"]
token = room_details["token"]
body_json = json.dumps(body).replace('"', '\\"')
print(f"++++ Body JSON: {body_json}")
bot_proc = f'python3 -m {example} -u {room_url} -t {token} -b "{body_json}"'
print(f"Starting bot. Example: {example}, Room: {room_url}")
try:
command_parts = shlex.split(bot_proc)
subprocess.Popen(command_parts, bufsize=1, cwd=os.path.dirname(os.path.abspath(__file__)))
return True
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.environ.get("HF_DAILY_API_KEY", ""),
daily_api_url=os.environ.get("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/start")
async def handle_start_request(request: Request) -> JSONResponse:
room_url = os.environ.get("HF_DAILY_SAMPLE_ROOM_URL", None)
try:
data = await request.json()
if "test" in data:
return JSONResponse({"test": True})
if all(key in data for key in ["From", "To", "callId", "callDomain"]):
body = await process_dialin_request(data)
elif "config" in data:
body = bot_registry.setup_configuration(data["config"])
else:
raise HTTPException(status_code=400, detail="Invalid request format")
body = ensure_prompt_config(body)
bot_type_name = bot_registry.detect_bot_type(body)
if not bot_type_name:
raise HTTPException(status_code=400, detail="Configuration doesn't match any supported scenario")
room_details = await create_daily_room(room_url, body)
await start_bot(room_details, body, bot_type_name)
bot_type = bot_registry.get_bot(bot_type_name)
response = {"status": "Bot started", "bot_type": bot_type_name}
if bot_type.has_test_mode(body):
response["room_url"] = room_details["room"]
if "llm" in body:
response["llm_provider"] = body["llm"]
if "dialout_settings" in body and len(body["dialout_settings"]) > 0:
first_setting = body["dialout_settings"][0]
if "phoneNumber" in first_setting:
response["dialing_to"] = f"phone:{first_setting['phoneNumber']}"
elif "sipUri" in first_setting:
response["dialing_to"] = f"sip:{first_setting['sipUri']}"
return JSONResponse(response)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON in request body")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Request processing error: {str(e)}")
if __name__ == "__main__":
for env_var in REQUIRED_ENV_VARS:
hf_env_var = f"HF_{env_var}"
if hf_env_var not in os.environ:
raise Exception(f"Missing environment variable: {hf_env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument("--host", type=str, default=os.environ.get("HOST", "0.0.0.0"), help="Host address")
parser.add_argument("--port", type=int, default=os.environ.get("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true", default=True, help="Reload code on change")
config = parser.parse_args()
try:
import uvicorn
uvicorn.run("bot_runner:app", host=config.host, port=config.port, reload=config.reload)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")