Spaces:
Running
Running
# services/report_data_handler.py | |
""" | |
This module is responsible for fetching pre-computed agentic analysis data | |
(reports, OKRs, etc.) from Bubble.io and reconstructing it into a nested | |
dictionary format that the Gradio UI can easily display. | |
""" | |
import pandas as pd | |
import logging | |
from typing import Dict, Any, Optional, Tuple | |
# This is the only function needed from the Bubble API module for this handler | |
from apis.Bubble_API_Calls import fetch_linkedin_posts_data_from_bubble | |
from config import ( | |
BUBBLE_REPORT_TABLE_NAME, | |
BUBBLE_OKR_TABLE_NAME, | |
BUBBLE_KEY_RESULTS_TABLE_NAME, | |
BUBBLE_TASKS_TABLE_NAME | |
) | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
def fetch_latest_agentic_analysis(org_urn: str) -> Tuple[Optional[pd.DataFrame], Optional[str]]: | |
""" | |
Fetches all agentic analysis report data for a given org_urn from Bubble. | |
This function is called once during the initial data load. | |
""" | |
logger.info(f"Fetching latest agentic analysis data from Bubble for org_urn: {org_urn}") | |
if not org_urn: | |
logger.warning("fetch_latest_agentic_analysis: org_urn is missing.") | |
return None, "org_urn is missing." | |
try: | |
# We fetch all reports and will sort them later if needed, but typically the | |
# external process should manage providing the "latest" or "active" report. | |
report_data_df, error = fetch_linkedin_posts_data_from_bubble( | |
data_type=BUBBLE_REPORT_TABLE_NAME, | |
constraint_value=org_urn, | |
constraint_key='organization_urn', | |
constraint_type='equals' | |
) | |
if error: | |
logger.error(f"Error fetching agentic reports from Bubble for org_urn {org_urn}: {error}") | |
return None, str(error) | |
if report_data_df is None or report_data_df.empty: | |
logger.info(f"No existing agentic analysis found in Bubble for org_urn {org_urn}.") | |
return pd.DataFrame(), None # Return empty DataFrame, no error | |
logger.info(f"Successfully fetched {len(report_data_df)} agentic report records for org_urn {org_urn}") | |
return report_data_df, None | |
except Exception as e: | |
logger.exception(f"An unexpected error occurred in fetch_latest_agentic_analysis for org_urn {org_urn}: {e}") | |
return None, str(e) | |
def fetch_and_reconstruct_data_from_bubble(report_df: pd.DataFrame) -> Optional[Dict[str, Any]]: | |
""" | |
Takes a DataFrame of report data, fetches all related child items (OKRs, KRs, Tasks) | |
from Bubble, and reconstructs the full nested dictionary expected by the UI. | |
Args: | |
report_df: The DataFrame containing one or more reports, fetched previously. | |
Returns: | |
A dictionary containing the reconstructed data ('report_str', 'actionable_okrs'), | |
or None if the report is not found or a critical error occurs. | |
""" | |
logger.info("Starting data reconstruction from fetched Bubble data.") | |
if report_df is None or report_df.empty: | |
logger.warning("Cannot reconstruct data, the provided report DataFrame is empty.") | |
return None | |
try: | |
# Assuming the most recent report is desired if multiple are returned. | |
# You might need more sophisticated logic here to select the "active" report. | |
latest_report = report_df.sort_values(by='Created Date', ascending=False).iloc[0] | |
report_id = latest_report.get('_id') | |
if not report_id: | |
logger.error("Fetched report is missing a Bubble '_id', cannot reconstruct children.") | |
return None | |
logger.info(f"Reconstructing data for the latest report, ID: {report_id}") | |
# 1. Fetch all related OKRs using the report_id | |
okrs_df, error = fetch_linkedin_posts_data_from_bubble( | |
data_type=BUBBLE_OKR_TABLE_NAME, | |
constraint_value=report_id, | |
constraint_key='report', | |
constraint_type='equals' | |
) | |
if error: | |
logger.error(f"Error fetching OKRs for report_id {report_id}: {error}") | |
return None # Fail reconstruction if children can't be fetched | |
# 2. Fetch all related Key Results using the OKR IDs | |
okr_ids = okrs_df['_id'].tolist() if not okrs_df.empty else [] | |
krs_df = pd.DataFrame() | |
if okr_ids: | |
krs_df, error = fetch_linkedin_posts_data_from_bubble( | |
data_type=BUBBLE_KEY_RESULTS_TABLE_NAME, | |
constraint_value=okr_ids, | |
constraint_key='okr', | |
constraint_type='in' | |
) | |
if error: logger.error(f"Error fetching Key Results: {error}") | |
# 3. Fetch all related Tasks using the Key Result IDs | |
kr_ids = krs_df['_id'].tolist() if not krs_df.empty else [] | |
tasks_df = pd.DataFrame() | |
if kr_ids: | |
tasks_df, error = fetch_linkedin_posts_data_from_bubble( | |
data_type=BUBBLE_TASKS_TABLE_NAME, | |
constraint_value=kr_ids, | |
constraint_key='key_result', | |
constraint_type='in' | |
) | |
if error: logger.error(f"Error fetching Tasks: {error}") | |
# 4. Reconstruct the nested dictionary | |
tasks_by_kr_id = tasks_df.groupby('key_result').apply(lambda x: x.to_dict('records')).to_dict() if not tasks_df.empty else {} | |
krs_by_okr_id = krs_df.groupby('okr').apply(lambda x: x.to_dict('records')).to_dict() if not krs_df.empty else {} | |
reconstructed_okrs = [] | |
if not okrs_df.empty: | |
for okr_data in okrs_df.to_dict('records'): | |
okr_id = okr_data['_id'] | |
key_results_list = krs_by_okr_id.get(okr_id, []) | |
for kr_data in key_results_list: | |
kr_id = kr_data['_id'] | |
kr_data['tasks'] = tasks_by_kr_id.get(kr_id, []) | |
okr_data['key_results'] = key_results_list | |
reconstructed_okrs.append(okr_data) | |
# 5. Assemble the final payload for the UI | |
actionable_okrs = {"okrs": reconstructed_okrs} | |
final_reconstructed_data = { | |
"report_str": latest_report.get("report_text", "Report text not found."), | |
"quarter": latest_report.get("quarter"), | |
"year": latest_report.get("year"), | |
"actionable_okrs": actionable_okrs, | |
"report_id": report_id | |
} | |
logger.info("Successfully reconstructed nested data structure for the UI.") | |
return final_reconstructed_data | |
except Exception as e: | |
logger.exception(f"An unexpected error occurred during data reconstruction: {e}") | |
return None | |