Spaces:
Runtime error
Runtime error
| import os | |
| from queue import Queue | |
| import gradio as gr | |
| import argilla as rg | |
| from argilla.webhooks import webhook_listener | |
| # Initialize Argilla client | |
| client = rg.Argilla() | |
| # Get the webhook server | |
| server = rg.get_webhook_server() | |
| # Queue to store events for display | |
| incoming_events = Queue() | |
| # Set up the webhook listener for response creation | |
| async def update_validation_space_on_answer(response, type, timestamp): | |
| """ | |
| Webhook listener that triggers when a new response is added to an answering space. | |
| It will automatically update the corresponding validation space with the new response. | |
| """ | |
| try: | |
| # Store the event for display in the UI | |
| incoming_events.put({"event": type, "timestamp": str(timestamp)}) | |
| # Print the response object structure to understand its properties | |
| print(f"Response object: {dir(response)}") | |
| # Get the record from the response | |
| record = response.record | |
| # Check if this is from an answering space | |
| dataset_name = record.dataset.name | |
| if not dataset_name.endswith("_responder_preguntas"): | |
| print(f"Ignoring event from non-answering dataset: {dataset_name}") | |
| return # Not an answering space, ignore | |
| # Extract the country from the dataset name | |
| country = dataset_name.replace("_responder_preguntas", "") | |
| print(f"Processing response for country: {country}") | |
| # Connect to the validation space | |
| validation_dataset_name = f"{country}_validar_respuestas" | |
| try: | |
| validation_dataset = client.datasets(validation_dataset_name) | |
| print(f"Found validation dataset: {validation_dataset_name}") | |
| except Exception as e: | |
| print(f"Error connecting to validation dataset: {e}") | |
| # You would need to import the create_validation_space function | |
| from build_space import create_validation_space | |
| validation_dataset = create_validation_space(country) | |
| response_value = str(response) | |
| # if hasattr(response, 'values') and 'text' in response.values: | |
| # if isinstance(response.values['text'], dict) and 'value' in response.values['text']: | |
| # response_value = response.values['text']['value'] | |
| # else: | |
| # response_value = str(response.values['text']) | |
| # Create a validation record with the correct attribute | |
| # Instead of response.value, we need to find the correct attribute | |
| # Based on the error, let's try to get the value differently | |
| validation_record = { | |
| "question": record.fields["question"], | |
| "answer": response_value, | |
| } | |
| # Add the record to the validation space | |
| validation_dataset.records.log(records=[validation_record]) | |
| print(f"Added new response to validation space for {country}") | |
| except Exception as e: | |
| print(f"Error in webhook handler: {e}") | |
| # Store the error in the queue for display | |
| incoming_events.put({"event": "error", "error": str(e)}) | |
| # Function to read the next event from the queue | |
| def read_next_event(): | |
| if not incoming_events.empty(): | |
| return incoming_events.get() | |
| return {} | |
| # Create Gradio interface | |
| with gr.Blocks() as demo: | |
| argilla_server = client.http_client.base_url | |
| gr.Markdown("## Argilla Webhooks - Validation Space Updater") | |
| gr.Markdown(f""" | |
| This application listens for new responses in Argilla answering spaces and automatically | |
| adds them to the corresponding validation spaces. | |
| Connected to Argilla server: {argilla_server} | |
| The webhook listens for: | |
| - `response.created` events from datasets ending with `_responder_preguntas` | |
| You can view the incoming events and any errors in the JSON component below. | |
| """) | |
| json_component = gr.JSON(label="Incoming events and errors:", value={}) | |
| gr.Timer(1, active=True).tick(read_next_event, outputs=json_component) | |
| # Mount the Gradio app to the FastAPI server | |
| gr.mount_gradio_app(server, demo, path="/") | |
| # Start the FastAPI server | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(server, host="0.0.0.0", port=7860) |