GuglielmoTor commited on
Commit
5f0b7f9
·
verified ·
1 Parent(s): bd2859f

Update run_agentic_pipeline.py

Browse files
Files changed (1) hide show
  1. run_agentic_pipeline.py +82 -276
run_agentic_pipeline.py CHANGED
@@ -1,300 +1,106 @@
1
  # run_agentic_pipeline.py
2
- import asyncio
3
- import os
4
- import json
 
 
5
  import logging
6
- from datetime import datetime
7
- import pandas as pd
8
- from typing import Dict, Any, Optional
9
  import gradio as gr
10
 
11
-
12
- # Assuming this script is at the same level as 'app.py' and 'insight_and_tasks/' is a subfolder
13
- # If 'insight_and_tasks' is not in python path, you might need to adjust sys.path
14
- # For example, if insight_and_tasks is a sibling of the dir containing this file:
15
- # import sys
16
- # script_dir = os.path.dirname(os.path.abspath(__file__))
17
- # project_root = os.path.dirname(script_dir) # Or navigate to the correct root
18
- # sys.path.insert(0, project_root)
19
-
20
- os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
21
- GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY")
22
- os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
23
-
24
- # Imports from your project structure
25
- from features.insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator
26
- # setup_logging might be called in app.py, if not, call it here or ensure it's called once.
27
- # from insight_and_tasks.utils.logging_config import setup_logging
28
- from data_processing.analytics_data_processing import prepare_filtered_analytics_data
29
- # Placeholder for UI generator import - to be created later
30
- # from .insights_ui_generator import format_orchestration_results_for_ui
31
-
32
  try:
33
  from ui.insights_ui_generator import (
34
  format_report_to_markdown,
35
  extract_key_results_for_selection,
36
  format_single_okr_for_display
37
  )
 
38
  AGENTIC_MODULES_LOADED = True
39
  except ImportError as e:
40
- logging.error(f"Could not import agentic pipeline modules: {e}. Tabs 3 and 4 will be disabled.")
41
  AGENTIC_MODULES_LOADED = False
42
- async def run_full_analytics_orchestration(*args, **kwargs): return None # Placeholder
43
- def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable." # Placeholder
44
- def extract_key_results_for_selection(okrs_dict): return [] # Placeholder
45
- def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable." # Placeholder
46
-
47
- from services.report_data_handler import save_report_results, save_actionable_okrs, fetch_and_reconstruct_data_from_bubble
48
 
49
- logger = logging.getLogger(__name__)
50
 
51
-
52
- async def run_full_analytics_orchestration(
53
- token_state: Dict[str, Any],
54
- date_filter_selection: str,
55
- custom_start_date: Optional[datetime],
56
- custom_end_date: Optional[datetime]
57
- ) -> Optional[Dict[str, Any]]:
58
  """
59
- Runs the full analytics pipeline using data from token_state and date filters,
60
- and returns the raw orchestration results.
61
- Args:
62
- token_state: Gradio token_state containing raw data and config.
63
- date_filter_selection: String for date filter type.
64
- custom_start_date: Optional custom start date.
65
- custom_end_date: Optional custom end date.
66
- Returns:
67
- A dictionary containing the results from the analytics orchestrator,
68
- or None if a critical error occurs.
69
  """
