GuglielmoTor commited on
Commit
4cc5e6b
·
verified ·
1 Parent(s): 5a20be2

Update services/report_data_handler.py

Browse files
Files changed (1) hide show
  1. services/report_data_handler.py +197 -165
services/report_data_handler.py CHANGED
@@ -12,101 +12,123 @@ from config import (
12
  import json # For handling JSON data
13
  from typing import List, Dict, Any, Optional, Tuple
14
 
 
 
 
15
  logger = logging.getLogger(__name__)
16
 
17
- def fetch_latest_agentic_analysis(org_urn: str):
18
  """
19
  Fetches all agentic analysis data for a given org_urn from Bubble.
20
- Returns the full dataframe and any error, or None, None.
21
  """
 
22
  if not org_urn:
23
  logger.warning("fetch_latest_agentic_analysis: org_urn is missing.")
24
- return None, None
25
-
26
- report_data_df, error = fetch_linkedin_posts_data_from_bubble(
27
- data_type=BUBBLE_REPORT_TABLE_NAME,
28
- org_urn=org_urn
29
- )
30
-
31
- if error or report_data_df is None or report_data_df.empty:
32
- logger.info(f"No existing agentic analysis found in Bubble for org_urn {org_urn} or error: {error}")
33
- return None, None
34
-
35
- logger.info(f"Agentic analysis data fetched for org_urn {org_urn}")
36
- return report_data_df, None # Return full dataframe and no error
 
 
 
 
 
 
 
 
 
37
 
38
 
39
  def save_report_results(
40
  org_urn: str,
41
  report_markdown: str,
42
- quarter,
43
- year,
44
  report_type: str,
45
- ):
46
- """Saves the agentic pipeline results to Bubble."""
 
47
  if not org_urn:
48
- logger.error("Cannot save agentic results: org_urn missing.")
49
- return False
50
-
51
- payload = {
52
- "organization_urn": org_urn,
53
- "report_text": report_markdown if report_markdown else "N/A",
54
- "quarter": quarter,
55
- "year": year,
56
- "report_type": report_type,
57
- }
58
- logger.info(f"Attempting to save agentic analysis to Bubble for org_urn: {org_urn}")
59
- success_ids = bulk_upload_to_bubble([payload], BUBBLE_REPORT_TABLE_NAME)
60
-
61
- if success_ids: # bulk_upload_to_bubble returns list of IDs or False
62
- logger.info(f"Successfully saved agentic analysis to Bubble. Record ID: {success_ids}")
63
- return success_ids['id']
64
- else:
65
- logger.error(f"Failed to save agentic analysis to Bubble. {success_ids}")
66
- return False
 
 
 
 
 
 
 
67
 
68
 
69
  # --- Data Saving Functions ---
70
 
71
  def save_objectives(
72
  org_urn: str,
73
- report_id,
74
  objectives_data: List[Dict[str, Any]]
75
  ) -> Optional[List[str]]:
76
  """
77
  Saves Objective records to Bubble.
78
-
79
- Args:
80
- org_urn: The URN of the organization to associate these objectives with.
81
- objectives_data: A list of objective dictionaries from the main data structure.
82
-
83
- Returns:
84
- A list of the newly created Bubble record IDs for the objectives, or None on failure.
85
  """
 
86
  if not objectives_data:
87
  logger.info("No objectives to save.")
88
  return []
89
 
90
- payloads = []
91
- for objective in objectives_data:
92
- payloads.append({
93
- #"organization_urn": org_urn,
94
- "objective_description": objective.get("objective_description"),
95
- "objective_timeline": objective.get("objective_timeline"),
96
- "objective_owner": objective.get("objective_owner"),
97
- "report": report_id
98
- })
99
-
100
- logger.info(f"Attempting to save {len(payloads)} objectives for org_urn: {org_urn}")
101
- objective_ids = bulk_upload_to_bubble(payloads, BUBBLE_OBJECTIVES_TABLE_NAME)
102
-
103
- if objective_ids is None:
104
- logger.error("Failed to save objectives to Bubble.")
 
 
 
 
 
 
 
 
 
105
  return None
106
 
107
- logger.info(f"Successfully saved {len(objective_ids)} objectives.")
108
- return objective_ids
109
-
110
 
111
  def save_key_results(
112
  org_urn: str,
@@ -114,49 +136,48 @@ def save_key_results(
114
  ) -> Optional[List[Tuple[Dict[str, Any], str]]]:
115
  """
116
  Saves Key Result records to Bubble, linking them to their parent objectives.
117
-
118
- Args:
119
- org_urn: The URN of the organization.
120
- objectives_with_ids: A list of tuples, where each tuple contains an
121
- original objective dictionary and its new Bubble ID.
122
- Example: [(objective_dict, 'bubble_id_123'), ...]
123
-
124
- Returns:
125
- A list of tuples containing the original key result data and its new Bubble ID,
126
- or None on failure. This is needed to save the tasks in the next step.
127
  """
 
128
  key_result_payloads = []
129
  # This list preserves the original KR data in the correct order to match the returned IDs
130
  key_results_to_process = []
131
-
132
- for objective_data, parent_objective_id in objectives_with_ids:
133
- for kr in objective_data.get("key_results", []):
134
- key_results_to_process.append(kr)
135
- key_result_payloads.append({
136
- #"organization_urn": org_urn,
137
- "okr": parent_objective_id, # Link to parent
138
- "description": kr.get("key_result_description"),
139
- "target_metric": kr.get("target_metric"),
140
- "target_value": kr.get("target_value"),
141
- "kr_type": kr.get("key_result_type"),
142
- "data_subject": kr.get("data_subject"),
143
- })
144
-
145
- if not key_result_payloads:
146
- logger.info("No key results to save.")
147
  return []
148
 
149
- logger.info(f"Attempting to save {len(key_result_payloads)} key results for org_urn: {org_urn}")
150
- key_result_ids = bulk_upload_to_bubble(key_result_payloads, BUBBLE_KEY_RESULTS_TABLE_NAME)
151
-
152
- if key_result_ids is None:
153
- logger.error("Failed to save key results to Bubble.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
154
  return None
155
 
156
- logger.info(f"Successfully saved {len(key_result_ids)} key results.")
157
- # Combine the original KR data with their new IDs
158
- return list(zip(key_results_to_process, key_result_ids))
159
-
160
 
161
  def save_tasks(
162
  org_urn: str,
@@ -164,80 +185,91 @@ def save_tasks(
164
  ) -> Optional[List[str]]:
165
  """
166
  Saves Task records to Bubble, linking them to their parent key results.
167
-
168
- Args:
169
- org_urn: The URN of the organization.
170
- key_results_with_ids: A list of tuples from the save_key_results function.
171
- Example: [(key_result_dict, 'bubble_id_456'), ...]
172
-
173
- Returns:
174
- A list of the newly created Bubble record IDs for the tasks, or None on failure.
175
  """
176
- task_payloads = []
177
- for key_result_data, parent_key_result_id in key_results_with_ids:
178
- for task in key_result_data.get("tasks", []):
179
- task_payloads.append({
180
- #"organization_urn": org_urn,
181
- "key_result": parent_key_result_id, # Link to parent
182
- "description": task.get("task_description"),
183
- "objective_deliverable": task.get("objective_deliverable"),
184
- "category": task.get("task_category"),
185
- #"task_type": task.get("task_type"),
186
- "priority": task.get("priority"),
187
- "priority_justification": task.get("priority_justification"),
188
- "effort": task.get("effort"),
189
- "timeline": task.get("timeline"),
190
- #"data_subject": task.get("data_subject"),
191
- "responsible_party": task.get("responsible_party"),
192
- "success_criteria_metrics": task.get("success_criteria_metrics"),
193
- "dependencies": task.get("dependencies_prerequisites"),
194
- "why": task.get("why_proposed"),
195
- })
196
-
197
- if not task_payloads:
198
- logger.info("No tasks to save.")
199
  return []
200
-
201
- logger.info(f"Attempting to save {len(task_payloads)} tasks for org_urn: {org_urn}")
202
- task_ids = bulk_upload_to_bubble(task_payloads, BUBBLE_TASKS_TABLE_NAME)
203
-
204
- if task_ids is None:
205
- logger.error("Failed to save tasks to Bubble.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
206
  return None
207
 
208
- logger.info(f"Successfully saved {len(task_ids)} tasks.")
209
- return task_ids
210
-
211
 
212
  # --- Orchestrator Function ---
213
 
214
- def save_actionable_okrs(org_urn: str, actionable_okrs: Dict[str, Any], report_id):
215
  """
216
  Orchestrates the sequential saving of objectives, key results, and tasks.
217
-
218
- This function shows how to correctly call the individual save functions
219
- in the right order, passing the IDs from one step to the next.
220
  """
221
- logger.info(f"--- Starting OKR save process for org_urn: {org_urn} ---")
222
- objectives_data = actionable_okrs.get("okrs", [])
223
-
224
- # Step 1: Save the top-level objectives
225
- objective_ids = save_objectives(org_urn, objectives_data, report_id)
226
- if objective_ids is None:
227
- logger.error("OKR save process aborted due to failure in saving objectives.")
228
- return
229
-
230
- # Combine the original objective data with their new IDs for the next step
231
- objectives_with_ids = list(zip(objectives_data, objective_ids))
232
-
233
- # Step 2: Save the key results, linking them to the objectives
234
- key_results_with_ids = save_key_results(org_urn, objectives_with_ids)
235
- if key_results_with_ids is None:
236
- logger.error("OKR save process aborted due to failure in saving key results.")
237
- return
238
-
239
- # Step 3: Save the tasks, linking them to the key results
240
- save_tasks(org_urn, key_results_with_ids)
241
-
242
- logger.info(f"--- OKR save process completed for org_urn: {org_urn} ---")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
243
 
 
12
  import json # For handling JSON data
13
  from typing import List, Dict, Any, Optional, Tuple
14
 
15
+ # It's good practice to configure the logger at the application entry point,
16
+ # but setting a default handler here prevents "No handler found" warnings.
17
+ logging.basicConfig(level=logging.INFO)
18
  logger = logging.getLogger(__name__)
19
 
20
+ def fetch_latest_agentic_analysis(org_urn: str) -> Tuple[Optional[pd.DataFrame], Optional[str]]:
21
  """
22
  Fetches all agentic analysis data for a given org_urn from Bubble.
23
+ Returns the full dataframe and any error message, or None, None.
24
  """
25
+ logger.info(f"Starting fetch_latest_agentic_analysis for org_urn: {org_urn}")
26
  if not org_urn:
27
  logger.warning("fetch_latest_agentic_analysis: org_urn is missing.")
28
+ return None, "org_urn is missing."
29
+
30
+ try:
31
+ report_data_df, error = fetch_linkedin_posts_data_from_bubble(
32
+ data_type=BUBBLE_REPORT_TABLE_NAME,
33
+ org_urn=org_urn
34
+ )
35
+
36
+ if error:
37
+ logger.error(f"Error fetching data from Bubble for org_urn {org_urn}: {error}")
38
+ return None, str(error)
39
+
40
+ if report_data_df is None or report_data_df.empty:
41
+ logger.info(f"No existing agentic analysis found in Bubble for org_urn {org_urn}.")
42
+ return None, None
43
+
44
+ logger.info(f"Successfully fetched {len(report_data_df)} records for org_urn {org_urn}")
45
+ return report_data_df, None # Return full dataframe and no error
46
+
47
+ except Exception as e:
48
+ logger.exception(f"An unexpected error occurred in fetch_latest_agentic_analysis for org_urn {org_urn}: {e}")
49
+ return None, str(e)
50
 
51
 
52
  def save_report_results(
53
  org_urn: str,
54
  report_markdown: str,
55
+ quarter: int,
56
+ year: int,
57
  report_type: str,
58
+ ) -> Optional[str]:
59
+ """Saves the agentic pipeline results to Bubble. Returns the new record ID or None."""
60
+ logger.info(f"Starting save_report_results for org_urn: {org_urn}")
61
  if not org_urn:
62
+ logger.error("Cannot save agentic results: org_urn is missing.")
63
+ return None
64
+
65
+ try:
66
+ payload = {
67
+ "organization_urn": org_urn,
68
+ "report_text": report_markdown if report_markdown else "N/A",
69
+ "quarter": quarter,
70
+ "year": year,
71
+ "report_type": report_type,
72
+ }
73
+ logger.info(f"Attempting to save agentic analysis to Bubble for org_urn: {org_urn}")
74
+ response = bulk_upload_to_bubble([payload], BUBBLE_REPORT_TABLE_NAME)
75
+
76
+ # Assuming bulk_upload_to_bubble returns a list of IDs on success and None or False on failure
77
+ if response and isinstance(response, list) and len(response) > 0:
78
+ record_id = response[0]
79
+ logger.info(f"Successfully saved agentic analysis to Bubble. Record ID: {record_id}")
80
+ return record_id
81
+ else:
82
+ logger.error(f"Failed to save agentic analysis to Bubble. Response: {response}")
83
+ return None
84
+
85
+ except Exception as e:
86
+ logger.exception(f"An unexpected error occurred in save_report_results for org_urn {org_urn}: {e}")
87
+ return None
88
 
89
 
90
  # --- Data Saving Functions ---
91
 
92
  def save_objectives(
93
  org_urn: str,
94
+ report_id: str,
95
  objectives_data: List[Dict[str, Any]]
96
  ) -> Optional[List[str]]:
97
  """
98
  Saves Objective records to Bubble.
99
+ Returns a list of the newly created Bubble record IDs for the objectives, or None on failure.
 
 
 
 
 
 
100
  """
101
+ logger.info(f"Starting save_objectives for report_id: {report_id}")
102
  if not objectives_data:
103
  logger.info("No objectives to save.")
104
  return []
105
 
106
+ try:
107
+ payloads = [
108
+ {
109
+ "objective_description": obj.get("objective_description"),
110
+ "objective_timeline": obj.get("objective_timeline"),
111
+ "objective_owner": obj.get("objective_owner"),
112
+ "report": report_id,
113
+ # "organization_urn": org_urn # Assuming 'report' links to the org
114
+ }
115
+ for obj in objectives_data
116
+ ]
117
+
118
+ logger.info(f"Attempting to save {len(payloads)} objectives for report_id: {report_id}")
119
+ objective_ids = bulk_upload_to_bubble(payloads, BUBBLE_OKR_TABLE_NAME) # Corrected table name
120
+
121
+ if objective_ids is None:
122
+ logger.error(f"Failed to save objectives to Bubble for report_id: {report_id}. The upload function returned None.")
123
+ return None
124
+
125
+ logger.info(f"Successfully saved {len(objective_ids)} objectives.")
126
+ return objective_ids
127
+
128
+ except Exception as e:
129
+ logger.exception(f"An unexpected error occurred in save_objectives for report_id {report_id}: {e}")
130
  return None
131
 
 
 
 
132
 
133
  def save_key_results(
134
  org_urn: str,
 
136
  ) -> Optional[List[Tuple[Dict[str, Any], str]]]:
137
  """
138
  Saves Key Result records to Bubble, linking them to their parent objectives.
139
+ Returns a list of tuples containing the original key result data and its new Bubble ID, or None on failure.
 
 
 
 
 
 
 
 
 
140
  """
141
+ logger.info(f"Starting save_key_results for {len(objectives_with_ids)} objectives.")
142
  key_result_payloads = []
143
  # This list preserves the original KR data in the correct order to match the returned IDs
144
  key_results_to_process = []
145
+
146
+ if not objectives_with_ids:
147
+ logger.info("No objectives provided to save_key_results.")
 
 
 
 
 
 
 
 
 
 
 
 
 
148
  return []
149
 
150
+ try:
151
+ for objective_data, parent_objective_id in objectives_with_ids:
152
+ for kr in objective_data.get("key_results", []):
153
+ key_results_to_process.append(kr)
154
+ key_result_payloads.append({
155
+ "okr": parent_objective_id,
156
+ "description": kr.get("key_result_description"),
157
+ "target_metric": kr.get("target_metric"),
158
+ "target_value": kr.get("target_value"),
159
+ "kr_type": kr.get("key_result_type"),
160
+ "data_subject": kr.get("data_subject"),
161
+ })
162
+
163
+ if not key_result_payloads:
164
+ logger.info("No key results to save.")
165
+ return []
166
+
167
+ logger.info(f"Attempting to save {len(key_result_payloads)} key results for org_urn: {org_urn}")
168
+ key_result_ids = bulk_upload_to_bubble(key_result_payloads, BUBBLE_KEY_RESULTS_TABLE_NAME)
169
+
170
+ if key_result_ids is None:
171
+ logger.error(f"Failed to save key results to Bubble for org_urn: {org_urn}.")
172
+ return None
173
+
174
+ logger.info(f"Successfully saved {len(key_result_ids)} key results.")
175
+ return list(zip(key_results_to_process, key_result_ids))
176
+
177
+ except Exception as e:
178
+ logger.exception(f"An unexpected error occurred in save_key_results for org_urn {org_urn}: {e}")
179
  return None
180
 
 
 
 
 
181
 
182
  def save_tasks(
183
  org_urn: str,
 
185
  ) -> Optional[List[str]]:
186
  """
187
  Saves Task records to Bubble, linking them to their parent key results.
188
+ Returns a list of the newly created Bubble record IDs for the tasks, or None on failure.
 
 
 
 
 
 
 
189
  """
190
+ logger.info(f"Starting save_tasks for {len(key_results_with_ids)} key results.")
191
+ if not key_results_with_ids:
192
+ logger.info("No key results provided to save_tasks.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
193
  return []
194
+
195
+ try:
196
+ task_payloads = []
197
+ for key_result_data, parent_key_result_id in key_results_with_ids:
198
+ for task in key_result_data.get("tasks", []):
199
+ task_payloads.append({
200
+ "key_result": parent_key_result_id,
201
+ "description": task.get("task_description"),
202
+ "objective_deliverable": task.get("objective_deliverable"),
203
+ "category": task.get("task_category"),
204
+ "priority": task.get("priority"),
205
+ "priority_justification": task.get("priority_justification"),
206
+ "effort": task.get("effort"),
207
+ "timeline": task.get("timeline"),
208
+ "responsible_party": task.get("responsible_party"),
209
+ "success_criteria_metrics": task.get("success_criteria_metrics"),
210
+ "dependencies": task.get("dependencies_prerequisites"),
211
+ "why": task.get("why_proposed"),
212
+ })
213
+
214
+ if not task_payloads:
215
+ logger.info("No tasks to save.")
216
+ return []
217
+
218
+ logger.info(f"Attempting to save {len(task_payloads)} tasks for org_urn: {org_urn}")
219
+ task_ids = bulk_upload_to_bubble(task_payloads, BUBBLE_TASKS_TABLE_NAME)
220
+
221
+ if task_ids is None:
222
+ logger.error(f"Failed to save tasks to Bubble for org_urn: {org_urn}.")
223
+ return None
224
+
225
+ logger.info(f"Successfully saved {len(task_ids)} tasks.")
226
+ return task_ids
227
+
228
+ except Exception as e:
229
+ logger.exception(f"An unexpected error occurred in save_tasks for org_urn {org_urn}: {e}")
230
  return None
231
 
 
 
 
232
 
233
  # --- Orchestrator Function ---
234
 
235
+ def save_actionable_okrs(org_urn: str, actionable_okrs: Dict[str, Any], report_id: str):
236
  """
237
  Orchestrates the sequential saving of objectives, key results, and tasks.
 
 
 
238
  """
239
+ logger.info(f"--- Starting OKR save process for org_urn: {org_urn}, report_id: {report_id} ---")
240
+
241
+ try:
242
+ objectives_data = actionable_okrs.get("okrs", [])
243
+ if not objectives_data:
244
+ logger.warning(f"No OKRs found in the input for org_urn: {org_urn}. Aborting save process.")
245
+ return
246
+
247
+ # Step 1: Save the top-level objectives
248
+ # Corrected the argument order from your original code.
249
+ objective_ids = save_objectives(org_urn, report_id, objectives_data)
250
+ if objective_ids is None:
251
+ logger.error("OKR save process aborted due to failure in saving objectives.")
252
+ return
253
+
254
+ # Combine the original objective data with their new IDs for the next step
255
+ objectives_with_ids = list(zip(objectives_data, objective_ids))
256
+
257
+ # Step 2: Save the key results, linking them to the objectives
258
+ key_results_with_ids = save_key_results(org_urn, objectives_with_ids)
259
+ if key_results_with_ids is None:
260
+ logger.error("OKR save process aborted due to failure in saving key results.")
261
+ return
262
+
263
+ # Step 3: Save the tasks, linking them to the key results
264
+ task_ids = save_tasks(org_urn, key_results_with_ids)
265
+ if task_ids is None:
266
+ logger.error("Task saving failed, but objectives and key results were saved.")
267
+ # Decide if you want to consider the whole process a failure.
268
+ # For now, we just log the error and complete.
269
+ return
270
+
271
+ logger.info(f"--- OKR save process completed successfully for org_urn: {org_urn} ---")
272
+
273
+ except Exception as e:
274
+ logger.exception(f"An unhandled exception occurred during the save_actionable_okrs orchestration for org_urn {org_urn}: {e}")
275