# state_manager.py """ Manages the application state, including token processing, initial data loading from Bubble, and determining sync requirements. """ import pandas as pd import logging import os from datetime import datetime, timedelta, timezone # Added timezone to ensure it's available import gradio as gr # Assuming Bubble_API_Calls contains fetch_linkedin_token_from_bubble and fetch_linkedin_posts_data_from_bubble from Bubble_API_Calls import ( fetch_linkedin_token_from_bubble, fetch_linkedin_posts_data_from_bubble ) # Assuming config.py contains all necessary constants from config import ( DEFAULT_INITIAL_FETCH_COUNT, BUBBLE_POST_DATE_COLUMN_NAME, BUBBLE_POSTS_TABLE_NAME, BUBBLE_MENTIONS_TABLE_NAME, BUBBLE_MENTIONS_DATE_COLUMN_NAME, BUBBLE_FOLLOWER_STATS_TABLE_NAME, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN, LINKEDIN_CLIENT_ID_ENV_VAR ) def check_token_status(token_state): """Checks the status of the LinkedIn token.""" return "✅ Token available" if token_state and token_state.get("token") else "❌ Token not available" def process_and_store_bubble_token(url_user_token, org_urn, token_state): """ Processes user token, fetches LinkedIn token, fetches existing Bubble data (posts, mentions, follower stats), and determines if an initial fetch or update is needed for each data type. Updates token state and UI for the sync button. """ logging.info(f"Processing token with URL user token: '{url_user_token}', Org URN: '{org_urn}'") # Initialize or update state safely new_state = token_state.copy() if token_state else { "token": None, "client_id": None, "org_urn": None, "bubble_posts_df": pd.DataFrame(), "fetch_count_for_api": 0, "bubble_mentions_df": pd.DataFrame(), "bubble_follower_stats_df": pd.DataFrame(), "url_user_token_temp_storage": None } new_state.update({ "org_urn": org_urn, "bubble_posts_df": new_state.get("bubble_posts_df", pd.DataFrame()), "fetch_count_for_api": new_state.get("fetch_count_for_api", 0), "bubble_mentions_df": new_state.get("bubble_mentions_df", pd.DataFrame()), "bubble_follower_stats_df": new_state.get("bubble_follower_stats_df", pd.DataFrame()), "url_user_token_temp_storage": url_user_token }) button_update = gr.update(visible=False, interactive=False, value="🔄 Sync LinkedIn Data") # Default to hidden client_id = os.environ.get(LINKEDIN_CLIENT_ID_ENV_VAR) new_state["client_id"] = client_id if client_id else "ENV VAR MISSING" if not client_id: logging.error(f"CRITICAL ERROR: '{LINKEDIN_CLIENT_ID_ENV_VAR}' environment variable not set.") # Fetch LinkedIn Token from Bubble if url_user_token and "not found" not in url_user_token and "Could not access" not in url_user_token: logging.info(f"Attempting to fetch LinkedIn token from Bubble with user token: {url_user_token}") try: parsed_linkedin_token = fetch_linkedin_token_from_bubble(url_user_token) if isinstance(parsed_linkedin_token, dict) and "access_token" in parsed_linkedin_token: new_state["token"] = parsed_linkedin_token logging.info("✅ LinkedIn Token successfully fetched from Bubble.") else: new_state["token"] = None logging.warning(f"❌ Failed to fetch a valid LinkedIn token from Bubble. Response: {parsed_linkedin_token}") except Exception as e: new_state["token"] = None logging.error(f"❌ Exception while fetching LinkedIn token from Bubble: {e}", exc_info=True) else: new_state["token"] = None logging.info("No valid URL user token provided for LinkedIn token fetch, or an error was indicated.") # Fetch existing data from Bubble if Org URN is available current_org_urn = new_state.get("org_urn") if current_org_urn: # Fetch Posts from Bubble logging.info(f"Attempting to fetch posts from Bubble for org_urn: {current_org_urn}") try: fetched_posts_df, error_message_posts = fetch_linkedin_posts_data_from_bubble(current_org_urn, BUBBLE_POSTS_TABLE_NAME) new_state["bubble_posts_df"] = pd.DataFrame() if error_message_posts or fetched_posts_df is None else fetched_posts_df if error_message_posts: logging.warning(f"Error fetching {BUBBLE_POSTS_TABLE_NAME} from Bubble: {error_message_posts}.") except Exception as e: logging.error(f"❌ Error fetching posts from Bubble: {e}.", exc_info=True) new_state["bubble_posts_df"] = pd.DataFrame() # Fetch Mentions from Bubble logging.info(f"Attempting to fetch mentions from Bubble for org_urn: {current_org_urn}") try: fetched_mentions_df, error_message_mentions = fetch_linkedin_posts_data_from_bubble(current_org_urn, BUBBLE_MENTIONS_TABLE_NAME) new_state["bubble_mentions_df"] = pd.DataFrame() if error_message_mentions or fetched_mentions_df is None else fetched_mentions_df if error_message_mentions: logging.warning(f"Error fetching {BUBBLE_MENTIONS_TABLE_NAME} from Bubble: {error_message_mentions}.") except Exception as e: logging.error(f"❌ Error fetching mentions from Bubble: {e}.", exc_info=True) new_state["bubble_mentions_df"] = pd.DataFrame() # Fetch Follower Stats from Bubble logging.info(f"Attempting to fetch follower stats from Bubble for org_urn: {current_org_urn}") try: fetched_follower_stats_df, error_message_fs = fetch_linkedin_posts_data_from_bubble(current_org_urn, BUBBLE_FOLLOWER_STATS_TABLE_NAME) new_state["bubble_follower_stats_df"] = pd.DataFrame() if error_message_fs or fetched_follower_stats_df is None else fetched_follower_stats_df if error_message_fs: logging.warning(f"Error fetching {BUBBLE_FOLLOWER_STATS_TABLE_NAME} from Bubble: {error_message_fs}.") except Exception as e: logging.error(f"❌ Error fetching follower stats from Bubble: {e}.", exc_info=True) new_state["bubble_follower_stats_df"] = pd.DataFrame() else: logging.warning("Org URN not available in state. Cannot fetch data from Bubble.") new_state["bubble_posts_df"] = pd.DataFrame() new_state["bubble_mentions_df"] = pd.DataFrame() new_state["bubble_follower_stats_df"] = pd.DataFrame() # Determine fetch count for Posts API if new_state["bubble_posts_df"].empty: logging.info(f"â„šī¸ No posts in Bubble. Setting to fetch initial {DEFAULT_INITIAL_FETCH_COUNT} posts.") new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT else: try: df_posts_check = new_state["bubble_posts_df"].copy() if BUBBLE_POST_DATE_COLUMN_NAME not in df_posts_check.columns or df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME].isnull().all(): logging.warning(f"Date column '{BUBBLE_POST_DATE_COLUMN_NAME}' for posts missing or all null values. Triggering initial fetch.") new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT else: df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME] = pd.to_datetime(df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME], errors='coerce', utc=True) last_post_date_utc = df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME].dropna().max() if pd.isna(last_post_date_utc): logging.warning("No valid post dates found after conversion. Triggering initial fetch.") new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT else: days_diff = (pd.Timestamp('now', tz='UTC').normalize() - last_post_date_utc.normalize()).days if days_diff >= 7: new_state['fetch_count_for_api'] = max(1, days_diff // 7) * 10 logging.info(f"Posts data is {days_diff} days old. Setting fetch count to {new_state['fetch_count_for_api']}.") else: new_state['fetch_count_for_api'] = 0 logging.info("Posts data is recent. No new posts fetch needed based on date.") except Exception as e: logging.error(f"Error processing post dates: {e}. Defaulting to initial fetch for posts.", exc_info=True) new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT # Determine if Mentions need sync mentions_need_sync = False if new_state["bubble_mentions_df"].empty: mentions_need_sync = True logging.info("Mentions need sync: Bubble mentions DF is empty.") else: if BUBBLE_MENTIONS_DATE_COLUMN_NAME not in new_state["bubble_mentions_df"].columns or \ new_state["bubble_mentions_df"][BUBBLE_MENTIONS_DATE_COLUMN_NAME].isnull().all(): mentions_need_sync = True logging.info(f"Mentions need sync: Date column '{BUBBLE_MENTIONS_DATE_COLUMN_NAME}' missing or all null values.") else: df_mentions_check = new_state["bubble_mentions_df"].copy() df_mentions_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME] = pd.to_datetime(df_mentions_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME], errors='coerce', utc=True) last_mention_date_utc = df_mentions_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME].dropna().max() if pd.isna(last_mention_date_utc) or \ (pd.Timestamp('now', tz='UTC').normalize() - last_mention_date_utc.normalize()).days >= 7: mentions_need_sync = True logging.info(f"Mentions need sync: Last mention date {last_mention_date_utc} is old or invalid.") else: logging.info(f"Mentions up-to-date. Last mention: {last_mention_date_utc}") # Determine if Follower Stats need sync follower_stats_need_sync = False fs_df = new_state.get("bubble_follower_stats_df", pd.DataFrame()) if fs_df.empty: follower_stats_need_sync = True logging.info("Follower stats need sync: Bubble follower stats DF is empty.") else: monthly_gains_df = fs_df[fs_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].copy() if monthly_gains_df.empty: follower_stats_need_sync = True logging.info("Follower stats need sync: No monthly gains data in Bubble.") elif FOLLOWER_STATS_CATEGORY_COLUMN not in monthly_gains_df.columns: follower_stats_need_sync = True logging.info(f"Follower stats need sync: Date column '{FOLLOWER_STATS_CATEGORY_COLUMN}' missing in monthly gains.") else: monthly_gains_df.loc[:, FOLLOWER_STATS_CATEGORY_COLUMN] = pd.to_datetime(monthly_gains_df[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.normalize() last_gain_date = monthly_gains_df[FOLLOWER_STATS_CATEGORY_COLUMN].dropna().max() if pd.isna(last_gain_date): follower_stats_need_sync = True logging.info("Follower stats need sync: No valid dates in monthly gains after conversion.") else: if last_gain_date.tzinfo is None or last_gain_date.tzinfo.utcoffset(last_gain_date) is None: last_gain_date = last_gain_date.tz_localize('UTC') # Localize naive to UTC else: last_gain_date = last_gain_date.tz_convert('UTC') # Convert aware to UTC start_of_current_month = pd.Timestamp('now', tz='UTC').normalize().replace(day=1) if last_gain_date < start_of_current_month: follower_stats_need_sync = True logging.info(f"Follower stats need sync: Last gain date {last_gain_date} is before current month start {start_of_current_month}.") else: logging.info(f"Follower monthly gains up-to-date. Last gain recorded on: {last_gain_date}") if fs_df[fs_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].empty: follower_stats_need_sync = True logging.info("Follower stats need sync: Demographic data (non-monthly types) missing.") # Update Sync Button based on token and needed actions sync_actions = [] if new_state['fetch_count_for_api'] > 0: sync_actions.append(f"{new_state['fetch_count_for_api']} Posts") if mentions_need_sync: # This flag is set based on data freshness sync_actions.append("Mentions") if follower_stats_need_sync: # This flag is set based on data freshness sync_actions.append("Follower Stats") if new_state["token"] and sync_actions: button_label = f"🔄 Sync LinkedIn Data ({', '.join(sync_actions)})" button_update = gr.update(value=button_label, visible=True, interactive=True) elif new_state["token"]: button_label = "✅ Data Up-to-Date" button_update = gr.update(value=button_label, visible=True, interactive=False) else: button_update = gr.update(visible=False, interactive=False) token_status_message = check_token_status(new_state) logging.info(f"Token processing complete. Status: {token_status_message}. Button: {button_update}. Sync actions: {sync_actions}") return token_status_message, new_state, button_update