import argparse import json import os import shlex import subprocess from contextlib import asynccontextmanager from typing import Any, Dict import aiohttp from bot_constants import ( MAX_SESSION_TIME, REQUIRED_ENV_VARS, ) from bot_registry import BotRegistry # Changed from bot_definitions to bot_registry from bot_runner_helpers import ( determine_room_capabilities, ensure_prompt_config, process_dialin_request, ) from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from pipecat.transports.services.helpers.daily_rest import ( DailyRESTHelper, DailyRoomParams, DailyRoomProperties, DailyRoomSipParams, ) daily_helpers = {} bot_registry = BotRegistry() # Instantiate BotRegistry async def create_daily_room(room_url: str = None, config_body: Dict[str, Any] = None): if not room_url: capabilities = determine_room_capabilities(config_body) sip_params = None if capabilities["enable_dialin"]: sip_params = DailyRoomSipParams( display_name="dialin-user", video=False, sip_mode="dial-in", num_endpoints=2 ) properties = DailyRoomProperties(sip=sip_params) if capabilities["enable_dialout"]: properties.enable_dialout = True capability_str = ", ".join([f"{k}={v}" for k, v in capabilities.items()]) print(f"Creating room with capabilities: {capability_str}") params = DailyRoomParams(properties=properties) print("Creating new room...") room = await daily_helpers["rest"].create_room(params=params) else: try: room = await daily_helpers["rest"].get_room_from_url(room_url) except Exception: raise HTTPException(status_code=500, detail=f"Room not found: {room_url}") print(f"Daily room: {room.url} {room.config.sip_endpoint}") token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME) if not room or not token: raise HTTPException(status_code=500, detail="Failed to get room or token") return {"room": room.url, "token": token, "sip_endpoint": room.config.sip_endpoint} async def start_bot(room_details: Dict[str, str], body: Dict[str, Any], example: str) -> bool: room_url = room_details["room"] token = room_details["token"] body_json = json.dumps(body).replace('"', '\\"') print(f"++++ Body JSON: {body_json}") bot_proc = f'python3 -m {example} -u {room_url} -t {token} -b "{body_json}"' print(f"Starting bot. Example: {example}, Room: {room_url}") try: command_parts = shlex.split(bot_proc) subprocess.Popen(command_parts, bufsize=1, cwd=os.path.dirname(os.path.abspath(__file__))) return True except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}") @asynccontextmanager async def lifespan(app: FastAPI): aiohttp_session = aiohttp.ClientSession() daily_helpers["rest"] = DailyRESTHelper( daily_api_key=os.environ.get("HF_DAILY_API_KEY", ""), daily_api_url=os.environ.get("DAILY_API_URL", "https://api.daily.co/v1"), aiohttp_session=aiohttp_session, ) yield await aiohttp_session.close() app = FastAPI(lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.post("/start") async def handle_start_request(request: Request) -> JSONResponse: room_url = os.environ.get("HF_DAILY_SAMPLE_ROOM_URL", None) try: data = await request.json() if "test" in data: return JSONResponse({"test": True}) if all(key in data for key in ["From", "To", "callId", "callDomain"]): body = await process_dialin_request(data) elif "config" in data: body = bot_registry.setup_configuration(data["config"]) else: raise HTTPException(status_code=400, detail="Invalid request format") body = ensure_prompt_config(body) bot_type_name = bot_registry.detect_bot_type(body) if not bot_type_name: raise HTTPException(status_code=400, detail="Configuration doesn't match any supported scenario") room_details = await create_daily_room(room_url, body) await start_bot(room_details, body, bot_type_name) bot_type = bot_registry.get_bot(bot_type_name) response = {"status": "Bot started", "bot_type": bot_type_name} if bot_type.has_test_mode(body): response["room_url"] = room_details["room"] if "llm" in body: response["llm_provider"] = body["llm"] if "dialout_settings" in body and len(body["dialout_settings"]) > 0: first_setting = body["dialout_settings"][0] if "phoneNumber" in first_setting: response["dialing_to"] = f"phone:{first_setting['phoneNumber']}" elif "sipUri" in first_setting: response["dialing_to"] = f"sip:{first_setting['sipUri']}" return JSONResponse(response) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON in request body") except Exception as e: raise HTTPException(status_code=400, detail=f"Request processing error: {str(e)}") if __name__ == "__main__": for env_var in REQUIRED_ENV_VARS: hf_env_var = f"HF_{env_var}" if hf_env_var not in os.environ: raise Exception(f"Missing environment variable: {hf_env_var}.") parser = argparse.ArgumentParser(description="Pipecat Bot Runner") parser.add_argument("--host", type=str, default=os.environ.get("HOST", "0.0.0.0"), help="Host address") parser.add_argument("--port", type=int, default=os.environ.get("PORT", 7860), help="Port number") parser.add_argument("--reload", action="store_true", default=True, help="Reload code on change") config = parser.parse_args() try: import uvicorn uvicorn.run("bot_runner:app", host=config.host, port=config.port, reload=config.reload) except KeyboardInterrupt: print("Pipecat runner shutting down...")