Spaces:
Running
Running
| # state_manager.py | |
| """ | |
| Manages the application state, including token processing, | |
| initial data loading from Bubble, and determining sync requirements | |
| based on the operations log. | |
| """ | |
| import pandas as pd | |
| import logging | |
| import os | |
| from datetime import timezone # Python's datetime, not to be confused with pandas' | |
| import gradio as gr | |
| # Assuming Bubble_API_Calls contains fetch_linkedin_token_from_bubble and fetch_linkedin_posts_data_from_bubble | |
| from Bubble_API_Calls import ( | |
| fetch_linkedin_token_from_bubble, | |
| fetch_linkedin_posts_data_from_bubble # This is generic, used for all tables | |
| ) | |
| # Assuming config.py contains all necessary constants | |
| from config import ( | |
| DEFAULT_INITIAL_FETCH_COUNT, DEFAULT_POSTS_UPDATE_FETCH_COUNT, | |
| BUBBLE_POST_DATE_COLUMN_NAME, BUBBLE_POSTS_TABLE_NAME, | |
| BUBBLE_POST_STATS_TABLE_NAME, | |
| BUBBLE_MENTIONS_TABLE_NAME, BUBBLE_MENTIONS_DATE_COLUMN_NAME, | |
| BUBBLE_FOLLOWER_STATS_TABLE_NAME, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN, | |
| LINKEDIN_CLIENT_ID_ENV_VAR, | |
| BUBBLE_OPERATIONS_LOG_TABLE_NAME, BUBBLE_OPERATIONS_LOG_DATE_COLUMN, | |
| BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN, | |
| LOG_SUBJECT_POSTS, LOG_SUBJECT_MENTIONS, LOG_SUBJECT_FOLLOWER_STATS | |
| ) | |
| def check_token_status(token_state): | |
| """Checks the status of the LinkedIn token.""" | |
| return "β Token available" if token_state and token_state.get("token") else "β Token not available" | |
| def get_last_sync_attempt_date(operations_log_df, subject, org_urn): | |
| """ | |
| Retrieves the last sync attempt date for a given subject and organization URN | |
| from the operations log DataFrame. | |
| Args: | |
| operations_log_df (pd.DataFrame): DataFrame containing operations log data. | |
| Expected columns defined in config: | |
| BUBBLE_OPERATIONS_LOG_DATE_COLUMN, | |
| BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, | |
| BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN. | |
| subject (str): The subject of the sync operation (e.g., "post", "mention"). | |
| org_urn (str): The organization URN. | |
| Returns: | |
| pd.Timestamp: The last sync attempt date (UTC), or pd.NaT if no relevant log entry is found. | |
| """ | |
| if operations_log_df.empty or not org_urn: | |
| return pd.NaT | |
| # Ensure required columns exist | |
| required_cols = [BUBBLE_OPERATIONS_LOG_DATE_COLUMN, BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN] | |
| if not all(col in operations_log_df.columns for col in required_cols): | |
| logging.warning(f"Operations log DF is missing one or more required columns: {required_cols}") | |
| return pd.NaT | |
| try: | |
| # Filter for the specific subject and organization URN | |
| # Ensure data types are consistent for comparison, especially org_urn | |
| filtered_df = operations_log_df[ | |
| (operations_log_df[BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN].astype(str) == str(subject)) & | |
| (operations_log_df[BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN].astype(str) == str(org_urn)) | |
| ] | |
| if filtered_df.empty: | |
| return pd.NaT | |
| # Convert date column to datetime objects (UTC) and find the maximum (latest) | |
| # The dates should ideally be stored in UTC or converted upon fetch. | |
| # Assuming fetch_linkedin_posts_data_from_bubble handles date parsing correctly or provides strings. | |
| dates = pd.to_datetime(filtered_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True) | |
| return dates.dropna().max() | |
| except Exception as e: | |
| logging.error(f"Error processing operations log for last sync attempt date: {e}", exc_info=True) | |
| return pd.NaT | |
| def process_and_store_bubble_token(url_user_token, org_urn, token_state): | |
| """ | |
| Processes user token, fetches LinkedIn token, fetches existing Bubble data (posts, mentions, follower stats, operations log), | |
| and determines if a sync is needed for each data type based on the operations log. | |
| Updates token state and UI for the sync button. | |
| """ | |
| logging.info(f"Processing token with URL user token: '{url_user_token}', Org URN: '{org_urn}'") | |
| new_state = token_state.copy() if token_state else {} | |
| new_state.update({ | |
| "token": new_state.get("token"), # Preserve existing token if any | |
| "client_id": new_state.get("client_id"), | |
| "org_urn": org_urn, | |
| "bubble_posts_df": new_state.get("bubble_posts_df", pd.DataFrame()), | |
| "fetch_count_for_api": 0, # Will be determined based on log | |
| "bubble_mentions_df": new_state.get("bubble_mentions_df", pd.DataFrame()), | |
| "mentions_should_sync_now": False, # Will be determined based on log | |
| "bubble_follower_stats_df": new_state.get("bubble_follower_stats_df", pd.DataFrame()), | |
| "fs_should_sync_now": False, # Will be determined based on log | |
| "bubble_operations_log_df": new_state.get("bubble_operations_log_df", pd.DataFrame()), # NEW | |
| "url_user_token_temp_storage": url_user_token | |
| }) | |
| button_update = gr.update(visible=False, interactive=False, value="π Sync LinkedIn Data") | |
| client_id = os.environ.get(LINKEDIN_CLIENT_ID_ENV_VAR) | |
| new_state["client_id"] = client_id if client_id else "ENV VAR MISSING" | |
| if not client_id: logging.error(f"CRITICAL ERROR: '{LINKEDIN_CLIENT_ID_ENV_VAR}' environment variable not set.") | |
| if url_user_token and "not found" not in url_user_token and "Could not access" not in url_user_token: | |
| logging.info(f"Attempting to fetch LinkedIn token from Bubble with user token: {url_user_token}") | |
| try: | |
| parsed_linkedin_token = fetch_linkedin_token_from_bubble(url_user_token) | |
| if isinstance(parsed_linkedin_token, dict) and "access_token" in parsed_linkedin_token: | |
| new_state["token"] = parsed_linkedin_token | |
| logging.info("β LinkedIn Token successfully fetched from Bubble.") | |
| else: | |
| new_state["token"] = None | |
| logging.warning(f"β Failed to fetch a valid LinkedIn token from Bubble. Response: {parsed_linkedin_token}") | |
| except Exception as e: | |
| new_state["token"] = None | |
| logging.error(f"β Exception while fetching LinkedIn token from Bubble: {e}", exc_info=True) | |
| else: | |
| new_state["token"] = None | |
| logging.info("No valid URL user token provided for LinkedIn token fetch, or an error was indicated.") | |
| current_org_urn = new_state.get("org_urn") | |
| if current_org_urn: | |
| data_tables_to_fetch = { | |
| "bubble_posts_df": BUBBLE_POSTS_TABLE_NAME, | |
| "bubble_mentions_df": BUBBLE_MENTIONS_TABLE_NAME, | |
| "bubble_follower_stats_df": BUBBLE_FOLLOWER_STATS_TABLE_NAME, | |
| "bubble_operations_log_df": BUBBLE_OPERATIONS_LOG_TABLE_NAME, # NEW | |
| "bubble_post_stats_df": BUBBLE_POST_STATS_TABLE_NAME | |
| } | |
| for state_key, table_name in data_tables_to_fetch.items(): | |
| logging.info(f"Attempting to fetch {table_name} from Bubble for org_urn: {current_org_urn}") | |
| try: | |
| fetched_df, error_message = fetch_linkedin_posts_data_from_bubble(current_org_urn, table_name) | |
| new_state[state_key] = pd.DataFrame() if error_message or fetched_df is None else fetched_df | |
| if error_message: logging.warning(f"Error fetching {table_name} from Bubble: {error_message}.") | |
| # Ensure date column in operations log is parsed correctly if it's fetched as string | |
| if state_key == "bubble_operations_log_df" and not new_state[state_key].empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in new_state[state_key].columns: | |
| new_state[state_key][BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(new_state[state_key][BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True) | |
| except Exception as e: | |
| logging.error(f"β Error fetching {table_name} from Bubble: {e}.", exc_info=True) | |
| new_state[state_key] = pd.DataFrame() | |
| else: | |
| logging.warning("Org URN not available in state. Cannot fetch data from Bubble.") | |
| for key in ["bubble_posts_df", "bubble_mentions_df", "bubble_follower_stats_df", "bubble_operations_log_df"]: | |
| new_state[key] = pd.DataFrame() | |
| # --- Determine sync needs based on Operations Log --- | |
| ops_log_df = new_state.get("bubble_operations_log_df", pd.DataFrame()) | |
| now_utc = pd.Timestamp.now(tz='UTC') | |
| # 1. Posts Sync Logic | |
| last_post_sync_attempt = get_last_sync_attempt_date(ops_log_df, LOG_SUBJECT_POSTS, current_org_urn) | |
| if pd.isna(last_post_sync_attempt): | |
| logging.info(f"βΉοΈ No previous '{LOG_SUBJECT_POSTS}' sync attempt logged. Setting to fetch initial {DEFAULT_INITIAL_FETCH_COUNT} posts.") | |
| new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
| else: | |
| days_since_last_attempt = (now_utc.normalize() - last_post_sync_attempt.normalize()).days | |
| if days_since_last_attempt >= 7: | |
| # Dynamic fetch count based on how many weeks have passed, or a fixed update count | |
| # For simplicity, using DEFAULT_POSTS_UPDATE_FETCH_COUNT | |
| new_state['fetch_count_for_api'] = DEFAULT_POSTS_UPDATE_FETCH_COUNT | |
| logging.info(f"Posts sync attempt is {days_since_last_attempt} days old. Setting fetch count to {new_state['fetch_count_for_api']}.") | |
| else: | |
| new_state['fetch_count_for_api'] = 0 | |
| logging.info(f"Posts sync attempt was recent ({days_since_last_attempt} days ago). No new posts fetch scheduled based on log.") | |
| # 2. Mentions Sync Logic | |
| last_mention_sync_attempt = get_last_sync_attempt_date(ops_log_df, LOG_SUBJECT_MENTIONS, current_org_urn) | |
| if pd.isna(last_mention_sync_attempt): | |
| new_state['mentions_should_sync_now'] = True | |
| logging.info(f"Mentions sync needed: No previous '{LOG_SUBJECT_MENTIONS}' sync attempt logged.") | |
| else: | |
| days_since_last_attempt_mentions = (now_utc.normalize() - last_mention_sync_attempt.normalize()).days | |
| if days_since_last_attempt_mentions >= 7: | |
| new_state['mentions_should_sync_now'] = True | |
| logging.info(f"Mentions sync needed: Last attempt was {days_since_last_attempt_mentions} days ago.") | |
| else: | |
| new_state['mentions_should_sync_now'] = False | |
| logging.info(f"Mentions sync attempt was recent ({days_since_last_attempt_mentions} days ago). Sync not scheduled.") | |
| # 3. Follower Stats Sync Logic | |
| last_fs_sync_attempt = get_last_sync_attempt_date(ops_log_df, LOG_SUBJECT_FOLLOWER_STATS, current_org_urn) | |
| fs_df_current = new_state.get("bubble_follower_stats_df", pd.DataFrame()) | |
| demographics_missing = False | |
| if fs_df_current.empty: | |
| demographics_missing = True # If entire table is empty, demographics are missing | |
| logging.info("Follower stats: Main table is empty, considering demographics missing.") | |
| elif FOLLOWER_STATS_TYPE_COLUMN not in fs_df_current.columns: | |
| demographics_missing = True # If type column is missing, cannot check demographics | |
| logging.info(f"Follower stats: Column '{FOLLOWER_STATS_TYPE_COLUMN}' is missing, considering demographics missing.") | |
| else: | |
| # Check if any rows exist that are NOT 'follower_gains_monthly' | |
| if fs_df_current[fs_df_current[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].empty: | |
| demographics_missing = True | |
| logging.info("Follower stats: Demographic data (non-monthly types) is missing.") | |
| time_based_need_fs = False | |
| if pd.isna(last_fs_sync_attempt): | |
| time_based_need_fs = True | |
| logging.info(f"Follower stats sync needed: No previous '{LOG_SUBJECT_FOLLOWER_STATS}' sync attempt logged.") | |
| else: | |
| start_of_current_month = now_utc.normalize().replace(day=1) | |
| # Ensure last_fs_sync_attempt is timezone-aware (should be by get_last_sync_attempt_date) | |
| if last_fs_sync_attempt.tzinfo is None: # Should not happen if get_last_sync_attempt_date works | |
| last_fs_sync_attempt = last_fs_sync_attempt.tz_localize('UTC') | |
| if last_fs_sync_attempt < start_of_current_month: | |
| time_based_need_fs = True | |
| logging.info(f"Follower stats sync needed: Last attempt {last_fs_sync_attempt.date()} is before current month start {start_of_current_month.date()}.") | |
| if time_based_need_fs or demographics_missing: | |
| new_state['fs_should_sync_now'] = True | |
| if demographics_missing and not time_based_need_fs: | |
| logging.info("Follower stats sync triggered: Demographic data missing, even if last sync attempt is recent.") | |
| elif time_based_need_fs: | |
| logging.info("Follower stats sync triggered by schedule.") | |
| else: | |
| new_state['fs_should_sync_now'] = False | |
| logging.info("Follower stats sync not currently required by schedule or data presence.") | |
| # Update Sync Button based on determined needs | |
| sync_actions = [] | |
| if new_state.get('fetch_count_for_api', 0) > 0: | |
| sync_actions.append(f"Posts ({new_state['fetch_count_for_api']})") | |
| if new_state.get('mentions_should_sync_now', False): | |
| sync_actions.append("Mentions") | |
| if new_state.get('fs_should_sync_now', False): | |
| sync_actions.append("Follower Stats") | |
| if new_state["token"] and sync_actions: | |
| button_label = f"π Sync LinkedIn Data ({', '.join(sync_actions)})" | |
| button_update = gr.update(value=button_label, visible=True, interactive=True) | |
| elif new_state["token"]: | |
| button_label = "β Data Up-to-Date (based on sync log)" | |
| button_update = gr.update(value=button_label, visible=True, interactive=False) | |
| else: # No token | |
| button_update = gr.update(visible=False, interactive=False, value="π Sync LinkedIn Data") | |
| token_status_message = check_token_status(new_state) | |
| logging.info(f"Token processing complete. Status: {token_status_message}. Button: {button_update.get('value', 'N/A') if button_update else 'N/A'}. Sync actions needed: {sync_actions}") | |
| return token_status_message, new_state, button_update | |