File size: 6,000 Bytes
0245be8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c095e79
 
 
0245be8
c095e79
0245be8
 
 
 
 
 
 
 
 
 
 
 
c095e79
 
 
0245be8
 
 
 
 
 
 
 
 
 
c095e79
 
 
0245be8
 
 
 
 
 
c095e79
 
 
0245be8
 
 
 
 
 
 
 
 
 
c095e79
 
 
0245be8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c095e79
 
 
0245be8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
from fastapi import (
    HTTPException,
    FastAPI,
    Depends,
)
import requests
import logging
import asyncio
import httpx
import yaml
import sys
import os

# ---------
# Constants
# ---------
CREDENTIALS_READY = False
RETRY_LIMIT = 10
RETRY_INTERVAL = 15

# ----------------
# Environment vars
# ----------------
NGROK_HOST = os.getenv("NGROK_HOST", "ngrok")
NGROK_PORT = os.getenv("NGROK_PORT", 4040)
NGROK_INTERNAL_WEBHOOK_HOST = os.getenv("NGROK_INTERNAL_WEBHOOK_HOST", "rasa-core")
NGROK_INTERNAL_WEBHOOK_PORT = os.getenv("NGROK_INTERNAL_WEBHOOK_PORT", 5005)
NGROK_API_URL = f"http://{NGROK_HOST}:{NGROK_PORT}"
TELEGRAM_ACCESS_TOKEN = os.getenv("TELEGRAM_ACCESS_TOKEN", None)
TELEGRAM_BOTNAME = os.getenv("TELEGRAM_BOTNAME", None)
CREDENTIALS_PATH = os.getenv("CREDENTIALS_PATH", "/app/rasa/credentials.yml")

# -------
# Logging
# -------
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.debug(
    f"NGROK_HOST: {NGROK_HOST}:{NGROK_PORT}\nNGROK_API_URL: {NGROK_API_URL}\nNGROK_INTERNAL_WEBHOOK_HOST: {NGROK_INTERNAL_WEBHOOK_HOST}:{NGROK_INTERNAL_WEBHOOK_PORT}"
)


# ---------------------------------
# Wait for ngrok API to come online
# ---------------------------------
async def wait_for_ngrok_api():

    while True:
        try:
            async with httpx.AsyncClient() as client:
                response = await client.get(f"{NGROK_API_URL}/api/tunnels")
                response.raise_for_status()
                logger.debug("ngrok API is online.")
                return True
        except httpx.RequestError:
            logger.debug("ngrok API is offline. Waiting...")
            await asyncio.sleep(RETRY_INTERVAL)


# -------------------------------------
# Fetch list of active tunnels on ngrok
# -------------------------------------
async def get_active_tunnels():
    try:
        response = requests.get(f"{NGROK_API_URL}/api/tunnels")
        response.raise_for_status()
        tunnels = response.json()["tunnels"]
    except requests.exceptions.HTTPError:
        tunnels = []
    return tunnels


# -----------------
# Stop ngrok tunnel
# -----------------
async def stop_tunnel(tunnel):
    tunnel_id = tunnel["name"]
    response = requests.delete(f"{NGROK_API_URL}/api/tunnels/{tunnel_id}")
    response.raise_for_status()


# ----------------------
# Stop all ngrok tunnels
# ----------------------
async def stop_all_tunnels():
    active_tunnels = await get_active_tunnels()
    if not active_tunnels:
        logger.debug("No active tunnels found.")
    else:
        for tunnel in active_tunnels:
            logger.debug(f"Stopping tunnel: {tunnel['name']} ({tunnel['public_url']})")
            await stop_tunnel(tunnel)


# -------------------------------------
# Get the first ngrok tunnel w/ retries
# -------------------------------------
async def get_tunnel(retry=0):
    if retry > RETRY_LIMIT:
        raise Exception(
            f"Could not create ngrok tunnel. Exceed retry limit of {RETRY_LIMIT} attempts."
        )

    active_tunnels = await get_active_tunnels()
    if len(active_tunnels) == 0:
        logger.debug(f"No active tunnels found. Trying again in {RETRY_INTERVAL}s..")
        await asyncio.sleep(RETRY_INTERVAL)
        retry += 1
        return await get_tunnel(retry=retry)
    else:
        return active_tunnels[0]["public_url"]


# -------------------
# Create ngrok tunnel
# -------------------
async def create_tunnel():
    response = requests.post(
        f"{NGROK_API_URL}/api/tunnels",
        json={
            "addr": f"{NGROK_INTERNAL_WEBHOOK_HOST}:{NGROK_INTERNAL_WEBHOOK_PORT}",
            "proto": "http",
            "name": NGROK_INTERNAL_WEBHOOK_HOST,
        },
    )
    try:
        response.raise_for_status()
        return response.json()["public_url"]
    except requests.exceptions.HTTPError as e:
        logger.warning(f"Error creating ngrok tunnel: {e}")
        return False


# ----------------------------
# Update Rasa credentials file
# ----------------------------
async def update_credentials_file(ngrok_url):
    global CREDENTIALS_READY
    try:
        with open(CREDENTIALS_PATH, "r") as file:
            credentials = yaml.safe_load(file)

        credentials["custom_telegram.CustomTelegramInput"][
            "webhook_url"
        ] = f"{ngrok_url}/webhooks/telegram/webhook"
        credentials["custom_telegram.CustomTelegramInput"][
            "access_token"
        ] = TELEGRAM_ACCESS_TOKEN
        credentials["custom_telegram.CustomTelegramInput"]["verify"] = TELEGRAM_BOTNAME

        with open(CREDENTIALS_PATH, "w") as file:
            yaml.safe_dump(credentials, file)

        CREDENTIALS_READY = True
    except Exception as e:
        logger.warning(f"Error updating {CREDENTIALS_PATH}: {e}")
        sys.exit(1)


# -----------------
# FastAPI endpoints
# -----------------

app = FastAPI()


# -------------
# Startup event
# -------------
@app.on_event("startup")
async def startup_event():
    env = os.getenv("ENV", None)
    if env and env.lower() in ["dev", "development", "local"]:
        await wait_for_ngrok_api()
        url = await get_tunnel()
        if not url:
            logger.debug("No active tunnels found. Creating one...")
            url = await create_tunnel()
            logger.debug(f"Tunnel url: {url}")
        await update_credentials_file(url)
    else:
        logger.debug("Not in dev environment. Skipping.")


# ---------------------
# Endpoint dependencies
# ---------------------
async def check_endpoint_availability():
    if not CREDENTIALS_READY:
        raise HTTPException(status_code=403, detail="Endpoint not available yet")
    return True


# ---------------------
# Health check endpoint
# ---------------------
# This endpoint is used by docker-compose to check if the
# container is ready. If it is ready, Rasa core can start
@app.get("/", dependencies=[Depends(check_endpoint_availability)])
async def health_check():
    return {"status": "ok"}