File size: 6,000 Bytes
0245be8 c095e79 0245be8 c095e79 0245be8 c095e79 0245be8 c095e79 0245be8 c095e79 0245be8 c095e79 0245be8 c095e79 0245be8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
from fastapi import (
HTTPException,
FastAPI,
Depends,
)
import requests
import logging
import asyncio
import httpx
import yaml
import sys
import os
# ---------
# Constants
# ---------
CREDENTIALS_READY = False
RETRY_LIMIT = 10
RETRY_INTERVAL = 15
# ----------------
# Environment vars
# ----------------
NGROK_HOST = os.getenv("NGROK_HOST", "ngrok")
NGROK_PORT = os.getenv("NGROK_PORT", 4040)
NGROK_INTERNAL_WEBHOOK_HOST = os.getenv("NGROK_INTERNAL_WEBHOOK_HOST", "rasa-core")
NGROK_INTERNAL_WEBHOOK_PORT = os.getenv("NGROK_INTERNAL_WEBHOOK_PORT", 5005)
NGROK_API_URL = f"http://{NGROK_HOST}:{NGROK_PORT}"
TELEGRAM_ACCESS_TOKEN = os.getenv("TELEGRAM_ACCESS_TOKEN", None)
TELEGRAM_BOTNAME = os.getenv("TELEGRAM_BOTNAME", None)
CREDENTIALS_PATH = os.getenv("CREDENTIALS_PATH", "/app/rasa/credentials.yml")
# -------
# Logging
# -------
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.debug(
f"NGROK_HOST: {NGROK_HOST}:{NGROK_PORT}\nNGROK_API_URL: {NGROK_API_URL}\nNGROK_INTERNAL_WEBHOOK_HOST: {NGROK_INTERNAL_WEBHOOK_HOST}:{NGROK_INTERNAL_WEBHOOK_PORT}"
)
# ---------------------------------
# Wait for ngrok API to come online
# ---------------------------------
async def wait_for_ngrok_api():
while True:
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{NGROK_API_URL}/api/tunnels")
response.raise_for_status()
logger.debug("ngrok API is online.")
return True
except httpx.RequestError:
logger.debug("ngrok API is offline. Waiting...")
await asyncio.sleep(RETRY_INTERVAL)
# -------------------------------------
# Fetch list of active tunnels on ngrok
# -------------------------------------
async def get_active_tunnels():
try:
response = requests.get(f"{NGROK_API_URL}/api/tunnels")
response.raise_for_status()
tunnels = response.json()["tunnels"]
except requests.exceptions.HTTPError:
tunnels = []
return tunnels
# -----------------
# Stop ngrok tunnel
# -----------------
async def stop_tunnel(tunnel):
tunnel_id = tunnel["name"]
response = requests.delete(f"{NGROK_API_URL}/api/tunnels/{tunnel_id}")
response.raise_for_status()
# ----------------------
# Stop all ngrok tunnels
# ----------------------
async def stop_all_tunnels():
active_tunnels = await get_active_tunnels()
if not active_tunnels:
logger.debug("No active tunnels found.")
else:
for tunnel in active_tunnels:
logger.debug(f"Stopping tunnel: {tunnel['name']} ({tunnel['public_url']})")
await stop_tunnel(tunnel)
# -------------------------------------
# Get the first ngrok tunnel w/ retries
# -------------------------------------
async def get_tunnel(retry=0):
if retry > RETRY_LIMIT:
raise Exception(
f"Could not create ngrok tunnel. Exceed retry limit of {RETRY_LIMIT} attempts."
)
active_tunnels = await get_active_tunnels()
if len(active_tunnels) == 0:
logger.debug(f"No active tunnels found. Trying again in {RETRY_INTERVAL}s..")
await asyncio.sleep(RETRY_INTERVAL)
retry += 1
return await get_tunnel(retry=retry)
else:
return active_tunnels[0]["public_url"]
# -------------------
# Create ngrok tunnel
# -------------------
async def create_tunnel():
response = requests.post(
f"{NGROK_API_URL}/api/tunnels",
json={
"addr": f"{NGROK_INTERNAL_WEBHOOK_HOST}:{NGROK_INTERNAL_WEBHOOK_PORT}",
"proto": "http",
"name": NGROK_INTERNAL_WEBHOOK_HOST,
},
)
try:
response.raise_for_status()
return response.json()["public_url"]
except requests.exceptions.HTTPError as e:
logger.warning(f"Error creating ngrok tunnel: {e}")
return False
# ----------------------------
# Update Rasa credentials file
# ----------------------------
async def update_credentials_file(ngrok_url):
global CREDENTIALS_READY
try:
with open(CREDENTIALS_PATH, "r") as file:
credentials = yaml.safe_load(file)
credentials["custom_telegram.CustomTelegramInput"][
"webhook_url"
] = f"{ngrok_url}/webhooks/telegram/webhook"
credentials["custom_telegram.CustomTelegramInput"][
"access_token"
] = TELEGRAM_ACCESS_TOKEN
credentials["custom_telegram.CustomTelegramInput"]["verify"] = TELEGRAM_BOTNAME
with open(CREDENTIALS_PATH, "w") as file:
yaml.safe_dump(credentials, file)
CREDENTIALS_READY = True
except Exception as e:
logger.warning(f"Error updating {CREDENTIALS_PATH}: {e}")
sys.exit(1)
# -----------------
# FastAPI endpoints
# -----------------
app = FastAPI()
# -------------
# Startup event
# -------------
@app.on_event("startup")
async def startup_event():
env = os.getenv("ENV", None)
if env and env.lower() in ["dev", "development", "local"]:
await wait_for_ngrok_api()
url = await get_tunnel()
if not url:
logger.debug("No active tunnels found. Creating one...")
url = await create_tunnel()
logger.debug(f"Tunnel url: {url}")
await update_credentials_file(url)
else:
logger.debug("Not in dev environment. Skipping.")
# ---------------------
# Endpoint dependencies
# ---------------------
async def check_endpoint_availability():
if not CREDENTIALS_READY:
raise HTTPException(status_code=403, detail="Endpoint not available yet")
return True
# ---------------------
# Health check endpoint
# ---------------------
# This endpoint is used by docker-compose to check if the
# container is ready. If it is ready, Rasa core can start
@app.get("/", dependencies=[Depends(check_endpoint_availability)])
async def health_check():
return {"status": "ok"}
|