paulpierre's picture
added function descriptions, cleaned up formatting
c095e79
from fastapi import (
HTTPException,
FastAPI,
Depends,
)
import requests
import logging
import asyncio
import httpx
import yaml
import sys
import os
# ---------
# Constants
# ---------
CREDENTIALS_READY = False
RETRY_LIMIT = 10
RETRY_INTERVAL = 15
# ----------------
# Environment vars
# ----------------
NGROK_HOST = os.getenv("NGROK_HOST", "ngrok")
NGROK_PORT = os.getenv("NGROK_PORT", 4040)
NGROK_INTERNAL_WEBHOOK_HOST = os.getenv("NGROK_INTERNAL_WEBHOOK_HOST", "rasa-core")
NGROK_INTERNAL_WEBHOOK_PORT = os.getenv("NGROK_INTERNAL_WEBHOOK_PORT", 5005)
NGROK_API_URL = f"http://{NGROK_HOST}:{NGROK_PORT}"
TELEGRAM_ACCESS_TOKEN = os.getenv("TELEGRAM_ACCESS_TOKEN", None)
TELEGRAM_BOTNAME = os.getenv("TELEGRAM_BOTNAME", None)
CREDENTIALS_PATH = os.getenv("CREDENTIALS_PATH", "/app/rasa/credentials.yml")
# -------
# Logging
# -------
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.debug(
f"NGROK_HOST: {NGROK_HOST}:{NGROK_PORT}\nNGROK_API_URL: {NGROK_API_URL}\nNGROK_INTERNAL_WEBHOOK_HOST: {NGROK_INTERNAL_WEBHOOK_HOST}:{NGROK_INTERNAL_WEBHOOK_PORT}"
)
# ---------------------------------
# Wait for ngrok API to come online
# ---------------------------------
async def wait_for_ngrok_api():
while True:
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{NGROK_API_URL}/api/tunnels")
response.raise_for_status()
logger.debug("ngrok API is online.")
return True
except httpx.RequestError:
logger.debug("ngrok API is offline. Waiting...")
await asyncio.sleep(RETRY_INTERVAL)
# -------------------------------------
# Fetch list of active tunnels on ngrok
# -------------------------------------
async def get_active_tunnels():
try:
response = requests.get(f"{NGROK_API_URL}/api/tunnels")
response.raise_for_status()
tunnels = response.json()["tunnels"]
except requests.exceptions.HTTPError:
tunnels = []
return tunnels
# -----------------
# Stop ngrok tunnel
# -----------------
async def stop_tunnel(tunnel):
tunnel_id = tunnel["name"]
response = requests.delete(f"{NGROK_API_URL}/api/tunnels/{tunnel_id}")
response.raise_for_status()
# ----------------------
# Stop all ngrok tunnels
# ----------------------
async def stop_all_tunnels():
active_tunnels = await get_active_tunnels()
if not active_tunnels:
logger.debug("No active tunnels found.")
else:
for tunnel in active_tunnels:
logger.debug(f"Stopping tunnel: {tunnel['name']} ({tunnel['public_url']})")
await stop_tunnel(tunnel)
# -------------------------------------
# Get the first ngrok tunnel w/ retries
# -------------------------------------
async def get_tunnel(retry=0):
if retry > RETRY_LIMIT:
raise Exception(
f"Could not create ngrok tunnel. Exceed retry limit of {RETRY_LIMIT} attempts."
)
active_tunnels = await get_active_tunnels()
if len(active_tunnels) == 0:
logger.debug(f"No active tunnels found. Trying again in {RETRY_INTERVAL}s..")
await asyncio.sleep(RETRY_INTERVAL)
retry += 1
return await get_tunnel(retry=retry)
else:
return active_tunnels[0]["public_url"]
# -------------------
# Create ngrok tunnel
# -------------------
async def create_tunnel():
response = requests.post(
f"{NGROK_API_URL}/api/tunnels",
json={
"addr": f"{NGROK_INTERNAL_WEBHOOK_HOST}:{NGROK_INTERNAL_WEBHOOK_PORT}",
"proto": "http",
"name": NGROK_INTERNAL_WEBHOOK_HOST,
},
)
try:
response.raise_for_status()
return response.json()["public_url"]
except requests.exceptions.HTTPError as e:
logger.warning(f"Error creating ngrok tunnel: {e}")
return False
# ----------------------------
# Update Rasa credentials file
# ----------------------------
async def update_credentials_file(ngrok_url):
global CREDENTIALS_READY
try:
with open(CREDENTIALS_PATH, "r") as file:
credentials = yaml.safe_load(file)
credentials["custom_telegram.CustomTelegramInput"][
"webhook_url"
] = f"{ngrok_url}/webhooks/telegram/webhook"
credentials["custom_telegram.CustomTelegramInput"][
"access_token"
] = TELEGRAM_ACCESS_TOKEN
credentials["custom_telegram.CustomTelegramInput"]["verify"] = TELEGRAM_BOTNAME
with open(CREDENTIALS_PATH, "w") as file:
yaml.safe_dump(credentials, file)
CREDENTIALS_READY = True
except Exception as e:
logger.warning(f"Error updating {CREDENTIALS_PATH}: {e}")
sys.exit(1)
# -----------------
# FastAPI endpoints
# -----------------
app = FastAPI()
# -------------
# Startup event
# -------------
@app.on_event("startup")
async def startup_event():
env = os.getenv("ENV", None)
if env and env.lower() in ["dev", "development", "local"]:
await wait_for_ngrok_api()
url = await get_tunnel()
if not url:
logger.debug("No active tunnels found. Creating one...")
url = await create_tunnel()
logger.debug(f"Tunnel url: {url}")
await update_credentials_file(url)
else:
logger.debug("Not in dev environment. Skipping.")
# ---------------------
# Endpoint dependencies
# ---------------------
async def check_endpoint_availability():
if not CREDENTIALS_READY:
raise HTTPException(status_code=403, detail="Endpoint not available yet")
return True
# ---------------------
# Health check endpoint
# ---------------------
# This endpoint is used by docker-compose to check if the
# container is ready. If it is ready, Rasa core can start
@app.get("/", dependencies=[Depends(check_endpoint_availability)])
async def health_check():
return {"status": "ok"}