File size: 6,244 Bytes
1366db9 a916901 1366db9 a916901 1366db9 e1aef59 1366db9 e1aef59 1366db9 e1aef59 1366db9 e1aef59 1366db9 e1aef59 1366db9 e1aef59 1366db9 e1aef59 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
import argparse
import json
import os
import shlex
import subprocess
from contextlib import asynccontextmanager
from typing import Any, Dict
import aiohttp
from bot_constants import (
MAX_SESSION_TIME,
REQUIRED_ENV_VARS,
)
from bot_registry import BotRegistry # Changed from bot_definitions to bot_registry
from bot_runner_helpers import (
determine_room_capabilities,
ensure_prompt_config,
process_dialin_request,
)
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper,
DailyRoomParams,
DailyRoomProperties,
DailyRoomSipParams,
)
daily_helpers = {}
bot_registry = BotRegistry() # Instantiate BotRegistry
async def create_daily_room(room_url: str = None, config_body: Dict[str, Any] = None):
if not room_url:
capabilities = determine_room_capabilities(config_body)
sip_params = None
if capabilities["enable_dialin"]:
sip_params = DailyRoomSipParams(
display_name="dialin-user", video=False, sip_mode="dial-in", num_endpoints=2
)
properties = DailyRoomProperties(sip=sip_params)
if capabilities["enable_dialout"]:
properties.enable_dialout = True
capability_str = ", ".join([f"{k}={v}" for k, v in capabilities.items()])
print(f"Creating room with capabilities: {capability_str}")
params = DailyRoomParams(properties=properties)
print("Creating new room...")
room = await daily_helpers["rest"].create_room(params=params)
else:
try:
room = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(status_code=500, detail=f"Room not found: {room_url}")
print(f"Daily room: {room.url} {room.config.sip_endpoint}")
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(status_code=500, detail="Failed to get room or token")
return {"room": room.url, "token": token, "sip_endpoint": room.config.sip_endpoint}
async def start_bot(room_details: Dict[str, str], body: Dict[str, Any], example: str) -> bool:
room_url = room_details["room"]
token = room_details["token"]
body_json = json.dumps(body).replace('"', '\\"')
print(f"++++ Body JSON: {body_json}")
bot_proc = f'python3 -m {example} -u {room_url} -t {token} -b "{body_json}"'
print(f"Starting bot. Example: {example}, Room: {room_url}")
try:
command_parts = shlex.split(bot_proc)
subprocess.Popen(command_parts, bufsize=1, cwd=os.path.dirname(os.path.abspath(__file__)))
return True
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.environ.get("HF_DAILY_API_KEY", ""),
daily_api_url=os.environ.get("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/start")
async def handle_start_request(request: Request) -> JSONResponse:
room_url = os.environ.get("HF_DAILY_SAMPLE_ROOM_URL", None)
try:
data = await request.json()
if "test" in data:
return JSONResponse({"test": True})
if all(key in data for key in ["From", "To", "callId", "callDomain"]):
body = await process_dialin_request(data)
elif "config" in data:
body = bot_registry.setup_configuration(data["config"])
else:
raise HTTPException(status_code=400, detail="Invalid request format")
body = ensure_prompt_config(body)
bot_type_name = bot_registry.detect_bot_type(body)
if not bot_type_name:
raise HTTPException(status_code=400, detail="Configuration doesn't match any supported scenario")
room_details = await create_daily_room(room_url, body)
await start_bot(room_details, body, bot_type_name)
bot_type = bot_registry.get_bot(bot_type_name)
response = {"status": "Bot started", "bot_type": bot_type_name}
if bot_type.has_test_mode(body):
response["room_url"] = room_details["room"]
if "llm" in body:
response["llm_provider"] = body["llm"]
if "dialout_settings" in body and len(body["dialout_settings"]) > 0:
first_setting = body["dialout_settings"][0]
if "phoneNumber" in first_setting:
response["dialing_to"] = f"phone:{first_setting['phoneNumber']}"
elif "sipUri" in first_setting:
response["dialing_to"] = f"sip:{first_setting['sipUri']}"
return JSONResponse(response)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON in request body")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Request processing error: {str(e)}")
if __name__ == "__main__":
for env_var in REQUIRED_ENV_VARS:
hf_env_var = f"HF_{env_var}"
if hf_env_var not in os.environ:
raise Exception(f"Missing environment variable: {hf_env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument("--host", type=str, default=os.environ.get("HOST", "0.0.0.0"), help="Host address")
parser.add_argument("--port", type=int, default=os.environ.get("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true", default=True, help="Reload code on change")
config = parser.parse_args()
try:
import uvicorn
uvicorn.run("bot_runner:app", host=config.host, port=config.port, reload=config.reload)
except KeyboardInterrupt:
print("Pipecat runner shutting down...") |