Spaces:
Sleeping
Sleeping
# services/report_data_handler.py | |
import pandas as pd | |
import logging | |
from apis.Bubble_API_Calls import fetch_linkedin_posts_data_from_bubble, bulk_upload_to_bubble | |
from config import ( | |
BUBBLE_REPORT_TABLE_NAME, | |
BUBBLE_OKR_TABLE_NAME, | |
BUBBLE_KEY_RESULTS_TABLE_NAME, | |
BUBBLE_TASKS_TABLE_NAME, | |
BUBBLE_KR_UPDATE_TABLE_NAME, | |
) | |
import json # For handling JSON data | |
logger = logging.getLogger(__name__) | |
def fetch_latest_agentic_analysis(org_urn: str): | |
""" | |
Fetches all agentic analysis data for a given org_urn from Bubble. | |
Returns the full dataframe and any error, or None, None. | |
""" | |
if not org_urn: | |
logger.warning("fetch_latest_agentic_analysis: org_urn is missing.") | |
return None, None | |
report_data_df, error = fetch_linkedin_posts_data_from_bubble( | |
data_type=BUBBLE_REPORT_TABLE_NAME, | |
org_urn=org_urn | |
) | |
if error or report_data_df is None or report_data_df.empty: | |
logger.info(f"No existing agentic analysis found in Bubble for org_urn {org_urn} or error: {error}") | |
return None, None | |
logger.info(f"Agentic analysis data fetched for org_urn {org_urn}") | |
return report_data_df, None # Return full dataframe and no error | |
def save_report_results( | |
org_urn: str, | |
report_markdown: str, | |
quarter, | |
year, | |
report_type: str, | |
): | |
"""Saves the agentic pipeline results to Bubble.""" | |
if not org_urn: | |
logger.error("Cannot save agentic results: org_urn missing.") | |
return False | |
payload = { | |
"organization_urn": org_urn, | |
"report_text": report_markdown if report_markdown else "N/A", | |
"quarter": quarter, | |
"year": year, | |
"report_type": report_type, | |
} | |
logger.info(f"Attempting to save agentic analysis to Bubble for org_urn: {org_urn}") | |
success_ids = bulk_upload_to_bubble([payload], BUBBLE_REPORT_TABLE_NAME) | |
if success_ids: # bulk_upload_to_bubble returns list of IDs or False | |
logger.info(f"Successfully saved agentic analysis to Bubble. Record ID: {success_ids}") | |
return success_ids['id'] | |
else: | |
logger.error(f"Failed to save agentic analysis to Bubble. {success_ids}") | |
return False | |
# --- Data Saving Functions --- | |
def save_objectives( | |
org_urn: str, | |
report_id | |
objectives_data: List[Dict[str, Any]] | |
) -> Optional[List[str]]: | |
""" | |
Saves Objective records to Bubble. | |
Args: | |
org_urn: The URN of the organization to associate these objectives with. | |
objectives_data: A list of objective dictionaries from the main data structure. | |
Returns: | |
A list of the newly created Bubble record IDs for the objectives, or None on failure. | |
""" | |
if not objectives_data: | |
logger.info("No objectives to save.") | |
return [] | |
payloads = [] | |
for objective in objectives_data: | |
payloads.append({ | |
#"organization_urn": org_urn, | |
"objective_description": objective.get("objective_description"), | |
"objective_timeline": objective.get("objective_timeline"), | |
"objective_owner": objective.get("objective_owner"), | |
"report": report_id | |
}) | |
logger.info(f"Attempting to save {len(payloads)} objectives for org_urn: {org_urn}") | |
objective_ids = bulk_upload_to_bubble(payloads, BUBBLE_OBJECTIVES_TABLE_NAME) | |
if objective_ids is None: | |
logger.error("Failed to save objectives to Bubble.") | |
return None | |
logger.info(f"Successfully saved {len(objective_ids)} objectives.") | |
return objective_ids | |
def save_key_results( | |
org_urn: str, | |
objectives_with_ids: List[Tuple[Dict[str, Any], str]] | |
) -> Optional[List[Tuple[Dict[str, Any], str]]]: | |
""" | |
Saves Key Result records to Bubble, linking them to their parent objectives. | |
Args: | |
org_urn: The URN of the organization. | |
objectives_with_ids: A list of tuples, where each tuple contains an | |
original objective dictionary and its new Bubble ID. | |
Example: [(objective_dict, 'bubble_id_123'), ...] | |
Returns: | |
A list of tuples containing the original key result data and its new Bubble ID, | |
or None on failure. This is needed to save the tasks in the next step. | |
""" | |
key_result_payloads = [] | |
# This list preserves the original KR data in the correct order to match the returned IDs | |
key_results_to_process = [] | |
for objective_data, parent_objective_id in objectives_with_ids: | |
for kr in objective_data.get("key_results", []): | |
key_results_to_process.append(kr) | |
key_result_payloads.append({ | |
#"organization_urn": org_urn, | |
"okr": parent_objective_id, # Link to parent | |
"description": kr.get("key_result_description"), | |
"target_metric": kr.get("target_metric"), | |
"target_value": kr.get("target_value"), | |
"kr_type": kr.get("key_result_type"), | |
"data_subject": kr.get("data_subject"), | |
}) | |
if not key_result_payloads: | |
logger.info("No key results to save.") | |
return [] | |
logger.info(f"Attempting to save {len(key_result_payloads)} key results for org_urn: {org_urn}") | |
key_result_ids = bulk_upload_to_bubble(key_result_payloads, BUBBLE_KEY_RESULTS_TABLE_NAME) | |
if key_result_ids is None: | |
logger.error("Failed to save key results to Bubble.") | |
return None | |
logger.info(f"Successfully saved {len(key_result_ids)} key results.") | |
# Combine the original KR data with their new IDs | |
return list(zip(key_results_to_process, key_result_ids)) | |
def save_tasks( | |
org_urn: str, | |
key_results_with_ids: List[Tuple[Dict[str, Any], str]] | |
) -> Optional[List[str]]: | |
""" | |
Saves Task records to Bubble, linking them to their parent key results. | |
Args: | |
org_urn: The URN of the organization. | |
key_results_with_ids: A list of tuples from the save_key_results function. | |
Example: [(key_result_dict, 'bubble_id_456'), ...] | |
Returns: | |
A list of the newly created Bubble record IDs for the tasks, or None on failure. | |
""" | |
task_payloads = [] | |
for key_result_data, parent_key_result_id in key_results_with_ids: | |
for task in key_result_data.get("tasks", []): | |
task_payloads.append({ | |
#"organization_urn": org_urn, | |
"key_result": parent_key_result_id, # Link to parent | |
"description": task.get("task_description"), | |
"objective_deliverable": task.get("objective_deliverable"), | |
"category": task.get("task_category"), | |
#"task_type": task.get("task_type"), | |
"priority": task.get("priority"), | |
"priority_justification": task.get("priority_justification"), | |
"effort": task.get("effort"), | |
"timeline": task.get("timeline"), | |
#"data_subject": task.get("data_subject"), | |
"responsible_party": task.get("responsible_party"), | |
"success_criteria_metrics": task.get("success_criteria_metrics"), | |
"dependencies": task.get("dependencies_prerequisites"), | |
"why": task.get("why_proposed"), | |
}) | |
if not task_payloads: | |
logger.info("No tasks to save.") | |
return [] | |
logger.info(f"Attempting to save {len(task_payloads)} tasks for org_urn: {org_urn}") | |
task_ids = bulk_upload_to_bubble(task_payloads, BUBBLE_TASKS_TABLE_NAME) | |
if task_ids is None: | |
logger.error("Failed to save tasks to Bubble.") | |
return None | |
logger.info(f"Successfully saved {len(task_ids)} tasks.") | |
return task_ids | |
# --- Orchestrator Function --- | |
def save_actionable_okrs(org_urn: str, actionable_okrs: Dict[str, Any], report_id): | |
""" | |
Orchestrates the sequential saving of objectives, key results, and tasks. | |
This function shows how to correctly call the individual save functions | |
in the right order, passing the IDs from one step to the next. | |
""" | |
logger.info(f"--- Starting OKR save process for org_urn: {org_urn} ---") | |
objectives_data = actionable_okrs.get("okrs", []) | |
# Step 1: Save the top-level objectives | |
objective_ids = save_objectives(org_urn, objectives_data, report_id) | |
if objective_ids is None: | |
logger.error("OKR save process aborted due to failure in saving objectives.") | |
return | |
# Combine the original objective data with their new IDs for the next step | |
objectives_with_ids = list(zip(objectives_data, objective_ids)) | |
# Step 2: Save the key results, linking them to the objectives | |
key_results_with_ids = save_key_results(org_urn, objectives_with_ids) | |
if key_results_with_ids is None: | |
logger.error("OKR save process aborted due to failure in saving key results.") | |
return | |
# Step 3: Save the tasks, linking them to the key results | |
save_tasks(org_urn, key_results_with_ids) | |
logger.info(f"--- OKR save process completed for org_urn: {org_urn} ---") | |