Spaces:
Running
Running
# state_manager.py | |
""" | |
Manages the application state, including token processing, | |
initial data loading from Bubble, and determining sync requirements | |
based on the operations log. | |
""" | |
import pandas as pd | |
import logging | |
import os | |
from datetime import timezone # Python's datetime, not to be confused with pandas' | |
import gradio as gr | |
# Assuming Bubble_API_Calls contains fetch_linkedin_token_from_bubble and fetch_linkedin_posts_data_from_bubble | |
from Bubble_API_Calls import ( | |
fetch_linkedin_token_from_bubble, | |
fetch_linkedin_posts_data_from_bubble # This is generic, used for all tables | |
) | |
# Assuming config.py contains all necessary constants | |
from config import ( | |
DEFAULT_INITIAL_FETCH_COUNT, DEFAULT_POSTS_UPDATE_FETCH_COUNT, | |
BUBBLE_POST_DATE_COLUMN_NAME, BUBBLE_POSTS_TABLE_NAME, | |
BUBBLE_POST_STATS_TABLE_NAME, | |
BUBBLE_MENTIONS_TABLE_NAME, BUBBLE_MENTIONS_DATE_COLUMN_NAME, | |
BUBBLE_FOLLOWER_STATS_TABLE_NAME, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN, | |
LINKEDIN_CLIENT_ID_ENV_VAR, | |
BUBBLE_OPERATIONS_LOG_TABLE_NAME, BUBBLE_OPERATIONS_LOG_DATE_COLUMN, | |
BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN, | |
LOG_SUBJECT_POSTS, LOG_SUBJECT_MENTIONS, LOG_SUBJECT_FOLLOWER_STATS | |
) | |
def check_token_status(token_state): | |
"""Checks the status of the LinkedIn token.""" | |
return "β Token available" if token_state and token_state.get("token") else "β Token not available" | |
def get_last_sync_attempt_date(operations_log_df, subject, org_urn): | |
""" | |
Retrieves the last sync attempt date for a given subject and organization URN | |
from the operations log DataFrame. | |
Args: | |
operations_log_df (pd.DataFrame): DataFrame containing operations log data. | |
Expected columns defined in config: | |
BUBBLE_OPERATIONS_LOG_DATE_COLUMN, | |
BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, | |
BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN. | |
subject (str): The subject of the sync operation (e.g., "post", "mention"). | |
org_urn (str): The organization URN. | |
Returns: | |
pd.Timestamp: The last sync attempt date (UTC), or pd.NaT if no relevant log entry is found. | |
""" | |
if operations_log_df.empty or not org_urn: | |
return pd.NaT | |
# Ensure required columns exist | |
required_cols = [BUBBLE_OPERATIONS_LOG_DATE_COLUMN, BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN] | |
if not all(col in operations_log_df.columns for col in required_cols): | |
logging.warning(f"Operations log DF is missing one or more required columns: {required_cols}") | |
return pd.NaT | |
try: | |
# Filter for the specific subject and organization URN | |
# Ensure data types are consistent for comparison, especially org_urn | |
filtered_df = operations_log_df[ | |
(operations_log_df[BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN].astype(str) == str(subject)) & | |
(operations_log_df[BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN].astype(str) == str(org_urn)) | |
] | |
if filtered_df.empty: | |
return pd.NaT | |
# Convert date column to datetime objects (UTC) and find the maximum (latest) | |
# The dates should ideally be stored in UTC or converted upon fetch. | |
# Assuming fetch_linkedin_posts_data_from_bubble handles date parsing correctly or provides strings. | |
dates = pd.to_datetime(filtered_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True) | |
return dates.dropna().max() | |
except Exception as e: | |
logging.error(f"Error processing operations log for last sync attempt date: {e}", exc_info=True) | |
return pd.NaT | |
def process_and_store_bubble_token(url_user_token, org_urn, token_state): | |
""" | |
Processes user token, fetches LinkedIn token, fetches existing Bubble data (posts, mentions, follower stats, operations log), | |
and determines if a sync is needed for each data type based on the operations log. | |
Updates token state and UI for the sync button. | |
""" | |
logging.info(f"Processing token with URL user token: '{url_user_token}', Org URN: '{org_urn}'") | |
new_state = token_state.copy() if token_state else {} | |
new_state.update({ | |
"token": new_state.get("token"), # Preserve existing token if any | |
"client_id": new_state.get("client_id"), | |
"org_urn": org_urn, | |
"bubble_posts_df": new_state.get("bubble_posts_df", pd.DataFrame()), | |
"fetch_count_for_api": 0, # Will be determined based on log | |
"bubble_mentions_df": new_state.get("bubble_mentions_df", pd.DataFrame()), | |
"mentions_should_sync_now": False, # Will be determined based on log | |
"bubble_follower_stats_df": new_state.get("bubble_follower_stats_df", pd.DataFrame()), | |
"fs_should_sync_now": False, # Will be determined based on log | |
"bubble_operations_log_df": new_state.get("bubble_operations_log_df", pd.DataFrame()), # NEW | |
"url_user_token_temp_storage": url_user_token | |
}) | |
button_update = gr.update(visible=False, interactive=False, value="π Sync LinkedIn Data") | |
client_id = os.environ.get(LINKEDIN_CLIENT_ID_ENV_VAR) | |
new_state["client_id"] = client_id if client_id else "ENV VAR MISSING" | |
if not client_id: logging.error(f"CRITICAL ERROR: '{LINKEDIN_CLIENT_ID_ENV_VAR}' environment variable not set.") | |
if url_user_token and "not found" not in url_user_token and "Could not access" not in url_user_token: | |
logging.info(f"Attempting to fetch LinkedIn token from Bubble with user token: {url_user_token}") | |
try: | |
parsed_linkedin_token = fetch_linkedin_token_from_bubble(url_user_token) | |
if isinstance(parsed_linkedin_token, dict) and "access_token" in parsed_linkedin_token: | |
new_state["token"] = parsed_linkedin_token | |
logging.info("β LinkedIn Token successfully fetched from Bubble.") | |
else: | |
new_state["token"] = None | |
logging.warning(f"β Failed to fetch a valid LinkedIn token from Bubble. Response: {parsed_linkedin_token}") | |
except Exception as e: | |
new_state["token"] = None | |
logging.error(f"β Exception while fetching LinkedIn token from Bubble: {e}", exc_info=True) | |
else: | |
new_state["token"] = None | |
logging.info("No valid URL user token provided for LinkedIn token fetch, or an error was indicated.") | |
current_org_urn = new_state.get("org_urn") | |
if current_org_urn: | |
data_tables_to_fetch = { | |
"bubble_posts_df": BUBBLE_POSTS_TABLE_NAME, | |
"bubble_mentions_df": BUBBLE_MENTIONS_TABLE_NAME, | |
"bubble_follower_stats_df": BUBBLE_FOLLOWER_STATS_TABLE_NAME, | |
"bubble_operations_log_df": BUBBLE_OPERATIONS_LOG_TABLE_NAME, # NEW | |
"bubble_post_stats_df": BUBBLE_POST_STATS_TABLE_NAME | |
} | |
for state_key, table_name in data_tables_to_fetch.items(): | |
logging.info(f"Attempting to fetch {table_name} from Bubble for org_urn: {current_org_urn}") | |
try: | |
fetched_df, error_message = fetch_linkedin_posts_data_from_bubble(current_org_urn, table_name) | |
new_state[state_key] = pd.DataFrame() if error_message or fetched_df is None else fetched_df | |
if error_message: logging.warning(f"Error fetching {table_name} from Bubble: {error_message}.") | |
# Ensure date column in operations log is parsed correctly if it's fetched as string | |
if state_key == "bubble_operations_log_df" and not new_state[state_key].empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in new_state[state_key].columns: | |
new_state[state_key][BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(new_state[state_key][BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True) | |
except Exception as e: | |
logging.error(f"β Error fetching {table_name} from Bubble: {e}.", exc_info=True) | |
new_state[state_key] = pd.DataFrame() | |
else: | |
logging.warning("Org URN not available in state. Cannot fetch data from Bubble.") | |
for key in ["bubble_posts_df", "bubble_mentions_df", "bubble_follower_stats_df", "bubble_operations_log_df"]: | |
new_state[key] = pd.DataFrame() | |
# --- Determine sync needs based on Operations Log --- | |
ops_log_df = new_state.get("bubble_operations_log_df", pd.DataFrame()) | |
now_utc = pd.Timestamp.now(tz='UTC') | |
# 1. Posts Sync Logic | |
last_post_sync_attempt = get_last_sync_attempt_date(ops_log_df, LOG_SUBJECT_POSTS, current_org_urn) | |
if pd.isna(last_post_sync_attempt): | |
logging.info(f"βΉοΈ No previous '{LOG_SUBJECT_POSTS}' sync attempt logged. Setting to fetch initial {DEFAULT_INITIAL_FETCH_COUNT} posts.") | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
else: | |
days_since_last_attempt = (now_utc.normalize() - last_post_sync_attempt.normalize()).days | |
if days_since_last_attempt >= 7: | |
# Dynamic fetch count based on how many weeks have passed, or a fixed update count | |
# For simplicity, using DEFAULT_POSTS_UPDATE_FETCH_COUNT | |
new_state['fetch_count_for_api'] = DEFAULT_POSTS_UPDATE_FETCH_COUNT | |
logging.info(f"Posts sync attempt is {days_since_last_attempt} days old. Setting fetch count to {new_state['fetch_count_for_api']}.") | |
else: | |
new_state['fetch_count_for_api'] = 0 | |
logging.info(f"Posts sync attempt was recent ({days_since_last_attempt} days ago). No new posts fetch scheduled based on log.") | |
# 2. Mentions Sync Logic | |
last_mention_sync_attempt = get_last_sync_attempt_date(ops_log_df, LOG_SUBJECT_MENTIONS, current_org_urn) | |
if pd.isna(last_mention_sync_attempt): | |
new_state['mentions_should_sync_now'] = True | |
logging.info(f"Mentions sync needed: No previous '{LOG_SUBJECT_MENTIONS}' sync attempt logged.") | |
else: | |
days_since_last_attempt_mentions = (now_utc.normalize() - last_mention_sync_attempt.normalize()).days | |
if days_since_last_attempt_mentions >= 7: | |
new_state['mentions_should_sync_now'] = True | |
logging.info(f"Mentions sync needed: Last attempt was {days_since_last_attempt_mentions} days ago.") | |
else: | |
new_state['mentions_should_sync_now'] = False | |
logging.info(f"Mentions sync attempt was recent ({days_since_last_attempt_mentions} days ago). Sync not scheduled.") | |
# 3. Follower Stats Sync Logic | |
last_fs_sync_attempt = get_last_sync_attempt_date(ops_log_df, LOG_SUBJECT_FOLLOWER_STATS, current_org_urn) | |
fs_df_current = new_state.get("bubble_follower_stats_df", pd.DataFrame()) | |
demographics_missing = False | |
if fs_df_current.empty: | |
demographics_missing = True # If entire table is empty, demographics are missing | |
logging.info("Follower stats: Main table is empty, considering demographics missing.") | |
elif FOLLOWER_STATS_TYPE_COLUMN not in fs_df_current.columns: | |
demographics_missing = True # If type column is missing, cannot check demographics | |
logging.info(f"Follower stats: Column '{FOLLOWER_STATS_TYPE_COLUMN}' is missing, considering demographics missing.") | |
else: | |
# Check if any rows exist that are NOT 'follower_gains_monthly' | |
if fs_df_current[fs_df_current[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].empty: | |
demographics_missing = True | |
logging.info("Follower stats: Demographic data (non-monthly types) is missing.") | |
time_based_need_fs = False | |
if pd.isna(last_fs_sync_attempt): | |
time_based_need_fs = True | |
logging.info(f"Follower stats sync needed: No previous '{LOG_SUBJECT_FOLLOWER_STATS}' sync attempt logged.") | |
else: | |
start_of_current_month = now_utc.normalize().replace(day=1) | |
# Ensure last_fs_sync_attempt is timezone-aware (should be by get_last_sync_attempt_date) | |
if last_fs_sync_attempt.tzinfo is None: # Should not happen if get_last_sync_attempt_date works | |
last_fs_sync_attempt = last_fs_sync_attempt.tz_localize('UTC') | |
if last_fs_sync_attempt < start_of_current_month: | |
time_based_need_fs = True | |
logging.info(f"Follower stats sync needed: Last attempt {last_fs_sync_attempt.date()} is before current month start {start_of_current_month.date()}.") | |
if time_based_need_fs or demographics_missing: | |
new_state['fs_should_sync_now'] = True | |
if demographics_missing and not time_based_need_fs: | |
logging.info("Follower stats sync triggered: Demographic data missing, even if last sync attempt is recent.") | |
elif time_based_need_fs: | |
logging.info("Follower stats sync triggered by schedule.") | |
else: | |
new_state['fs_should_sync_now'] = False | |
logging.info("Follower stats sync not currently required by schedule or data presence.") | |
# Update Sync Button based on determined needs | |
sync_actions = [] | |
if new_state.get('fetch_count_for_api', 0) > 0: | |
sync_actions.append(f"Posts ({new_state['fetch_count_for_api']})") | |
if new_state.get('mentions_should_sync_now', False): | |
sync_actions.append("Mentions") | |
if new_state.get('fs_should_sync_now', False): | |
sync_actions.append("Follower Stats") | |
if new_state["token"] and sync_actions: | |
button_label = f"π Sync LinkedIn Data ({', '.join(sync_actions)})" | |
button_update = gr.update(value=button_label, visible=True, interactive=True) | |
elif new_state["token"]: | |
button_label = "β Data Up-to-Date (based on sync log)" | |
button_update = gr.update(value=button_label, visible=True, interactive=False) | |
else: # No token | |
button_update = gr.update(visible=False, interactive=False, value="π Sync LinkedIn Data") | |
token_status_message = check_token_status(new_state) | |
logging.info(f"Token processing complete. Status: {token_status_message}. Button: {button_update.get('value', 'N/A') if button_update else 'N/A'}. Sync actions needed: {sync_actions}") | |
return token_status_message, new_state, button_update | |