LinkedinMonitor / state_manager.py
GuglielmoTor's picture
Create state_manager.py
fed4e5b verified
raw
history blame
13.4 kB
# state_manager.py
"""
Manages the application state, including token processing,
initial data loading from Bubble, and determining sync requirements.
"""
import pandas as pd
import logging
import os
from datetime import datetime, timedelta, timezone # Added timezone to ensure it's available
import gradio as gr
# Assuming Bubble_API_Calls contains fetch_linkedin_token_from_bubble and fetch_linkedin_posts_data_from_bubble
from Bubble_API_Calls import (
fetch_linkedin_token_from_bubble,
fetch_linkedin_posts_data_from_bubble
)
# Assuming config.py contains all necessary constants
from config import (
DEFAULT_INITIAL_FETCH_COUNT, BUBBLE_POST_DATE_COLUMN_NAME, BUBBLE_POSTS_TABLE_NAME,
BUBBLE_MENTIONS_TABLE_NAME, BUBBLE_MENTIONS_DATE_COLUMN_NAME,
BUBBLE_FOLLOWER_STATS_TABLE_NAME, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN,
LINKEDIN_CLIENT_ID_ENV_VAR
)
def check_token_status(token_state):
"""Checks the status of the LinkedIn token."""
return "βœ… Token available" if token_state and token_state.get("token") else "❌ Token not available"
def process_and_store_bubble_token(url_user_token, org_urn, token_state):
"""
Processes user token, fetches LinkedIn token, fetches existing Bubble data (posts, mentions, follower stats),
and determines if an initial fetch or update is needed for each data type.
Updates token state and UI for the sync button.
"""
logging.info(f"Processing token with URL user token: '{url_user_token}', Org URN: '{org_urn}'")
# Initialize or update state safely
new_state = token_state.copy() if token_state else {
"token": None, "client_id": None, "org_urn": None,
"bubble_posts_df": pd.DataFrame(), "fetch_count_for_api": 0,
"bubble_mentions_df": pd.DataFrame(),
"bubble_follower_stats_df": pd.DataFrame(),
"url_user_token_temp_storage": None
}
new_state.update({
"org_urn": org_urn,
"bubble_posts_df": new_state.get("bubble_posts_df", pd.DataFrame()),
"fetch_count_for_api": new_state.get("fetch_count_for_api", 0),
"bubble_mentions_df": new_state.get("bubble_mentions_df", pd.DataFrame()),
"bubble_follower_stats_df": new_state.get("bubble_follower_stats_df", pd.DataFrame()),
"url_user_token_temp_storage": url_user_token
})
button_update = gr.update(visible=False, interactive=False, value="πŸ”„ Sync LinkedIn Data") # Default to hidden
client_id = os.environ.get(LINKEDIN_CLIENT_ID_ENV_VAR)
new_state["client_id"] = client_id if client_id else "ENV VAR MISSING"
if not client_id: logging.error(f"CRITICAL ERROR: '{LINKEDIN_CLIENT_ID_ENV_VAR}' environment variable not set.")
# Fetch LinkedIn Token from Bubble
if url_user_token and "not found" not in url_user_token and "Could not access" not in url_user_token:
logging.info(f"Attempting to fetch LinkedIn token from Bubble with user token: {url_user_token}")
try:
parsed_linkedin_token = fetch_linkedin_token_from_bubble(url_user_token)
if isinstance(parsed_linkedin_token, dict) and "access_token" in parsed_linkedin_token:
new_state["token"] = parsed_linkedin_token
logging.info("βœ… LinkedIn Token successfully fetched from Bubble.")
else:
new_state["token"] = None
logging.warning(f"❌ Failed to fetch a valid LinkedIn token from Bubble. Response: {parsed_linkedin_token}")
except Exception as e:
new_state["token"] = None
logging.error(f"❌ Exception while fetching LinkedIn token from Bubble: {e}", exc_info=True)
else:
new_state["token"] = None
logging.info("No valid URL user token provided for LinkedIn token fetch, or an error was indicated.")
# Fetch existing data from Bubble if Org URN is available
current_org_urn = new_state.get("org_urn")
if current_org_urn:
# Fetch Posts from Bubble
logging.info(f"Attempting to fetch posts from Bubble for org_urn: {current_org_urn}")
try:
fetched_posts_df, error_message_posts = fetch_linkedin_posts_data_from_bubble(current_org_urn, BUBBLE_POSTS_TABLE_NAME)
new_state["bubble_posts_df"] = pd.DataFrame() if error_message_posts or fetched_posts_df is None else fetched_posts_df
if error_message_posts: logging.warning(f"Error fetching {BUBBLE_POSTS_TABLE_NAME} from Bubble: {error_message_posts}.")
except Exception as e:
logging.error(f"❌ Error fetching posts from Bubble: {e}.", exc_info=True)
new_state["bubble_posts_df"] = pd.DataFrame()
# Fetch Mentions from Bubble
logging.info(f"Attempting to fetch mentions from Bubble for org_urn: {current_org_urn}")
try:
fetched_mentions_df, error_message_mentions = fetch_linkedin_posts_data_from_bubble(current_org_urn, BUBBLE_MENTIONS_TABLE_NAME)
new_state["bubble_mentions_df"] = pd.DataFrame() if error_message_mentions or fetched_mentions_df is None else fetched_mentions_df
if error_message_mentions: logging.warning(f"Error fetching {BUBBLE_MENTIONS_TABLE_NAME} from Bubble: {error_message_mentions}.")
except Exception as e:
logging.error(f"❌ Error fetching mentions from Bubble: {e}.", exc_info=True)
new_state["bubble_mentions_df"] = pd.DataFrame()
# Fetch Follower Stats from Bubble
logging.info(f"Attempting to fetch follower stats from Bubble for org_urn: {current_org_urn}")
try:
fetched_follower_stats_df, error_message_fs = fetch_linkedin_posts_data_from_bubble(current_org_urn, BUBBLE_FOLLOWER_STATS_TABLE_NAME)
new_state["bubble_follower_stats_df"] = pd.DataFrame() if error_message_fs or fetched_follower_stats_df is None else fetched_follower_stats_df
if error_message_fs: logging.warning(f"Error fetching {BUBBLE_FOLLOWER_STATS_TABLE_NAME} from Bubble: {error_message_fs}.")
except Exception as e:
logging.error(f"❌ Error fetching follower stats from Bubble: {e}.", exc_info=True)
new_state["bubble_follower_stats_df"] = pd.DataFrame()
else:
logging.warning("Org URN not available in state. Cannot fetch data from Bubble.")
new_state["bubble_posts_df"] = pd.DataFrame()
new_state["bubble_mentions_df"] = pd.DataFrame()
new_state["bubble_follower_stats_df"] = pd.DataFrame()
# Determine fetch count for Posts API
if new_state["bubble_posts_df"].empty:
logging.info(f"ℹ️ No posts in Bubble. Setting to fetch initial {DEFAULT_INITIAL_FETCH_COUNT} posts.")
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
else:
try:
df_posts_check = new_state["bubble_posts_df"].copy()
if BUBBLE_POST_DATE_COLUMN_NAME not in df_posts_check.columns or df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME].isnull().all():
logging.warning(f"Date column '{BUBBLE_POST_DATE_COLUMN_NAME}' for posts missing or all null values. Triggering initial fetch.")
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
else:
df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME] = pd.to_datetime(df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME], errors='coerce', utc=True)
last_post_date_utc = df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME].dropna().max()
if pd.isna(last_post_date_utc):
logging.warning("No valid post dates found after conversion. Triggering initial fetch.")
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
else:
days_diff = (pd.Timestamp('now', tz='UTC').normalize() - last_post_date_utc.normalize()).days
if days_diff >= 7:
new_state['fetch_count_for_api'] = max(1, days_diff // 7) * 10
logging.info(f"Posts data is {days_diff} days old. Setting fetch count to {new_state['fetch_count_for_api']}.")
else:
new_state['fetch_count_for_api'] = 0
logging.info("Posts data is recent. No new posts fetch needed based on date.")
except Exception as e:
logging.error(f"Error processing post dates: {e}. Defaulting to initial fetch for posts.", exc_info=True)
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
# Determine if Mentions need sync
mentions_need_sync = False
if new_state["bubble_mentions_df"].empty:
mentions_need_sync = True
logging.info("Mentions need sync: Bubble mentions DF is empty.")
else:
if BUBBLE_MENTIONS_DATE_COLUMN_NAME not in new_state["bubble_mentions_df"].columns or \
new_state["bubble_mentions_df"][BUBBLE_MENTIONS_DATE_COLUMN_NAME].isnull().all():
mentions_need_sync = True
logging.info(f"Mentions need sync: Date column '{BUBBLE_MENTIONS_DATE_COLUMN_NAME}' missing or all null values.")
else:
df_mentions_check = new_state["bubble_mentions_df"].copy()
df_mentions_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME] = pd.to_datetime(df_mentions_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME], errors='coerce', utc=True)
last_mention_date_utc = df_mentions_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME].dropna().max()
if pd.isna(last_mention_date_utc) or \
(pd.Timestamp('now', tz='UTC').normalize() - last_mention_date_utc.normalize()).days >= 7:
mentions_need_sync = True
logging.info(f"Mentions need sync: Last mention date {last_mention_date_utc} is old or invalid.")
else:
logging.info(f"Mentions up-to-date. Last mention: {last_mention_date_utc}")
# Determine if Follower Stats need sync
follower_stats_need_sync = False
fs_df = new_state.get("bubble_follower_stats_df", pd.DataFrame())
if fs_df.empty:
follower_stats_need_sync = True
logging.info("Follower stats need sync: Bubble follower stats DF is empty.")
else:
monthly_gains_df = fs_df[fs_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].copy()
if monthly_gains_df.empty:
follower_stats_need_sync = True
logging.info("Follower stats need sync: No monthly gains data in Bubble.")
elif FOLLOWER_STATS_CATEGORY_COLUMN not in monthly_gains_df.columns:
follower_stats_need_sync = True
logging.info(f"Follower stats need sync: Date column '{FOLLOWER_STATS_CATEGORY_COLUMN}' missing in monthly gains.")
else:
monthly_gains_df.loc[:, FOLLOWER_STATS_CATEGORY_COLUMN] = pd.to_datetime(monthly_gains_df[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.normalize()
last_gain_date = monthly_gains_df[FOLLOWER_STATS_CATEGORY_COLUMN].dropna().max()
if pd.isna(last_gain_date):
follower_stats_need_sync = True
logging.info("Follower stats need sync: No valid dates in monthly gains after conversion.")
else:
if last_gain_date.tzinfo is None or last_gain_date.tzinfo.utcoffset(last_gain_date) is None:
last_gain_date = last_gain_date.tz_localize('UTC') # Localize naive to UTC
else:
last_gain_date = last_gain_date.tz_convert('UTC') # Convert aware to UTC
start_of_current_month = pd.Timestamp('now', tz='UTC').normalize().replace(day=1)
if last_gain_date < start_of_current_month:
follower_stats_need_sync = True
logging.info(f"Follower stats need sync: Last gain date {last_gain_date} is before current month start {start_of_current_month}.")
else:
logging.info(f"Follower monthly gains up-to-date. Last gain recorded on: {last_gain_date}")
if fs_df[fs_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].empty:
follower_stats_need_sync = True
logging.info("Follower stats need sync: Demographic data (non-monthly types) missing.")
# Update Sync Button based on token and needed actions
sync_actions = []
if new_state['fetch_count_for_api'] > 0:
sync_actions.append(f"{new_state['fetch_count_for_api']} Posts")
if mentions_need_sync: # This flag is set based on data freshness
sync_actions.append("Mentions")
if follower_stats_need_sync: # This flag is set based on data freshness
sync_actions.append("Follower Stats")
if new_state["token"] and sync_actions:
button_label = f"πŸ”„ Sync LinkedIn Data ({', '.join(sync_actions)})"
button_update = gr.update(value=button_label, visible=True, interactive=True)
elif new_state["token"]:
button_label = "βœ… Data Up-to-Date"
button_update = gr.update(value=button_label, visible=True, interactive=False)
else:
button_update = gr.update(visible=False, interactive=False)
token_status_message = check_token_status(new_state)
logging.info(f"Token processing complete. Status: {token_status_message}. Button: {button_update}. Sync actions: {sync_actions}")
return token_status_message, new_state, button_update