LinkedinMonitor / services /dashboard_sync_handlers.py
GuglielmoTor's picture
Create dashboard_sync_handlers.py
d3914e5 verified
raw
history blame
8.56 kB
# handlers/dashboard_sync_handlers.py
import gradio as gr
import logging
# Assuming these functions are in the specified paths and are importable
from services.state_manager import process_and_store_bubble_token
from services.sync_logic import sync_all_linkedin_data_orchestrator
from ui.ui_generators import display_main_dashboard
class DashboardSyncHandlers:
def __init__(self, dashboard_components, token_state_ref, url_user_token_display_ref, org_urn_display_ref, status_box_ref):
self.components = dashboard_components
self.token_state = token_state_ref # gr.State object
self.url_user_token_display = url_user_token_display_ref # gr.Textbox object
self.org_urn_display = org_urn_display_ref # gr.Textbox object
self.status_box = status_box_ref # gr.Textbox object
logging.info("DashboardSyncHandlers initialized.")
def initial_load_sequence(self, url_token, org_urn_val, current_token_state_value):
"""
Handles the initial loading sequence after URL parameters are fetched.
This is called by app.load().then() or org_urn_display.change().
"""
logging.info(f"Initial load sequence triggered. URL Token: {'Set' if url_token else 'Not Set'}, Org URN: {org_urn_val}")
# process_and_store_bubble_token is expected to return:
# status_msg, new_state_dict, btn_update_dict (for sync_data_btn)
status_msg, new_state_dict, btn_update_dict = process_and_store_bubble_token(
url_token,
org_urn_val,
current_token_state_value # Pass the current value of the state
)
# display_main_dashboard expects the new state dictionary
dashboard_content_html = display_main_dashboard(new_state_dict)
# Returns:
# 1. Update for status_box (value)
# 2. New value for token_state (this will update the gr.State object)
# 3. Update for sync_data_btn (e.g., gr.update(visible=True, interactive=True))
# 4. Update for dashboard_display_html (value)
return status_msg, new_state_dict, btn_update_dict, dashboard_content_html
async def trigger_sync_and_refresh(self, current_token_state_value, url_token, org_urn_val):
"""
Orchestrates the full sync process when the sync button is clicked.
This combines sync_all_linkedin_data_orchestrator and subsequent updates.
Yields updates for a chained sequence.
"""
logging.info("Sync data button clicked. Starting sync process.")
# Part 1: sync_all_linkedin_data_orchestrator
# Expected to return: html_status_update_str, updated_token_state_dict
sync_status_html, updated_token_state_after_sync = await sync_all_linkedin_data_orchestrator(current_token_state_value)
yield sync_status_html, updated_token_state_after_sync # Updates sync_status_html_output, token_state
# Part 2: process_and_store_bubble_token again with potentially updated token_state
# This ensures the state is consistent after sync and reflects any new data fetched by sync_all_linkedin_data_orchestrator
# It might also re-evaluate if sync button should be visible/interactive
status_msg_after_sync, final_token_state_dict, sync_btn_update_after_sync = process_and_store_bubble_token(
url_token, # url_token and org_urn_val might be stale if not re-fetched, but usually static for a session
org_urn_val,
updated_token_state_after_sync # Use the state updated by sync_all_linkedin_data_orchestrator
)
yield status_msg_after_sync, final_token_state_dict, sync_btn_update_after_sync # Updates status_box, token_state, sync_data_btn
# Part 3: Refresh dashboard display
dashboard_html_after_sync = display_main_dashboard(final_token_state_dict)
yield dashboard_html_after_sync # Updates dashboard_display_html
# The subsequent calls to refresh_analytics_graphs_ui and run_agentic_pipeline_autonomously
# will be chained in app.py using .then() on this sync event, using the updated token_state.
def setup_event_handlers(self, initial_load_trigger_component, agentic_handlers_ref, analytics_handlers_ref):
"""
Sets up event handlers for the dashboard and sync tab.
initial_load_trigger_component is typically org_urn_display.
"""
logging.info("Setting up dashboard/sync event handlers.")
# Initial load sequence
# This is more complex due to the chained .then() calls for analytics and agentic pipeline
# The main app.py will handle the .then() chaining. This handler provides the core functions.
# The initial_load_sequence method itself will be the 'fn' for the first .change() or .load().
# Sync button click
# The sync_data_btn.click event will also have .then() chains in app.py
# We define the primary function here.
self.components['sync_data_btn'].click(
fn=self.trigger_sync_and_refresh,
inputs=[
self.token_state,
self.url_user_token_display, # Pass the component to get its current value
self.org_urn_display # Pass the component to get its current value
],
outputs=[
self.components['sync_status_html_output'], # Output 1 from sync_all_linkedin_data
self.token_state, # Output 2 from sync_all_linkedin_data
self.status_box, # Output 1 from process_and_store (after sync)
self.token_state, # Output 2 from process_and_store (after sync)
self.components['sync_data_btn'], # Output 3 from process_and_store (after sync)
self.components['dashboard_display_html'] # Output from display_main_dashboard (after sync)
],
show_progress="full",
api_name="sync_linkedin_data_full_flow"
# Note: The number of outputs here must match the total number of yields in trigger_sync_and_refresh
# trigger_sync_and_refresh yields:
# 1. sync_status_html (for sync_status_html_output)
# 2. updated_token_state_after_sync (for token_state)
# --- then ---
# 3. status_msg_after_sync (for status_box)
# 4. final_token_state_dict (for token_state)
# 5. sync_btn_update_after_sync (for sync_data_btn)
# --- then ---
# 6. dashboard_html_after_sync (for dashboard_display_html)
# This means the Gradio event needs to be structured to handle these yields if they are separate updates.
# However, Gradio's .click typically expects a single function that returns all outputs or a generator
# that yields tuples of updates for all outputs at each step.
# For simplicity, trigger_sync_and_refresh should be an async generator yielding tuples for all outputs.
# Let's refine trigger_sync_and_refresh to yield tuples.
)
logging.info("Dashboard/sync event handlers setup complete.")
async def refined_trigger_sync_and_refresh(self, current_token_state_value, url_token, org_urn_val):
"""
Refined orchestrator for sync process, yielding tuples for all outputs at each step.
Outputs:
1. self.components['sync_status_html_output']
2. self.token_state
3. self.status_box
4. self.components['sync_data_btn'] (this is tricky, might need to be separate or handled by token_state change)
5. self.components['dashboard_display_html']
Let's simplify the outputs for the direct click and handle subsequent updates via .then() in app.py
The click will call sync_all_linkedin_data_orchestrator.
Then, .then() will call process_and_store_bubble_token.
Then, .then() will call display_main_dashboard.
And further .then() for agentic and analytics.
So, this handler will provide the individual functions.
"""
pass # The original structure in app.py with chained .then() is better.
# This class will hold the functions called by those .then() chains.
# sync_all_linkedin_data_orchestrator
# process_and_store_bubble_token (already imported)
# display_main_dashboard (already imported)