# sync_logic.py """ Handles the logic for syncing LinkedIn data: posts, mentions, and follower statistics. Fetches data from LinkedIn APIs, uploads to Bubble, and logs sync attempts. """ import pandas as pd import logging import html from datetime import timezone # Python's datetime # Assuming Bubble_API_Calls contains bulk_upload_to_bubble from Bubble_API_Calls import bulk_upload_to_bubble, fetch_linkedin_posts_data_from_bubble # Added fetch for log update # Assuming Linkedin_Data_API_Calls contains all necessary LinkedIn data fetching and processing functions from Linkedin_Data_API_Calls import ( fetch_linkedin_posts_core, fetch_comments, analyze_sentiment, # For post comments compile_detailed_posts, prepare_data_for_bubble, # For posts, stats, comments fetch_linkedin_mentions_core, analyze_mentions_sentiment, # For individual mentions compile_detailed_mentions, # Compiles to user-specified format prepare_mentions_for_bubble # Prepares user-specified format for Bubble ) # Assuming linkedin_follower_stats.py contains get_linkedin_follower_stats from linkedin_follower_stats import get_linkedin_follower_stats # Assuming config.py contains all necessary constants from config import ( LINKEDIN_POST_URN_KEY, BUBBLE_POST_URN_COLUMN_NAME, BUBBLE_POSTS_TABLE_NAME, BUBBLE_POST_STATS_TABLE_NAME, BUBBLE_POST_COMMENTS_TABLE_NAME, BUBBLE_MENTIONS_TABLE_NAME, BUBBLE_MENTIONS_ID_COLUMN_NAME, BUBBLE_MENTIONS_DATE_COLUMN_NAME, DEFAULT_MENTIONS_INITIAL_FETCH_COUNT, DEFAULT_MENTIONS_UPDATE_FETCH_COUNT, BUBBLE_FOLLOWER_STATS_TABLE_NAME, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, FOLLOWER_STATS_PAID_COLUMN, LINKEDIN_CLIENT_ID_ENV_VAR, # Though client_id is usually passed in token_state # NEW constants for logging BUBBLE_OPERATIONS_LOG_TABLE_NAME, BUBBLE_OPERATIONS_LOG_DATE_COLUMN, BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN, LOG_SUBJECT_POSTS, LOG_SUBJECT_MENTIONS, LOG_SUBJECT_FOLLOWER_STATS ) def _log_sync_attempt(org_urn, subject, token_state): """ Logs a sync attempt to the Bubble operations log table and updates the operations log DataFrame in token_state. """ logging.info(f"Logging sync attempt for subject: {subject}, org_urn: {org_urn}") if not org_urn: logging.warning("Cannot log sync attempt: org_urn is missing.") return token_state try: log_entry_data = { BUBBLE_OPERATIONS_LOG_DATE_COLUMN: pd.Timestamp.now(tz='UTC').isoformat(), BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN: subject, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN: org_urn } # Ensure data types are what Bubble expects, e.g., date as string # bulk_upload_to_bubble should handle dicts with basic types. upload_payload = [log_entry_data] bulk_upload_to_bubble(upload_payload, BUBBLE_OPERATIONS_LOG_TABLE_NAME) logging.info(f"Successfully logged sync attempt for {subject} to Bubble table '{BUBBLE_OPERATIONS_LOG_TABLE_NAME}'.") # Update token_state with the new log entry to keep it fresh current_log_df = token_state.get("bubble_operations_log_df", pd.DataFrame()) new_log_entry_df = pd.DataFrame(upload_payload) # DataFrame from the same data we uploaded # Ensure date column is datetime before concat if it exists and is not empty if not new_log_entry_df.empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in new_log_entry_df.columns: new_log_entry_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(new_log_entry_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True) if not current_log_df.empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in current_log_df.columns: # Ensure existing log df date column is also datetime if not pd.api.types.is_datetime64_any_dtype(current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN]): current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True) updated_log_df = pd.concat([current_log_df, new_log_entry_df], ignore_index=True) # To ensure the get_last_sync_attempt_date always gets the absolute latest, # we can sort and drop duplicates, keeping the last. # However, simply appending and letting max() find the latest is also fine. # For robustness, let's sort and keep the latest for each subject/org combo if multiple logs were made rapidly. if not updated_log_df.empty and all(col in updated_log_df.columns for col in [BUBBLE_OPERATIONS_LOG_DATE_COLUMN, BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN]): updated_log_df = updated_log_df.sort_values(by=BUBBLE_OPERATIONS_LOG_DATE_COLUMN).drop_duplicates( subset=[BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN], keep='last' ) token_state["bubble_operations_log_df"] = updated_log_df logging.info(f"Updated 'bubble_operations_log_df' in token_state after logging {subject}.") except Exception as e: logging.error(f"Failed to log sync attempt for {subject} or update token_state: {e}", exc_info=True) return token_state def _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api): """Internal logic for syncing LinkedIn posts.""" # This function is called by orchestrator only if fetch_count_for_posts_api > 0 # So, an attempt to sync posts is indeed happening. logging.info(f"Posts sync: Starting fetch for {fetch_count_for_posts_api} posts.") client_id = token_state.get("client_id") token_dict = token_state.get("token") org_urn = token_state.get('org_urn') bubble_posts_df_orig = token_state.get("bubble_posts_df", pd.DataFrame()).copy() posts_sync_message = "" attempt_logged = False # Flag to ensure log happens once try: # Basic checks before API call if not all([client_id, token_dict, org_urn]): posts_sync_message = "Posts: Config error (client_id, token, or org_urn missing). " logging.error(f"Posts sync: Prerequisite missing - client_id: {'OK' if client_id else 'Missing'}, token: {'OK' if token_dict else 'Missing'}, org_urn: {'OK' if org_urn else 'Missing'}") # Log attempt even if config error, as state_manager decided a sync *should* occur token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state) attempt_logged = True return posts_sync_message, token_state processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_for_posts_api) if not processed_raw_posts: posts_sync_message = "Posts: None found via API. " logging.info("Posts sync: No raw posts returned from API.") # Log attempt as API was called token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state) attempt_logged = True return posts_sync_message, token_state existing_post_urns = set() if not bubble_posts_df_orig.empty and BUBBLE_POST_URN_COLUMN_NAME in bubble_posts_df_orig.columns: existing_post_urns = set(bubble_posts_df_orig[BUBBLE_POST_URN_COLUMN_NAME].dropna().astype(str)) new_raw_posts = [p for p in processed_raw_posts if str(p.get(LINKEDIN_POST_URN_KEY)) not in existing_post_urns] if not new_raw_posts: posts_sync_message = "Posts: All fetched already in Bubble. " logging.info("Posts sync: All fetched posts were already found in Bubble.") # Log attempt as API was called and processed token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state) attempt_logged = True return posts_sync_message, token_state logging.info(f"Posts sync: Processing {len(new_raw_posts)} new raw posts.") post_urns_to_process = [p[LINKEDIN_POST_URN_KEY] for p in new_raw_posts if p.get(LINKEDIN_POST_URN_KEY)] all_comments_data = fetch_comments(client_id, token_dict, post_urns_to_process, stats_map) sentiments_per_post = analyze_sentiment(all_comments_data) detailed_new_posts = compile_detailed_posts(new_raw_posts, stats_map, sentiments_per_post) li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_new_posts, all_comments_data) if li_posts: bulk_upload_to_bubble(li_posts, BUBBLE_POSTS_TABLE_NAME) updated_posts_df = pd.concat([bubble_posts_df_orig, pd.DataFrame(li_posts)], ignore_index=True) token_state["bubble_posts_df"] = updated_posts_df.drop_duplicates(subset=[BUBBLE_POST_URN_COLUMN_NAME], keep='last') logging.info(f"Posts sync: Uploaded {len(li_posts)} new posts to Bubble.") if li_post_stats: bulk_upload_to_bubble(li_post_stats, BUBBLE_POST_STATS_TABLE_NAME) logging.info(f"Posts sync: Uploaded {len(li_post_stats)} post_stats entries.") if li_post_comments: bulk_upload_to_bubble(li_post_comments, BUBBLE_POST_COMMENTS_TABLE_NAME) logging.info(f"Posts sync: Uploaded {len(li_post_comments)} post_comments entries.") posts_sync_message = f"Posts: Synced {len(li_posts)} new. " else: posts_sync_message = "Posts: No new ones to upload after processing. " logging.info("Posts sync: No new posts were prepared for Bubble upload.") except ValueError as ve: posts_sync_message = f"Posts Error: {html.escape(str(ve))}. " logging.error(f"Posts sync: ValueError: {ve}", exc_info=True) except Exception as e: logging.exception("Posts sync: Unexpected error during processing.") posts_sync_message = f"Posts: Unexpected error ({type(e).__name__}). " finally: # Log the sync attempt if it hasn't been logged already (e.g. due to early exit) # and if basic conditions (org_urn) for logging are met. if not attempt_logged and org_urn: token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state) return posts_sync_message, token_state def sync_linkedin_mentions(token_state): """Fetches new LinkedIn mentions and uploads them to Bubble, if scheduled by state_manager.""" logging.info("Starting LinkedIn mentions sync process check.") if not token_state.get("mentions_should_sync_now", False): logging.info("Mentions sync: Not scheduled by state_manager based on operations log. Skipping.") return "Mentions: Sync not currently required by schedule. ", token_state logging.info("Mentions sync: Proceeding as scheduled by state_manager.") if not token_state or not token_state.get("token"): logging.error("Mentions sync: Access denied. No LinkedIn token.") # Still log an attempt if org_urn is available, as a sync was scheduled org_urn_for_log = token_state.get('org_urn') if token_state else None if org_urn_for_log: token_state = _log_sync_attempt(org_urn_for_log, LOG_SUBJECT_MENTIONS, token_state) return "Mentions: No token. ", token_state client_id = token_state.get("client_id") token_dict = token_state.get("token") org_urn = token_state.get('org_urn') bubble_mentions_df_orig = token_state.get("bubble_mentions_df", pd.DataFrame()).copy() mentions_sync_message = "" attempt_logged = False if not org_urn or not client_id or client_id == "ENV VAR MISSING": logging.error("Mentions sync: Configuration error (Org URN or Client ID missing).") if org_urn: # Log if possible token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state) attempt_logged = True return "Mentions: Config error. ", token_state # Determine fetch count: initial if no mentions data, update otherwise fetch_count_for_mentions_api = DEFAULT_MENTIONS_INITIAL_FETCH_COUNT \ if bubble_mentions_df_orig.empty else DEFAULT_MENTIONS_UPDATE_FETCH_COUNT logging.info(f"Mentions sync: Fetch count set to {fetch_count_for_mentions_api}.") try: processed_raw_mentions = fetch_linkedin_mentions_core(client_id, token_dict, org_urn, count=fetch_count_for_mentions_api) if not processed_raw_mentions: logging.info("Mentions sync: No new mentions found via API.") mentions_sync_message = "Mentions: None found via API. " token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state) attempt_logged = True return mentions_sync_message, token_state existing_mention_ids = set() if not bubble_mentions_df_orig.empty and BUBBLE_MENTIONS_ID_COLUMN_NAME in bubble_mentions_df_orig.columns: existing_mention_ids = set(bubble_mentions_df_orig[BUBBLE_MENTIONS_ID_COLUMN_NAME].dropna().astype(str)) sentiments_map = analyze_mentions_sentiment(processed_raw_mentions) all_compiled_mentions = compile_detailed_mentions(processed_raw_mentions, sentiments_map) new_compiled_mentions_to_upload = [ m for m in all_compiled_mentions if str(m.get("id")) not in existing_mention_ids ] if not new_compiled_mentions_to_upload: logging.info("Mentions sync: All fetched mentions are already in Bubble.") mentions_sync_message = "Mentions: All fetched already in Bubble. " token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state) attempt_logged = True return mentions_sync_message, token_state bubble_ready_mentions = prepare_mentions_for_bubble(new_compiled_mentions_to_upload) if bubble_ready_mentions: bulk_upload_to_bubble(bubble_ready_mentions, BUBBLE_MENTIONS_TABLE_NAME) logging.info(f"Successfully uploaded {len(bubble_ready_mentions)} new mentions to Bubble.") updated_mentions_df = pd.concat([bubble_mentions_df_orig, pd.DataFrame(bubble_ready_mentions)], ignore_index=True) token_state["bubble_mentions_df"] = updated_mentions_df.drop_duplicates(subset=[BUBBLE_MENTIONS_ID_COLUMN_NAME], keep='last') mentions_sync_message = f"Mentions: Synced {len(bubble_ready_mentions)} new. " else: logging.info("Mentions sync: No new mentions were prepared for Bubble upload.") mentions_sync_message = "Mentions: No new ones to upload. " except ValueError as ve: logging.error(f"ValueError during mentions sync: {ve}", exc_info=True) mentions_sync_message = f"Mentions Error: {html.escape(str(ve))}. " except Exception as e: logging.exception("Unexpected error in sync_linkedin_mentions.") mentions_sync_message = f"Mentions: Unexpected error ({type(e).__name__}). " finally: if not attempt_logged and org_urn: token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state) return mentions_sync_message, token_state def sync_linkedin_follower_stats(token_state): """Fetches new LinkedIn follower statistics and uploads them to Bubble, if scheduled.""" logging.info("Starting LinkedIn follower stats sync process check.") if not token_state.get("fs_should_sync_now", False): logging.info("Follower Stats sync: Not scheduled by state_manager. Skipping.") return "Follower Stats: Sync not currently required by schedule. ", token_state logging.info("Follower Stats sync: Proceeding as scheduled by state_manager.") if not token_state or not token_state.get("token"): logging.error("Follower Stats sync: Access denied. No LinkedIn token.") org_urn_for_log = token_state.get('org_urn') if token_state else None if org_urn_for_log: token_state = _log_sync_attempt(org_urn_for_log, LOG_SUBJECT_FOLLOWER_STATS, token_state) return "Follower Stats: No token. ", token_state client_id = token_state.get("client_id") token_dict = token_state.get("token") org_urn = token_state.get('org_urn') bubble_follower_stats_df_orig = token_state.get("bubble_follower_stats_df", pd.DataFrame()).copy() follower_stats_sync_message = "" attempt_logged = False if not org_urn or not client_id or client_id == "ENV VAR MISSING": logging.error("Follower Stats sync: Configuration error (Org URN or Client ID missing).") if org_urn: token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state) attempt_logged = True return "Follower Stats: Config error. ", token_state logging.info(f"Follower stats sync proceeding for org_urn: {org_urn}") try: api_follower_stats = get_linkedin_follower_stats(client_id, token_dict, org_urn) if not api_follower_stats: logging.info(f"Follower Stats sync: No stats found via API for org {org_urn}.") follower_stats_sync_message = "Follower Stats: None found via API. " token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state) attempt_logged = True return follower_stats_sync_message, token_state new_stats_to_upload = [] # Logic for comparing API stats with existing Bubble stats (monthly gains and demographics) # Monthly Gains api_monthly_gains = [s for s in api_follower_stats if s.get(FOLLOWER_STATS_TYPE_COLUMN) == 'follower_gains_monthly'] existing_monthly_gain_dates = set() if not bubble_follower_stats_df_orig.empty: bubble_monthly_df = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'] if FOLLOWER_STATS_CATEGORY_COLUMN in bubble_monthly_df.columns: # Convert to string for consistent comparison, assuming API data also provides date as string or convertible existing_monthly_gain_dates = set(pd.to_datetime(bubble_monthly_df[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.strftime('%Y-%m-%d').dropna()) for gain_stat in api_monthly_gains: api_date_str = pd.to_datetime(gain_stat.get(FOLLOWER_STATS_CATEGORY_COLUMN), errors='coerce').strftime('%Y-%m-%d') if api_date_str != 'NaT' and api_date_str not in existing_monthly_gain_dates: new_stats_to_upload.append(gain_stat) # Demographics (overwrite logic: if API has it, and it's different or not present in Bubble, upload) api_demographics = [s for s in api_follower_stats if s.get(FOLLOWER_STATS_TYPE_COLUMN) != 'follower_gains_monthly'] # Create a map of existing demographics for quick lookup # Key: (org_urn, type, category), Value: (organic_count, paid_count) existing_demographics_map = {} if not bubble_follower_stats_df_orig.empty: bubble_demographics_df = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'] required_cols_demo = [ FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, FOLLOWER_STATS_PAID_COLUMN ] if not bubble_demographics_df.empty and all(col in bubble_demographics_df.columns for col in required_cols_demo): for _, row in bubble_demographics_df.iterrows(): key = ( str(row[FOLLOWER_STATS_ORG_URN_COLUMN]), str(row[FOLLOWER_STATS_TYPE_COLUMN]), str(row[FOLLOWER_STATS_CATEGORY_COLUMN]) # Category can be various things ) existing_demographics_map[key] = ( row[FOLLOWER_STATS_ORGANIC_COLUMN], row[FOLLOWER_STATS_PAID_COLUMN] ) for demo_stat in api_demographics: key = ( str(demo_stat.get(FOLLOWER_STATS_ORG_URN_COLUMN)), str(demo_stat.get(FOLLOWER_STATS_TYPE_COLUMN)), str(demo_stat.get(FOLLOWER_STATS_CATEGORY_COLUMN)) ) api_counts = ( demo_stat.get(FOLLOWER_STATS_ORGANIC_COLUMN, 0), demo_stat.get(FOLLOWER_STATS_PAID_COLUMN, 0) ) # If key not in existing OR counts are different, then it's new/changed if key not in existing_demographics_map or existing_demographics_map[key] != api_counts: new_stats_to_upload.append(demo_stat) if not new_stats_to_upload: logging.info(f"Follower Stats sync: Data for org {org_urn} is up-to-date or no changes found after comparison.") follower_stats_sync_message = "Follower Stats: Data up-to-date or no changes. " token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state) attempt_logged = True return follower_stats_sync_message, token_state bulk_upload_to_bubble(new_stats_to_upload, BUBBLE_FOLLOWER_STATS_TABLE_NAME) logging.info(f"Successfully uploaded {len(new_stats_to_upload)} follower stat entries to Bubble for org {org_urn}.") # Update token_state's follower stats DataFrame temp_df = pd.concat([bubble_follower_stats_df_orig, pd.DataFrame(new_stats_to_upload)], ignore_index=True) # For monthly gains, keep last entry per org/date (category) monthly_part = temp_df[temp_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].drop_duplicates( subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN], keep='last' ) # For demographics, keep last entry per org/type/category demographics_part = temp_df[temp_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].drop_duplicates( subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN], keep='last' ) token_state["bubble_follower_stats_df"] = pd.concat([monthly_part, demographics_part], ignore_index=True) follower_stats_sync_message = f"Follower Stats: Synced {len(new_stats_to_upload)} entries. " except ValueError as ve: logging.error(f"ValueError during follower stats sync for {org_urn}: {ve}", exc_info=True) follower_stats_sync_message = f"Follower Stats Error: {html.escape(str(ve))}. " except Exception as e: logging.exception(f"Unexpected error in sync_linkedin_follower_stats for {org_urn}.") follower_stats_sync_message = f"Follower Stats: Unexpected error ({type(e).__name__}). " finally: if not attempt_logged and org_urn: token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state) return follower_stats_sync_message, token_state def sync_all_linkedin_data_orchestrator(token_state): """Orchestrates the syncing of all LinkedIn data types (Posts, Mentions, Follower Stats).""" logging.info("Starting sync_all_linkedin_data_orchestrator process.") if not token_state or not token_state.get("token"): logging.error("Sync All: Access denied. LinkedIn token not available.") return "

