LinkedinMonitor / services /agentic_handlers.py
GuglielmoTor's picture
Update services/agentic_handlers.py
63f64cb verified
raw
history blame
21.4 kB
# handlers/agentic_handlers.py
import gradio as gr
import logging
from collections import defaultdict
import json # Added for JSON serialization/deserialization
# Attempt to import agentic pipeline functions and UI formatters
try:
from run_agentic_pipeline import run_full_analytics_orchestration
from ui.insights_ui_generator import (
format_report_to_markdown,
extract_key_results_for_selection,
format_single_okr_for_display
)
AGENTIC_MODULES_LOADED = True
except ImportError as e:
logging.error(f"Could not import agentic pipeline modules for AgenticHandlers: {e}.")
AGENTIC_MODULES_LOADED = False
# Define placeholder functions if modules are not loaded to avoid NameErrors during class definition
async def run_full_analytics_orchestration(*args, **kwargs): return None
def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable."
def extract_key_results_for_selection(okrs_dict): return []
def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable."
class AgenticHandlers:
def __init__(self, agentic_report_components, agentic_okrs_components,
token_state_ref, orchestration_raw_results_st_ref,
key_results_for_selection_st_ref, selected_key_result_ids_st_ref):
self.report_components = agentic_report_components
self.okrs_components = agentic_okrs_components
# References to global states
self.token_state = token_state_ref
self.orchestration_raw_results_st = orchestration_raw_results_st_ref
self.key_results_for_selection_st = key_results_for_selection_st_ref
self.selected_key_result_ids_st = selected_key_result_ids_st_ref
self.agentic_modules_really_loaded = AGENTIC_MODULES_LOADED
logging.info(f"AgenticHandlers initialized. Modules loaded: {self.agentic_modules_really_loaded}")
def _safe_checkbox_update(self, choices=None, value=None, interactive=True):
"""
Safely create a CheckboxGroup update with proper format.
"""
try:
update_dict = {}
if choices is not None:
# Ensure choices is a list of tuples (display, value)
formatted_choices = []
for choice in choices:
if isinstance(choice, tuple) and len(choice) == 2:
# Ensure both elements are strings
display_text = str(choice[0]).strip()
choice_value = str(choice[1]).strip()
formatted_choices.append((display_text, choice_value))
elif isinstance(choice, (str, int)):
# Convert single values to (display, value) tuples
choice_str = str(choice).strip()
formatted_choices.append((choice_str, choice_str))
else:
logging.warning(f"Invalid choice format: {choice}")
continue
update_dict['choices'] = formatted_choices
if value is not None:
# Ensure value is a list of strings that match choice values
if isinstance(value, list):
# Ensure all values are strings
formatted_value = [str(v).strip() for v in value if v is not None]
update_dict['value'] = formatted_value
else:
update_dict['value'] = []
update_dict['interactive'] = interactive
return gr.update(**update_dict)
except Exception as e:
logging.error(f"Error creating checkbox update: {e}")
return gr.update(choices=[], value=[], interactive=False)
async def run_agentic_pipeline_autonomously_on_update(self, current_token_state_val):
"""
This function is intended to be triggered by changes in token_state.
It yields updates for the agentic report and OKR tabs.
State values (5th, 6th, 7th) are serialized to JSON strings.
Updates for key_results_cbg are now for a CheckboxGroup.
"""
logging.info(f"Agentic pipeline auto-trigger. Token: {'Set' if current_token_state_val.get('token') else 'Not Set'}")
initial_report_status = "Pipeline AI: In attesa dei dati necessari..."
initial_okr_details = "Pipeline AI: In attesa dei dati necessari..."
# Initial state for key_results_cbg (CheckboxGroup)
initial_okr_cbg_update = self._safe_checkbox_update(choices=[], value=[], interactive=False)
initial_orchestration_results = self.orchestration_raw_results_st.value
initial_selected_krs = self.selected_key_result_ids_st.value
initial_krs_for_selection = self.key_results_for_selection_st.value
report_status_md_update = gr.update(value=initial_report_status) if self.report_components.get("agentic_pipeline_status_md") else gr.update()
report_display_md_update = gr.update()
okrs_detail_md_update = gr.update(value=initial_okr_details) if self.okrs_components.get("okr_detail_display_md") else gr.update()
if not current_token_state_val or not current_token_state_val.get("token"):
logging.info("Agentic pipeline: Token not available in token_state. Skipping actual run.")
yield (
report_status_md_update,
report_display_md_update,
initial_okr_cbg_update,
okrs_detail_md_update,
json.dumps(initial_orchestration_results), # Serialize to JSON
json.dumps(initial_selected_krs if isinstance(initial_selected_krs, list) else []), # Serialize to JSON
json.dumps(initial_krs_for_selection if isinstance(initial_krs_for_selection, list) else []) # Serialize to JSON
)
return
in_progress_status = "Analisi AI (Sempre) in corso..."
if self.report_components.get("agentic_pipeline_status_md"):
report_status_md_update = gr.update(value=in_progress_status)
if self.okrs_components.get("okr_detail_display_md"):
okrs_detail_md_update = gr.update(value="Dettagli OKR (Sempre) in corso di generazione...")
# Show loading state for CheckboxGroup
loading_okr_cbg_update = self._safe_checkbox_update(choices=[], value=[], interactive=False)
yield (
report_status_md_update,
report_display_md_update,
loading_okr_cbg_update,
okrs_detail_md_update,
json.dumps(initial_orchestration_results), # Serialize to JSON
json.dumps(initial_selected_krs if isinstance(initial_selected_krs, list) else []), # Serialize to JSON
json.dumps(initial_krs_for_selection if isinstance(initial_krs_for_selection, list) else []) # Serialize to JSON
)
if not self.agentic_modules_really_loaded:
logging.warning("Agentic modules not loaded. Skipping autonomous pipeline actual run.")
error_status = "Moduli AI non caricati. Operazione non disponibile."
if self.report_components.get("agentic_pipeline_status_md"):
report_status_md_update = gr.update(value=error_status)
if self.report_components.get("agentic_report_display_md"):
report_display_md_update = gr.update(value=error_status)
# Update for key_results_cbg (CheckboxGroup) in error case
error_okr_cbg_update = self._safe_checkbox_update(choices=[], value=[], interactive=False)
if self.okrs_components.get("okr_detail_display_md"):
okrs_detail_md_update = gr.update(value=error_status)
yield (
report_status_md_update,
report_display_md_update,
error_okr_cbg_update,
okrs_detail_md_update,
json.dumps(None),
json.dumps([]),
json.dumps([]) # Serialize to JSON
)
return
try:
date_filter_val_agentic = "Sempre"
custom_start_val_agentic = None
custom_end_val_agentic = None
logging.info("Agentic pipeline: Calling run_full_analytics_orchestration...")
orchestration_output = await run_full_analytics_orchestration(
current_token_state_val,
date_filter_val_agentic,
custom_start_val_agentic,
custom_end_val_agentic
)
final_status_text = "Pipeline AI (Sempre) completata."
logging.info(f"Autonomous agentic pipeline finished. Output keys: {orchestration_output.keys() if orchestration_output else 'None'}")
orchestration_results_update_val = None
selected_krs_update_val = [] # This will be the value for the CheckboxGroup, initially empty
krs_for_selection_update_val = []
final_okr_cbg_update = self._safe_checkbox_update(choices=[], value=[], interactive=False)
if orchestration_output:
orchestration_results_update_val = orchestration_output
report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report testuale fornito.")
if self.report_components.get("agentic_report_display_md"):
report_display_md_update = gr.update(value=format_report_to_markdown(report_str))
actionable_okrs = orchestration_output.get('actionable_okrs_and_tasks')
krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs)
krs_for_selection_update_val = krs_for_ui_selection_list # This is the list of dicts
# Prepare choices for key_results_cbg (CheckboxGroup)
kr_choices_for_cbg = []
if krs_for_ui_selection_list and isinstance(krs_for_ui_selection_list, list):
for kr in krs_for_ui_selection_list:
if isinstance(kr, dict) and 'kr_description' in kr and 'unique_kr_id' in kr:
# Ensure kr_description is a string and clean it
kr_desc = str(kr['kr_description']).strip()
# Truncate very long descriptions to avoid UI issues
if len(kr_desc) > 100:
kr_desc = kr_desc[:97] + "..."
# Ensure unique_kr_id is a string
kr_id = str(kr['unique_kr_id']).strip()
kr_choices_for_cbg.append((kr_desc, kr_id))
# Create CheckboxGroup update with proper choices
final_okr_cbg_update = self._safe_checkbox_update(
choices=kr_choices_for_cbg,
value=[],
interactive=True
)
all_okrs_md_parts = []
if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list):
for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]):
all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx))
if not all_okrs_md_parts:
if self.okrs_components.get("okr_detail_display_md"):
okrs_detail_md_update = gr.update(value="Nessun OKR generato o trovato (Sempre).")
else:
if self.okrs_components.get("okr_detail_display_md"):
okrs_detail_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts))
selected_krs_update_val = [] # Reset CheckboxGroup selection
else:
final_status_text = "Pipeline AI (Sempre): Nessun risultato prodotto."
if self.report_components.get("agentic_report_display_md"):
report_display_md_update = gr.update(value="Nessun report generato dalla pipeline AI (Sempre).")
# Update for key_results_cbg (CheckboxGroup) if no output
final_okr_cbg_update = self._safe_checkbox_update(choices=[], value=[], interactive=False)
if self.okrs_components.get("okr_detail_display_md"):
okrs_detail_md_update = gr.update(value="Nessun OKR generato o errore nella pipeline AI (Sempre).")
if self.report_components.get("agentic_pipeline_status_md"):
report_status_md_update = gr.update(value=final_status_text)
yield (
report_status_md_update,
report_display_md_update,
final_okr_cbg_update,
okrs_detail_md_update,
json.dumps(orchestration_results_update_val), # Serialize to JSON
json.dumps(selected_krs_update_val), # Serialize to JSON (value for selected_key_result_ids_st)
json.dumps(krs_for_selection_update_val) # Serialize to JSON (value for key_results_for_selection_st)
)
except Exception as e:
logging.error(f"Error during autonomous agentic pipeline execution: {e}", exc_info=True)
error_status_text = f"Errore pipeline AI (Sempre): {str(e)}"
if self.report_components.get("agentic_pipeline_status_md"):
report_status_md_update = gr.update(value=error_status_text)
if self.report_components.get("agentic_report_display_md"):
report_display_md_update = gr.update(value=f"Errore generazione report AI (Sempre): {str(e)}")
# Update for key_results_cbg (CheckboxGroup) in case of exception
error_okr_cbg_update = self._safe_checkbox_update(choices=[], value=[], interactive=False)
if self.okrs_components.get("okr_detail_display_md"):
okrs_detail_md_update = gr.update(value=f"Errore generazione OKR AI (Sempre): {str(e)}")
yield (
report_status_md_update,
report_display_md_update,
error_okr_cbg_update,
okrs_detail_md_update,
json.dumps(None),
json.dumps([]),
json.dumps([]) # Serialize to JSON
)
def update_okr_display_on_kr_selection(self, selected_kr_unique_ids: list,
raw_orchestration_results_json: str,
all_krs_for_selection_list_json: str):
"""
Updates the OKR detail display when Key Results are selected in the CheckboxGroup.
raw_orchestration_results_json and all_krs_for_selection_list_json are expected
to be JSON strings from state.
"""
if not self.agentic_modules_really_loaded:
return gr.update(value="Moduli AI non caricati. Impossibile visualizzare i dettagli OKR.")
# Handle case where selected_kr_unique_ids might be None or not a list
if not isinstance(selected_kr_unique_ids, list):
selected_kr_unique_ids = []
# Ensure all selected IDs are strings
selected_kr_unique_ids = [str(id).strip() for id in selected_kr_unique_ids if id is not None]
parsed_orchestration_results = None
try:
if raw_orchestration_results_json: # Check if the string is not empty
parsed_orchestration_results = json.loads(raw_orchestration_results_json)
except (json.JSONDecodeError, TypeError) as e:
logging.error(f"Failed to parse raw_orchestration_results_json: {raw_orchestration_results_json}. Error: {e}")
return gr.update(value="Errore: Dati interni corrotti (orchestration results).")
if not parsed_orchestration_results: # This covers None or empty after parsing
return gr.update(value="Nessun dato dalla pipeline AI (orchestration results).")
parsed_krs_for_selection_list = []
try:
if all_krs_for_selection_list_json: # Check if the string is not empty
parsed_krs_for_selection_list = json.loads(all_krs_for_selection_list_json)
except (json.JSONDecodeError, TypeError) as e:
logging.error(f"Failed to parse all_krs_for_selection_list_json: {all_krs_for_selection_list_json}. Error: {e}")
return gr.update(value="Errore: Dati interni corrotti (krs for selection).")
# Ensure parsed_krs_for_selection_list is a list, even if JSON was 'null' or other non-list type
if not isinstance(parsed_krs_for_selection_list, list):
logging.warning(f"Parsed all_krs_for_selection_list is not a list: {type(parsed_krs_for_selection_list)}. Defaulting to empty list.")
parsed_krs_for_selection_list = []
actionable_okrs_dict = parsed_orchestration_results.get("actionable_okrs_and_tasks") if isinstance(parsed_orchestration_results, dict) else None
if not actionable_okrs_dict or not isinstance(actionable_okrs_dict.get("okrs"), list):
return gr.update(value="Nessun OKR trovato nei risultati della pipeline (o dati in formato imprevisto).")
okrs_list = actionable_okrs_dict["okrs"]
if not okrs_list:
return gr.update(value="Nessun OKR generato.")
kr_id_to_indices = {}
if isinstance(parsed_krs_for_selection_list, list): # Ensure it's a list before iterating
for kr_info in parsed_krs_for_selection_list:
if isinstance(kr_info, dict) and 'unique_kr_id' in kr_info and 'okr_index' in kr_info and 'kr_index' in kr_info:
kr_id = str(kr_info['unique_kr_id']).strip()
kr_id_to_indices[kr_id] = (kr_info['okr_index'], kr_info['kr_index'])
else:
logging.warning(f"Skipping invalid kr_info item: {kr_info}")
selected_krs_by_okr_idx = defaultdict(list)
# selected_kr_unique_ids comes directly from CheckboxGroup, should be a list of strings/values
if isinstance(selected_kr_unique_ids, list):
for kr_unique_id in selected_kr_unique_ids:
kr_unique_id_str = str(kr_unique_id).strip()
if kr_unique_id_str in kr_id_to_indices:
okr_idx, kr_idx_in_okr = kr_id_to_indices[kr_unique_id_str]
selected_krs_by_okr_idx[okr_idx].append(kr_idx_in_okr)
output_md_parts = []
for okr_idx, okr_data in enumerate(okrs_list):
accepted_indices_for_this_okr = selected_krs_by_okr_idx.get(okr_idx)
if selected_kr_unique_ids:
if accepted_indices_for_this_okr is not None:
formatted_okr_md = format_single_okr_for_display(
okr_data,
accepted_kr_indices=accepted_indices_for_this_okr,
okr_main_index=okr_idx
)
output_md_parts.append(formatted_okr_md)
else:
formatted_okr_md = format_single_okr_for_display(
okr_data,
accepted_kr_indices=None,
okr_main_index=okr_idx
)
output_md_parts.append(formatted_okr_md)
if not output_md_parts and selected_kr_unique_ids:
final_md = "Nessun OKR corrisponde alla selezione corrente o i KR selezionati non hanno task dettagliati."
elif not output_md_parts and not selected_kr_unique_ids:
final_md = "Nessun OKR generato."
else:
final_md = "\n\n---\n\n".join(output_md_parts)
return gr.update(value=final_md)
def setup_event_handlers(self):
"""Sets up event handlers for the agentic OKRs tab."""
if not self.agentic_modules_really_loaded:
logging.warning("Agentic modules not loaded. Skipping agentic event handler setup.")
return
if self.okrs_components.get("key_results_cbg"):
self.okrs_components['key_results_cbg'].change(
fn=self.update_okr_display_on_kr_selection,
inputs=[
self.okrs_components['key_results_cbg'],
self.orchestration_raw_results_st,
self.key_results_for_selection_st
],
outputs=[self.okrs_components['okr_detail_display_md']],
api_name="update_okr_display_on_kr_selection" # Keep api_name for Gradio
)
logging.info("Agentic OKR selection handler setup complete.")
else:
logging.warning("key_results_cbg component not found for agentic OKR handler setup.")