70
- if not GOOGLE_API_KEY:
71
- logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.")
72
- return None
73
-
74
- logger.info("Starting full analytics orchestration process...")
75
-
76
- # 1. Prepare and filter data
77
- try:
78
- (
79
- filtered_posts_df,
80
- filtered_mentions_df,
81
- _date_filtered_follower_stats_df, # This might be used if FollowerAgent specifically needs pre-filtered time series
82
- raw_follower_stats_df, # FollowerAgent typically processes raw historical for some metrics
83
- _start_dt, # Filtered start date, for logging or context if needed
84
- _end_dt # Filtered end date
85
- ) = prepare_filtered_analytics_data(
86
- token_state, date_filter_selection, custom_start_date, custom_end_date
87
- )
88
- logger.info(f"Data prepared: Posts({len(filtered_posts_df)}), Mentions({len(filtered_mentions_df)}), FollowerStatsRaw({len(raw_follower_stats_df)})")
89
-
90
- except Exception as e:
91
- logger.error(f"Error during data preparation: {e}", exc_info=True)
92
- return None
93
-
94
- # Check if essential dataframes are empty after filtering, which might make analysis trivial or erroneous
95
- if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty:
96
- logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.")
97
- # Depending on requirements, you might return a specific message or empty results structure.
98
-
99
- # 2. Initialize and run the orchestrator
100
- try:
101
- # You can pass a specific model name or let the orchestrator use its default
102
- llm_model_for_run = "gemini-2.5-flash-preview-05-20" #token_state.get("config_llm_model_override") # Example: if you store this in token_state
103
-
104
- orchestrator = EnhancedLinkedInAnalyticsOrchestrator(
105
- api_key=GOOGLE_API_KEY,
106
- llm_model_name=llm_model_for_run, # Pass None to use orchestrator's default
107
- current_date_for_tasks=datetime.utcnow().date()
108
- )
109
-
110
- logger.info("Orchestrator initialized. Generating full analysis and tasks...")
111
- # The orchestrator expects the primary follower stats DF to be the one it can process for
112
- # time-series ('follower_gains_monthly') and demographics.
113
- # The `raw_follower_stats_df` is usually better for this, as FollowerAgent does its own processing.
114
- orchestration_results = await orchestrator.generate_full_analysis_and_tasks(
115
- follower_stats_df=raw_follower_stats_df, # Pass the full history for followers
116
- post_df=filtered_posts_df,
117
- mentions_df=filtered_mentions_df
118
- )
119
- logger.info("Orchestration process completed.")
120
- return orchestration_results
121
-
122
- except Exception as e:
123
- logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True)
124
- return None
125
-
126
-
127
-
128
- async def run_agentic_pipeline_autonomously(current_token_state_val, orchestration_raw_results_st,selected_key_result_ids_st, key_results_for_selection_st):
129
- logging.info(f"Agentic pipeline check triggered for token_state update. Current token: {'Set' if current_token_state_val.get('token') else 'Not Set'}")
130
- # Initial state before pipeline runs or if skipped
131
- initial_yield = (
132
- gr.update(value="Pipeline AI: In attesa dei dati necessari..."), # agentic_report_display_md
133
- gr.update(choices=[], value=[], interactive=False), # key_results_cbg
134
- gr.update(value="Pipeline AI: In attesa dei dati necessari..."), # okr_detail_display_md
135
- orchestration_raw_results_st, # Preserve current raw results
136
- selected_key_result_ids_st, # Preserve current selection
137
- key_results_for_selection_st, # Preserve current options
138
- "Pipeline AI: In attesa dei dati..." # agentic_pipeline_status_md
139
- )
140
-
141
- if not current_token_state_val or not current_token_state_val.get("token"):
142
- logging.info("Agentic pipeline: Token not available in token_state. Skipping.")
143
- yield initial_yield
144
- return
145
-
146
- logging.info("Agentic pipeline starting autonomously with 'Sempre' filter.")
147
- # Update status to indicate processing
148
- yield (
149
- gr.update(value="Analisi AI (Sempre) in corso..."),
150
- gr.update(choices=[], value=[], interactive=False), # Keep CBG disabled during run
151
- gr.update(value="Dettagli OKR (Sempre) in corso di generazione..."),
152
- orchestration_raw_results_st, # Preserve
153
- selected_key_result_ids_st, # Preserve
154
- key_results_for_selection_st, # Preserve
155
- "Esecuzione pipeline AI (Sempre)..."
156
  )
157
 
158
  if not AGENTIC_MODULES_LOADED:
159
- logging.warning("Agentic modules not loaded. Skipping autonomous pipeline.")
160
- yield (
161
- gr.update(value="Moduli AI non caricati. Report non disponibile."),
162
- gr.update(choices=[], value=[], interactive=False),
163
- gr.update(value="Moduli AI non caricati. OKR non disponibili."),
164
- None, [], [], "Pipeline AI: Moduli non caricati."
165
- )
166
- return
167
-
168
- if not current_token_state_val.get("agentic_pipeline_should_run_now", False):
169
- logging.info("Fetching existing data from Bubble as pipeline run is not required.")
170
-
171
- report_df = current_token_state_val.get('bubble_agentic_analysis_data')
172
-
173
- # Call the new function to get reconstructed data
174
- retrieved_data = fetch_and_reconstruct_data_from_bubble(report_df)
175
-
176
- if not retrieved_data:
177
- logging.warning(f"No data found in Bubble for org_urn {org_urn}. Informing user.")
178
- yield (
179
- gr.update(value="Nessun dato di analisi precedente trovato in Bubble."),
180
- gr.update(choices=[], value=[], interactive=False),
181
- gr.update(value="Eseguire la pipeline per generare un nuovo report."),
182
- None, [], [], "Pipeline AI: Dati non disponibili"
183
- )
184
- return
185
-
186
- # If data is found, format it for the UI
187
- report_str = retrieved_data.get('report_str')
188
- actionable_okrs = retrieved_data.get('actionable_okrs')
189
-
190
- agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
191
-
192
  krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs)
193
  kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list]
194
  key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True)
