Spaces:
Running
Running
File size: 6,806 Bytes
3615850 c58585a 3615850 c58585a 3615850 c58585a 3615850 4cc5e6b 3615850 bf7d3b2 4cc5e6b 3615850 c58585a 3615850 c58585a 3615850 4cc5e6b 4192d57 c58585a 4cc5e6b c58585a 4cc5e6b c58585a 4cc5e6b c58585a 4cc5e6b 3615850 4192d57 bf7d3b2 4192d57 bf7d3b2 4192d57 c473dc0 bf7d3b2 4192d57 bf7d3b2 4192d57 bf7d3b2 c473dc0 bf7d3b2 4192d57 bf7d3b2 4192d57 bf7d3b2 c58585a bf7d3b2 c58585a 4192d57 c58585a 4192d57 c58585a 4192d57 bf7d3b2 4192d57 c58585a 4192d57 c58585a 4192d57 c58585a 4192d57 c58585a 4192d57 c58585a 4192d57 c58585a 4192d57 c58585a c473dc0 4192d57 bf7d3b2 4192d57 c58585a bf7d3b2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# services/report_data_handler.py
"""
This module is responsible for fetching pre-computed agentic analysis data
(reports, OKRs, etc.) from Bubble.io and reconstructing it into a nested
dictionary format that the Gradio UI can easily display.
"""
import pandas as pd
import logging
from typing import Dict, Any, Optional, Tuple
# This is the only function needed from the Bubble API module for this handler
from apis.Bubble_API_Calls import fetch_linkedin_posts_data_from_bubble
from config import (
BUBBLE_REPORT_TABLE_NAME,
BUBBLE_OKR_TABLE_NAME,
BUBBLE_KEY_RESULTS_TABLE_NAME,
BUBBLE_TASKS_TABLE_NAME
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def fetch_latest_agentic_analysis(org_urn: str) -> Tuple[Optional[pd.DataFrame], Optional[str]]:
"""
Fetches all agentic analysis report data for a given org_urn from Bubble.
This function is called once during the initial data load.
"""
logger.info(f"Fetching latest agentic analysis data from Bubble for org_urn: {org_urn}")
if not org_urn:
logger.warning("fetch_latest_agentic_analysis: org_urn is missing.")
return None, "org_urn is missing."
try:
report_data_df, error = fetch_linkedin_posts_data_from_bubble(
data_type=BUBBLE_REPORT_TABLE_NAME,
constraint_value=org_urn,
constraint_key='organization_urn',
constraint_type='equals'
)
if error:
logger.error(f"Error fetching agentic reports from Bubble for org_urn {org_urn}: {error}")
return None, str(error)
if report_data_df is None or report_data_df.empty:
logger.info(f"No existing agentic analysis found in Bubble for org_urn {org_urn}.")
return pd.DataFrame(), None # Return empty DataFrame, no error
logger.info(f"Successfully fetched {len(report_data_df)} agentic report records for org_urn {org_urn}")
return report_data_df, None
except Exception as e:
logger.exception(f"An unexpected error occurred in fetch_latest_agentic_analysis for org_urn {org_urn}: {e}")
return None, str(e)
def fetch_and_reconstruct_data_from_bubble(report_series: pd.Series, session_cache: dict) -> Tuple[Optional[Dict[str, Any]], dict]:
"""
MODIFIED: Takes a pandas Series of a single report and a session-specific cache dictionary.
It fetches all related child items from Bubble, reconstructs the full nested dictionary,
and uses the cache to avoid redundant API calls.
Args:
report_series: A pandas Series representing a single report to be processed.
session_cache: The session-specific cache dictionary from a Gradio State.
Returns:
A tuple containing:
- The reconstructed data dictionary.
- The updated session_cache dictionary.
"""
logger.info("Attempting to get or reconstruct data for a Bubble report using session cache.")
if report_series is None or report_series.empty:
logger.warning("Cannot reconstruct data, the provided report Series is empty.")
return None, session_cache
report_id = report_series.get('_id')
if not report_id:
logger.error("Fetched report series is missing a Bubble '_id', cannot reconstruct children.")
return None, session_cache
# --- CACHE CHECK ---
if report_id in session_cache:
logger.info(f"CACHE HIT: Found reconstructed data for report ID {report_id} in session cache.")
return session_cache[report_id], session_cache
logger.info(f"CACHE MISS: No data for report ID {report_id}. Starting reconstruction from Bubble.io.")
try:
# 1. Fetch all related OKRs using the report_id
okrs_df, error = fetch_linkedin_posts_data_from_bubble(
data_type=BUBBLE_OKR_TABLE_NAME,
constraint_value=report_id,
constraint_key='report',
constraint_type='equals'
)
if error:
logger.error(f"Error fetching OKRs for report_id {report_id}: {error}")
return None, session_cache
# 2. Fetch all related Key Results using the OKR IDs
okr_ids = okrs_df['_id'].tolist() if not okrs_df.empty else []
krs_df = pd.DataFrame()
if okr_ids:
krs_df, error = fetch_linkedin_posts_data_from_bubble(
data_type=BUBBLE_KEY_RESULTS_TABLE_NAME,
constraint_value=okr_ids,
constraint_key='okr',
constraint_type='in'
)
if error: logger.error(f"Error fetching Key Results: {error}")
# 3. Fetch all related Tasks using the Key Result IDs
kr_ids = krs_df['_id'].tolist() if not krs_df.empty else []
tasks_df = pd.DataFrame()
if kr_ids:
tasks_df, error = fetch_linkedin_posts_data_from_bubble(
data_type=BUBBLE_TASKS_TABLE_NAME,
constraint_value=kr_ids,
constraint_key='key_result',
constraint_type='in'
)
if error: logger.error(f"Error fetching Tasks: {error}")
# 4. Reconstruct the nested dictionary
tasks_by_kr_id = tasks_df.groupby('key_result').apply(lambda x: x.to_dict('records')).to_dict() if not tasks_df.empty else {}
krs_by_okr_id = krs_df.groupby('okr').apply(lambda x: x.to_dict('records')).to_dict() if not krs_df.empty else {}
reconstructed_okrs = []
if not okrs_df.empty:
for okr_data in okrs_df.to_dict('records'):
okr_id = okr_data['_id']
key_results_list = krs_by_okr_id.get(okr_id, [])
for kr_data in key_results_list:
kr_id = kr_data['_id']
kr_data['tasks'] = tasks_by_kr_id.get(kr_id, [])
okr_data['key_results'] = key_results_list
reconstructed_okrs.append(okr_data)
# 5. Assemble the final payload for the UI
actionable_okrs = {"okrs": reconstructed_okrs}
final_reconstructed_data = {
"report_str": report_series.get("report_text", "Report text not found."),
"quarter": report_series.get("quarter"),
"year": report_series.get("year"),
"actionable_okrs": actionable_okrs,
"report_id": report_id
}
# --- STORE IN SESSION CACHE ---
session_cache[report_id] = final_reconstructed_data
logger.info(f"Successfully reconstructed and cached data for report {report_id} in the current session.")
return final_reconstructed_data, session_cache
except Exception as e:
logger.exception(f"An unexpected error occurred during data reconstruction: {e}")
return None, session_cache
|