Spaces:
Running
Running
# -- coding: utf-8 -- | |
import gradio as gr | |
import json | |
import os | |
import logging | |
import html | |
import pandas as pd | |
from datetime import datetime, timedelta, timezone # Added timezone | |
# Import functions from your custom modules | |
from analytics_fetch_and_rendering import fetch_and_render_analytics | |
from gradio_utils import get_url_user_token | |
from Bubble_API_Calls import ( | |
fetch_linkedin_token_from_bubble, | |
bulk_upload_to_bubble, | |
fetch_linkedin_posts_data_from_bubble # This will be used for posts, mentions, and follower stats | |
) | |
from Linkedin_Data_API_Calls import ( | |
fetch_linkedin_posts_core, | |
fetch_comments, | |
analyze_sentiment, # For post comments | |
compile_detailed_posts, | |
prepare_data_for_bubble, # For posts, stats, comments | |
fetch_linkedin_mentions_core, | |
analyze_mentions_sentiment, # For individual mentions | |
compile_detailed_mentions, # Compiles to user-specified format | |
prepare_mentions_for_bubble # Prepares user-specified format for Bubble | |
) | |
# Import follower stats function | |
from linkedin_follower_stats import get_linkedin_follower_stats | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# --- Global Constants --- | |
DEFAULT_INITIAL_FETCH_COUNT = 10 | |
LINKEDIN_POST_URN_KEY = 'id' | |
BUBBLE_POST_URN_COLUMN_NAME = 'id' # Assuming this is the unique post ID in Bubble | |
BUBBLE_POST_DATE_COLUMN_NAME = 'published_at' # Assuming this is the post publication date in Bubble | |
# Constants for Mentions | |
BUBBLE_MENTIONS_TABLE_NAME = "LI_mentions" | |
BUBBLE_MENTIONS_ID_COLUMN_NAME = "id" # Assuming this is the unique mention ID in Bubble | |
BUBBLE_MENTIONS_DATE_COLUMN_NAME = "date" # Assuming this is the mention date in Bubble | |
DEFAULT_MENTIONS_INITIAL_FETCH_COUNT = 20 | |
DEFAULT_MENTIONS_UPDATE_FETCH_COUNT = 10 | |
# Constants for Follower Stats | |
BUBBLE_FOLLOWER_STATS_TABLE_NAME = "LI_follower_stats" | |
FOLLOWER_STATS_CATEGORY_COLUMN = "category_name" # For demographics: name (e.g., "Engineering"), for monthly gains: date string 'YYYY-MM-DD' | |
FOLLOWER_STATS_TYPE_COLUMN = "follower_count_type" # e.g., "follower_seniority", "follower_gains_monthly" | |
FOLLOWER_STATS_ORG_URN_COLUMN = "organization_urn" # URN of the organization | |
FOLLOWER_STATS_ORGANIC_COLUMN = "follower_count_organic" | |
FOLLOWER_STATS_PAID_COLUMN = "follower_count_paid" | |
def check_token_status(token_state): | |
"""Checks the status of the LinkedIn token.""" | |
return "β Token available" if token_state and token_state.get("token") else "β Token not available" | |
def process_and_store_bubble_token(url_user_token, org_urn, token_state): | |
""" | |
Processes user token, fetches LinkedIn token, fetches existing Bubble data (posts, mentions, follower stats), | |
and determines if an initial fetch or update is needed for each data type. | |
Updates token state and UI for the sync button. | |
""" | |
logging.info(f"Processing token with URL user token: '{url_user_token}', Org URN: '{org_urn}'") | |
# Initialize or update state safely | |
new_state = token_state.copy() if token_state else { | |
"token": None, "client_id": None, "org_urn": None, | |
"bubble_posts_df": pd.DataFrame(), "fetch_count_for_api": 0, | |
"bubble_mentions_df": pd.DataFrame(), | |
"bubble_follower_stats_df": pd.DataFrame(), | |
"url_user_token_temp_storage": None | |
} | |
new_state.update({ | |
"org_urn": org_urn, | |
"bubble_posts_df": new_state.get("bubble_posts_df", pd.DataFrame()), # Ensure DF exists | |
"fetch_count_for_api": new_state.get("fetch_count_for_api", 0), | |
"bubble_mentions_df": new_state.get("bubble_mentions_df", pd.DataFrame()), # Ensure DF exists | |
"bubble_follower_stats_df": new_state.get("bubble_follower_stats_df", pd.DataFrame()), # Ensure DF exists | |
"url_user_token_temp_storage": url_user_token | |
}) | |
button_update = gr.update(visible=False, interactive=False, value="π Sync LinkedIn Data") # Default to hidden | |
client_id = os.environ.get("Linkedin_client_id") | |
new_state["client_id"] = client_id if client_id else "ENV VAR MISSING" | |
if not client_id: logging.error("CRITICAL ERROR: 'Linkedin_client_id' environment variable not set.") | |
# Fetch LinkedIn Token from Bubble | |
if url_user_token and "not found" not in url_user_token and "Could not access" not in url_user_token: | |
logging.info(f"Attempting to fetch LinkedIn token from Bubble with user token: {url_user_token}") | |
try: | |
parsed_linkedin_token = fetch_linkedin_token_from_bubble(url_user_token) | |
if isinstance(parsed_linkedin_token, dict) and "access_token" in parsed_linkedin_token: | |
new_state["token"] = parsed_linkedin_token | |
logging.info("β LinkedIn Token successfully fetched from Bubble.") | |
else: | |
new_state["token"] = None | |
logging.warning(f"β Failed to fetch a valid LinkedIn token from Bubble. Response: {parsed_linkedin_token}") | |
except Exception as e: | |
new_state["token"] = None | |
logging.error(f"β Exception while fetching LinkedIn token from Bubble: {e}", exc_info=True) | |
else: | |
new_state["token"] = None | |
logging.info("No valid URL user token provided for LinkedIn token fetch, or an error was indicated.") | |
# Fetch existing data from Bubble if Org URN is available | |
current_org_urn = new_state.get("org_urn") | |
if current_org_urn: | |
# Fetch Posts from Bubble | |
logging.info(f"Attempting to fetch posts from Bubble for org_urn: {current_org_urn}") | |
try: | |
fetched_posts_df, error_message_posts = fetch_linkedin_posts_data_from_bubble(current_org_urn, "LI_posts") # Assuming "LI_posts" is the table name | |
new_state["bubble_posts_df"] = pd.DataFrame() if error_message_posts or fetched_posts_df is None else fetched_posts_df | |
if error_message_posts: logging.warning(f"Error fetching LI_posts from Bubble: {error_message_posts}.") | |
except Exception as e: | |
logging.error(f"β Error fetching posts from Bubble: {e}.", exc_info=True) | |
new_state["bubble_posts_df"] = pd.DataFrame() | |
# Fetch Mentions from Bubble | |
logging.info(f"Attempting to fetch mentions from Bubble for org_urn: {current_org_urn}") | |
try: | |
fetched_mentions_df, error_message_mentions = fetch_linkedin_posts_data_from_bubble(current_org_urn, BUBBLE_MENTIONS_TABLE_NAME) | |
new_state["bubble_mentions_df"] = pd.DataFrame() if error_message_mentions or fetched_mentions_df is None else fetched_mentions_df | |
if error_message_mentions: logging.warning(f"Error fetching {BUBBLE_MENTIONS_TABLE_NAME} from Bubble: {error_message_mentions}.") | |
except Exception as e: | |
logging.error(f"β Error fetching mentions from Bubble: {e}.", exc_info=True) | |
new_state["bubble_mentions_df"] = pd.DataFrame() | |
# Fetch Follower Stats from Bubble | |
logging.info(f"Attempting to fetch follower stats from Bubble for org_urn: {current_org_urn}") | |
try: | |
fetched_follower_stats_df, error_message_fs = fetch_linkedin_posts_data_from_bubble(current_org_urn, BUBBLE_FOLLOWER_STATS_TABLE_NAME) | |
new_state["bubble_follower_stats_df"] = pd.DataFrame() if error_message_fs or fetched_follower_stats_df is None else fetched_follower_stats_df | |
if error_message_fs: logging.warning(f"Error fetching {BUBBLE_FOLLOWER_STATS_TABLE_NAME} from Bubble: {error_message_fs}.") | |
except Exception as e: | |
logging.error(f"β Error fetching follower stats from Bubble: {e}.", exc_info=True) | |
new_state["bubble_follower_stats_df"] = pd.DataFrame() | |
else: | |
logging.warning("Org URN not available in state. Cannot fetch data from Bubble.") | |
new_state["bubble_posts_df"] = pd.DataFrame() | |
new_state["bubble_mentions_df"] = pd.DataFrame() | |
new_state["bubble_follower_stats_df"] = pd.DataFrame() | |
# Determine fetch count for Posts API | |
if new_state["bubble_posts_df"].empty: | |
logging.info(f"βΉοΈ No posts in Bubble. Setting to fetch initial {DEFAULT_INITIAL_FETCH_COUNT} posts.") | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
else: | |
try: | |
df_posts_check = new_state["bubble_posts_df"].copy() # Use .copy() | |
if BUBBLE_POST_DATE_COLUMN_NAME not in df_posts_check.columns or df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME].isnull().all(): | |
logging.warning(f"Date column '{BUBBLE_POST_DATE_COLUMN_NAME}' for posts missing or all null values. Triggering initial fetch.") | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
else: | |
df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME] = pd.to_datetime(df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME], errors='coerce', utc=True) | |
last_post_date_utc = df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME].dropna().max() | |
if pd.isna(last_post_date_utc): # No valid dates found after conversion | |
logging.warning("No valid post dates found after conversion. Triggering initial fetch.") | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
else: | |
days_diff = (pd.Timestamp('now', tz='UTC').normalize() - last_post_date_utc.normalize()).days | |
if days_diff >= 7: | |
# Fetch more if data is older, e.g., 10 posts per week of difference | |
new_state['fetch_count_for_api'] = max(1, days_diff // 7) * 10 | |
logging.info(f"Posts data is {days_diff} days old. Setting fetch count to {new_state['fetch_count_for_api']}.") | |
else: | |
new_state['fetch_count_for_api'] = 0 # Data is recent | |
logging.info("Posts data is recent. No new posts fetch needed based on date.") | |
except Exception as e: | |
logging.error(f"Error processing post dates: {e}. Defaulting to initial fetch for posts.", exc_info=True) | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
# Determine if Mentions need sync | |
mentions_need_sync = False | |
if new_state["bubble_mentions_df"].empty: | |
mentions_need_sync = True | |
logging.info("Mentions need sync: Bubble mentions DF is empty.") | |
else: | |
# Check if the crucial date column exists and has any non-null values | |
if BUBBLE_MENTIONS_DATE_COLUMN_NAME not in new_state["bubble_mentions_df"].columns or \ | |
new_state["bubble_mentions_df"][BUBBLE_MENTIONS_DATE_COLUMN_NAME].isnull().all(): | |
mentions_need_sync = True | |
logging.info(f"Mentions need sync: Date column '{BUBBLE_MENTIONS_DATE_COLUMN_NAME}' missing or all null values.") | |
else: | |
df_mentions_check = new_state["bubble_mentions_df"].copy() # Use .copy() | |
df_mentions_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME] = pd.to_datetime(df_mentions_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME], errors='coerce', utc=True) | |
last_mention_date_utc = df_mentions_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME].dropna().max() | |
# Sync if no valid last mention date or if it's 7 days or older | |
if pd.isna(last_mention_date_utc) or \ | |
(pd.Timestamp('now', tz='UTC').normalize() - last_mention_date_utc.normalize()).days >= 7: | |
mentions_need_sync = True | |
logging.info(f"Mentions need sync: Last mention date {last_mention_date_utc} is old or invalid.") | |
else: | |
logging.info(f"Mentions up-to-date. Last mention: {last_mention_date_utc}") | |
# Determine if Follower Stats need sync | |
follower_stats_need_sync = False | |
fs_df = new_state.get("bubble_follower_stats_df", pd.DataFrame()) | |
if fs_df.empty: | |
follower_stats_need_sync = True | |
logging.info("Follower stats need sync: Bubble follower stats DF is empty.") | |
else: | |
# Check monthly gains data | |
monthly_gains_df = fs_df[fs_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].copy() # Use .copy() | |
if monthly_gains_df.empty: | |
follower_stats_need_sync = True | |
logging.info("Follower stats need sync: No monthly gains data in Bubble.") | |
elif FOLLOWER_STATS_CATEGORY_COLUMN not in monthly_gains_df.columns: | |
follower_stats_need_sync = True | |
logging.info(f"Follower stats need sync: Date column '{FOLLOWER_STATS_CATEGORY_COLUMN}' missing in monthly gains.") | |
else: | |
# Ensure date conversion does not raise SettingWithCopyWarning by using .loc | |
monthly_gains_df.loc[:, FOLLOWER_STATS_CATEGORY_COLUMN] = pd.to_datetime(monthly_gains_df[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.normalize() | |
last_gain_date = monthly_gains_df[FOLLOWER_STATS_CATEGORY_COLUMN].dropna().max() | |
if pd.isna(last_gain_date): # No valid dates after conversion | |
follower_stats_need_sync = True | |
logging.info("Follower stats need sync: No valid dates in monthly gains after conversion.") | |
else: | |
# Sync if the last recorded gain is for a month *before* the start of the current month. | |
# This ensures we attempt to fetch the previous month's data if it's not there. | |
start_of_current_month = pd.Timestamp('now', tz='UTC').normalize().replace(day=1) | |
if last_gain_date < start_of_current_month: | |
follower_stats_need_sync = True | |
logging.info(f"Follower stats need sync: Last gain date {last_gain_date} is before current month start {start_of_current_month}.") | |
else: | |
logging.info(f"Follower monthly gains up-to-date. Last gain recorded on: {last_gain_date}") | |
# Also trigger sync if demographic data (non-monthly gains) is missing entirely | |
# This is a basic check; more granular checks could be added for specific demographic types if needed. | |
if fs_df[fs_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].empty: | |
follower_stats_need_sync = True | |
logging.info("Follower stats need sync: Demographic data (non-monthly types) missing.") | |
# Update Sync Button based on token and needed actions | |
sync_actions = [] | |
if new_state['fetch_count_for_api'] > 0: | |
sync_actions.append(f"{new_state['fetch_count_for_api']} Posts") | |
if mentions_need_sync: | |
sync_actions.append("Mentions") | |
if follower_stats_need_sync: | |
sync_actions.append("Follower Stats") | |
if new_state["token"] and sync_actions: # Token present and actions needed | |
button_label = f"π Sync LinkedIn Data ({', '.join(sync_actions)})" | |
button_update = gr.update(value=button_label, visible=True, interactive=True) | |
elif new_state["token"]: # Token present but nothing to sync | |
button_label = "β Data Up-to-Date" | |
button_update = gr.update(value=button_label, visible=True, interactive=False) # Visible but not interactive | |
else: # No token | |
button_update = gr.update(visible=False, interactive=False) # Keep hidden | |
token_status_message = check_token_status(new_state) | |
logging.info(f"Token processing complete. Status: {token_status_message}. Button: {button_update}. Sync actions: {sync_actions}") | |
return token_status_message, new_state, button_update | |
def sync_linkedin_mentions(token_state): | |
"""Fetches new LinkedIn mentions and uploads them to Bubble.""" | |
logging.info("Starting LinkedIn mentions sync process.") | |
if not token_state or not token_state.get("token"): | |
logging.error("Mentions sync: Access denied. No LinkedIn token.") | |
return "Mentions: No token. ", token_state | |
client_id = token_state.get("client_id") | |
token_dict = token_state.get("token") | |
org_urn = token_state.get('org_urn') | |
bubble_mentions_df = token_state.get("bubble_mentions_df", pd.DataFrame()).copy() # Work with a copy | |
if not org_urn or not client_id or client_id == "ENV VAR MISSING": | |
logging.error("Mentions sync: Configuration error (Org URN or Client ID missing).") | |
return "Mentions: Config error. ", token_state | |
# Determine if mentions sync is needed and how many to fetch | |
fetch_count_for_mentions_api = 0 | |
mentions_sync_is_needed_now = False | |
if bubble_mentions_df.empty: | |
mentions_sync_is_needed_now = True | |
fetch_count_for_mentions_api = DEFAULT_MENTIONS_INITIAL_FETCH_COUNT | |
logging.info("Mentions sync needed: Bubble DF empty. Fetching initial count.") | |
else: | |
if BUBBLE_MENTIONS_DATE_COLUMN_NAME not in bubble_mentions_df.columns or \ | |
bubble_mentions_df[BUBBLE_MENTIONS_DATE_COLUMN_NAME].isnull().all(): | |
mentions_sync_is_needed_now = True | |
fetch_count_for_mentions_api = DEFAULT_MENTIONS_INITIAL_FETCH_COUNT | |
logging.info(f"Mentions sync needed: Date column '{BUBBLE_MENTIONS_DATE_COLUMN_NAME}' missing or all null. Fetching initial count.") | |
else: | |
mentions_df_copy = bubble_mentions_df.copy() # Redundant copy, already copied above | |
mentions_df_copy[BUBBLE_MENTIONS_DATE_COLUMN_NAME] = pd.to_datetime(mentions_df_copy[BUBBLE_MENTIONS_DATE_COLUMN_NAME], errors='coerce', utc=True) | |
last_mention_date_utc = mentions_df_copy[BUBBLE_MENTIONS_DATE_COLUMN_NAME].dropna().max() | |
if pd.isna(last_mention_date_utc) or \ | |
(pd.Timestamp('now', tz='UTC').normalize() - last_mention_date_utc.normalize()).days >= 7: | |
mentions_sync_is_needed_now = True | |
fetch_count_for_mentions_api = DEFAULT_MENTIONS_UPDATE_FETCH_COUNT # Fetch update count if data is old | |
logging.info(f"Mentions sync needed: Last mention date {last_mention_date_utc} is old or invalid. Fetching update count.") | |
if not mentions_sync_is_needed_now: | |
logging.info("Mentions data is fresh based on current check. No API fetch needed for mentions.") | |
return "Mentions: Up-to-date. ", token_state | |
logging.info(f"Mentions sync proceeding. Fetch count: {fetch_count_for_mentions_api}") | |
try: | |
processed_raw_mentions = fetch_linkedin_mentions_core(client_id, token_dict, org_urn, count=fetch_count_for_mentions_api) | |
if not processed_raw_mentions: | |
logging.info("Mentions sync: No new mentions found via API.") | |
return "Mentions: None found via API. ", token_state | |
existing_mention_ids = set() | |
if not bubble_mentions_df.empty and BUBBLE_MENTIONS_ID_COLUMN_NAME in bubble_mentions_df.columns: | |
# Ensure IDs are strings for reliable comparison, handling potential NaNs | |
existing_mention_ids = set(bubble_mentions_df[BUBBLE_MENTIONS_ID_COLUMN_NAME].dropna().astype(str)) | |
sentiments_map = analyze_mentions_sentiment(processed_raw_mentions) # Assumes this returns a map {mention_id: sentiment_data} | |
all_compiled_mentions = compile_detailed_mentions(processed_raw_mentions, sentiments_map) # Assumes this adds sentiment to each mention dict | |
# Filter out mentions already in Bubble | |
new_compiled_mentions_to_upload = [ | |
m for m in all_compiled_mentions if str(m.get("id")) not in existing_mention_ids | |
] | |
if not new_compiled_mentions_to_upload: | |
logging.info("Mentions sync: All fetched mentions are already in Bubble.") | |
return "Mentions: All fetched already in Bubble. ", token_state | |
bubble_ready_mentions = prepare_mentions_for_bubble(new_compiled_mentions_to_upload) # Prepare for Bubble format | |
if bubble_ready_mentions: | |
bulk_upload_to_bubble(bubble_ready_mentions, BUBBLE_MENTIONS_TABLE_NAME) | |
logging.info(f"Successfully uploaded {len(bubble_ready_mentions)} new mentions to Bubble.") | |
# Update in-memory DataFrame | |
updated_mentions_df = pd.concat([bubble_mentions_df, pd.DataFrame(bubble_ready_mentions)], ignore_index=True) | |
# Drop duplicates based on ID, keeping the latest (which would be the newly added ones if IDs overlapped, though logic above should prevent this) | |
token_state["bubble_mentions_df"] = updated_mentions_df.drop_duplicates(subset=[BUBBLE_MENTIONS_ID_COLUMN_NAME], keep='last') | |
return f"Mentions: Synced {len(bubble_ready_mentions)} new. ", token_state | |
else: | |
logging.info("Mentions sync: No new mentions were prepared for Bubble upload (possibly all filtered or empty after prep).") | |
return "Mentions: No new ones to upload. ", token_state | |
except ValueError as ve: # Catch specific errors if your API calls raise them | |
logging.error(f"ValueError during mentions sync: {ve}", exc_info=True) | |
return f"Mentions Error: {html.escape(str(ve))}. ", token_state | |
except Exception as e: | |
logging.exception("Unexpected error in sync_linkedin_mentions.") # Logs full traceback | |
return f"Mentions: Unexpected error ({type(e).__name__}). ", token_state | |
def sync_linkedin_follower_stats(token_state): | |
"""Fetches new LinkedIn follower statistics and uploads them to Bubble.""" | |
logging.info("Starting LinkedIn follower stats sync process.") | |
if not token_state or not token_state.get("token"): | |
logging.error("Follower Stats sync: Access denied. No LinkedIn token.") | |
return "Follower Stats: No token. ", token_state | |
client_id = token_state.get("client_id") | |
token_dict = token_state.get("token") | |
org_urn = token_state.get('org_urn') | |
if not org_urn or not client_id or client_id == "ENV VAR MISSING": | |
logging.error("Follower Stats sync: Configuration error (Org URN or Client ID missing).") | |
return "Follower Stats: Config error. ", token_state | |
# Determine if follower stats sync is needed (logic copied and adapted from process_and_store_bubble_token) | |
follower_stats_sync_is_needed_now = False | |
fs_df_current = token_state.get("bubble_follower_stats_df", pd.DataFrame()).copy() # Work with a copy | |
if fs_df_current.empty: | |
follower_stats_sync_is_needed_now = True | |
logging.info("Follower stats sync needed: Bubble DF is empty.") | |
else: | |
monthly_gains_df = fs_df_current[fs_df_current[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].copy() | |
if monthly_gains_df.empty or FOLLOWER_STATS_CATEGORY_COLUMN not in monthly_gains_df.columns: | |
follower_stats_sync_is_needed_now = True | |
logging.info("Follower stats sync needed: Monthly gains data missing or date column absent.") | |
else: | |
monthly_gains_df.loc[:, FOLLOWER_STATS_CATEGORY_COLUMN] = pd.to_datetime(monthly_gains_df[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.normalize() | |
last_gain_date = monthly_gains_df[FOLLOWER_STATS_CATEGORY_COLUMN].dropna().max() | |
start_of_current_month = pd.Timestamp('now', tz='UTC').normalize().replace(day=1) | |
if pd.isna(last_gain_date) or last_gain_date < start_of_current_month: | |
follower_stats_sync_is_needed_now = True | |
logging.info(f"Follower stats sync needed: Last gain date {last_gain_date} is old or invalid.") | |
if fs_df_current[fs_df_current[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].empty: | |
follower_stats_sync_is_needed_now = True | |
logging.info("Follower stats sync needed: Demographic data (non-monthly) is missing.") | |
if not follower_stats_sync_is_needed_now: | |
logging.info("Follower stats data is fresh based on current check. No API fetch needed.") | |
return "Follower Stats: Data up-to-date. ", token_state | |
logging.info(f"Follower stats sync proceeding for org_urn: {org_urn}") | |
try: | |
# This function should return a list of dicts, each dict representing a stat entry | |
api_follower_stats = get_linkedin_follower_stats(client_id, token_dict, org_urn) | |
if not api_follower_stats: # api_follower_stats could be None or empty list | |
logging.info(f"Follower Stats sync: No stats found via API for org {org_urn}.") | |
return "Follower Stats: None found via API. ", token_state | |
bubble_follower_stats_df_orig = token_state.get("bubble_follower_stats_df", pd.DataFrame()).copy() | |
new_stats_to_upload = [] | |
# --- Process Monthly Gains --- | |
api_monthly_gains = [s for s in api_follower_stats if s.get(FOLLOWER_STATS_TYPE_COLUMN) == 'follower_gains_monthly'] | |
existing_monthly_gain_dates = set() | |
if not bubble_follower_stats_df_orig.empty: | |
bubble_monthly_df = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'] | |
if FOLLOWER_STATS_CATEGORY_COLUMN in bubble_monthly_df.columns: | |
# Ensure dates are strings for set comparison, handle potential NaNs from to_datetime if any | |
existing_monthly_gain_dates = set(bubble_monthly_df[FOLLOWER_STATS_CATEGORY_COLUMN].astype(str).unique()) | |
for gain_stat in api_monthly_gains: | |
# category_name for monthly gains is 'YYYY-MM-DD' string from linkedin_follower_stats | |
if str(gain_stat.get(FOLLOWER_STATS_CATEGORY_COLUMN)) not in existing_monthly_gain_dates: | |
new_stats_to_upload.append(gain_stat) | |
# --- Process Demographics (add if new or different counts) --- | |
api_demographics = [s for s in api_follower_stats if s.get(FOLLOWER_STATS_TYPE_COLUMN) != 'follower_gains_monthly'] | |
# Create a map of existing demographics for quick lookup and comparison | |
# Key: (org_urn, type, category_name) -> (organic_count, paid_count) | |
existing_demographics_map = {} | |
if not bubble_follower_stats_df_orig.empty: | |
bubble_demographics_df = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'] | |
if not bubble_demographics_df.empty and \ | |
all(col in bubble_demographics_df.columns for col in [ | |
FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, | |
FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, | |
FOLLOWER_STATS_PAID_COLUMN | |
]): | |
for _, row in bubble_demographics_df.iterrows(): | |
key = ( | |
str(row[FOLLOWER_STATS_ORG_URN_COLUMN]), | |
str(row[FOLLOWER_STATS_TYPE_COLUMN]), | |
str(row[FOLLOWER_STATS_CATEGORY_COLUMN]) | |
) | |
existing_demographics_map[key] = ( | |
row[FOLLOWER_STATS_ORGANIC_COLUMN], | |
row[FOLLOWER_STATS_PAID_COLUMN] | |
) | |
for demo_stat in api_demographics: | |
key = ( | |
str(demo_stat.get(FOLLOWER_STATS_ORG_URN_COLUMN)), | |
str(demo_stat.get(FOLLOWER_STATS_TYPE_COLUMN)), | |
str(demo_stat.get(FOLLOWER_STATS_CATEGORY_COLUMN)) | |
) | |
api_counts = ( | |
demo_stat.get(FOLLOWER_STATS_ORGANIC_COLUMN, 0), | |
demo_stat.get(FOLLOWER_STATS_PAID_COLUMN, 0) | |
) | |
if key not in existing_demographics_map or existing_demographics_map[key] != api_counts: | |
new_stats_to_upload.append(demo_stat) | |
if not new_stats_to_upload: | |
logging.info(f"Follower Stats sync: Data for org {org_urn} is up-to-date or no changes found.") | |
return "Follower Stats: Data up-to-date or no changes. ", token_state | |
bulk_upload_to_bubble(new_stats_to_upload, BUBBLE_FOLLOWER_STATS_TABLE_NAME) | |
logging.info(f"Successfully uploaded {len(new_stats_to_upload)} follower stat entries to Bubble for org {org_urn}.") | |
# Update in-memory DataFrame: Concatenate old and new, then drop duplicates strategically | |
temp_df = pd.concat([bubble_follower_stats_df_orig, pd.DataFrame(new_stats_to_upload)], ignore_index=True) | |
# For monthly gains, unique by org, date (category_name) | |
monthly_part = temp_df[temp_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].drop_duplicates( | |
subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN], | |
keep='last' # Keep the newest entry if dates somehow collide (shouldn't with current logic) | |
) | |
# For demographics, unique by org, type, and category_name | |
demographics_part = temp_df[temp_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].drop_duplicates( | |
subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN], | |
keep='last' # This ensures that if a demographic was "updated", the new version is kept | |
) | |
token_state["bubble_follower_stats_df"] = pd.concat([monthly_part, demographics_part], ignore_index=True) | |
return f"Follower Stats: Synced {len(new_stats_to_upload)} entries. ", token_state | |
except ValueError as ve: # Catch specific errors if your API calls raise them | |
logging.error(f"ValueError during follower stats sync for {org_urn}: {ve}", exc_info=True) | |
return f"Follower Stats Error: {html.escape(str(ve))}. ", token_state | |
except Exception as e: | |
logging.exception(f"Unexpected error in sync_linkedin_follower_stats for {org_urn}.") # Logs full traceback | |
return f"Follower Stats: Unexpected error ({type(e).__name__}). ", token_state | |
def sync_all_linkedin_data(token_state): | |
"""Orchestrates the syncing of all LinkedIn data types (Posts, Mentions, Follower Stats).""" | |
logging.info("Starting sync_all_linkedin_data process.") | |
if not token_state or not token_state.get("token"): | |
logging.error("Sync All: Access denied. LinkedIn token not available.") | |
return "<p style='color:red; text-align:center;'>β Access denied. LinkedIn token not available.</p>", token_state | |
client_id = token_state.get("client_id") | |
token_dict = token_state.get("token") | |
org_urn = token_state.get('org_urn') | |
fetch_count_for_posts_api = token_state.get('fetch_count_for_api', 0) | |
# Operate on copies to avoid modifying original DFs in state directly until the end | |
bubble_posts_df_orig = token_state.get("bubble_posts_df", pd.DataFrame()).copy() | |
posts_sync_message = "" | |
mentions_sync_message = "" | |
follower_stats_sync_message = "" | |
if not org_urn: | |
logging.error("Sync All: Org URN missing in token_state.") | |
return "<p style='color:red;'>β Config error: Org URN missing.</p>", token_state | |
if not client_id or client_id == "ENV VAR MISSING": | |
logging.error("Sync All: Client ID missing or not set.") | |
return "<p style='color:red;'>β Config error: Client ID missing.</p>", token_state | |
# --- Sync Posts --- | |
if fetch_count_for_posts_api == 0: | |
posts_sync_message = "Posts: Already up-to-date. " | |
logging.info("Posts sync: Skipped as fetch_count_for_posts_api is 0.") | |
else: | |
logging.info(f"Posts sync: Starting fetch for {fetch_count_for_posts_api} posts.") | |
try: | |
# fetch_linkedin_posts_core is expected to return: (processed_raw_posts, stats_map, errors_list) | |
processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_for_posts_api) | |
if not processed_raw_posts: | |
posts_sync_message = "Posts: None found via API. " | |
logging.info("Posts sync: No raw posts returned from API.") | |
else: | |
existing_post_urns = set() | |
if not bubble_posts_df_orig.empty and BUBBLE_POST_URN_COLUMN_NAME in bubble_posts_df_orig.columns: | |
existing_post_urns = set(bubble_posts_df_orig[BUBBLE_POST_URN_COLUMN_NAME].dropna().astype(str)) | |
# Filter out posts already in Bubble | |
new_raw_posts = [p for p in processed_raw_posts if str(p.get(LINKEDIN_POST_URN_KEY)) not in existing_post_urns] | |
if not new_raw_posts: | |
posts_sync_message = "Posts: All fetched already in Bubble. " | |
logging.info("Posts sync: All fetched posts were already found in Bubble.") | |
else: | |
logging.info(f"Posts sync: Processing {len(new_raw_posts)} new raw posts.") | |
post_urns_to_process = [p[LINKEDIN_POST_URN_KEY] for p in new_raw_posts if p.get(LINKEDIN_POST_URN_KEY)] | |
all_comments_data = fetch_comments(client_id, token_dict, post_urns_to_process, stats_map) | |
sentiments_per_post = analyze_sentiment(all_comments_data) # Assumes analysis of comments | |
detailed_new_posts = compile_detailed_posts(new_raw_posts, stats_map, sentiments_per_post) # Compiles with stats and sentiment | |
# prepare_data_for_bubble should return tuple: (posts_for_bubble, post_stats_for_bubble, post_comments_for_bubble) | |
li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_new_posts, all_comments_data) | |
if li_posts: # If there are posts to upload | |
bulk_upload_to_bubble(li_posts, "LI_posts") | |
# Update in-memory DataFrame for posts | |
updated_posts_df = pd.concat([bubble_posts_df_orig, pd.DataFrame(li_posts)], ignore_index=True) | |
token_state["bubble_posts_df"] = updated_posts_df.drop_duplicates(subset=[BUBBLE_POST_URN_COLUMN_NAME], keep='last') | |
logging.info(f"Posts sync: Uploaded {len(li_posts)} new posts to Bubble.") | |
if li_post_stats: | |
bulk_upload_to_bubble(li_post_stats, "LI_post_stats") | |
logging.info(f"Posts sync: Uploaded {len(li_post_stats)} post_stats entries.") | |
# Note: Consider how/if to update a local stats_df in token_state if you maintain one. | |
if li_post_comments: | |
bulk_upload_to_bubble(li_post_comments, "LI_post_comments") | |
logging.info(f"Posts sync: Uploaded {len(li_post_comments)} post_comments entries.") | |
# Note: Consider how/if to update a local comments_df in token_state. | |
posts_sync_message = f"Posts: Synced {len(li_posts)} new. " | |
else: | |
posts_sync_message = "Posts: No new ones to upload after processing. " | |
logging.info("Posts sync: No new posts were prepared for Bubble upload.") | |
except ValueError as ve: # Catch specific errors from your API calls | |
posts_sync_message = f"Posts Error: {html.escape(str(ve))}. " | |
logging.error(f"Posts sync: ValueError: {ve}", exc_info=True) | |
except Exception as e: | |
logging.exception("Posts sync: Unexpected error during processing.") # Logs full traceback | |
posts_sync_message = f"Posts: Unexpected error ({type(e).__name__}). " | |
# --- Sync Mentions --- | |
# The sync_linkedin_mentions function updates token_state["bubble_mentions_df"] internally | |
mentions_sync_message, token_state = sync_linkedin_mentions(token_state) | |
# --- Sync Follower Stats --- | |
# The sync_linkedin_follower_stats function updates token_state["bubble_follower_stats_df"] internally | |
follower_stats_sync_message, token_state = sync_linkedin_follower_stats(token_state) | |
logging.info(f"Sync process complete. Messages: Posts: [{posts_sync_message.strip()}], Mentions: [{mentions_sync_message.strip()}], Follower Stats: [{follower_stats_sync_message.strip()}]") | |
final_message = f"<p style='color:green; text-align:center;'>β Sync Attempted. {posts_sync_message} {mentions_sync_message} {follower_stats_sync_message}</p>" | |
return final_message, token_state | |
def display_main_dashboard(token_state): | |
"""Generates HTML for the main dashboard display using data from token_state.""" | |
if not token_state or not token_state.get("token"): | |
logging.warning("Dashboard display: Access denied. No token available.") | |
return "β Access denied. No token available for dashboard." | |
html_parts = ["<div style='padding:10px;'><h3>Dashboard Overview</h3>"] | |
# Display Recent Posts | |
posts_df = token_state.get("bubble_posts_df", pd.DataFrame()) | |
html_parts.append(f"<h4>Recent Posts ({len(posts_df)} in Bubble):</h4>") | |
if not posts_df.empty: | |
# Define columns to show, ensuring they exist in the DataFrame | |
cols_to_show_posts = [col for col in [BUBBLE_POST_DATE_COLUMN_NAME, 'text', 'sentiment', 'summary_text', 'li_eb_label'] if col in posts_df.columns] | |
if not cols_to_show_posts: | |
html_parts.append("<p>No relevant post columns found to display.</p>") | |
else: | |
display_df_posts = posts_df.copy() | |
if BUBBLE_POST_DATE_COLUMN_NAME in display_df_posts.columns: | |
try: | |
# Format date and sort | |
display_df_posts[BUBBLE_POST_DATE_COLUMN_NAME] = pd.to_datetime(display_df_posts[BUBBLE_POST_DATE_COLUMN_NAME], errors='coerce').dt.strftime('%Y-%m-%d %H:%M') | |
display_df_posts = display_df_posts.sort_values(by=BUBBLE_POST_DATE_COLUMN_NAME, ascending=False) | |
except Exception as e: | |
logging.error(f"Error formatting post dates for display: {e}") | |
html_parts.append("<p>Error formatting post dates.</p>") | |
# Use escape=False if 'text' or 'summary_text' can contain HTML; otherwise, True is safer. | |
# Assuming 'text' might have HTML from LinkedIn, using escape=False. Be cautious with this. | |
html_parts.append(display_df_posts[cols_to_show_posts].head().to_html(escape=False, index=False, classes="table table-striped table-sm")) | |
else: | |
html_parts.append("<p>No posts loaded from Bubble.</p>") | |
html_parts.append("<hr/>") | |
# Display Recent Mentions | |
mentions_df = token_state.get("bubble_mentions_df", pd.DataFrame()) | |
html_parts.append(f"<h4>Recent Mentions ({len(mentions_df)} in Bubble):</h4>") | |
if not mentions_df.empty: | |
cols_to_show_mentions = [col for col in [BUBBLE_MENTIONS_DATE_COLUMN_NAME, "mention_text", "sentiment_label"] if col in mentions_df.columns] | |
if not cols_to_show_mentions: | |
html_parts.append("<p>No relevant mention columns found to display.</p>") | |
else: | |
display_df_mentions = mentions_df.copy() | |
if BUBBLE_MENTIONS_DATE_COLUMN_NAME in display_df_mentions.columns: | |
try: | |
display_df_mentions[BUBBLE_MENTIONS_DATE_COLUMN_NAME] = pd.to_datetime(display_df_mentions[BUBBLE_MENTIONS_DATE_COLUMN_NAME], errors='coerce').dt.strftime('%Y-%m-%d %H:%M') | |
display_df_mentions = display_df_mentions.sort_values(by=BUBBLE_MENTIONS_DATE_COLUMN_NAME, ascending=False) | |
except Exception as e: | |
logging.error(f"Error formatting mention dates for display: {e}") | |
html_parts.append("<p>Error formatting mention dates.</p>") | |
# Assuming "mention_text" can have HTML. | |
html_parts.append(display_df_mentions[cols_to_show_mentions].head().to_html(escape=False, index=False, classes="table table-striped table-sm")) | |
else: | |
html_parts.append("<p>No mentions loaded from Bubble.</p>") | |
html_parts.append("<hr/>") | |
# Display Follower Statistics Summary | |
follower_stats_df = token_state.get("bubble_follower_stats_df", pd.DataFrame()) | |
html_parts.append(f"<h4>Follower Statistics ({len(follower_stats_df)} entries in Bubble):</h4>") | |
if not follower_stats_df.empty: | |
# Latest Monthly Follower Gain | |
monthly_gains = follower_stats_df[follower_stats_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].copy() | |
if not monthly_gains.empty and FOLLOWER_STATS_CATEGORY_COLUMN in monthly_gains.columns and \ | |
FOLLOWER_STATS_ORGANIC_COLUMN in monthly_gains.columns and FOLLOWER_STATS_PAID_COLUMN in monthly_gains.columns: | |
try: | |
# FOLLOWER_STATS_CATEGORY_COLUMN for monthly gains is 'YYYY-MM-DD' | |
monthly_gains.loc[:, FOLLOWER_STATS_CATEGORY_COLUMN] = pd.to_datetime(monthly_gains[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.strftime('%Y-%m-%d') | |
latest_gain = monthly_gains.sort_values(by=FOLLOWER_STATS_CATEGORY_COLUMN, ascending=False).head(1) | |
if not latest_gain.empty: | |
html_parts.append("<h5>Latest Monthly Follower Gain:</h5>") | |
html_parts.append(latest_gain[[FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, FOLLOWER_STATS_PAID_COLUMN]].to_html(escape=True, index=False, classes="table table-sm")) | |
else: | |
html_parts.append("<p>No valid monthly follower gain data to display after processing.</p>") | |
except Exception as e: | |
logging.error(f"Error formatting follower gain dates for display: {e}") | |
html_parts.append("<p>Error displaying monthly follower gain data.</p>") | |
else: | |
html_parts.append("<p>No monthly follower gain data or required columns are missing.</p>") | |
# Count of Demographic Entries | |
demographics_count = len(follower_stats_df[follower_stats_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly']) | |
html_parts.append(f"<p>Total demographic entries (seniority, industry, etc.): {demographics_count}</p>") | |
else: | |
html_parts.append("<p>No follower statistics loaded from Bubble.</p>") | |
html_parts.append("</div>") | |
return "".join(html_parts) | |
def guarded_fetch_analytics(token_state): | |
"""Guarded call to fetch_and_render_analytics, ensuring token and basic data structures.""" | |
if not token_state or not token_state.get("token"): | |
logging.warning("Analytics fetch: Access denied. No token.") | |
# Ensure the number of returned Nones matches the expected number of outputs for the plots | |
return ("β Access denied. No token.", None, None, None, None, None, None, None) | |
# Ensure DataFrames are passed, even if empty, to avoid errors in the analytics function | |
posts_df_analytics = token_state.get("bubble_posts_df", pd.DataFrame()) | |
mentions_df_analytics = token_state.get("bubble_mentions_df", pd.DataFrame()) | |
follower_stats_df_analytics = token_state.get("bubble_follower_stats_df", pd.DataFrame()) | |
logging.info("Calling fetch_and_render_analytics with current token_state data.") | |
return fetch_and_render_analytics( | |
token_state.get("client_id"), | |
token_state.get("token"), | |
token_state.get("org_urn"), | |
posts_df_analytics, | |
mentions_df_analytics, | |
follower_stats_df_analytics | |
) | |
def run_mentions_tab_display(token_state): | |
"""Generates HTML and a plot for the Mentions tab.""" | |
logging.info("Updating Mentions Tab display.") | |
if not token_state or not token_state.get("token"): | |
logging.warning("Mentions tab: Access denied. No token.") | |
return ("β Access denied. No token available for mentions.", None) | |
mentions_df = token_state.get("bubble_mentions_df", pd.DataFrame()) | |
if mentions_df.empty: | |
logging.info("Mentions tab: No mentions data in Bubble.") | |
return ("<p style='text-align:center;'>No mentions data in Bubble. Try syncing.</p>", None) | |
html_parts = ["<h3 style='text-align:center;'>Recent Mentions</h3>"] | |
# Define columns to display, ensuring they exist | |
display_columns = [col for col in [BUBBLE_MENTIONS_DATE_COLUMN_NAME, "mention_text", "sentiment_label", BUBBLE_MENTIONS_ID_COLUMN_NAME] if col in mentions_df.columns] | |
mentions_df_display = mentions_df.copy() | |
if BUBBLE_MENTIONS_DATE_COLUMN_NAME in mentions_df_display.columns: | |
try: | |
mentions_df_display[BUBBLE_MENTIONS_DATE_COLUMN_NAME] = pd.to_datetime(mentions_df_display[BUBBLE_MENTIONS_DATE_COLUMN_NAME], errors='coerce').dt.strftime('%Y-%m-%d %H:%M') | |
mentions_df_display = mentions_df_display.sort_values(by=BUBBLE_MENTIONS_DATE_COLUMN_NAME, ascending=False) | |
except Exception as e: | |
logging.error(f"Error formatting mention dates for tab display: {e}") | |
html_parts.append("<p>Error formatting mention dates.</p>") | |
if not display_columns or mentions_df_display[display_columns].empty: # Check if display_df is empty after potential sort/filter | |
html_parts.append("<p>Required columns for mentions display are missing or no data after processing.</p>") | |
else: | |
# Assuming "mention_text" might contain HTML. | |
html_parts.append(mentions_df_display[display_columns].head(20).to_html(escape=False, index=False, classes="table table-sm")) | |
mentions_html_output = "\n".join(html_parts) | |
fig = None # Initialize fig to None | |
if not mentions_df.empty and "sentiment_label" in mentions_df.columns: | |
try: | |
import matplotlib.pyplot as plt | |
plt.switch_backend('Agg') # Essential for Gradio | |
fig_plot, ax = plt.subplots(figsize=(6,4)) # Create figure and axes | |
sentiment_counts = mentions_df["sentiment_label"].value_counts() | |
sentiment_counts.plot(kind='bar', ax=ax, color=['#4CAF50', '#FFC107', '#F44336', '#9E9E9E', '#2196F3']) # Example colors | |
ax.set_title("Mention Sentiment Distribution") | |
ax.set_ylabel("Count") | |
plt.xticks(rotation=45, ha='right') | |
plt.tight_layout() # Adjust layout to prevent labels from overlapping | |
fig = fig_plot # Assign the figure to fig | |
logging.info("Mentions tab: Sentiment distribution plot generated.") | |
except Exception as e: | |
logging.error(f"Error generating mentions plot: {e}", exc_info=True) | |
fig = None # Ensure fig is None on error | |
else: | |
logging.info("Mentions tab: Not enough data or 'sentiment_label' column missing for plot.") | |
return mentions_html_output, fig | |
def run_follower_stats_tab_display(token_state): | |
"""Generates HTML and plots for the Follower Stats tab.""" | |
logging.info("Updating Follower Stats Tab display.") | |
if not token_state or not token_state.get("token"): | |
logging.warning("Follower stats tab: Access denied. No token.") | |
return ("β Access denied. No token available for follower stats.", None, None, None) | |
follower_stats_df_orig = token_state.get("bubble_follower_stats_df", pd.DataFrame()) | |
if follower_stats_df_orig.empty: | |
logging.info("Follower stats tab: No follower stats data in Bubble.") | |
return ("<p style='text-align:center;'>No follower stats data in Bubble. Try syncing.</p>", None, None, None) | |
follower_stats_df = follower_stats_df_orig.copy() # Work with a copy | |
html_parts = ["<div style='padding:10px;'><h3 style='text-align:center;'>Follower Statistics Overview</h3>"] | |
plot_monthly_gains = None | |
plot_seniority_dist = None | |
plot_industry_dist = None # Initialize for industry plot | |
# --- Monthly Gains Table & Plot --- | |
# Filter for monthly gains and ensure necessary columns exist | |
monthly_gains_df = follower_stats_df[ | |
(follower_stats_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly') & | |
(follower_stats_df[FOLLOWER_STATS_CATEGORY_COLUMN].notna()) & # Date column | |
(follower_stats_df[FOLLOWER_STATS_ORGANIC_COLUMN].notna()) & | |
(follower_stats_df[FOLLOWER_STATS_PAID_COLUMN].notna()) | |
].copy() | |
if not monthly_gains_df.empty: | |
try: | |
# FOLLOWER_STATS_CATEGORY_COLUMN for monthly gains is 'YYYY-MM-DD' | |
# For table display, sort descending by original date string | |
monthly_gains_df.loc[:, FOLLOWER_STATS_CATEGORY_COLUMN_DT] = pd.to_datetime(monthly_gains_df[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce') | |
monthly_gains_df_sorted_table = monthly_gains_df.sort_values(by=FOLLOWER_STATS_CATEGORY_COLUMN_DT, ascending=False) | |
html_parts.append("<h4>Monthly Follower Gains (Last 13 Months):</h4>") | |
# Format date for display in table | |
table_display_df = monthly_gains_df_sorted_table.copy() | |
table_display_df[FOLLOWER_STATS_CATEGORY_COLUMN] = table_display_df[FOLLOWER_STATS_CATEGORY_COLUMN_DT].dt.strftime('%Y-%m') | |
html_parts.append(table_display_df[[FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, FOLLOWER_STATS_PAID_COLUMN]].head(13).to_html(escape=True, index=False, classes="table table-sm")) | |
# For plotting, sort ascending by datetime object for correct time series | |
monthly_gains_df_sorted_plot = monthly_gains_df.sort_values(by=FOLLOWER_STATS_CATEGORY_COLUMN_DT, ascending=True) | |
# Use the formatted YYYY-MM string for x-axis ticks on the plot | |
plot_dates = monthly_gains_df_sorted_plot[FOLLOWER_STATS_CATEGORY_COLUMN_DT].dt.strftime('%Y-%m').unique() | |
import matplotlib.pyplot as plt | |
plt.switch_backend('Agg') | |
fig_gains, ax_gains = plt.subplots(figsize=(10,5)) # Wider plot | |
ax_gains.plot(plot_dates, monthly_gains_df_sorted_plot.groupby(monthly_gains_df_sorted_plot[FOLLOWER_STATS_CATEGORY_COLUMN_DT].dt.strftime('%Y-%m'))[FOLLOWER_STATS_ORGANIC_COLUMN].sum(), marker='o', linestyle='-', label='Organic Gain') | |
ax_gains.plot(plot_dates, monthly_gains_df_sorted_plot.groupby(monthly_gains_df_sorted_plot[FOLLOWER_STATS_CATEGORY_COLUMN_DT].dt.strftime('%Y-%m'))[FOLLOWER_STATS_PAID_COLUMN].sum(), marker='x', linestyle='--', label='Paid Gain') | |
ax_gains.set_title("Monthly Follower Gains Over Time") | |
ax_gains.set_ylabel("Follower Count") | |
ax_gains.set_xlabel("Month (YYYY-MM)") | |
plt.xticks(rotation=45, ha='right') | |
ax_gains.legend() | |
plt.grid(True, linestyle='--', alpha=0.7) | |
plt.tight_layout() | |
plot_monthly_gains = fig_gains | |
logging.info("Follower stats tab: Monthly gains plot generated.") | |
except Exception as e: | |
logging.error(f"Error processing or plotting monthly gains: {e}", exc_info=True) | |
html_parts.append("<p>Error displaying monthly follower gain data.</p>") | |
else: | |
html_parts.append("<p>No monthly follower gain data available or required columns missing.</p>") | |
html_parts.append("<hr/>") | |
# --- Seniority Table & Plot --- | |
seniority_df = follower_stats_df[ | |
(follower_stats_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_seniority') & | |
(follower_stats_df[FOLLOWER_STATS_CATEGORY_COLUMN].notna()) & # Seniority name | |
(follower_stats_df[FOLLOWER_STATS_ORGANIC_COLUMN].notna()) | |
].copy() | |
if not seniority_df.empty: | |
try: | |
seniority_df_sorted = seniority_df.sort_values(by=FOLLOWER_STATS_ORGANIC_COLUMN, ascending=False) | |
html_parts.append("<h4>Followers by Seniority (Top 10 Organic):</h4>") | |
html_parts.append(seniority_df_sorted[[FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, FOLLOWER_STATS_PAID_COLUMN]].head(10).to_html(escape=True, index=False, classes="table table-sm")) | |
import matplotlib.pyplot as plt | |
plt.switch_backend('Agg') | |
fig_seniority, ax_seniority = plt.subplots(figsize=(8,5)) # Adjusted size | |
top_n_seniority = seniority_df_sorted.nlargest(10, FOLLOWER_STATS_ORGANIC_COLUMN) | |
ax_seniority.bar(top_n_seniority[FOLLOWER_STATS_CATEGORY_COLUMN], top_n_seniority[FOLLOWER_STATS_ORGANIC_COLUMN], color='skyblue') | |
ax_seniority.set_title("Follower Distribution by Seniority (Top 10 Organic)") | |
ax_seniority.set_ylabel("Organic Follower Count") | |
plt.xticks(rotation=45, ha='right') | |
plt.grid(axis='y', linestyle='--', alpha=0.7) | |
plt.tight_layout() | |
plot_seniority_dist = fig_seniority | |
logging.info("Follower stats tab: Seniority distribution plot generated.") | |
except Exception as e: | |
logging.error(f"Error processing or plotting seniority data: {e}", exc_info=True) | |
html_parts.append("<p>Error displaying follower seniority data.</p>") | |
else: | |
html_parts.append("<p>No follower seniority data available or required columns missing.</p>") | |
html_parts.append("<hr/>") | |
# --- Industry Table & Plot --- | |
industry_df = follower_stats_df[ | |
(follower_stats_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_industry') & | |
(follower_stats_df[FOLLOWER_STATS_CATEGORY_COLUMN].notna()) & # Industry name | |
(follower_stats_df[FOLLOWER_STATS_ORGANIC_COLUMN].notna()) | |
].copy() | |
if not industry_df.empty: | |
try: | |
industry_df_sorted = industry_df.sort_values(by=FOLLOWER_STATS_ORGANIC_COLUMN, ascending=False) | |
html_parts.append("<h4>Followers by Industry (Top 10 Organic):</h4>") | |
html_parts.append(industry_df_sorted[[FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, FOLLOWER_STATS_PAID_COLUMN]].head(10).to_html(escape=True, index=False, classes="table table-sm")) | |
import matplotlib.pyplot as plt | |
plt.switch_backend('Agg') | |
fig_industry, ax_industry = plt.subplots(figsize=(8,5)) | |
top_n_industry = industry_df_sorted.nlargest(10, FOLLOWER_STATS_ORGANIC_COLUMN) | |
ax_industry.bar(top_n_industry[FOLLOWER_STATS_CATEGORY_COLUMN], top_n_industry[FOLLOWER_STATS_ORGANIC_COLUMN], color='lightcoral') | |
ax_industry.set_title("Follower Distribution by Industry (Top 10 Organic)") | |
ax_industry.set_ylabel("Organic Follower Count") | |
plt.xticks(rotation=45, ha='right') | |
plt.grid(axis='y', linestyle='--', alpha=0.7) | |
plt.tight_layout() | |
plot_industry_dist = fig_industry | |
logging.info("Follower stats tab: Industry distribution plot generated.") | |
except Exception as e: | |
logging.error(f"Error processing or plotting industry data: {e}", exc_info=True) | |
html_parts.append("<p>Error displaying follower industry data.</p>") | |
else: | |
html_parts.append("<p>No follower industry data available or required columns missing.</p>") | |
html_parts.append("</div>") | |
follower_html_output = "\n".join(html_parts) | |
return follower_html_output, plot_monthly_gains, plot_seniority_dist, plot_industry_dist | |
# --- Gradio UI Blocks --- | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"), | |
title="LinkedIn Organization Dashboard") as app: | |
# Central state for holding token, client_id, org_urn, and fetched dataframes | |
token_state = gr.State(value={ | |
"token": None, "client_id": None, "org_urn": None, | |
"bubble_posts_df": pd.DataFrame(), "fetch_count_for_api": 0, # For posts | |
"bubble_mentions_df": pd.DataFrame(), | |
"bubble_follower_stats_df": pd.DataFrame(), | |
"url_user_token_temp_storage": None # To hold token from URL temporarily | |
}) | |
gr.Markdown("# π LinkedIn Organization Dashboard") | |
# Hidden textboxes to capture URL parameters | |
url_user_token_display = gr.Textbox(label="User Token (from URL - Hidden)", interactive=False, visible=False) | |
status_box = gr.Textbox(label="Overall LinkedIn Token Status", interactive=False, value="Initializing...") | |
org_urn_display = gr.Textbox(label="Organization URN (from URL - Hidden)", interactive=False, visible=False) | |
# Load URL parameters when the Gradio app loads | |
# This will populate url_user_token_display and org_urn_display | |
app.load(fn=get_url_user_token, inputs=None, outputs=[url_user_token_display, org_urn_display], api_name="get_url_params", show_progress=False) | |
# This function will run after URL params are loaded and org_urn_display changes (which it will on load) | |
def initial_load_sequence(url_token, org_urn_val, current_state): | |
logging.info(f"Initial load sequence triggered by org_urn_display change. Org URN: {org_urn_val}") | |
# Process token, fetch Bubble data, determine sync needs | |
status_msg, new_state, btn_update = process_and_store_bubble_token(url_token, org_urn_val, current_state) | |
# Display initial dashboard content based on (potentially empty) Bubble data | |
dashboard_content = display_main_dashboard(new_state) | |
return status_msg, new_state, btn_update, dashboard_content | |
with gr.Tabs(): | |
with gr.TabItem("1οΈβ£ Dashboard & Sync"): | |
gr.Markdown("System checks for existing data from Bubble. The 'Sync' button activates if new data needs to be fetched from LinkedIn based on the last sync times and data availability.") | |
sync_data_btn = gr.Button("π Sync LinkedIn Data", variant="primary", visible=False, interactive=False) # Start hidden/disabled | |
sync_status_html_output = gr.HTML("<p style='text-align:center;'>Sync status will appear here.</p>") | |
dashboard_display_html = gr.HTML("<p style='text-align:center;'>Dashboard loading...</p>") | |
# Chain of events for initial load: | |
# 1. app.load gets URL params. | |
# 2. org_urn_display.change triggers initial_load_sequence. | |
# This populates token_state, updates sync button, and loads initial dashboard. | |
org_urn_display.change( | |
fn=initial_load_sequence, | |
inputs=[url_user_token_display, org_urn_display, token_state], | |
outputs=[status_box, token_state, sync_data_btn, dashboard_display_html], | |
show_progress="full" | |
) | |
# When Sync button is clicked: | |
# 1. sync_all_linkedin_data: Fetches from LinkedIn, uploads to Bubble, updates token_state DFs. | |
# 2. process_and_store_bubble_token: Re-evaluates sync needs (button should now say "Up-to-date"). | |
# 3. display_main_dashboard: Refreshes dashboard with newly synced data. | |
sync_data_btn.click( | |
fn=sync_all_linkedin_data, | |
inputs=[token_state], | |
outputs=[sync_status_html_output, token_state], # token_state is updated here | |
show_progress="full" | |
).then( | |
fn=process_and_store_bubble_token, # Re-check sync status and update button | |
inputs=[url_user_token_display, org_urn_display, token_state], # Pass current token_state | |
outputs=[status_box, token_state, sync_data_btn], # token_state updated again | |
show_progress=False | |
).then( | |
fn=display_main_dashboard, # Refresh dashboard display | |
inputs=[token_state], | |
outputs=[dashboard_display_html], | |
show_progress=False | |
) | |
with gr.TabItem("2οΈβ£ Analytics"): | |
fetch_analytics_btn = gr.Button("π Fetch/Refresh Full Analytics", variant="primary") | |
# Analytics outputs | |
follower_count_md = gr.Markdown("Analytics data will load here...") | |
with gr.Row(): follower_plot, growth_plot = gr.Plot(label="Follower Demographics"), gr.Plot(label="Follower Growth") | |
with gr.Row(): eng_rate_plot = gr.Plot(label="Engagement Rate") | |
with gr.Row(): interaction_plot = gr.Plot(label="Post Interactions") | |
with gr.Row(): eb_plot = gr.Plot(label="Engagement Benchmark") | |
with gr.Row(): mentions_vol_plot, mentions_sentiment_plot = gr.Plot(label="Mentions Volume"), gr.Plot(label="Mentions Sentiment") | |
fetch_analytics_btn.click( | |
fn=guarded_fetch_analytics, inputs=[token_state], | |
outputs=[follower_count_md, follower_plot, growth_plot, eng_rate_plot, | |
interaction_plot, eb_plot, mentions_vol_plot, mentions_sentiment_plot], | |
show_progress="full" | |
) | |
with gr.TabItem("3οΈβ£ Mentions"): | |
refresh_mentions_display_btn = gr.Button("π Refresh Mentions Display (from local data)", variant="secondary") | |
mentions_html = gr.HTML("Mentions data loads from Bubble after sync. Click refresh to view current local data.") | |
mentions_sentiment_dist_plot = gr.Plot(label="Mention Sentiment Distribution") | |
refresh_mentions_display_btn.click( | |
fn=run_mentions_tab_display, inputs=[token_state], | |
outputs=[mentions_html, mentions_sentiment_dist_plot], | |
show_progress="full" | |
) | |
with gr.TabItem("4οΈβ£ Follower Stats"): | |
refresh_follower_stats_btn = gr.Button("π Refresh Follower Stats Display (from local data)", variant="secondary") | |
follower_stats_html = gr.HTML("Follower statistics load from Bubble after sync. Click refresh to view current local data.") | |
with gr.Row(): | |
fs_plot_monthly_gains = gr.Plot(label="Monthly Follower Gains") | |
with gr.Row(): | |
fs_plot_seniority = gr.Plot(label="Followers by Seniority (Top 10 Organic)") | |
fs_plot_industry = gr.Plot(label="Followers by Industry (Top 10 Organic)") | |
refresh_follower_stats_btn.click( | |
fn=run_follower_stats_tab_display, inputs=[token_state], | |
outputs=[follower_stats_html, fs_plot_monthly_gains, fs_plot_seniority, fs_plot_industry], | |
show_progress="full" | |
) | |
if __name__ == "__main__": | |
# Check for essential environment variables | |
if not os.environ.get("Linkedin_client_id"): | |
logging.warning("WARNING: 'Linkedin_client_id' environment variable not set. The app may not function correctly for LinkedIn API calls.") | |
if not os.environ.get("BUBBLE_APP_NAME") or \ | |
not os.environ.get("BUBBLE_API_KEY_PRIVATE") or \ | |
not os.environ.get("BUBBLE_API_ENDPOINT"): | |
logging.warning("WARNING: One or more Bubble environment variables (BUBBLE_APP_NAME, BUBBLE_API_KEY_PRIVATE, BUBBLE_API_ENDPOINT) are not set. Bubble integration will fail.") | |
try: | |
import matplotlib | |
logging.info(f"Matplotlib version: {matplotlib.__version__} found.") | |
except ImportError: | |
logging.error("Matplotlib is not installed. Plots will not be generated. Please install it: pip install matplotlib") | |
# Launch the Gradio app | |
app.launch(server_name="0.0.0.0", server_port=7860, debug=True) # Added debug=True for more verbose logging from Gradio | |