195
-
196
- all_okrs_md_parts = []
197
- if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list):
198
- for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]):
199
- all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx))
200
-
201
- if not all_okrs_md_parts:
202
- okr_detail_display_md_update = gr.update(value="Nessun OKR trovato per il report più recente.")
203
- else:
204
- okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts))
205
-
206
- # Yield the updates for the Gradio interface
207
- yield (
208
- agentic_report_md_update,
209
- key_results_cbg_update,
210
- okr_detail_display_md_update,
211
- retrieved_data, # Store full retrieved data in state
212
- [], # Reset selected KRs state
213
- krs_for_ui_selection_list, # Update state with list of KR dicts
214
- "Pipeline AI: Dati caricati da Bubble"
215
- )
216
- return
217
-
218
-
219
- try:
220
- # Parameters for 'Sempre' filter for the agentic pipeline
221
- date_filter_val_agentic = "Sempre"
222
- custom_start_val_agentic = None
223
- custom_end_val_agentic = None
224
-
225
- orchestration_output = await run_full_analytics_orchestration(
226
- current_token_state_val,
227
- date_filter_val_agentic,
228
- custom_start_val_agentic,
229
- custom_end_val_agentic
230
- )
231
- agentic_status_text = "Pipeline AI (Sempre) completata."
232
- logging.info(f"Autonomous agentic pipeline finished. Output keys: {orchestration_output.keys() if orchestration_output else 'None'}")
233
-
234
- if orchestration_output:
235
- orchestration_results_update = orchestration_output # Store full results in state
236
- report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.")
237
- agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
238
-
239
- quarter = orchestration_output.get('quarter', "quarter non disponibile")
240
- year = orchestration_output.get('year', "year non disponibile")
241
- org_urn = current_token_state_val.get('org_urn')
242
-
243
- try:
244
- report_id = save_report_results(org_urn=org_urn, report_markdown=report_str, quarter=quarter, year=year, report_type='Quarter')
245
- except Exception as e:
246
- logging.error(f"error saving report {e}")
247
-
248
- actionable_okrs = orchestration_output.get('actionable_okrs_and_tasks') # This is the dict containing 'okrs' list
249
- metrics = orchestration_output.get('detailed_metrics')
250
- try:
251
- save_actionable_okrs(org_urn, actionable_okrs, report_id, metrics)
252
- except Exception as e:
253
- logging.error(f"error saving report {e}")
254
-
255
- krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs) # Expects the dict
256
-
257
- krs_for_selection_update = krs_for_ui_selection_list # Update state with list of KR dicts
258
-
259
- # Choices for CheckboxGroup: list of (label, value) tuples
260
- kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list]
261
- key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True) # Reset selection
262
-
263
- # Display all OKRs by default after pipeline run
264
- all_okrs_md_parts = []
265
- if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list):
266
- for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]):
267
- all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx))
268
-
269
- if not all_okrs_md_parts:
270
- okr_detail_display_md_update = gr.update(value="Nessun OKR generato o trovato (Sempre).")
271
- else:
272
- okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts))
273
-
274
- selected_krs_update = [] # Reset selected KRs state
275
- else:
276
- agentic_report_md_update = gr.update(value="Nessun report generato dalla pipeline AI (Sempre).")
277
- key_results_cbg_update = gr.update(choices=[], value=[], interactive=False)
278
- okr_detail_display_md_update = gr.update(value="Nessun OKR generato o errore nella pipeline AI (Sempre).")
279
- orchestration_results_update = None
280
- selected_krs_update = []
281
- krs_for_selection_update = []
282
 
283
- yield (
284
- agentic_report_md_update,
285
- key_results_cbg_update,
286
- okr_detail_display_md_update,
287
- orchestration_results_update, # state
288
- selected_krs_update, # state
289
- krs_for_selection_update, # state
290
- agentic_status_text
291
- )
292
- except Exception as e:
293
- logging.error(f"Error during autonomous agentic pipeline execution: {e}", exc_info=True)
294
- agentic_status_text = f"Errore pipeline AI (Sempre): {str(e)}"
295
- yield (
296
- gr.update(value=f"Errore generazione report AI (Sempre): {str(e)}"),
297
- gr.update(choices=[], value=[], interactive=False),
298
- gr.update(value=f"Errore generazione OKR AI (Sempre): {str(e)}"),
299
- None, [], [], agentic_status_text # Reset states on error
300
- )
 
1
  # run_agentic_pipeline.py
2
+ """
3
+ This module is responsible for loading and displaying pre-computed AI analysis
4
+ results (reports and OKRs) that have been fetched from Bubble.io. It does not
5
+ perform any new analysis.
6
+ """
7
  import logging
 
 
 
8
  import gradio as gr
9
 
10
+ # UI formatting and data reconstruction functions are still needed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  try:
12
  from ui.insights_ui_generator import (
13
  format_report_to_markdown,
14
  extract_key_results_for_selection,
15
  format_single_okr_for_display
16
  )
