webhook-listener / main.py
pagezyhf's picture
pagezyhf HF Staff
Update main.py
d7f040a verified
from fastapi import FastAPI, Request, Response
import os
import requests
import json
import logging
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Configurations
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL")
MODEL_CATALOG_WEBHOOK = os.environ.get("MODEL_CATALOG_WEBHOOK")
MODEL_CATALOG_WEBHOOK_SECRET = os.environ.get("MODEL_CATALOG_WEBHOOK_SECRET")
SIMSHIP_WEBHOOK = os.environ.get("SIMSHIP_WEBHOOK")
SIMSHIP_WEBHOOK_SECRET = os.environ.get("SIMSHIP_WEBHOOK_SECRET")
def send_slack_message(message: str):
"""Send a message to Slack using webhook URL"""
if not SLACK_WEBHOOK_URL:
logger.warning(f"No Slack webhook URL configured. Message: {message}")
return
payload = {"text": message}
try:
response = requests.post(SLACK_WEBHOOK_URL, json=payload)
response.raise_for_status()
logger.info(f"Slack message sent successfully: {message}")
except requests.exceptions.RequestException as e:
logger.error(f"Failed to send Slack message: {e}")
def process_model_catalog_webhook(data: dict):
"""
Process model catalog webhook
"""
event = data.get("event", {})
repo = data.get("repo", {})
movedTo = data.get("movedTo", {})
# repo name changed
if (
repo.get("type") == "model" and
event.get("scope") == "repo" and
event.get("action") == "move"
):
message = f"πŸ”„ Model in the catalog renamed: {repo.get('name', '')} β†’ https://hf.co/{movedTo.get('name', '')}"
send_slack_message(message)
# repo deleted
elif (
repo.get("type") == "model" and
event.get("scope") == "repo" and
event.get("action") == "delete"
):
message = f"πŸ—‘οΈ Model in the catalog deleted: https://hf.co/{repo.get('name', '')}"
send_slack_message(message)
# other events
else:
pass
def process_simship_webhook(data: dict):
"""
Process simship webhook
"""
event = data.get("event", {})
repo = data.get("repo", {})
updatedConfig = data.get("updatedConfig", {})
# repo creation
if (
repo.get("type") == "model" and
event.get("scope") == "repo" and
event.get("action") == "create"
):
message = f"πŸ†• SimShip Model created: https://hf.co/{repo.get('name', '')}"
send_slack_message(message)
# repo visibility update
elif (
repo.get("type") == "model" and
event.get("scope") == "repo.config" and
event.get("action") == "update" and
updatedConfig.get("private") is False
):
message = f"πŸ†• SimShip Model made public: https://hf.co/{repo.get('name', '')}"
send_slack_message(message)
# other events
else:
pass
app = FastAPI()
@app.post("/webhook")
async def webhook(request: Request):
logger.info(f"Received webhook request from {request.client.host}")
if request.headers.get("X-Webhook-Secret") not in {MODEL_CATALOG_WEBHOOK_SECRET, SIMSHIP_WEBHOOK_SECRET}:
logger.warning("Invalid webhook secret received")
return Response("Invalid secret", status_code=401)
data = await request.json()
logger.info(f"Webhook payload: {json.dumps(data, indent=2)}")
webhook_id = data.get("webhook", {}).get("id", None)
if webhook_id == MODEL_CATALOG_WEBHOOK:
logger.info("Processing model catalog webhook")
process_model_catalog_webhook(data)
elif webhook_id == SIMSHIP_WEBHOOK:
logger.info("Processing simship webhook")
process_simship_webhook(data)
else:
logger.warning("Invalid webhook ID received")
return Response("Invalid webhook ID", status_code=401)
return Response("Webhook notification received and processed!", status_code=200)