import os from datetime import datetime from queue import Queue from argilla.webhooks import webhook_listener import argilla as rg import gradio as gr client = rg.Argilla() server = rg.get_webhook_server() incoming_events = Queue() # Set up the webhook listeners # Delete all existing webhooks for webhook in client.webhooks: print(f"Deleting webhook: {webhook.url}") webhook.delete() @webhook_listener(events="response.created") async def update_validation_space_on_answer(event): """ Webhook listener that triggers when a new response is added to an answering space. It will automatically update the corresponding validation space with the new response. Args: event: The webhook event containing the user response data """ response = event.response record = response.record dataset_name = record.dataset.name if not dataset_name.endswith("_responder_preguntas"): return # Not an answering space, ignore # Extract the country from the dataset name country = dataset_name.replace("_responder_preguntas", "") validation_dataset_name = f"{country}_validar_respuestas" try: validation_dataset = client.datasets(validation_dataset_name) except Exception as e: print(f"Error connecting to validation dataset: {e}") validation_dataset = create_validation_space(country) validation_record = { "question": record.fields["question"], "answer": response.value, } validation_dataset.records.log(records=[validation_record]) print(f"Added new response to validation space for {country}") # Create a webhook for record events @rg.webhook_listener(events=["record.deleted", "record.completed"]) async def record_events(record: rg.Record, type: str, timestamp: datetime): print(f"Received event type {type} at {timestamp}: ", record) incoming_events.put({"event": type, "data": record}) # Create a webhook for dataset events @rg.webhook_listener(events=["dataset.created", "dataset.updated", "dataset.deleted", "dataset.published"]) async def dataset_events(dataset: rg.Dataset, type: str, timestamp: datetime): print(f"Received event type {type} at {timestamp}: ", dataset) incoming_events.put({"event": type, "data": dataset}) # Create a webhook for response events @rg.webhook_listener(events=["response.created", "response.updated"]) async def response_events(response: rg.UserResponse, type: str, timestamp: datetime): print(f"Received event type {type} at {timestamp}: ", response) incoming_events.put({"event": type, "data": response}) def read_next_event(): event = incoming_events.get() return event with gr.Blocks() as demo: argilla_server = client.http_client.base_url gr.Markdown("## Argilla Webhooks") gr.Markdown(f""" This demo shows the incoming events from the [Argilla Server]({argilla_server}). The application defines three webhook listeners for the following events: - Record events: `record.deleted`, `record.completed` - Dataset events: `dataset.created`, `dataset.updated`, `dataset.deleted`, `dataset.published` - Response events: `response.created`, `response.updated` The events are stored in a queue and displayed in the JSON component and the incoming events is updated every second. You can view the incoming events in the JSON component below. This application is just a demonstration of how to use the Argilla webhook listeners. You can visit the [Argilla documentation](https://docs.argilla.io/dev/how_to_guides/webhooks) for more information. """) json_component = gr.JSON(label="Incoming argilla events:", value={}) gr.Timer(1, active=True).tick(read_next_event, outputs=json_component) gr.mount_gradio_app(server, demo, path="/") # Start the FastAPI server import uvicorn uvicorn.run(server, host="0.0.0.0", port=7860)