17
+ from services.report_data_handler import fetch_and_reconstruct_data_from_bubble
18
  AGENTIC_MODULES_LOADED = True
19
  except ImportError as e:
20
+ logging.error(f"Could not import agentic pipeline display modules: {e}. Tabs 3 and 4 will be disabled.")
21
  AGENTIC_MODULES_LOADED = False
22
+ # Define placeholder functions if imports fail
23
+ def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable."
24
+ def extract_key_results_for_selection(okrs_dict): return []
25
+ def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable."
26
+ def fetch_and_reconstruct_data_from_bubble(df): return None
 
27
 
 
28
 
29
+ def load_and_display_agentic_results(current_token_state, orchestration_raw_results_st, selected_key_result_ids_st, key_results_for_selection_st):
 
 
 
 
 
 
30
  """
31
+ Loads pre-computed agentic analysis and OKR data from the application state
32
+ (which was fetched from Bubble) and formats it for display in the Gradio UI.
 
 
 
 
 
 
 
 
33
  """
34
+ logging.info("Loading and displaying pre-computed agentic results from state.")
35
+
36
+ # A tuple of Gradio updates to return in case of errors or no data
37
+ initial_yield_updates = (
38
+ gr.update(value="Nessun dato di analisi trovato..."), # agentic_report_display_md
39
+ gr.update(choices=[], value=[], interactive=False), # key_results_cbg
40
+ gr.update(value="Nessun OKR trovato..."), # okr_detail_display_md
41
+ None, # orchestration_raw_results_st
42
+ [], # selected_key_result_ids_st
43
+ [], # key_results_for_selection_st
44
+ "Stato: In attesa di dati" # agentic_pipeline_status_md
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
  )
46
 
47
  if not AGENTIC_MODULES_LOADED:
48
+ logging.warning("Agentic display modules not loaded. Cannot display results.")
49
+ error_updates = list(initial_yield_updates)
50
+ error_updates[-1] = "Errore: Moduli AI non caricati."
51
+ return tuple(error_updates)
52
+
53
+ # The raw DataFrame fetched from Bubble's agentic analysis table
54
+ agentic_data_df = current_token_state.get('bubble_agentic_analysis_data')
55
+
56
+ if agentic_data_df is None or agentic_data_df.empty:
57
+ logging.warning("No agentic analysis data found in the application state.")
58
+ return initial_yield_updates
59
+
60
+ # Use the handler to reconstruct the report and OKRs from the DataFrame
61
+ reconstructed_data = fetch_and_reconstruct_data_from_bubble(agentic_data_df)
62
+
63
+ if not reconstructed_data:
64
+ logging.warning("Could not reconstruct agentic data from the fetched DataFrame.")
65
+ error_updates = list(initial_yield_updates)
66
+ error_updates[0] = gr.update(value="I dati di analisi esistenti non sono nel formato corretto.")
67
+ error_updates[2] = gr.update(value="Impossibile visualizzare gli OKR.")
68
+ error_updates[-1] = "Stato: Errore formato dati"
69
+ return tuple(error_updates)
70
+
71
+ # --- Prepare UI updates with the reconstructed data ---
72
+ report_str = reconstructed_data.get('report_str', "Nessun report di analisi trovato nei dati.")
73
+ actionable_okrs = reconstructed_data.get('actionable_okrs') # This is the dict with 'okrs' list
74
+
75
+ # 1. Update Report Tab
76
+ agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
77
+
78
+ # 2. Update OKR Tab components
79
+ if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list):
 
80
  krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs)
81
  kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list]
82
  key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True)
83
+ krs_for_selection_state_update = krs_for_ui_selection_list
84
+
85
+ all_okrs_md_parts = [
86
+ format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx)
87
+ for okr_idx, okr_item in enumerate(actionable_okrs["okrs"])
88
+ ]
89
+ okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts))
90
+ else:
91
+ # Handle case where there are no OKRs in the data
92
+ krs_for_selection_state_update = []
93
+ key_results_cbg_update = gr.update(choices=[], value=[], interactive=False)
94
+ okr_detail_display_md_update = gr.update(value="Nessun OKR trovato nei dati di analisi caricati.")
95
+
96
+ # Return all the final updates for the Gradio interface
97
+ return (
98
+ agentic_report_md_update,
99
+ key_results_cbg_update,
100
+ okr_detail_display_md_update,
101
+ reconstructed_data, # Store the full reconstructed data dict in the state
102
+ [], # Reset the selected KR IDs state
103
+ krs_for_selection_state_update, # Update the state with all available KRs
104
+ "Stato: Dati di analisi caricati correttamente da Bubble" # Final status message
105
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
106