LinkedinMonitor / ui /agentic_module.py
GuglielmoTor's picture
Create agentic_module.py
1cbff0c verified
raw
history blame
12.2 kB
# ui/agentic_module.py
import gradio as gr
import logging
from collections import defaultdict
# --- Module Imports ---
try:
from run_agentic_pipeline import run_full_analytics_orchestration
from ui.insights_ui_generator import (
format_report_to_markdown,
extract_key_results_for_selection,
format_single_okr_for_display
)
AGENTIC_MODULES_LOADED = True
except ImportError as e:
logging.error(f"Could not import agentic pipeline modules in agentic_module.py: {e}.")
AGENTIC_MODULES_LOADED = False
async def run_full_analytics_orchestration(*args, **kwargs): return None
def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable."
def extract_key_results_for_selection(okrs_dict): return []
def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable."
logger = logging.getLogger(__name__)
# Store references to UI components that handlers need to update
_agentic_report_display_md = None
_key_results_cbg = None
_okr_detail_display_md = None
_agentic_pipeline_status_md = None
def handle_update_okr_display(selected_kr_unique_ids: list, raw_orchestration_results: dict, all_krs_for_selection: list):
if not raw_orchestration_results or not AGENTIC_MODULES_LOADED:
return gr.update(value="Nessun dato dalla pipeline AI o moduli non caricati.")
actionable_okrs_dict = raw_orchestration_results.get("actionable_okrs_and_tasks")
if not actionable_okrs_dict or not isinstance(actionable_okrs_dict.get("okrs"), list):
return gr.update(value="Nessun OKR trovato nei risultati della pipeline.")
okrs_list = actionable_okrs_dict["okrs"]
# Rebuild kr_id_to_indices based on the structure of all_krs_for_selection
# all_krs_for_selection is: [{'okr_index': int, 'kr_index': int, 'unique_kr_id': str, ...}]
kr_id_to_indices = {kr_info['unique_kr_id']: (kr_info['okr_index'], kr_info['kr_index'])
for kr_info in all_krs_for_selection if isinstance(kr_info, dict) and 'unique_kr_id' in kr_info}
selected_krs_by_okr_idx = defaultdict(list)
if selected_kr_unique_ids:
for kr_unique_id in selected_kr_unique_ids:
if kr_unique_id in kr_id_to_indices:
okr_idx, kr_idx = kr_id_to_indices[kr_unique_id]
selected_krs_by_okr_idx[okr_idx].append(kr_idx)
else:
logger.warning(f"Selected KR ID '{kr_unique_id}' not found in kr_id_to_indices map.")
output_md_parts = []
if not okrs_list:
output_md_parts.append("Nessun OKR generato.")
else:
for okr_idx, okr_data in enumerate(okrs_list):
accepted_indices_for_this_okr = selected_krs_by_okr_idx.get(okr_idx)
# If specific KRs are selected, only show OKRs that have at least one of those selected KRs
if selected_kr_unique_ids: # User has made a selection
if accepted_indices_for_this_okr is not None: # This OKR has some selected KRs
output_md_parts.append(format_single_okr_for_display(okr_data, accepted_kr_indices=accepted_indices_for_this_okr, okr_main_index=okr_idx))
else: # No KRs selected, show all OKRs with all their KRs
output_md_parts.append(format_single_okr_for_display(okr_data, accepted_kr_indices=None, okr_main_index=okr_idx))
if not output_md_parts and selected_kr_unique_ids:
final_md = "Nessun OKR corrisponde alla selezione corrente o i KR selezionati non hanno task dettagliati."
elif not output_md_parts and not selected_kr_unique_ids and okrs_list : # OKRs exist but somehow didn't format
final_md = "Nessun OKR da visualizzare in base alla selezione (o tutti OKR visualizzati)."
elif not output_md_parts and not okrs_list:
final_md = "Nessun OKR generato."
else:
final_md = "\n\n---\n\n".join(output_md_parts)
return gr.update(value=final_md)
async def handle_run_agentic_pipeline(current_token_state_val, orchestration_raw_results_st_val, key_results_for_selection_st_val, selected_key_result_ids_st_val):
logger.info(f"Agentic pipeline check triggered. Current token: {'Set' if current_token_state_val.get('token') else 'Not Set'}")
if not current_token_state_val or not current_token_state_val.get("token"):
logger.info("Agentic pipeline: Token not available in token_state. Skipping.")
yield (
gr.update(value="Pipeline AI: In attesa dei dati necessari..."), # report_display
gr.update(choices=[], value=[], interactive=False), # key_results_cbg
gr.update(value="Pipeline AI: In attesa dei dati necessari..."), # okr_detail_display
None, # orchestration_raw_results_st
[], # selected_key_result_ids_st
[], # key_results_for_selection_st
"Pipeline AI: In attesa dei dati..." # agentic_pipeline_status_md
)
return
logger.info("Agentic pipeline starting autonomously with 'Sempre' filter.")
yield (
gr.update(value="Analisi AI (Sempre) in corso..."),
gr.update(choices=[], value=[], interactive=False),
gr.update(value="Dettagli OKR (Sempre) in corso di generazione..."),
orchestration_raw_results_st_val, # Preserve existing results
selected_key_result_ids_st_val,
key_results_for_selection_st_val,
"Esecuzione pipeline AI (Sempre)..."
)
if not AGENTIC_MODULES_LOADED:
logger.warning("Agentic modules not loaded. Skipping autonomous pipeline.")
yield (
gr.update(value="Moduli AI non caricati. Report non disponibile."),
gr.update(choices=[], value=[], interactive=False),
gr.update(value="Moduli AI non caricati. OKR non disponibili."),
None, [], [], "Pipeline AI: Moduli non caricati."
)
return
try:
date_filter_val_agentic = "Sempre"
custom_start_val_agentic = None
custom_end_val_agentic = None
orchestration_output = await run_full_analytics_orchestration(
current_token_state_val, date_filter_val_agentic,
custom_start_val_agentic, custom_end_val_agentic
)
agentic_status_text = "Pipeline AI (Sempre) completata."
logger.info(f"Autonomous agentic pipeline finished. Output keys: {orchestration_output.keys() if orchestration_output else 'None'}")
if orchestration_output:
orchestration_results_update = orchestration_output
report_str = orchestration_output.get('comprehensive_analysis_report')
agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
actionable_okrs = orchestration_output.get('actionable_okrs_and_tasks')
krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs)
krs_for_selection_update = krs_for_ui_selection_list # This is the list of dicts for the state
kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list if isinstance(kr, dict)]
key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True)
# Default display for OKRs: show all, as if no KR is selected yet.
all_okrs_md_parts = []
if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list):
for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]):
all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx))
if not all_okrs_md_parts:
okr_detail_display_md_update = gr.update(value="Nessun OKR generato o trovato (Sempre).")
else:
okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts))
selected_krs_update = [] # Reset selection
else:
agentic_report_md_update = gr.update(value="Nessun report generato dalla pipeline AI (Sempre).")
key_results_cbg_update = gr.update(choices=[], value=[], interactive=False)
okr_detail_display_md_update = gr.update(value="Nessun OKR generato o errore nella pipeline AI (Sempre).")
orchestration_results_update = None
selected_krs_update = []
krs_for_selection_update = []
yield (agentic_report_md_update, key_results_cbg_update, okr_detail_display_md_update,
orchestration_results_update, selected_krs_update, krs_for_selection_update, agentic_status_text)
except Exception as e:
logger.error(f"Error during autonomous agentic pipeline execution: {e}", exc_info=True)
agentic_status_text = f"Errore pipeline AI (Sempre): {str(e)}"
yield (
gr.update(value=f"Errore generazione report AI (Sempre): {str(e)}"),
gr.update(choices=[], value=[], interactive=False),
gr.update(value=f"Errore generazione OKR AI (Sempre): {str(e)}"),
None, [], [], agentic_status_text
)
def build_and_wire_tabs(orchestration_raw_results_st, key_results_for_selection_st, selected_key_result_ids_st):
"""Builds the UI for Agentic Tabs and wires up internal event handlers."""
global _agentic_report_display_md, _key_results_cbg, _okr_detail_display_md, _agentic_pipeline_status_md
with gr.TabItem("3️⃣ Agentic Analysis Report", id="tab_agentic_report", visible=AGENTIC_MODULES_LOADED):
gr.Markdown("## 🤖 Comprehensive Analysis Report (AI Generated)")
_agentic_pipeline_status_md = gr.Markdown("Stato Pipeline AI (filtro 'Sempre'): In attesa...", visible=True)
gr.Markdown("Questo report è generato da un agente AI con filtro 'Sempre' sui dati disponibili. Rivedi criticamente.")
_agentic_report_display_md = gr.Markdown("La pipeline AI si avvierà automaticamente dopo il caricamento iniziale dei dati o dopo una sincronizzazione.")
if not AGENTIC_MODULES_LOADED:
gr.Markdown("🔴 **Error:** Agentic pipeline modules could not be loaded. This tab is disabled.")
with gr.TabItem("4️⃣ Agentic OKRs & Tasks", id="tab_agentic_okrs", visible=AGENTIC_MODULES_LOADED):
gr.Markdown("## 🎯 AI Generated OKRs and Actionable Tasks (filtro 'Sempre')")
gr.Markdown("Basato sull'analisi AI (filtro 'Sempre'), l'agente ha proposto i seguenti OKR e task. Seleziona i Key Results per dettagli.")
if not AGENTIC_MODULES_LOADED:
gr.Markdown("🔴 **Error:** Agentic pipeline modules could not be loaded. This tab is disabled.")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Suggested Key Results (da analisi 'Sempre')")
_key_results_cbg = gr.CheckboxGroup(label="Select Key Results", choices=[], value=[], interactive=True)
with gr.Column(scale=3):
gr.Markdown("### Detailed OKRs and Tasks for Selected Key Results")
_okr_detail_display_md = gr.Markdown("I dettagli OKR appariranno qui dopo l'esecuzione della pipeline AI.")
if AGENTIC_MODULES_LOADED:
_key_results_cbg.change(
fn=handle_update_okr_display, # This handler now correctly returns gr.update()
inputs=[_key_results_cbg, orchestration_raw_results_st, key_results_for_selection_st],
outputs=[_okr_detail_display_md]
)
# Components to be updated by handle_run_agentic_pipeline
# Order must match the yield tuple in handle_run_agentic_pipeline
agentic_pipeline_outputs_components = [
_agentic_report_display_md,
_key_results_cbg,
_okr_detail_display_md,
# orchestration_raw_results_st, # State
# selected_key_result_ids_st, # State
# key_results_for_selection_st, # State
_agentic_pipeline_status_md
]
return agentic_pipeline_outputs_components