Spaces:
Running
Running
# state_manager.py | |
""" | |
Manages the application state, including token processing, | |
initial data loading from Bubble, and determining sync requirements. | |
""" | |
import pandas as pd | |
import logging | |
import os | |
from datetime import datetime, timedelta, timezone # Added timezone to ensure it's available | |
import gradio as gr | |
# Assuming Bubble_API_Calls contains fetch_linkedin_token_from_bubble and fetch_linkedin_posts_data_from_bubble | |
from Bubble_API_Calls import ( | |
fetch_linkedin_token_from_bubble, | |
fetch_linkedin_posts_data_from_bubble | |
) | |
# Assuming config.py contains all necessary constants | |
from config import ( | |
DEFAULT_INITIAL_FETCH_COUNT, BUBBLE_POST_DATE_COLUMN_NAME, BUBBLE_POSTS_TABLE_NAME, | |
BUBBLE_MENTIONS_TABLE_NAME, BUBBLE_MENTIONS_DATE_COLUMN_NAME, | |
BUBBLE_FOLLOWER_STATS_TABLE_NAME, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN, | |
LINKEDIN_CLIENT_ID_ENV_VAR | |
) | |
def check_token_status(token_state): | |
"""Checks the status of the LinkedIn token.""" | |
return "β Token available" if token_state and token_state.get("token") else "β Token not available" | |
def process_and_store_bubble_token(url_user_token, org_urn, token_state): | |
""" | |
Processes user token, fetches LinkedIn token, fetches existing Bubble data (posts, mentions, follower stats), | |
and determines if an initial fetch or update is needed for each data type. | |
Updates token state and UI for the sync button. | |
""" | |
logging.info(f"Processing token with URL user token: '{url_user_token}', Org URN: '{org_urn}'") | |
# Initialize or update state safely | |
new_state = token_state.copy() if token_state else { | |
"token": None, "client_id": None, "org_urn": None, | |
"bubble_posts_df": pd.DataFrame(), "fetch_count_for_api": 0, | |
"bubble_mentions_df": pd.DataFrame(), | |
"bubble_follower_stats_df": pd.DataFrame(), | |
"url_user_token_temp_storage": None | |
} | |
new_state.update({ | |
"org_urn": org_urn, | |
"bubble_posts_df": new_state.get("bubble_posts_df", pd.DataFrame()), | |
"fetch_count_for_api": new_state.get("fetch_count_for_api", 0), | |
"bubble_mentions_df": new_state.get("bubble_mentions_df", pd.DataFrame()), | |
"bubble_follower_stats_df": new_state.get("bubble_follower_stats_df", pd.DataFrame()), | |
"url_user_token_temp_storage": url_user_token | |
}) | |
button_update = gr.update(visible=False, interactive=False, value="π Sync LinkedIn Data") # Default to hidden | |
client_id = os.environ.get(LINKEDIN_CLIENT_ID_ENV_VAR) | |
new_state["client_id"] = client_id if client_id else "ENV VAR MISSING" | |
if not client_id: logging.error(f"CRITICAL ERROR: '{LINKEDIN_CLIENT_ID_ENV_VAR}' environment variable not set.") | |
# Fetch LinkedIn Token from Bubble | |
if url_user_token and "not found" not in url_user_token and "Could not access" not in url_user_token: | |
logging.info(f"Attempting to fetch LinkedIn token from Bubble with user token: {url_user_token}") | |
try: | |
parsed_linkedin_token = fetch_linkedin_token_from_bubble(url_user_token) | |
if isinstance(parsed_linkedin_token, dict) and "access_token" in parsed_linkedin_token: | |
new_state["token"] = parsed_linkedin_token | |
logging.info("β LinkedIn Token successfully fetched from Bubble.") | |
else: | |
new_state["token"] = None | |
logging.warning(f"β Failed to fetch a valid LinkedIn token from Bubble. Response: {parsed_linkedin_token}") | |
except Exception as e: | |
new_state["token"] = None | |
logging.error(f"β Exception while fetching LinkedIn token from Bubble: {e}", exc_info=True) | |
else: | |
new_state["token"] = None | |
logging.info("No valid URL user token provided for LinkedIn token fetch, or an error was indicated.") | |
# Fetch existing data from Bubble if Org URN is available | |
current_org_urn = new_state.get("org_urn") | |
if current_org_urn: | |
# Fetch Posts from Bubble | |
logging.info(f"Attempting to fetch posts from Bubble for org_urn: {current_org_urn}") | |
try: | |
fetched_posts_df, error_message_posts = fetch_linkedin_posts_data_from_bubble(current_org_urn, BUBBLE_POSTS_TABLE_NAME) | |
new_state["bubble_posts_df"] = pd.DataFrame() if error_message_posts or fetched_posts_df is None else fetched_posts_df | |
if error_message_posts: logging.warning(f"Error fetching {BUBBLE_POSTS_TABLE_NAME} from Bubble: {error_message_posts}.") | |
except Exception as e: | |
logging.error(f"β Error fetching posts from Bubble: {e}.", exc_info=True) | |
new_state["bubble_posts_df"] = pd.DataFrame() | |
# Fetch Mentions from Bubble | |
logging.info(f"Attempting to fetch mentions from Bubble for org_urn: {current_org_urn}") | |
try: | |
fetched_mentions_df, error_message_mentions = fetch_linkedin_posts_data_from_bubble(current_org_urn, BUBBLE_MENTIONS_TABLE_NAME) | |
new_state["bubble_mentions_df"] = pd.DataFrame() if error_message_mentions or fetched_mentions_df is None else fetched_mentions_df | |
if error_message_mentions: logging.warning(f"Error fetching {BUBBLE_MENTIONS_TABLE_NAME} from Bubble: {error_message_mentions}.") | |
except Exception as e: | |
logging.error(f"β Error fetching mentions from Bubble: {e}.", exc_info=True) | |
new_state["bubble_mentions_df"] = pd.DataFrame() | |
# Fetch Follower Stats from Bubble | |
logging.info(f"Attempting to fetch follower stats from Bubble for org_urn: {current_org_urn}") | |
try: | |
fetched_follower_stats_df, error_message_fs = fetch_linkedin_posts_data_from_bubble(current_org_urn, BUBBLE_FOLLOWER_STATS_TABLE_NAME) | |
new_state["bubble_follower_stats_df"] = pd.DataFrame() if error_message_fs or fetched_follower_stats_df is None else fetched_follower_stats_df | |
if error_message_fs: logging.warning(f"Error fetching {BUBBLE_FOLLOWER_STATS_TABLE_NAME} from Bubble: {error_message_fs}.") | |
except Exception as e: | |
logging.error(f"β Error fetching follower stats from Bubble: {e}.", exc_info=True) | |
new_state["bubble_follower_stats_df"] = pd.DataFrame() | |
else: | |
logging.warning("Org URN not available in state. Cannot fetch data from Bubble.") | |
new_state["bubble_posts_df"] = pd.DataFrame() | |
new_state["bubble_mentions_df"] = pd.DataFrame() | |
new_state["bubble_follower_stats_df"] = pd.DataFrame() | |
# Determine fetch count for Posts API | |
if new_state["bubble_posts_df"].empty: | |
logging.info(f"βΉοΈ No posts in Bubble. Setting to fetch initial {DEFAULT_INITIAL_FETCH_COUNT} posts.") | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
else: | |
try: | |
df_posts_check = new_state["bubble_posts_df"].copy() | |
if BUBBLE_POST_DATE_COLUMN_NAME not in df_posts_check.columns or df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME].isnull().all(): | |
logging.warning(f"Date column '{BUBBLE_POST_DATE_COLUMN_NAME}' for posts missing or all null values. Triggering initial fetch.") | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
else: | |
df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME] = pd.to_datetime(df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME], errors='coerce', utc=True) | |
last_post_date_utc = df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME].dropna().max() | |
if pd.isna(last_post_date_utc): | |
logging.warning("No valid post dates found after conversion. Triggering initial fetch.") | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
else: | |
days_diff = (pd.Timestamp('now', tz='UTC').normalize() - last_post_date_utc.normalize()).days | |
if days_diff >= 7: | |
new_state['fetch_count_for_api'] = max(1, days_diff // 7) * 10 | |
logging.info(f"Posts data is {days_diff} days old. Setting fetch count to {new_state['fetch_count_for_api']}.") | |
else: | |
new_state['fetch_count_for_api'] = 0 | |
logging.info("Posts data is recent. No new posts fetch needed based on date.") | |
except Exception as e: | |
logging.error(f"Error processing post dates: {e}. Defaulting to initial fetch for posts.", exc_info=True) | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
# Determine if Mentions need sync | |
mentions_need_sync = False | |
if new_state["bubble_mentions_df"].empty: | |
mentions_need_sync = True | |
logging.info("Mentions need sync: Bubble mentions DF is empty.") | |
else: | |
if BUBBLE_MENTIONS_DATE_COLUMN_NAME not in new_state["bubble_mentions_df"].columns or \ | |
new_state["bubble_mentions_df"][BUBBLE_MENTIONS_DATE_COLUMN_NAME].isnull().all(): | |
mentions_need_sync = True | |
logging.info(f"Mentions need sync: Date column '{BUBBLE_MENTIONS_DATE_COLUMN_NAME}' missing or all null values.") | |
else: | |
df_mentions_check = new_state["bubble_mentions_df"].copy() | |
df_mentions_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME] = pd.to_datetime(df_mentions_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME], errors='coerce', utc=True) | |
last_mention_date_utc = df_mentions_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME].dropna().max() | |
if pd.isna(last_mention_date_utc) or \ | |
(pd.Timestamp('now', tz='UTC').normalize() - last_mention_date_utc.normalize()).days >= 7: | |
mentions_need_sync = True | |
logging.info(f"Mentions need sync: Last mention date {last_mention_date_utc} is old or invalid.") | |
else: | |
logging.info(f"Mentions up-to-date. Last mention: {last_mention_date_utc}") | |
# Determine if Follower Stats need sync | |
follower_stats_need_sync = False | |
fs_df = new_state.get("bubble_follower_stats_df", pd.DataFrame()) | |
if fs_df.empty: | |
follower_stats_need_sync = True | |
logging.info("Follower stats need sync: Bubble follower stats DF is empty.") | |
else: | |
monthly_gains_df = fs_df[fs_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].copy() | |
if monthly_gains_df.empty: | |
follower_stats_need_sync = True | |
logging.info("Follower stats need sync: No monthly gains data in Bubble.") | |
elif FOLLOWER_STATS_CATEGORY_COLUMN not in monthly_gains_df.columns: | |
follower_stats_need_sync = True | |
logging.info(f"Follower stats need sync: Date column '{FOLLOWER_STATS_CATEGORY_COLUMN}' missing in monthly gains.") | |
else: | |
monthly_gains_df.loc[:, FOLLOWER_STATS_CATEGORY_COLUMN] = pd.to_datetime(monthly_gains_df[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.normalize() | |
last_gain_date = monthly_gains_df[FOLLOWER_STATS_CATEGORY_COLUMN].dropna().max() | |
if pd.isna(last_gain_date): | |
follower_stats_need_sync = True | |
logging.info("Follower stats need sync: No valid dates in monthly gains after conversion.") | |
else: | |
if last_gain_date.tzinfo is None or last_gain_date.tzinfo.utcoffset(last_gain_date) is None: | |
last_gain_date = last_gain_date.tz_localize('UTC') # Localize naive to UTC | |
else: | |
last_gain_date = last_gain_date.tz_convert('UTC') # Convert aware to UTC | |
start_of_current_month = pd.Timestamp('now', tz='UTC').normalize().replace(day=1) | |
if last_gain_date < start_of_current_month: | |
follower_stats_need_sync = True | |
logging.info(f"Follower stats need sync: Last gain date {last_gain_date} is before current month start {start_of_current_month}.") | |
else: | |
logging.info(f"Follower monthly gains up-to-date. Last gain recorded on: {last_gain_date}") | |
if fs_df[fs_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].empty: | |
follower_stats_need_sync = True | |
logging.info("Follower stats need sync: Demographic data (non-monthly types) missing.") | |
# Update Sync Button based on token and needed actions | |
sync_actions = [] | |
if new_state['fetch_count_for_api'] > 0: | |
sync_actions.append(f"{new_state['fetch_count_for_api']} Posts") | |
if mentions_need_sync: # This flag is set based on data freshness | |
sync_actions.append("Mentions") | |
if follower_stats_need_sync: # This flag is set based on data freshness | |
sync_actions.append("Follower Stats") | |
if new_state["token"] and sync_actions: | |
button_label = f"π Sync LinkedIn Data ({', '.join(sync_actions)})" | |
button_update = gr.update(value=button_label, visible=True, interactive=True) | |
elif new_state["token"]: | |
button_label = "β Data Up-to-Date" | |
button_update = gr.update(value=button_label, visible=True, interactive=False) | |
else: | |
button_update = gr.update(visible=False, interactive=False) | |
token_status_message = check_token_status(new_state) | |
logging.info(f"Token processing complete. Status: {token_status_message}. Button: {button_update}. Sync actions: {sync_actions}") | |
return token_status_message, new_state, button_update | |