LinkedinMonitor / state_manager.py
GuglielmoTor's picture
Update state_manager.py
9da7d8b verified
raw
history blame
14.3 kB
# state_manager.py
"""
Manages the application state, including token processing,
initial data loading from Bubble, and determining sync requirements
based on the operations log.
"""
import pandas as pd
import logging
import os
from datetime import timezone # Python's datetime, not to be confused with pandas'
import gradio as gr
# Assuming Bubble_API_Calls contains fetch_linkedin_token_from_bubble and fetch_linkedin_posts_data_from_bubble
from Bubble_API_Calls import (
fetch_linkedin_token_from_bubble,
fetch_linkedin_posts_data_from_bubble # This is generic, used for all tables
)
# Assuming config.py contains all necessary constants
from config import (
DEFAULT_INITIAL_FETCH_COUNT, DEFAULT_POSTS_UPDATE_FETCH_COUNT,
BUBBLE_POST_DATE_COLUMN_NAME, BUBBLE_POSTS_TABLE_NAME,
BUBBLE_POST_STATS_TABLE_NAME,
BUBBLE_MENTIONS_TABLE_NAME, BUBBLE_MENTIONS_DATE_COLUMN_NAME,
BUBBLE_FOLLOWER_STATS_TABLE_NAME, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN,
LINKEDIN_CLIENT_ID_ENV_VAR,
BUBBLE_OPERATIONS_LOG_TABLE_NAME, BUBBLE_OPERATIONS_LOG_DATE_COLUMN,
BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN,
LOG_SUBJECT_POSTS, LOG_SUBJECT_MENTIONS, LOG_SUBJECT_FOLLOWER_STATS
)
def check_token_status(token_state):
"""Checks the status of the LinkedIn token."""
return "βœ… Token available" if token_state and token_state.get("token") else "❌ Token not available"
def get_last_sync_attempt_date(operations_log_df, subject, org_urn):
"""
Retrieves the last sync attempt date for a given subject and organization URN
from the operations log DataFrame.
Args:
operations_log_df (pd.DataFrame): DataFrame containing operations log data.
Expected columns defined in config:
BUBBLE_OPERATIONS_LOG_DATE_COLUMN,
BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN,
BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN.
subject (str): The subject of the sync operation (e.g., "post", "mention").
org_urn (str): The organization URN.
Returns:
pd.Timestamp: The last sync attempt date (UTC), or pd.NaT if no relevant log entry is found.
"""
if operations_log_df.empty or not org_urn:
return pd.NaT
# Ensure required columns exist
required_cols = [BUBBLE_OPERATIONS_LOG_DATE_COLUMN, BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN]
if not all(col in operations_log_df.columns for col in required_cols):
logging.warning(f"Operations log DF is missing one or more required columns: {required_cols}")
return pd.NaT
try:
# Filter for the specific subject and organization URN
# Ensure data types are consistent for comparison, especially org_urn
filtered_df = operations_log_df[
(operations_log_df[BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN].astype(str) == str(subject)) &
(operations_log_df[BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN].astype(str) == str(org_urn))
]
if filtered_df.empty:
return pd.NaT
# Convert date column to datetime objects (UTC) and find the maximum (latest)
# The dates should ideally be stored in UTC or converted upon fetch.
# Assuming fetch_linkedin_posts_data_from_bubble handles date parsing correctly or provides strings.
dates = pd.to_datetime(filtered_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True)
return dates.dropna().max()
except Exception as e:
logging.error(f"Error processing operations log for last sync attempt date: {e}", exc_info=True)
return pd.NaT
def process_and_store_bubble_token(url_user_token, org_urn, token_state):
"""
Processes user token, fetches LinkedIn token, fetches existing Bubble data (posts, mentions, follower stats, operations log),
and determines if a sync is needed for each data type based on the operations log.
Updates token state and UI for the sync button.
"""
logging.info(f"Processing token with URL user token: '{url_user_token}', Org URN: '{org_urn}'")
new_state = token_state.copy() if token_state else {}
new_state.update({
"token": new_state.get("token"), # Preserve existing token if any
"client_id": new_state.get("client_id"),
"org_urn": org_urn,
"bubble_posts_df": new_state.get("bubble_posts_df", pd.DataFrame()),
"fetch_count_for_api": 0, # Will be determined based on log
"bubble_mentions_df": new_state.get("bubble_mentions_df", pd.DataFrame()),
"mentions_should_sync_now": False, # Will be determined based on log
"bubble_follower_stats_df": new_state.get("bubble_follower_stats_df", pd.DataFrame()),
"fs_should_sync_now": False, # Will be determined based on log
"bubble_operations_log_df": new_state.get("bubble_operations_log_df", pd.DataFrame()), # NEW
"url_user_token_temp_storage": url_user_token
})
button_update = gr.update(visible=False, interactive=False, value="πŸ”„ Sync LinkedIn Data")
client_id = os.environ.get(LINKEDIN_CLIENT_ID_ENV_VAR)
new_state["client_id"] = client_id if client_id else "ENV VAR MISSING"
if not client_id: logging.error(f"CRITICAL ERROR: '{LINKEDIN_CLIENT_ID_ENV_VAR}' environment variable not set.")
if url_user_token and "not found" not in url_user_token and "Could not access" not in url_user_token:
logging.info(f"Attempting to fetch LinkedIn token from Bubble with user token: {url_user_token}")
try:
parsed_linkedin_token = fetch_linkedin_token_from_bubble(url_user_token)
if isinstance(parsed_linkedin_token, dict) and "access_token" in parsed_linkedin_token:
new_state["token"] = parsed_linkedin_token
logging.info("βœ… LinkedIn Token successfully fetched from Bubble.")
else:
new_state["token"] = None
logging.warning(f"❌ Failed to fetch a valid LinkedIn token from Bubble. Response: {parsed_linkedin_token}")
except Exception as e:
new_state["token"] = None
logging.error(f"❌ Exception while fetching LinkedIn token from Bubble: {e}", exc_info=True)
else:
new_state["token"] = None
logging.info("No valid URL user token provided for LinkedIn token fetch, or an error was indicated.")
current_org_urn = new_state.get("org_urn")
if current_org_urn:
data_tables_to_fetch = {
"bubble_posts_df": BUBBLE_POSTS_TABLE_NAME,
"bubble_mentions_df": BUBBLE_MENTIONS_TABLE_NAME,
"bubble_follower_stats_df": BUBBLE_FOLLOWER_STATS_TABLE_NAME,
"bubble_operations_log_df": BUBBLE_OPERATIONS_LOG_TABLE_NAME, # NEW
"bubble_post_stats_df": BUBBLE_POST_STATS_TABLE_NAME
}
for state_key, table_name in data_tables_to_fetch.items():
logging.info(f"Attempting to fetch {table_name} from Bubble for org_urn: {current_org_urn}")
try:
fetched_df, error_message = fetch_linkedin_posts_data_from_bubble(current_org_urn, table_name)
new_state[state_key] = pd.DataFrame() if error_message or fetched_df is None else fetched_df
if error_message: logging.warning(f"Error fetching {table_name} from Bubble: {error_message}.")
# Ensure date column in operations log is parsed correctly if it's fetched as string
if state_key == "bubble_operations_log_df" and not new_state[state_key].empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in new_state[state_key].columns:
new_state[state_key][BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(new_state[state_key][BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True)
except Exception as e:
logging.error(f"❌ Error fetching {table_name} from Bubble: {e}.", exc_info=True)
new_state[state_key] = pd.DataFrame()
else:
logging.warning("Org URN not available in state. Cannot fetch data from Bubble.")
for key in ["bubble_posts_df", "bubble_mentions_df", "bubble_follower_stats_df", "bubble_operations_log_df"]:
new_state[key] = pd.DataFrame()
# --- Determine sync needs based on Operations Log ---
ops_log_df = new_state.get("bubble_operations_log_df", pd.DataFrame())
now_utc = pd.Timestamp.now(tz='UTC')
# 1. Posts Sync Logic
last_post_sync_attempt = get_last_sync_attempt_date(ops_log_df, LOG_SUBJECT_POSTS, current_org_urn)
if pd.isna(last_post_sync_attempt):
logging.info(f"ℹ️ No previous '{LOG_SUBJECT_POSTS}' sync attempt logged. Setting to fetch initial {DEFAULT_INITIAL_FETCH_COUNT} posts.")
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
else:
days_since_last_attempt = (now_utc.normalize() - last_post_sync_attempt.normalize()).days
if days_since_last_attempt >= 7:
# Dynamic fetch count based on how many weeks have passed, or a fixed update count
# For simplicity, using DEFAULT_POSTS_UPDATE_FETCH_COUNT
new_state['fetch_count_for_api'] = DEFAULT_POSTS_UPDATE_FETCH_COUNT
logging.info(f"Posts sync attempt is {days_since_last_attempt} days old. Setting fetch count to {new_state['fetch_count_for_api']}.")
else:
new_state['fetch_count_for_api'] = 0
logging.info(f"Posts sync attempt was recent ({days_since_last_attempt} days ago). No new posts fetch scheduled based on log.")
# 2. Mentions Sync Logic
last_mention_sync_attempt = get_last_sync_attempt_date(ops_log_df, LOG_SUBJECT_MENTIONS, current_org_urn)
if pd.isna(last_mention_sync_attempt):
new_state['mentions_should_sync_now'] = True
logging.info(f"Mentions sync needed: No previous '{LOG_SUBJECT_MENTIONS}' sync attempt logged.")
else:
days_since_last_attempt_mentions = (now_utc.normalize() - last_mention_sync_attempt.normalize()).days
if days_since_last_attempt_mentions >= 7:
new_state['mentions_should_sync_now'] = True
logging.info(f"Mentions sync needed: Last attempt was {days_since_last_attempt_mentions} days ago.")
else:
new_state['mentions_should_sync_now'] = False
logging.info(f"Mentions sync attempt was recent ({days_since_last_attempt_mentions} days ago). Sync not scheduled.")
# 3. Follower Stats Sync Logic
last_fs_sync_attempt = get_last_sync_attempt_date(ops_log_df, LOG_SUBJECT_FOLLOWER_STATS, current_org_urn)
fs_df_current = new_state.get("bubble_follower_stats_df", pd.DataFrame())
demographics_missing = False
if fs_df_current.empty:
demographics_missing = True # If entire table is empty, demographics are missing
logging.info("Follower stats: Main table is empty, considering demographics missing.")
elif FOLLOWER_STATS_TYPE_COLUMN not in fs_df_current.columns:
demographics_missing = True # If type column is missing, cannot check demographics
logging.info(f"Follower stats: Column '{FOLLOWER_STATS_TYPE_COLUMN}' is missing, considering demographics missing.")
else:
# Check if any rows exist that are NOT 'follower_gains_monthly'
if fs_df_current[fs_df_current[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].empty:
demographics_missing = True
logging.info("Follower stats: Demographic data (non-monthly types) is missing.")
time_based_need_fs = False
if pd.isna(last_fs_sync_attempt):
time_based_need_fs = True
logging.info(f"Follower stats sync needed: No previous '{LOG_SUBJECT_FOLLOWER_STATS}' sync attempt logged.")
else:
start_of_current_month = now_utc.normalize().replace(day=1)
# Ensure last_fs_sync_attempt is timezone-aware (should be by get_last_sync_attempt_date)
if last_fs_sync_attempt.tzinfo is None: # Should not happen if get_last_sync_attempt_date works
last_fs_sync_attempt = last_fs_sync_attempt.tz_localize('UTC')
if last_fs_sync_attempt < start_of_current_month:
time_based_need_fs = True
logging.info(f"Follower stats sync needed: Last attempt {last_fs_sync_attempt.date()} is before current month start {start_of_current_month.date()}.")
if time_based_need_fs or demographics_missing:
new_state['fs_should_sync_now'] = True
if demographics_missing and not time_based_need_fs:
logging.info("Follower stats sync triggered: Demographic data missing, even if last sync attempt is recent.")
elif time_based_need_fs:
logging.info("Follower stats sync triggered by schedule.")
else:
new_state['fs_should_sync_now'] = False
logging.info("Follower stats sync not currently required by schedule or data presence.")
# Update Sync Button based on determined needs
sync_actions = []
if new_state.get('fetch_count_for_api', 0) > 0:
sync_actions.append(f"Posts ({new_state['fetch_count_for_api']})")
if new_state.get('mentions_should_sync_now', False):
sync_actions.append("Mentions")
if new_state.get('fs_should_sync_now', False):
sync_actions.append("Follower Stats")
if new_state["token"] and sync_actions:
button_label = f"πŸ”„ Sync LinkedIn Data ({', '.join(sync_actions)})"
button_update = gr.update(value=button_label, visible=True, interactive=True)
elif new_state["token"]:
button_label = "βœ… Data Up-to-Date (based on sync log)"
button_update = gr.update(value=button_label, visible=True, interactive=False)
else: # No token
button_update = gr.update(visible=False, interactive=False, value="πŸ”„ Sync LinkedIn Data")
token_status_message = check_token_status(new_state)
logging.info(f"Token processing complete. Status: {token_status_message}. Button: {button_update.get('value', 'N/A') if button_update else 'N/A'}. Sync actions needed: {sync_actions}")
return token_status_message, new_state, button_update