❌ Access denied. LinkedIn token not available.

", token_state org_urn = token_state.get('org_urn') client_id = token_state.get("client_id") posts_sync_message = "" mentions_sync_message = "" follower_stats_sync_message = "" if not org_urn: logging.error("Sync All: Org URN missing in token_state.") return "

❌ Config error: Org URN missing.

", token_state if not client_id or client_id == "ENV VAR MISSING": logging.error("Sync All: Client ID missing or not set in token_state.") return "

❌ Config error: Client ID missing.

", token_state # --- Sync Posts --- fetch_count_for_posts_api = token_state.get('fetch_count_for_api', 0) if fetch_count_for_posts_api == 0: # This means state_manager determined no post sync is needed based on log posts_sync_message = "Posts: Sync not currently required by schedule. " logging.info("Posts sync: Skipped as fetch_count_for_posts_api is 0 (determined by state_manager).") # Log an "attempt" to sync posts which resulted in a skip due to schedule. # This keeps the log fresh, indicating a check was made. token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state) else: posts_sync_message, token_state = _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api) # _sync_linkedin_posts_internal now handles its own logging internally # --- Sync Mentions --- # sync_linkedin_mentions will check token_state.get("mentions_should_sync_now") # and log its attempt internally. mentions_sync_message, token_state = sync_linkedin_mentions(token_state) # --- Sync Follower Stats --- # sync_linkedin_follower_stats will check token_state.get("fs_should_sync_now") # and log its attempt internally. follower_stats_sync_message, token_state = sync_linkedin_follower_stats(token_state) logging.info(f"Sync process complete. Messages: Posts: [{posts_sync_message.strip()}], Mentions: [{mentions_sync_message.strip()}], Follower Stats: [{follower_stats_sync_message.strip()}]") final_message = f"

✅ Sync Attempted. {posts_sync_message} {mentions_sync_message} {follower_stats_sync_message}

" return final_message, token_state