|
from fastapi import ( |
|
HTTPException, |
|
FastAPI, |
|
Depends, |
|
) |
|
import requests |
|
import logging |
|
import asyncio |
|
import httpx |
|
import yaml |
|
import sys |
|
import os |
|
|
|
|
|
|
|
|
|
CREDENTIALS_READY = False |
|
RETRY_LIMIT = 10 |
|
RETRY_INTERVAL = 15 |
|
|
|
|
|
|
|
|
|
NGROK_HOST = os.getenv("NGROK_HOST", "ngrok") |
|
NGROK_PORT = os.getenv("NGROK_PORT", 4040) |
|
NGROK_INTERNAL_WEBHOOK_HOST = os.getenv("NGROK_INTERNAL_WEBHOOK_HOST", "rasa-core") |
|
NGROK_INTERNAL_WEBHOOK_PORT = os.getenv("NGROK_INTERNAL_WEBHOOK_PORT", 5005) |
|
NGROK_API_URL = f"http://{NGROK_HOST}:{NGROK_PORT}" |
|
TELEGRAM_ACCESS_TOKEN = os.getenv("TELEGRAM_ACCESS_TOKEN", None) |
|
TELEGRAM_BOTNAME = os.getenv("TELEGRAM_BOTNAME", None) |
|
CREDENTIALS_PATH = os.getenv("CREDENTIALS_PATH", "/app/rasa/credentials.yml") |
|
|
|
|
|
|
|
|
|
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) |
|
logger = logging.getLogger(__name__) |
|
logger.debug( |
|
f"NGROK_HOST: {NGROK_HOST}:{NGROK_PORT}\nNGROK_API_URL: {NGROK_API_URL}\nNGROK_INTERNAL_WEBHOOK_HOST: {NGROK_INTERNAL_WEBHOOK_HOST}:{NGROK_INTERNAL_WEBHOOK_PORT}" |
|
) |
|
|
|
|
|
|
|
|
|
|
|
async def wait_for_ngrok_api(): |
|
|
|
while True: |
|
try: |
|
async with httpx.AsyncClient() as client: |
|
response = await client.get(f"{NGROK_API_URL}/api/tunnels") |
|
response.raise_for_status() |
|
logger.debug("ngrok API is online.") |
|
return True |
|
except httpx.RequestError: |
|
logger.debug("ngrok API is offline. Waiting...") |
|
await asyncio.sleep(RETRY_INTERVAL) |
|
|
|
|
|
|
|
|
|
|
|
async def get_active_tunnels(): |
|
try: |
|
response = requests.get(f"{NGROK_API_URL}/api/tunnels") |
|
response.raise_for_status() |
|
tunnels = response.json()["tunnels"] |
|
except requests.exceptions.HTTPError: |
|
tunnels = [] |
|
return tunnels |
|
|
|
|
|
|
|
|
|
|
|
async def stop_tunnel(tunnel): |
|
tunnel_id = tunnel["name"] |
|
response = requests.delete(f"{NGROK_API_URL}/api/tunnels/{tunnel_id}") |
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
|
|
|
async def stop_all_tunnels(): |
|
active_tunnels = await get_active_tunnels() |
|
if not active_tunnels: |
|
logger.debug("No active tunnels found.") |
|
else: |
|
for tunnel in active_tunnels: |
|
logger.debug(f"Stopping tunnel: {tunnel['name']} ({tunnel['public_url']})") |
|
await stop_tunnel(tunnel) |
|
|
|
|
|
|
|
|
|
|
|
async def get_tunnel(retry=0): |
|
if retry > RETRY_LIMIT: |
|
raise Exception( |
|
f"Could not create ngrok tunnel. Exceed retry limit of {RETRY_LIMIT} attempts." |
|
) |
|
|
|
active_tunnels = await get_active_tunnels() |
|
if len(active_tunnels) == 0: |
|
logger.debug(f"No active tunnels found. Trying again in {RETRY_INTERVAL}s..") |
|
await asyncio.sleep(RETRY_INTERVAL) |
|
retry += 1 |
|
return await get_tunnel(retry=retry) |
|
else: |
|
return active_tunnels[0]["public_url"] |
|
|
|
|
|
|
|
|
|
|
|
async def create_tunnel(): |
|
response = requests.post( |
|
f"{NGROK_API_URL}/api/tunnels", |
|
json={ |
|
"addr": f"{NGROK_INTERNAL_WEBHOOK_HOST}:{NGROK_INTERNAL_WEBHOOK_PORT}", |
|
"proto": "http", |
|
"name": NGROK_INTERNAL_WEBHOOK_HOST, |
|
}, |
|
) |
|
try: |
|
response.raise_for_status() |
|
return response.json()["public_url"] |
|
except requests.exceptions.HTTPError as e: |
|
logger.warning(f"Error creating ngrok tunnel: {e}") |
|
return False |
|
|
|
|
|
|
|
|
|
|
|
async def update_credentials_file(ngrok_url): |
|
global CREDENTIALS_READY |
|
try: |
|
with open(CREDENTIALS_PATH, "r") as file: |
|
credentials = yaml.safe_load(file) |
|
|
|
credentials["custom_telegram.CustomTelegramInput"][ |
|
"webhook_url" |
|
] = f"{ngrok_url}/webhooks/telegram/webhook" |
|
credentials["custom_telegram.CustomTelegramInput"][ |
|
"access_token" |
|
] = TELEGRAM_ACCESS_TOKEN |
|
credentials["custom_telegram.CustomTelegramInput"]["verify"] = TELEGRAM_BOTNAME |
|
|
|
with open(CREDENTIALS_PATH, "w") as file: |
|
yaml.safe_dump(credentials, file) |
|
|
|
CREDENTIALS_READY = True |
|
except Exception as e: |
|
logger.warning(f"Error updating {CREDENTIALS_PATH}: {e}") |
|
sys.exit(1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
|
|
|
|
|
|
@app.on_event("startup") |
|
async def startup_event(): |
|
env = os.getenv("ENV", None) |
|
if env and env.lower() in ["dev", "development", "local"]: |
|
await wait_for_ngrok_api() |
|
url = await get_tunnel() |
|
if not url: |
|
logger.debug("No active tunnels found. Creating one...") |
|
url = await create_tunnel() |
|
logger.debug(f"Tunnel url: {url}") |
|
await update_credentials_file(url) |
|
else: |
|
logger.debug("Not in dev environment. Skipping.") |
|
|
|
|
|
|
|
|
|
|
|
async def check_endpoint_availability(): |
|
if not CREDENTIALS_READY: |
|
raise HTTPException(status_code=403, detail="Endpoint not available yet") |
|
return True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/", dependencies=[Depends(check_endpoint_availability)]) |
|
async def health_check(): |
|
return {"status": "ok"} |
|
|