# state_manager.py """ Manages the application state, including token processing, initial data loading from Bubble, and determining sync requirements based on the operations log. """ import pandas as pd import logging import os from datetime import timezone # Python's datetime, not to be confused with pandas' import gradio as gr # Assuming Bubble_API_Calls contains fetch_linkedin_token_from_bubble and fetch_linkedin_posts_data_from_bubble from Bubble_API_Calls import ( fetch_linkedin_token_from_bubble, fetch_linkedin_posts_data_from_bubble # This is generic, used for all tables ) # Assuming config.py contains all necessary constants from config import ( DEFAULT_INITIAL_FETCH_COUNT, DEFAULT_POSTS_UPDATE_FETCH_COUNT, BUBBLE_POST_DATE_COLUMN_NAME, BUBBLE_POSTS_TABLE_NAME, BUBBLE_POST_STATS_TABLE_NAME, BUBBLE_MENTIONS_TABLE_NAME, BUBBLE_MENTIONS_DATE_COLUMN_NAME, BUBBLE_FOLLOWER_STATS_TABLE_NAME, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN, LINKEDIN_CLIENT_ID_ENV_VAR, BUBBLE_OPERATIONS_LOG_TABLE_NAME, BUBBLE_OPERATIONS_LOG_DATE_COLUMN, BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN, LOG_SUBJECT_POSTS, LOG_SUBJECT_MENTIONS, LOG_SUBJECT_FOLLOWER_STATS ) def check_token_status(token_state): """Checks the status of the LinkedIn token.""" return "✅ Token available" if token_state and token_state.get("token") else "❌ Token not available" def get_last_sync_attempt_date(operations_log_df, subject, org_urn): """ Retrieves the last sync attempt date for a given subject and organization URN from the operations log DataFrame. Args: operations_log_df (pd.DataFrame): DataFrame containing operations log data. Expected columns defined in config: BUBBLE_OPERATIONS_LOG_DATE_COLUMN, BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN. subject (str): The subject of the sync operation (e.g., "post", "mention"). org_urn (str): The organization URN. Returns: pd.Timestamp: The last sync attempt date (UTC), or pd.NaT if no relevant log entry is found. """ if operations_log_df.empty or not org_urn: return pd.NaT # Ensure required columns exist required_cols = [BUBBLE_OPERATIONS_LOG_DATE_COLUMN, BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN] if not all(col in operations_log_df.columns for col in required_cols): logging.warning(f"Operations log DF is missing one or more required columns: {required_cols}") return pd.NaT try: # Filter for the specific subject and organization URN # Ensure data types are consistent for comparison, especially org_urn filtered_df = operations_log_df[ (operations_log_df[BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN].astype(str) == str(subject)) & (operations_log_df[BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN].astype(str) == str(org_urn)) ] if filtered_df.empty: return pd.NaT # Convert date column to datetime objects (UTC) and find the maximum (latest) # The dates should ideally be stored in UTC or converted upon fetch. # Assuming fetch_linkedin_posts_data_from_bubble handles date parsing correctly or provides strings. dates = pd.to_datetime(filtered_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True) return dates.dropna().max() except Exception as e: logging.error(f"Error processing operations log for last sync attempt date: {e}", exc_info=True) return pd.NaT def process_and_store_bubble_token(url_user_token, org_urn, token_state): """ Processes user token, fetches LinkedIn token, fetches existing Bubble data (posts, mentions, follower stats, operations log), and determines if a sync is needed for each data type based on the operations log. Updates token state and UI for the sync button. """ logging.info(f"Processing token with URL user token: '{url_user_token}', Org URN: '{org_urn}'") new_state = token_state.copy() if token_state else {} new_state.update({ "token": new_state.get("token"), # Preserve existing token if any "client_id": new_state.get("client_id"), "org_urn": org_urn, "bubble_posts_df": new_state.get("bubble_posts_df", pd.DataFrame()), "fetch_count_for_api": 0, # Will be determined based on log "bubble_mentions_df": new_state.get("bubble_mentions_df", pd.DataFrame()), "mentions_should_sync_now": False, # Will be determined based on log "bubble_follower_stats_df": new_state.get("bubble_follower_stats_df", pd.DataFrame()), "fs_should_sync_now": False, # Will be determined based on log "bubble_operations_log_df": new_state.get("bubble_operations_log_df", pd.DataFrame()), # NEW "url_user_token_temp_storage": url_user_token }) button_update = gr.update(visible=False, interactive=False, value="🔄 Sync LinkedIn Data") client_id = os.environ.get(LINKEDIN_CLIENT_ID_ENV_VAR) new_state["client_id"] = client_id if client_id else "ENV VAR MISSING" if not client_id: logging.error(f"CRITICAL ERROR: '{LINKEDIN_CLIENT_ID_ENV_VAR}' environment variable not set.") if url_user_token and "not found" not in url_user_token and "Could not access" not in url_user_token: logging.info(f"Attempting to fetch LinkedIn token from Bubble with user token: {url_user_token}") try: parsed_linkedin_token = fetch_linkedin_token_from_bubble(url_user_token) if isinstance(parsed_linkedin_token, dict) and "access_token" in parsed_linkedin_token: new_state["token"] = parsed_linkedin_token logging.info("✅ LinkedIn Token successfully fetched from Bubble.") else: new_state["token"] = None logging.warning(f"❌ Failed to fetch a valid LinkedIn token from Bubble. Response: {parsed_linkedin_token}") except Exception as e: new_state["token"] = None logging.error(f"❌ Exception while fetching LinkedIn token from Bubble: {e}", exc_info=True) else: new_state["token"] = None logging.info("No valid URL user token provided for LinkedIn token fetch, or an error was indicated.") current_org_urn = new_state.get("org_urn") if current_org_urn: data_tables_to_fetch = { "bubble_posts_df": BUBBLE_POSTS_TABLE_NAME, "bubble_mentions_df": BUBBLE_MENTIONS_TABLE_NAME, "bubble_follower_stats_df": BUBBLE_FOLLOWER_STATS_TABLE_NAME, "bubble_operations_log_df": BUBBLE_OPERATIONS_LOG_TABLE_NAME, # NEW "bubble_post_stats_df": BUBBLE_POST_STATS_TABLE_NAME } for state_key, table_name in data_tables_to_fetch.items(): logging.info(f"Attempting to fetch {table_name} from Bubble for org_urn: {current_org_urn}") try: fetched_df, error_message = fetch_linkedin_posts_data_from_bubble(current_org_urn, table_name) new_state[state_key] = pd.DataFrame() if error_message or fetched_df is None else fetched_df if error_message: logging.warning(f"Error fetching {table_name} from Bubble: {error_message}.") # Ensure date column in operations log is parsed correctly if it's fetched as string if state_key == "bubble_operations_log_df" and not new_state[state_key].empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in new_state[state_key].columns: new_state[state_key][BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(new_state[state_key][BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True) except Exception as e: logging.error(f"❌ Error fetching {table_name} from Bubble: {e}.", exc_info=True) new_state[state_key] = pd.DataFrame() else: logging.warning("Org URN not available in state. Cannot fetch data from Bubble.") for key in ["bubble_posts_df", "bubble_mentions_df", "bubble_follower_stats_df", "bubble_operations_log_df"]: new_state[key] = pd.DataFrame() # --- Determine sync needs based on Operations Log --- ops_log_df = new_state.get("bubble_operations_log_df", pd.DataFrame()) now_utc = pd.Timestamp.now(tz='UTC') # 1. Posts Sync Logic last_post_sync_attempt = get_last_sync_attempt_date(ops_log_df, LOG_SUBJECT_POSTS, current_org_urn) if pd.isna(last_post_sync_attempt): logging.info(f"â„šī¸ No previous '{LOG_SUBJECT_POSTS}' sync attempt logged. Setting to fetch initial {DEFAULT_INITIAL_FETCH_COUNT} posts.") new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT else: days_since_last_attempt = (now_utc.normalize() - last_post_sync_attempt.normalize()).days if days_since_last_attempt >= 7: # Dynamic fetch count based on how many weeks have passed, or a fixed update count # For simplicity, using DEFAULT_POSTS_UPDATE_FETCH_COUNT new_state['fetch_count_for_api'] = DEFAULT_POSTS_UPDATE_FETCH_COUNT logging.info(f"Posts sync attempt is {days_since_last_attempt} days old. Setting fetch count to {new_state['fetch_count_for_api']}.") else: new_state['fetch_count_for_api'] = 0 logging.info(f"Posts sync attempt was recent ({days_since_last_attempt} days ago). No new posts fetch scheduled based on log.") # 2. Mentions Sync Logic last_mention_sync_attempt = get_last_sync_attempt_date(ops_log_df, LOG_SUBJECT_MENTIONS, current_org_urn) if pd.isna(last_mention_sync_attempt): new_state['mentions_should_sync_now'] = True logging.info(f"Mentions sync needed: No previous '{LOG_SUBJECT_MENTIONS}' sync attempt logged.") else: days_since_last_attempt_mentions = (now_utc.normalize() - last_mention_sync_attempt.normalize()).days if days_since_last_attempt_mentions >= 7: new_state['mentions_should_sync_now'] = True logging.info(f"Mentions sync needed: Last attempt was {days_since_last_attempt_mentions} days ago.") else: new_state['mentions_should_sync_now'] = False logging.info(f"Mentions sync attempt was recent ({days_since_last_attempt_mentions} days ago). Sync not scheduled.") # 3. Follower Stats Sync Logic last_fs_sync_attempt = get_last_sync_attempt_date(ops_log_df, LOG_SUBJECT_FOLLOWER_STATS, current_org_urn) fs_df_current = new_state.get("bubble_follower_stats_df", pd.DataFrame()) demographics_missing = False if fs_df_current.empty: demographics_missing = True # If entire table is empty, demographics are missing logging.info("Follower stats: Main table is empty, considering demographics missing.") elif FOLLOWER_STATS_TYPE_COLUMN not in fs_df_current.columns: demographics_missing = True # If type column is missing, cannot check demographics logging.info(f"Follower stats: Column '{FOLLOWER_STATS_TYPE_COLUMN}' is missing, considering demographics missing.") else: # Check if any rows exist that are NOT 'follower_gains_monthly' if fs_df_current[fs_df_current[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].empty: demographics_missing = True logging.info("Follower stats: Demographic data (non-monthly types) is missing.") time_based_need_fs = False if pd.isna(last_fs_sync_attempt): time_based_need_fs = True logging.info(f"Follower stats sync needed: No previous '{LOG_SUBJECT_FOLLOWER_STATS}' sync attempt logged.") else: start_of_current_month = now_utc.normalize().replace(day=1) # Ensure last_fs_sync_attempt is timezone-aware (should be by get_last_sync_attempt_date) if last_fs_sync_attempt.tzinfo is None: # Should not happen if get_last_sync_attempt_date works last_fs_sync_attempt = last_fs_sync_attempt.tz_localize('UTC') if last_fs_sync_attempt < start_of_current_month: time_based_need_fs = True logging.info(f"Follower stats sync needed: Last attempt {last_fs_sync_attempt.date()} is before current month start {start_of_current_month.date()}.") if time_based_need_fs or demographics_missing: new_state['fs_should_sync_now'] = True if demographics_missing and not time_based_need_fs: logging.info("Follower stats sync triggered: Demographic data missing, even if last sync attempt is recent.") elif time_based_need_fs: logging.info("Follower stats sync triggered by schedule.") else: new_state['fs_should_sync_now'] = False logging.info("Follower stats sync not currently required by schedule or data presence.") # Update Sync Button based on determined needs sync_actions = [] if new_state.get('fetch_count_for_api', 0) > 0: sync_actions.append(f"Posts ({new_state['fetch_count_for_api']})") if new_state.get('mentions_should_sync_now', False): sync_actions.append("Mentions") if new_state.get('fs_should_sync_now', False): sync_actions.append("Follower Stats") if new_state["token"] and sync_actions: button_label = f"🔄 Sync LinkedIn Data ({', '.join(sync_actions)})" button_update = gr.update(value=button_label, visible=True, interactive=True) elif new_state["token"]: button_label = "✅ Data Up-to-Date (based on sync log)" button_update = gr.update(value=button_label, visible=True, interactive=False) else: # No token button_update = gr.update(visible=False, interactive=False, value="🔄 Sync LinkedIn Data") token_status_message = check_token_status(new_state) logging.info(f"Token processing complete. Status: {token_status_message}. Button: {button_update.get('value', 'N/A') if button_update else 'N/A'}. Sync actions needed: {sync_actions}") return token_status_message, new_state, button_update