Spaces:
Sleeping
Sleeping
import os | |
import base64 | |
import math | |
import random | |
import re | |
import json | |
import yaml | |
import uvicorn | |
import httpx | |
import secrets | |
from fastapi import FastAPI, Request, Response, HTTPException, Security, Depends, Path, Query, status | |
from fastapi.security.api_key import APIKeyHeader | |
from fastapi.openapi.utils import get_openapi | |
from fastapi.encoders import jsonable_encoder | |
from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html | |
from starlette.middleware.base import BaseHTTPMiddleware | |
from starlette.responses import HTMLResponse, JSONResponse, RedirectResponse | |
from typing import Annotated | |
from cachetools import TTLCache | |
# ---------- Start Cache ---------- # | |
cache = TTLCache(maxsize=1000, ttl=21600) | |
# ---------- End Cache ---------- # | |
# ---------- Start App ---------- # | |
app = FastAPI( | |
debug = True, | |
title = "🗺️ Map Tiles Proxy Service", | |
# summary = "Proxy for Map Tiles Service | Control Your Little Globe 🌍", | |
description = ( | |
"<br><hr><br>" | |
"<blockquote>🗺️ **Map Tiles Proxy Service** is an app that acts as a proxy for tile maps from multiple sources, allowing you to easily aggregate map services from various providers in one place ✨</blockquote>" | |
"<br><hr><br>" | |
), | |
version = "template", | |
license_info = { | |
"name": "Web Sparkle © 2025 by MeowSkyKung is licensed under CC BY-SA 4.0", | |
"url": "https://creativecommons.org/licenses/by-sa/4.0/", | |
}, | |
# contact = { | |
# "name": "MeowIsWorking", | |
# "url": "https://meowverse.azurewebsites.net", | |
# "email": "[email protected]" | |
# }, | |
openapi_tags = [ | |
{ | |
"name": "🎉 Features 🎉", | |
"description": ( | |
"* 🔐 Basic Authentication to prevent unauthorized access\n" | |
"* 🧭 Restricts geographic bounds and zoom levels to control map access precisely\n" | |
"* 🚀 Caching system to improve performance and reduce redundant requests\n" | |
"* 🔄 Supports subdomain switching for load balancing\n" | |
) | |
}, | |
{ | |
"name": "📥 Installation 📥", | |
"description": ( | |
"1. Clone this repository\n" | |
"2. Configure the `config.yaml` file as shown in the repo\n" | |
"3. Set the Environment Secret `PASSWORD` for Basic Auth\n" | |
) | |
}, | |
{ | |
"name": "🚀 Usage 🚀", | |
"description": ( | |
"* Use the API via the endpoint `/serviceId/{z}/{x}/{y}` with Basic Auth\n" | |
"* View each service’s details via the root endpoint `/`\n" | |
) | |
}, | |
{ | |
"name": "💖 Supporting by 💖", | |
"description": ( | |
"* Like Me on Hugging Face 🌟\n" | |
"* Donate on Ko-Fi ☕\n" | |
) | |
} | |
], | |
openapi_url = None, | |
docs_url = None, | |
redoc_url = None | |
) | |
# ---------- End App ---------- # | |
# ---------- Start Config ---------- # | |
CONFIG_FILE = "config.yaml" | |
configs = {} | |
def load(): | |
global configs | |
with open(CONFIG_FILE, "r") as f: | |
data = yaml.safe_load(f) | |
configs.update({ | |
service["serviceId"]: service for service in data.get("tileServices", []) | |
}) | |
app.state.urls = list(configs.keys()) | |
load() | |
# ---------- End Config ---------- # | |
# ---------- Start Limit ---------- # | |
def xyz_to_bbox(z, x, y): | |
"""Returns (minLat, minLon, maxLat, maxLon) for a given tile.""" | |
n = 2.0 ** z | |
lonDegMin = x / n * 360.0 - 180.0 | |
lonDegMax = (x + 1) / n * 360.0 - 180.0 | |
latRadMin = math.atan(math.sinh(math.pi * (1 - 2 * y / n))) | |
latRadMax = math.atan(math.sinh(math.pi * (1 - 2 * (y + 1) / n))) | |
latDegMin = math.degrees(latRadMax) | |
latDegMax = math.degrees(latRadMin) | |
return lonDegMin, latDegMin, lonDegMax, latDegMax # west, north, east, south | |
# ---------- End Limit ---------- # | |
# ---------- Start Switch ---------- # | |
def switch(template: str) -> str: | |
"""Replace {switch:a,b,c,d} with a random subdomain.""" | |
match = re.search(r"{switch:([^}]+)}", template) | |
if match: | |
options = match.group(1).split(",") | |
chosen = random.choice(options) | |
return template.replace(match.group(0), chosen) | |
return template | |
# ---------- End Switch ---------- # | |
# ---------- Start Auth ---------- # | |
PASSWORD = os.getenv("PASSWORD") | |
if not PASSWORD: | |
raise RuntimeError("Missing PASSWORD environment secret") | |
stats = {} | |
header = APIKeyHeader(name="X-Authorization", scheme_name="X-Authorization", description="base64 encoded username:password", auto_error=False) | |
def authenticate(credentials: Annotated[str, Security(header)]): | |
try: | |
decoded = base64.b64decode(credentials).decode("utf-8") | |
username, password = decoded.split(":", 1) | |
except Exception: | |
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid auth header") | |
if not secrets.compare_digest(password.encode("utf-8"), PASSWORD.encode("utf-8")): | |
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect password") | |
stats[username] = stats.get(username, 0) + 1 | |
print(f"User '{username}' used the service {stats[username]} times.") | |
return username | |
# ---------- End Auth ---------- # | |
# ---------- Start Middleware ---------- # | |
class DebugHeadersMiddleware(BaseHTTPMiddleware): | |
async def dispatch(self, request: Request, next): | |
print("Request headers:", request.headers) | |
response = await next(request) | |
return response | |
class TileValidationMiddleware(BaseHTTPMiddleware): | |
async def dispatch(self, request: Request, next): | |
parts = request.url.path.strip("/").split("/") | |
if len(parts) >= 4: | |
id = parts[0] | |
config = configs.get(id) | |
if not config: | |
return JSONResponse(content={"detail": f"Service '{id}' not found"}, status_code=status.HTTP_404_NOT_FOUND) | |
minZoom = config.get("minZoom", 0) | |
maxZoom = config.get("maxZoom", 20) | |
minLat = config.get("minLat", -90.0) | |
maxLat = config.get("maxLat", 90.0) | |
minLon = config.get("minLon", -180.0) | |
maxLon = config.get("maxLon", 180.0) | |
if config: | |
try: | |
z = int(parts[1]) | |
x = int(parts[2]) | |
y = int(parts[3]) | |
if not (minZoom <= z <= maxZoom): | |
return JSONResponse(content={"detail": "Zoom level out of bounds"}, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY) | |
west, north, east, south = xyz_to_bbox(z, x, y) | |
if ( | |
south < minLat or north > maxLat or | |
west < minLon or east > maxLon | |
): | |
return JSONResponse(content={"detail": "Tile out of bounding box"}, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY) | |
except Exception as e: | |
return JSONResponse(content={"detail": f"Invalid tile parameters: {str(e)}"}, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY) | |
return await next(request) | |
app.add_middleware(DebugHeadersMiddleware) | |
app.add_middleware(TileValidationMiddleware) | |
# ---------- End Middleware ---------- # | |
# ---------- Start Proxy ---------- # | |
async def proxy( | |
request: Request, | |
username: Annotated[str, Security(authenticate)], | |
id: str = Path(..., description="Service name"), | |
z: int = Path(..., description="Zoom level"), | |
x: int = Path(..., description="Tile X coordinate"), | |
y: int = Path(..., description="Tile Y coordinate"), | |
key: str = Query(None, description="API key (required if serviceURL contains {key})") | |
): | |
config = configs.get(id) | |
if not config: | |
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND , detail=f"Service '{id}' not found") | |
try: | |
queries = dict(request.query_params) | |
key = queries.get("key", None) | |
format = {"z": z, "x": x, "y": y} | |
if "{key}" in config["serviceURL"]: | |
if not key: | |
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Missing required 'key' parameter") | |
format["key"] = key | |
template = switch(config["serviceURL"]) | |
url = template.format(**format) | |
find = f"{id}/{z}/{x}/{y}" | |
if find in cache: | |
content, type = cache[find] | |
else: | |
async with httpx.AsyncClient(http2=True) as client: | |
resp = await client.get(url) | |
if resp.status_code != 200: | |
raise HTTPException(resp.status_code, detail="Upstream service error") | |
content = resp.content | |
type = resp.headers.get("content-type", "application/octet-stream") | |
cache[find] = (content, type) | |
return Response(content=content, media_type=type, headers={ | |
"Cache-Control": "public, max-age=21600, immutable" | |
}) | |
except Exception as e: | |
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Error formatting tile URL: {str(e)}") | |
# ---------- End Proxy ---------- # | |
# ---------- Start Root ---------- # | |
async def root(request: Request): | |
base = str(request.base_url).rstrip("/") | |
available = {} | |
for name in app.state.urls: | |
conf = configs[name] | |
available[name] = { | |
"url": f"{base}/{name}/{{z}}/{{x}}/{{y}}?key=", | |
"minZoom": conf.get("minZoom"), | |
"maxZoom": conf.get("maxZoom"), | |
"minLat": conf.get("minLat"), | |
"maxLat": conf.get("maxLat"), | |
"minLon": conf.get("minLon"), | |
"maxLon": conf.get("maxLon"), | |
"keyRequired": "{key}" in conf.get("serviceURL", ""), | |
} | |
return JSONResponse(content={"available": available}, status_code=status.HTTP_200_OK) | |
# ---------- End Root ---------- # | |
async def play(): | |
# ดึง schema ต้นฉบับ | |
display = get_openapi( | |
title=app.title, | |
version=app.version, | |
openapi_version=app.openapi_version, | |
summary="", | |
description="", | |
terms_of_service="", | |
contact="", | |
license_info="", | |
tags="", | |
routes=app.routes, | |
webhooks=app.webhooks.routes, | |
servers=app.servers, | |
separate_input_output_schemas=app.separate_input_output_schemas, | |
) | |
# แก้ security ของแต่ละ path + method | |
for path, methods in display.get("paths", {}).items(): | |
for method, details in methods.items(): | |
if "security" in details: | |
# แปลงจาก [{"Authorization": []}] เป็น [{"HTTPBasic": []}] | |
details["security"] = [{"HTTPBasic": []}] | |
# แก้ components.securitySchemes | |
display["components"]["securitySchemes"] = { | |
"HTTPBasic": { | |
"type": "http", | |
"scheme": "basic" | |
} | |
} | |
html = f""" | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<link type="text/css" rel="stylesheet" href="{get_swagger_ui_html.__kwdefaults__["swagger_css_url"]}"> | |
<link rel="shortcut icon" href="{get_swagger_ui_html.__kwdefaults__["swagger_favicon_url"]}"> | |
<title>{app.title} - Swagger UI</title> | |
</head> | |
<body> | |
<div id="swagger-ui"></div> | |
<script src="{get_swagger_ui_html.__kwdefaults__["swagger_js_url"]}"></script> | |
<script> | |
const spec = {json.dumps(display, ensure_ascii=False).replace("</", "<\\/")} | |
const ui = SwaggerUIBundle({{ | |
dom_id: '#swagger-ui', | |
spec: {json.dumps(display)}, | |
layout: 'BaseLayout', | |
presets: [ | |
SwaggerUIBundle.presets.apis, | |
SwaggerUIBundle.SwaggerUIStandalonePreset | |
] | |
}}) | |
</script> | |
<script> | |
(function() {{ | |
const originalFetch = window.fetch; | |
window.fetch = function(input, init = {{}}) {{ | |
if(init.headers) {{ | |
if(init.headers instanceof Headers) {{ | |
if(init.headers.has('Authorization')) {{ | |
let authValue = init.headers.get('Authorization'); | |
if (typeof authValue === 'string' && authValue.startsWith('Basic ')) {{ | |
authValue = authValue.slice(6); | |
}} | |
init.headers.delete('Authorization'); | |
init.headers.set('X-Authorization', authValue); | |
}} | |
}} else if (typeof init.headers === 'object') {{ | |
if('Authorization' in init.headers) {{ | |
let authValue = init.headers['Authorization']; | |
if (typeof authValue === 'string' && authValue.startsWith('Basic ')) {{ | |
authValue = authValue.slice(6); | |
}} | |
init.headers['X-Authorization'] = authValue; | |
delete init.headers['Authorization']; | |
}} | |
}} | |
}} | |
return originalFetch(input, init); | |
}}; | |
}})(); | |
</script> | |
</body> | |
</html> | |
""" | |
return HTMLResponse(html) | |
if __name__ == "__main__": | |
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False, log_level="debug") |