Spaces:
Sleeping
Sleeping
# handlers/agentic_handlers.py | |
import gradio as gr | |
import logging | |
from collections import defaultdict | |
# Attempt to import agentic pipeline functions and UI formatters | |
try: | |
from run_agentic_pipeline import run_full_analytics_orchestration | |
from ui.insights_ui_generator import ( | |
format_report_to_markdown, | |
extract_key_results_for_selection, | |
format_single_okr_for_display | |
) | |
AGENTIC_MODULES_LOADED = True | |
except ImportError as e: | |
logging.error(f"Could not import agentic pipeline modules for AgenticHandlers: {e}.") | |
AGENTIC_MODULES_LOADED = False | |
# Define placeholder functions if modules are not loaded to avoid NameErrors during class definition | |
async def run_full_analytics_orchestration(*args, **kwargs): return None | |
def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable." | |
def extract_key_results_for_selection(okrs_dict): return [] | |
def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable." | |
class AgenticHandlers: | |
def __init__(self, agentic_report_components, agentic_okrs_components, | |
token_state_ref, orchestration_raw_results_st_ref, | |
key_results_for_selection_st_ref, selected_key_result_ids_st_ref): | |
self.report_components = agentic_report_components | |
self.okrs_components = agentic_okrs_components | |
# References to global states | |
self.token_state = token_state_ref | |
self.orchestration_raw_results_st = orchestration_raw_results_st_ref | |
self.key_results_for_selection_st = key_results_for_selection_st_ref | |
self.selected_key_result_ids_st = selected_key_result_ids_st_ref # Though this is updated by CBG, might be read | |
self.agentic_modules_really_loaded = AGENTIC_MODULES_LOADED # Store the flag | |
logging.info(f"AgenticHandlers initialized. Modules loaded: {self.agentic_modules_really_loaded}") | |
async def run_agentic_pipeline_autonomously_on_update(self, current_token_state_val): | |
""" | |
This function is intended to be triggered by changes in token_state | |
(e.g., after initial load or after sync). | |
It yields updates for the agentic report and OKR tabs. | |
""" | |
logging.info(f"Agentic pipeline auto-trigger. Token: {'Set' if current_token_state_val.get('token') else 'Not Set'}") | |
# Initial "waiting" status updates | |
initial_report_status = "Pipeline AI: In attesa dei dati necessari..." | |
initial_okr_choices = [] | |
initial_okr_value = [] | |
initial_okr_interactive = False | |
initial_okr_details = "Pipeline AI: In attesa dei dati necessari..." | |
initial_orchestration_results = self.orchestration_raw_results_st.value # Preserve if re-running | |
initial_selected_krs = self.selected_key_result_ids_st.value | |
initial_krs_for_selection = self.key_results_for_selection_st.value | |
# Check if components exist (in case tabs were conditionally not fully rendered) | |
report_status_md_update = gr.update(value=initial_report_status) if self.report_components.get("agentic_pipeline_status_md") else gr.update() | |
report_display_md_update = gr.update() # No change to report display yet | |
okrs_cbg_update = gr.update(choices=initial_okr_choices, value=initial_okr_value, interactive=initial_okr_interactive) if self.okrs_components.get("key_results_cbg") else gr.update() | |
okrs_detail_md_update = gr.update(value=initial_okr_details) if self.okrs_components.get("okr_detail_display_md") else gr.update() | |
if not current_token_state_val or not current_token_state_val.get("token"): | |
logging.info("Agentic pipeline: Token not available in token_state. Skipping actual run.") | |
yield ( | |
report_status_md_update, # For agentic_pipeline_status_md | |
report_display_md_update, # For agentic_report_display_md (no change yet) | |
okrs_cbg_update, # For key_results_cbg | |
okrs_detail_md_update, # For okr_detail_display_md | |
initial_orchestration_results, # For orchestration_raw_results_st | |
initial_selected_krs, # For selected_key_result_ids_st | |
initial_krs_for_selection # For key_results_for_selection_st | |
) | |
return | |
# Update status to "in progress" | |
in_progress_status = "Analisi AI (Sempre) in corso..." | |
if self.report_components.get("agentic_pipeline_status_md"): | |
report_status_md_update = gr.update(value=in_progress_status) | |
if self.okrs_components.get("okr_detail_display_md"): # Also update OKR detail placeholder | |
okrs_detail_md_update = gr.update(value="Dettagli OKR (Sempre) in corso di generazione...") | |
yield ( | |
report_status_md_update, report_display_md_update, okrs_cbg_update, okrs_detail_md_update, | |
initial_orchestration_results, initial_selected_krs, initial_krs_for_selection | |
) | |
if not self.agentic_modules_really_loaded: | |
logging.warning("Agentic modules not loaded. Skipping autonomous pipeline actual run.") | |
error_status = "Moduli AI non caricati. Operazione non disponibile." | |
if self.report_components.get("agentic_pipeline_status_md"): | |
report_status_md_update = gr.update(value=error_status) | |
if self.report_components.get("agentic_report_display_md"): | |
report_display_md_update = gr.update(value=error_status) # Update report display too | |
if self.okrs_components.get("okr_detail_display_md"): | |
okrs_detail_md_update = gr.update(value=error_status) # Update OKR detail too | |
yield ( | |
report_status_md_update, report_display_md_update, okrs_cbg_update, okrs_detail_md_update, | |
None, [], [] # Reset states | |
) | |
return | |
try: | |
# Always use "Sempre" filter for the autonomous agentic pipeline | |
date_filter_val_agentic = "Sempre" | |
custom_start_val_agentic = None | |
custom_end_val_agentic = None | |
logging.info("Agentic pipeline: Calling run_full_analytics_orchestration...") | |
orchestration_output = await run_full_analytics_orchestration( | |
current_token_state_val, | |
date_filter_val_agentic, | |
custom_start_val_agentic, | |
custom_end_val_agentic | |
) | |
final_status_text = "Pipeline AI (Sempre) completata." | |
logging.info(f"Autonomous agentic pipeline finished. Output keys: {orchestration_output.keys() if orchestration_output else 'None'}") | |
orchestration_results_update_val = None | |
selected_krs_update_val = [] | |
krs_for_selection_update_val = [] | |
if orchestration_output: | |
orchestration_results_update_val = orchestration_output # Store the raw output | |
report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report testuale fornito.") | |
if self.report_components.get("agentic_report_display_md"): | |
report_display_md_update = gr.update(value=format_report_to_markdown(report_str)) | |
actionable_okrs = orchestration_output.get('actionable_okrs_and_tasks') | |
krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs) | |
krs_for_selection_update_val = krs_for_ui_selection_list # Store for CBG change handler | |
kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list] | |
if self.okrs_components.get("key_results_cbg"): | |
okrs_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True) | |
# Display all OKRs initially (before any KR selection) | |
all_okrs_md_parts = [] | |
if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list): | |
for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]): | |
all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx)) | |
if not all_okrs_md_parts: | |
if self.okrs_components.get("okr_detail_display_md"): | |
okrs_detail_md_update = gr.update(value="Nessun OKR generato o trovato (Sempre).") | |
else: | |
if self.okrs_components.get("okr_detail_display_md"): | |
okrs_detail_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts)) | |
selected_krs_update_val = [] # Reset selection | |
else: # No output from pipeline | |
final_status_text = "Pipeline AI (Sempre): Nessun risultato prodotto." | |
if self.report_components.get("agentic_report_display_md"): | |
report_display_md_update = gr.update(value="Nessun report generato dalla pipeline AI (Sempre).") | |
if self.okrs_components.get("key_results_cbg"): | |
okrs_cbg_update = gr.update(choices=[], value=[], interactive=False) | |
if self.okrs_components.get("okr_detail_display_md"): | |
okrs_detail_md_update = gr.update(value="Nessun OKR generato o errore nella pipeline AI (Sempre).") | |
if self.report_components.get("agentic_pipeline_status_md"): | |
report_status_md_update = gr.update(value=final_status_text) | |
yield ( | |
report_status_md_update, report_display_md_update, okrs_cbg_update, okrs_detail_md_update, | |
orchestration_results_update_val, selected_krs_update_val, krs_for_selection_update_val | |
) | |
except Exception as e: | |
logging.error(f"Error during autonomous agentic pipeline execution: {e}", exc_info=True) | |
error_status_text = f"Errore pipeline AI (Sempre): {str(e)}" | |
if self.report_components.get("agentic_pipeline_status_md"): | |
report_status_md_update = gr.update(value=error_status_text) | |
if self.report_components.get("agentic_report_display_md"): | |
report_display_md_update = gr.update(value=f"Errore generazione report AI (Sempre): {str(e)}") | |
if self.okrs_components.get("key_results_cbg"): | |
okrs_cbg_update = gr.update(choices=[], value=[], interactive=False) | |
if self.okrs_components.get("okr_detail_display_md"): | |
okrs_detail_md_update = gr.update(value=f"Errore generazione OKR AI (Sempre): {str(e)}") | |
yield ( | |
report_status_md_update, report_display_md_update, okrs_cbg_update, okrs_detail_md_update, | |
None, [], [] # Reset states on error | |
) | |
def update_okr_display_on_kr_selection(self, selected_kr_unique_ids: list, | |
raw_orchestration_results: dict, | |
all_krs_for_selection_list: list): | |
""" | |
Updates the OKR detail display when Key Results are selected in the CheckboxGroup. | |
""" | |
if not self.agentic_modules_really_loaded: | |
return gr.update(value="Moduli AI non caricati. Impossibile visualizzare i dettagli OKR.") | |
if not raw_orchestration_results: | |
return gr.update(value="Nessun dato dalla pipeline AI o moduli non caricati.") | |
actionable_okrs_dict = raw_orchestration_results.get("actionable_okrs_and_tasks") | |
if not actionable_okrs_dict or not isinstance(actionable_okrs_dict.get("okrs"), list): | |
return gr.update(value="Nessun OKR trovato nei risultati della pipeline.") | |
okrs_list = actionable_okrs_dict["okrs"] | |
if not okrs_list: # Should be caught by above, but defensive | |
return gr.update(value="Nessun OKR generato.") | |
# Create a mapping from unique_kr_id to its (okr_index, kr_index_within_okr) | |
kr_id_to_indices = {} | |
if all_krs_for_selection_list: # This list comes from extract_key_results_for_selection | |
for kr_info in all_krs_for_selection_list: | |
kr_id_to_indices[kr_info['unique_kr_id']] = (kr_info['okr_index'], kr_info['kr_index']) | |
# Group selected KR indices by their parent OKR index | |
selected_krs_by_okr_idx = defaultdict(list) | |
if selected_kr_unique_ids: # This is the list of unique_kr_ids from the checkboxgroup | |
for kr_unique_id in selected_kr_unique_ids: | |
if kr_unique_id in kr_id_to_indices: | |
okr_idx, kr_idx_in_okr = kr_id_to_indices[kr_unique_id] | |
selected_krs_by_okr_idx[okr_idx].append(kr_idx_in_okr) | |
output_md_parts = [] | |
for okr_idx, okr_data in enumerate(okrs_list): | |
# If specific KRs are selected, only show OKRs that have at least one selected KR, | |
# and then format that OKR to only show its selected KRs. | |
# If no KRs are selected, show all OKRs with all their KRs. | |
accepted_indices_for_this_okr = selected_krs_by_okr_idx.get(okr_idx) # This will be a list of KR indices or None | |
if selected_kr_unique_ids: # If there's any selection active | |
if accepted_indices_for_this_okr is not None: # Only format/display if this OKR has selected KRs | |
formatted_okr_md = format_single_okr_for_display( | |
okr_data, | |
accepted_kr_indices=accepted_indices_for_this_okr, | |
okr_main_index=okr_idx | |
) | |
output_md_parts.append(formatted_okr_md) | |
else: # No KRs selected, display all OKRs fully | |
formatted_okr_md = format_single_okr_for_display( | |
okr_data, | |
accepted_kr_indices=None, # None means show all KRs for this OKR | |
okr_main_index=okr_idx | |
) | |
output_md_parts.append(formatted_okr_md) | |
if not output_md_parts and selected_kr_unique_ids: | |
final_md = "Nessun OKR corrisponde alla selezione corrente o i KR selezionati non hanno task dettagliati." | |
elif not output_md_parts and not selected_kr_unique_ids: # Should mean okrs_list was empty | |
final_md = "Nessun OKR generato." | |
else: | |
final_md = "\n\n---\n\n".join(output_md_parts) | |
return gr.update(value=final_md) | |
def setup_event_handlers(self): | |
"""Sets up event handlers for the agentic OKRs tab.""" | |
if not self.agentic_modules_really_loaded: | |
logging.warning("Agentic modules not loaded. Skipping agentic event handler setup.") | |
return | |
if self.okrs_components.get("key_results_cbg"): | |
self.okrs_components['key_results_cbg'].change( | |
fn=self.update_okr_display_on_kr_selection, | |
inputs=[ | |
self.okrs_components['key_results_cbg'], # Current value of checkbox group | |
self.orchestration_raw_results_st, # State | |
self.key_results_for_selection_st # State | |
], | |
outputs=[self.okrs_components['okr_detail_display_md']], | |
api_name="update_okr_display_on_kr_selection" | |
) | |
logging.info("Agentic OKR selection handler setup complete.") | |
else: | |
logging.warning("key_results_cbg component not found for agentic OKR handler setup.") | |