Spaces:
Sleeping
Sleeping
File size: 3,979 Bytes
a7c2cd2 30c85ad 32c2587 f039650 32c2587 f039650 4a7815d 32c2587 f039650 32c2587 ea4e048 32c2587 16eaf19 17145bf f40fa43 32c2587 95feb70 30c85ad 48225dd f420a36 f039650 bcf795e 32c2587 30c85ad f039650 f420a36 f039650 32c2587 ea4e048 30c85ad ea4e048 f420a36 48225dd f039650 30c85ad ea4e048 32c2587 ea4e048 32c2587 30c85ad 32c2587 a7c2cd2 876efb6 b0bc07c 4a7815d b0bc07c 4a7815d b0bc07c 30c85ad 32c2587 6020a54 16eaf19 f40fa43 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
import os
from datetime import datetime
from queue import Queue
import argilla as rg
import gradio as gr
client = rg.Argilla()
server = rg.get_webhook_server()
incoming_events = Queue()
# Set up the webhook listeners
# Delete all existing webhooks
for webhook in client.webhooks:
print(f"Deleting webhook: {webhook.url}")
webhook.delete()
@webhook_listener(events="response.created")
async def update_validation_space_on_answer(event):
"""
Webhook listener that triggers when a new response is added to an answering space.
It will automatically update the corresponding validation space with the new response.
Args:
event: The webhook event containing the user response data
"""
response = event.response
record = response.record
dataset_name = record.dataset.name
if not dataset_name.endswith("_responder_preguntas"):
return # Not an answering space, ignore
# Extract the country from the dataset name
country = dataset_name.replace("_responder_preguntas", "")
validation_dataset_name = f"{country}_validar_respuestas"
try:
validation_dataset = client.datasets(validation_dataset_name)
except Exception as e:
print(f"Error connecting to validation dataset: {e}")
validation_dataset = create_validation_space(country)
validation_record = {
"question": record.fields["question"],
"answer": response.value,
}
validation_dataset.records.log(records=[validation_record])
print(f"Added new response to validation space for {country}")
# Create a webhook for record events
@rg.webhook_listener(events=["record.deleted", "record.completed"])
async def record_events(record: rg.Record, type: str, timestamp: datetime):
print(f"Received event type {type} at {timestamp}: ", record)
incoming_events.put({"event": type, "data": record})
# Create a webhook for dataset events
@rg.webhook_listener(events=["dataset.created", "dataset.updated", "dataset.deleted", "dataset.published"])
async def dataset_events(dataset: rg.Dataset, type: str, timestamp: datetime):
print(f"Received event type {type} at {timestamp}: ", dataset)
incoming_events.put({"event": type, "data": dataset})
# Create a webhook for response events
@rg.webhook_listener(events=["response.created", "response.updated"])
async def response_events(response: rg.UserResponse, type: str, timestamp: datetime):
print(f"Received event type {type} at {timestamp}: ", response)
incoming_events.put({"event": type, "data": response})
def read_next_event():
event = incoming_events.get()
return event
with gr.Blocks() as demo:
argilla_server = client.http_client.base_url
gr.Markdown("## Argilla Webhooks")
gr.Markdown(f"""
This demo shows the incoming events from the [Argilla Server]({argilla_server}).
The application defines three webhook listeners for the following events:
- Record events: `record.deleted`, `record.completed`
- Dataset events: `dataset.created`, `dataset.updated`, `dataset.deleted`, `dataset.published`
- Response events: `response.created`, `response.updated`
The events are stored in a queue and displayed in the JSON component and the incoming events is updated every second.
You can view the incoming events in the JSON component below.
This application is just a demonstration of how to use the Argilla webhook listeners.
You can visit the [Argilla documentation](https://docs.argilla.io/dev/how_to_guides/webhooks) for more information.
""")
json_component = gr.JSON(label="Incoming argilla events:", value={})
gr.Timer(1, active=True).tick(read_next_event, outputs=json_component)
gr.mount_gradio_app(server, demo, path="/")
# Start the FastAPI server
import uvicorn
uvicorn.run(server, host="0.0.0.0", port=7860)
|