LinkedinMonitor / sync_logic.py
GuglielmoTor's picture
Update sync_logic.py
6fbe851 verified
raw
history blame
34.4 kB
# sync_logic.py
"""
Handles the logic for syncing LinkedIn data: posts, mentions, and follower statistics.
Fetches data from LinkedIn APIs, uploads to Bubble, and logs sync attempts.
"""
import pandas as pd
import logging
import html
from datetime import timezone # Python's datetime
# Assuming Bubble_API_Calls contains bulk_upload_to_bubble
from Bubble_API_Calls import bulk_upload_to_bubble, fetch_linkedin_posts_data_from_bubble, update_record_in_bubble
# Assuming Linkedin_Data_API_Calls contains all necessary LinkedIn data fetching and processing functions
from Linkedin_Data_API_Calls import (
fetch_linkedin_posts_core,
fetch_comments,
analyze_sentiment, # For post comments
compile_detailed_posts,
prepare_data_for_bubble, # For posts, stats, comments
fetch_linkedin_mentions_core,
analyze_mentions_sentiment, # For individual mentions
compile_detailed_mentions, # Compiles to user-specified format
prepare_mentions_for_bubble # Prepares user-specified format for Bubble
)
# Assuming linkedin_follower_stats.py contains get_linkedin_follower_stats
from linkedin_follower_stats import get_linkedin_follower_stats
# Assuming config.py contains all necessary constants
from config import (
LINKEDIN_POST_URN_KEY, BUBBLE_POST_URN_COLUMN_NAME, BUBBLE_POSTS_TABLE_NAME,
BUBBLE_POST_STATS_TABLE_NAME, BUBBLE_POST_COMMENTS_TABLE_NAME,
BUBBLE_MENTIONS_TABLE_NAME, BUBBLE_MENTIONS_ID_COLUMN_NAME, BUBBLE_MENTIONS_DATE_COLUMN_NAME,
DEFAULT_MENTIONS_INITIAL_FETCH_COUNT, DEFAULT_MENTIONS_UPDATE_FETCH_COUNT,
BUBBLE_FOLLOWER_STATS_TABLE_NAME, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN,
FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, FOLLOWER_STATS_PAID_COLUMN,
LINKEDIN_CLIENT_ID_ENV_VAR, # Though client_id is usually passed in token_state
# NEW constants for logging
BUBBLE_OPERATIONS_LOG_TABLE_NAME, BUBBLE_OPERATIONS_LOG_DATE_COLUMN,
BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN,
LOG_SUBJECT_POSTS, LOG_SUBJECT_MENTIONS, LOG_SUBJECT_FOLLOWER_STATS,
BUBBLE_UNIQUE_ID_COLUMN_NAME
)
def _log_sync_attempt(org_urn, subject, token_state):
"""
Logs a sync attempt to the Bubble operations log table and updates
the operations log DataFrame in token_state.
"""
logging.info(f"Logging sync attempt for subject: {subject}, org_urn: {org_urn}")
if not org_urn:
logging.warning("Cannot log sync attempt: org_urn is missing.")
return token_state
try:
log_entry_data = {
BUBBLE_OPERATIONS_LOG_DATE_COLUMN: pd.Timestamp.now(tz='UTC').isoformat(),
BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN: subject,
BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN: org_urn
}
# Ensure data types are what Bubble expects, e.g., date as string
# bulk_upload_to_bubble should handle dicts with basic types.
upload_payload = [log_entry_data]
bulk_upload_to_bubble(upload_payload, BUBBLE_OPERATIONS_LOG_TABLE_NAME)
logging.info(f"Successfully logged sync attempt for {subject} to Bubble table '{BUBBLE_OPERATIONS_LOG_TABLE_NAME}'.")
# Update token_state with the new log entry to keep it fresh
current_log_df = token_state.get("bubble_operations_log_df", pd.DataFrame())
new_log_entry_df = pd.DataFrame(upload_payload) # DataFrame from the same data we uploaded
# Ensure date column is datetime before concat if it exists and is not empty
if not new_log_entry_df.empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in new_log_entry_df.columns:
new_log_entry_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(new_log_entry_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True)
if not current_log_df.empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in current_log_df.columns:
# Ensure existing log df date column is also datetime
if not pd.api.types.is_datetime64_any_dtype(current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN]):
current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True)
updated_log_df = pd.concat([current_log_df, new_log_entry_df], ignore_index=True)
# To ensure the get_last_sync_attempt_date always gets the absolute latest,
# we can sort and drop duplicates, keeping the last.
# However, simply appending and letting max() find the latest is also fine.
# For robustness, let's sort and keep the latest for each subject/org combo if multiple logs were made rapidly.
if not updated_log_df.empty and all(col in updated_log_df.columns for col in [BUBBLE_OPERATIONS_LOG_DATE_COLUMN, BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN]):
updated_log_df = updated_log_df.sort_values(by=BUBBLE_OPERATIONS_LOG_DATE_COLUMN).drop_duplicates(
subset=[BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN],
keep='last'
)
token_state["bubble_operations_log_df"] = updated_log_df
logging.info(f"Updated 'bubble_operations_log_df' in token_state after logging {subject}.")
except Exception as e:
logging.error(f"Failed to log sync attempt for {subject} or update token_state: {e}", exc_info=True)
return token_state
def _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api):
"""Internal logic for syncing LinkedIn posts."""
# This function is called by orchestrator only if fetch_count_for_posts_api > 0
# So, an attempt to sync posts is indeed happening.
logging.info(f"Posts sync: Starting fetch for {fetch_count_for_posts_api} posts.")
client_id = token_state.get("client_id")
token_dict = token_state.get("token")
org_urn = token_state.get('org_urn')
bubble_posts_df_orig = token_state.get("bubble_posts_df", pd.DataFrame()).copy()
posts_sync_message = ""
attempt_logged = False # Flag to ensure log happens once
try:
# Basic checks before API call
if not all([client_id, token_dict, org_urn]):
posts_sync_message = "Posts: Config error (client_id, token, or org_urn missing). "
logging.error(f"Posts sync: Prerequisite missing - client_id: {'OK' if client_id else 'Missing'}, token: {'OK' if token_dict else 'Missing'}, org_urn: {'OK' if org_urn else 'Missing'}")
# Log attempt even if config error, as state_manager decided a sync *should* occur
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
attempt_logged = True
return posts_sync_message, token_state
processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_for_posts_api)
if not processed_raw_posts:
posts_sync_message = "Posts: None found via API. "
logging.info("Posts sync: No raw posts returned from API.")
# Log attempt as API was called
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
attempt_logged = True
return posts_sync_message, token_state
existing_post_urns = set()
if not bubble_posts_df_orig.empty and BUBBLE_POST_URN_COLUMN_NAME in bubble_posts_df_orig.columns:
existing_post_urns = set(bubble_posts_df_orig[BUBBLE_POST_URN_COLUMN_NAME].dropna().astype(str))
new_raw_posts = [p for p in processed_raw_posts if str(p.get(LINKEDIN_POST_URN_KEY)) not in existing_post_urns]
if not new_raw_posts:
posts_sync_message = "Posts: All fetched already in Bubble. "
logging.info("Posts sync: All fetched posts were already found in Bubble.")
# Log attempt as API was called and processed
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
attempt_logged = True
return posts_sync_message, token_state
logging.info(f"Posts sync: Processing {len(new_raw_posts)} new raw posts.")
post_urns_to_process = [p[LINKEDIN_POST_URN_KEY] for p in new_raw_posts if p.get(LINKEDIN_POST_URN_KEY)]
all_comments_data = fetch_comments(client_id, token_dict, post_urns_to_process, stats_map)
sentiments_per_post = analyze_sentiment(all_comments_data)
detailed_new_posts = compile_detailed_posts(new_raw_posts, stats_map, sentiments_per_post)
li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_new_posts, all_comments_data)
if li_posts:
bulk_upload_to_bubble(li_posts, BUBBLE_POSTS_TABLE_NAME)
updated_posts_df = pd.concat([bubble_posts_df_orig, pd.DataFrame(li_posts)], ignore_index=True)
token_state["bubble_posts_df"] = updated_posts_df.drop_duplicates(subset=[BUBBLE_POST_URN_COLUMN_NAME], keep='last')
logging.info(f"Posts sync: Uploaded {len(li_posts)} new posts to Bubble.")
if li_post_stats:
bulk_upload_to_bubble(li_post_stats, BUBBLE_POST_STATS_TABLE_NAME)
logging.info(f"Posts sync: Uploaded {len(li_post_stats)} post_stats entries.")
if li_post_comments:
bulk_upload_to_bubble(li_post_comments, BUBBLE_POST_COMMENTS_TABLE_NAME)
logging.info(f"Posts sync: Uploaded {len(li_post_comments)} post_comments entries.")
posts_sync_message = f"Posts: Synced {len(li_posts)} new. "
else:
posts_sync_message = "Posts: No new ones to upload after processing. "
logging.info("Posts sync: No new posts were prepared for Bubble upload.")
except ValueError as ve:
posts_sync_message = f"Posts Error: {html.escape(str(ve))}. "
logging.error(f"Posts sync: ValueError: {ve}", exc_info=True)
except Exception as e:
logging.exception("Posts sync: Unexpected error during processing.")
posts_sync_message = f"Posts: Unexpected error ({type(e).__name__}). "
finally:
# Log the sync attempt if it hasn't been logged already (e.g. due to early exit)
# and if basic conditions (org_urn) for logging are met.
if not attempt_logged and org_urn:
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
return posts_sync_message, token_state
def sync_linkedin_mentions(token_state):
"""Fetches new LinkedIn mentions and uploads them to Bubble, if scheduled by state_manager."""
logging.info("Starting LinkedIn mentions sync process check.")
if not token_state.get("mentions_should_sync_now", False):
logging.info("Mentions sync: Not scheduled by state_manager based on operations log. Skipping.")
return "Mentions: Sync not currently required by schedule. ", token_state
logging.info("Mentions sync: Proceeding as scheduled by state_manager.")
if not token_state or not token_state.get("token"):
logging.error("Mentions sync: Access denied. No LinkedIn token.")
# Still log an attempt if org_urn is available, as a sync was scheduled
org_urn_for_log = token_state.get('org_urn') if token_state else None
if org_urn_for_log:
token_state = _log_sync_attempt(org_urn_for_log, LOG_SUBJECT_MENTIONS, token_state)
return "Mentions: No token. ", token_state
client_id = token_state.get("client_id")
token_dict = token_state.get("token")
org_urn = token_state.get('org_urn')
bubble_mentions_df_orig = token_state.get("bubble_mentions_df", pd.DataFrame()).copy()
mentions_sync_message = ""
attempt_logged = False
if not org_urn or not client_id or client_id == "ENV VAR MISSING":
logging.error("Mentions sync: Configuration error (Org URN or Client ID missing).")
if org_urn: # Log if possible
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
attempt_logged = True
return "Mentions: Config error. ", token_state
# Determine fetch count: initial if no mentions data, update otherwise
fetch_count_for_mentions_api = DEFAULT_MENTIONS_INITIAL_FETCH_COUNT \
if bubble_mentions_df_orig.empty else DEFAULT_MENTIONS_UPDATE_FETCH_COUNT
logging.info(f"Mentions sync: Fetch count set to {fetch_count_for_mentions_api}.")
try:
processed_raw_mentions = fetch_linkedin_mentions_core(client_id, token_dict, org_urn, count=fetch_count_for_mentions_api)
if not processed_raw_mentions:
logging.info("Mentions sync: No new mentions found via API.")
mentions_sync_message = "Mentions: None found via API. "
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
attempt_logged = True
return mentions_sync_message, token_state
existing_mention_ids = set()
if not bubble_mentions_df_orig.empty and BUBBLE_MENTIONS_ID_COLUMN_NAME in bubble_mentions_df_orig.columns:
existing_mention_ids = set(bubble_mentions_df_orig[BUBBLE_MENTIONS_ID_COLUMN_NAME].dropna().astype(str))
sentiments_map = analyze_mentions_sentiment(processed_raw_mentions)
all_compiled_mentions = compile_detailed_mentions(processed_raw_mentions, sentiments_map)
new_compiled_mentions_to_upload = [
m for m in all_compiled_mentions if str(m.get("id")) not in existing_mention_ids
]
if not new_compiled_mentions_to_upload:
logging.info("Mentions sync: All fetched mentions are already in Bubble.")
mentions_sync_message = "Mentions: All fetched already in Bubble. "
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
attempt_logged = True
return mentions_sync_message, token_state
bubble_ready_mentions = prepare_mentions_for_bubble(new_compiled_mentions_to_upload)
if bubble_ready_mentions:
bulk_upload_to_bubble(bubble_ready_mentions, BUBBLE_MENTIONS_TABLE_NAME)
logging.info(f"Successfully uploaded {len(bubble_ready_mentions)} new mentions to Bubble.")
updated_mentions_df = pd.concat([bubble_mentions_df_orig, pd.DataFrame(bubble_ready_mentions)], ignore_index=True)
token_state["bubble_mentions_df"] = updated_mentions_df.drop_duplicates(subset=[BUBBLE_MENTIONS_ID_COLUMN_NAME], keep='last')
mentions_sync_message = f"Mentions: Synced {len(bubble_ready_mentions)} new. "
else:
logging.info("Mentions sync: No new mentions were prepared for Bubble upload.")
mentions_sync_message = "Mentions: No new ones to upload. "
except ValueError as ve:
logging.error(f"ValueError during mentions sync: {ve}", exc_info=True)
mentions_sync_message = f"Mentions Error: {html.escape(str(ve))}. "
except Exception as e:
logging.exception("Unexpected error in sync_linkedin_mentions.")
mentions_sync_message = f"Mentions: Unexpected error ({type(e).__name__}). "
finally:
if not attempt_logged and org_urn:
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
return mentions_sync_message, token_state
def sync_linkedin_follower_stats(token_state):
"""
Fetches new/updated LinkedIn follower statistics and uploads/updates them in Bubble,
if scheduled by state_manager.
For both monthly gains and demographics, updates counts only if the new LinkedIn count is greater.
Creates new records if the category/month doesn't exist.
"""
logging.info("Starting LinkedIn follower stats sync process check.")
if not token_state.get("fs_should_sync_now", False):
logging.info("Follower Stats sync: Not scheduled by state_manager. Skipping.")
return "Follower Stats: Sync not currently required by schedule. ", token_state
logging.info("Follower Stats sync: Proceeding as scheduled by state_manager.")
if not token_state or not token_state.get("token"):
logging.error("Follower Stats sync: Access denied. No LinkedIn token.")
org_urn_for_log = token_state.get('org_urn') if token_state else None
if org_urn_for_log:
token_state = _log_sync_attempt(org_urn_for_log, LOG_SUBJECT_FOLLOWER_STATS, token_state)
return "Follower Stats: No token. ", token_state
client_id = token_state.get("client_id")
token_dict = token_state.get("token")
org_urn = token_state.get('org_urn')
bubble_follower_stats_df_orig = token_state.get("bubble_follower_stats_df", pd.DataFrame()).copy()
follower_stats_sync_message = ""
attempt_logged = False
if not org_urn or not client_id or client_id == "ENV VAR MISSING":
logging.error("Follower Stats sync: Configuration error (Org URN or Client ID missing).")
if org_urn:
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
attempt_logged = True
return "Follower Stats: Config error. ", token_state
logging.info(f"{bubble_follower_stats_df_orig.columns}")
# Ensure the BUBBLE_UNIQUE_ID_COLUMN_NAME exists in the DataFrame if it's not empty,
# as it's crucial for building the maps for updates.
if not bubble_follower_stats_df_orig.empty and BUBBLE_UNIQUE_ID_COLUMN_NAME not in bubble_follower_stats_df_orig.columns:
logging.error(f"Follower Stats sync: Critical error - '{BUBBLE_UNIQUE_ID_COLUMN_NAME}' column missing in bubble_follower_stats_df. Cannot proceed with updates.")
if org_urn:
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state) # Log the attempt despite error
attempt_logged = True
return f"Follower Stats: Config error ({BUBBLE_UNIQUE_ID_COLUMN_NAME} missing). ", token_state
logging.info(f"Follower stats sync proceeding for org_urn: {org_urn}")
try:
api_follower_stats = get_linkedin_follower_stats(client_id, token_dict, org_urn)
if not api_follower_stats:
logging.info(f"Follower Stats sync: No stats found via API for org {org_urn}.")
follower_stats_sync_message = "Follower Stats: None found via API. "
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
attempt_logged = True
return follower_stats_sync_message, token_state
stats_for_bulk_upload = []
records_to_update_via_patch = [] # List of tuples: (bubble_id, fields_to_update_dict)
# --- Prepare maps for existing data in Bubble for efficient lookup ---
# Key: (org_urn, type, category_identifier), Value: (organic, paid, bubble_record_id)
# For monthly gains, category_identifier is the formatted date string.
# For demographics, category_identifier is the FOLLOWER_STATS_CATEGORY_COLUMN value.
existing_stats_map = {}
stats_required_cols = [
FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN,
FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, # Assuming these apply to monthly too
FOLLOWER_STATS_PAID_COLUMN, # Assuming these apply to monthly too
BUBBLE_UNIQUE_ID_COLUMN_NAME
]
if not bubble_follower_stats_df_orig.empty and all(col in bubble_follower_stats_df_orig.columns for col in stats_required_cols):
for _, row in bubble_follower_stats_df_orig.iterrows():
category_identifier = str(row[FOLLOWER_STATS_CATEGORY_COLUMN])
# For monthly gains, ensure category (date) is consistently formatted if needed
if row[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly':
try:
category_identifier = pd.to_datetime(row[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').strftime('%Y-%m-%d')
if category_identifier == 'NaT': # Handle parsing errors
logging.warning(f"Could not parse date for existing monthly gain: {row[FOLLOWER_STATS_CATEGORY_COLUMN]}. Skipping this entry for map.")
continue
except Exception: # Catch any other parsing issues
logging.warning(f"Error parsing date for existing monthly gain: {row[FOLLOWER_STATS_CATEGORY_COLUMN]}. Skipping this entry for map.")
continue
key = (
str(row[FOLLOWER_STATS_ORG_URN_COLUMN]),
str(row[FOLLOWER_STATS_TYPE_COLUMN]),
category_identifier
)
existing_stats_map[key] = (
row[FOLLOWER_STATS_ORGANIC_COLUMN], # Assuming monthly gains have this
row[FOLLOWER_STATS_PAID_COLUMN], # Assuming monthly gains have this
row[BUBBLE_UNIQUE_ID_COLUMN_NAME]
)
elif not bubble_follower_stats_df_orig.empty:
logging.warning(f"Follower Stats: Data in Bubble is missing one or more required columns for update logic: {stats_required_cols}. Will treat all API stats as new if not matched by key elements.")
# --- Process all stats from API (monthly gains and demographics) ---
for stat_from_api in api_follower_stats:
api_type = str(stat_from_api.get(FOLLOWER_STATS_TYPE_COLUMN))
api_category_raw = stat_from_api.get(FOLLOWER_STATS_CATEGORY_COLUMN)
api_category_identifier = str(api_category_raw)
if api_type == 'follower_gains_monthly':
try:
api_category_identifier = pd.to_datetime(api_category_raw, errors='coerce').strftime('%Y-%m-%d')
if api_category_identifier == 'NaT':
logging.warning(f"Could not parse date from API for monthly gain: {api_category_raw}. Skipping this API stat.")
continue
except Exception:
logging.warning(f"Error parsing date from API for monthly gain: {api_category_raw}. Skipping this API stat.")
continue
key = (
str(stat_from_api.get(FOLLOWER_STATS_ORG_URN_COLUMN)),
api_type,
api_category_identifier
)
# Assuming monthly gains also have organic/paid counts.
# If they have different count fields, these need to be specified.
# For simplicity, using FOLLOWER_STATS_ORGANIC_COLUMN and FOLLOWER_STATS_PAID_COLUMN.
# If monthly gains only have a single 'count' field, adjust logic accordingly.
api_organic_count = stat_from_api.get(FOLLOWER_STATS_ORGANIC_COLUMN, 0)
api_paid_count = stat_from_api.get(FOLLOWER_STATS_PAID_COLUMN, 0)
if key not in existing_stats_map:
# This stat category/month is entirely new, add for bulk creation
stats_for_bulk_upload.append(stat_from_api)
else:
# Stat category/month exists, check if counts need updating
existing_organic, existing_paid, bubble_id = existing_stats_map[key]
fields_to_update_in_bubble = {}
if api_organic_count != existing_organic:
fields_to_update_in_bubble[FOLLOWER_STATS_ORGANIC_COLUMN] = api_organic_count
if api_paid_count != existing_paid:
fields_to_update_in_bubble[FOLLOWER_STATS_PAID_COLUMN] = api_paid_count
if fields_to_update_in_bubble: # If there's at least one field to update
records_to_update_via_patch.append((bubble_id, fields_to_update_in_bubble))
# --- Perform Bubble Operations ---
num_bulk_uploaded = 0
if stats_for_bulk_upload:
if bulk_upload_to_bubble(stats_for_bulk_upload, BUBBLE_FOLLOWER_STATS_TABLE_NAME):
num_bulk_uploaded = len(stats_for_bulk_upload)
logging.info(f"Successfully bulk-uploaded {num_bulk_uploaded} new follower stat entries to Bubble for org {org_urn}.")
else:
logging.error(f"Failed to bulk-upload {len(stats_for_bulk_upload)} new follower stat entries for org {org_urn}.")
num_patched_updated = 0
if records_to_update_via_patch:
for bubble_id, fields_to_update in records_to_update_via_patch:
if update_record_in_bubble(BUBBLE_FOLLOWER_STATS_TABLE_NAME, bubble_id, fields_to_update):
num_patched_updated += 1
else:
logging.error(f"Failed to update record {bubble_id} via PATCH for follower stats for org {org_urn}.")
logging.info(f"Attempted to update {len(records_to_update_via_patch)} follower stat entries via PATCH, {num_patched_updated} succeeded for org {org_urn}.")
if not stats_for_bulk_upload and not records_to_update_via_patch:
logging.info(f"Follower Stats sync: Data for org {org_urn} is up-to-date or no changes met update criteria.")
follower_stats_sync_message = "Follower Stats: Data up-to-date or no qualifying changes. "
else:
follower_stats_sync_message = f"Follower Stats: Synced (New: {num_bulk_uploaded}, Updated: {num_patched_updated}). "
# --- Update token_state's follower stats DataFrame ---
current_data_for_state_df = bubble_follower_stats_df_orig.copy()
if records_to_update_via_patch and num_patched_updated > 0:
# Create a temporary map of successful updates for quick lookup
successful_updates_map = {
bubble_id: fields for i, (bubble_id, fields) in enumerate(records_to_update_via_patch) if i < num_patched_updated
}
if successful_updates_map: # only proceed if there were successful updates to reflect
for index, row in current_data_for_state_df.iterrows():
bubble_id_from_df = row.get(BUBBLE_UNIQUE_ID_COLUMN_NAME)
if bubble_id_from_df in successful_updates_map:
fields_updated = successful_updates_map[bubble_id_from_df]
for col, value in fields_updated.items():
current_data_for_state_df.loc[index, col] = value
if stats_for_bulk_upload and num_bulk_uploaded > 0:
# Only consider successfully uploaded new records
successfully_created_stats = [s for i, s in enumerate(stats_for_bulk_upload) if i < num_bulk_uploaded]
if successfully_created_stats:
newly_created_df = pd.DataFrame(successfully_created_stats)
if not newly_created_df.empty:
for col in current_data_for_state_df.columns:
if col not in newly_created_df.columns:
newly_created_df[col] = pd.NA # Use pd.NA for missing values
# Align columns before concat to avoid issues with differing column orders or types
aligned_newly_created_df = newly_created_df.reindex(columns=current_data_for_state_df.columns).fillna(pd.NA)
current_data_for_state_df = pd.concat([current_data_for_state_df, aligned_newly_created_df], ignore_index=True)
if not current_data_for_state_df.empty:
# Deduplication logic (important after combining original, patched, and new data)
# Ensure consistent primary key for deduplication across types
# For monthly gains, primary key is (org_urn, type='follower_gains_monthly', category=date_str)
# For demographics, primary key is (org_urn, type, category)
# To handle this, we can sort by a hypothetical 'last_modified_indicator' if we had one,
# or rely on 'keep=last' after ensuring data is ordered such that API data (potentially newer) comes later.
# The concat order (original, then new) and then drop_duplicates with keep='last' on identifying keys is standard.
# We need to define unique keys for each type to drop duplicates correctly.
# The current deduplication splits by type and then applies different subsets. This should still work.
monthly_part = current_data_for_state_df[current_data_for_state_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly']
if not monthly_part.empty:
# Ensure category is consistently formatted for monthly gains before deduplication
monthly_part_copy = monthly_part.copy() # To avoid SettingWithCopyWarning
monthly_part_copy[FOLLOWER_STATS_CATEGORY_COLUMN] = pd.to_datetime(monthly_part_copy[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.strftime('%Y-%m-%d')
monthly_part = monthly_part_copy.drop_duplicates(
subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN],
keep='last'
)
demographics_part = current_data_for_state_df[current_data_for_state_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly']
if not demographics_part.empty:
demo_subset_cols = [FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN]
if all(col in demographics_part.columns for col in demo_subset_cols):
demographics_part = demographics_part.drop_duplicates(
subset=demo_subset_cols,
keep='last'
)
else:
logging.warning("Follower Stats: Missing columns for demographic deduplication in token_state update. Skipping.")
if monthly_part.empty and demographics_part.empty:
token_state["bubble_follower_stats_df"] = pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
elif monthly_part.empty:
token_state["bubble_follower_stats_df"] = demographics_part.reset_index(drop=True) if not demographics_part.empty else pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
elif demographics_part.empty:
token_state["bubble_follower_stats_df"] = monthly_part.reset_index(drop=True) if not monthly_part.empty else pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
else:
token_state["bubble_follower_stats_df"] = pd.concat([monthly_part, demographics_part], ignore_index=True)
else:
token_state["bubble_follower_stats_df"] = pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
except ValueError as ve:
logging.error(f"ValueError during follower stats sync for {org_urn}: {ve}", exc_info=True)
follower_stats_sync_message = f"Follower Stats Error: {html.escape(str(ve))}. "
except Exception as e:
logging.exception(f"Unexpected error in sync_linkedin_follower_stats for {org_urn}.")
follower_stats_sync_message = f"Follower Stats: Unexpected error ({type(e).__name__}). "
finally:
if not attempt_logged and org_urn:
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
return follower_stats_sync_message, token_state
def sync_all_linkedin_data_orchestrator(token_state):
"""Orchestrates the syncing of all LinkedIn data types (Posts, Mentions, Follower Stats)."""
logging.info("Starting sync_all_linkedin_data_orchestrator process.")
if not token_state or not token_state.get("token"):
logging.error("Sync All: Access denied. LinkedIn token not available.")
return "<p style='color:red; text-align:center;'>❌ Access denied. LinkedIn token not available.</p>", token_state
org_urn = token_state.get('org_urn')
client_id = token_state.get("client_id")
posts_sync_message = ""
mentions_sync_message = ""
follower_stats_sync_message = ""
if not org_urn:
logging.error("Sync All: Org URN missing in token_state.")
return "<p style='color:red;'>❌ Config error: Org URN missing.</p>", token_state
if not client_id or client_id == "ENV VAR MISSING":
logging.error("Sync All: Client ID missing or not set in token_state.")
return "<p style='color:red;'>❌ Config error: Client ID missing.</p>", token_state
# --- Sync Posts ---
fetch_count_for_posts_api = token_state.get('fetch_count_for_api', 0)
if fetch_count_for_posts_api == 0:
# This means state_manager determined no post sync is needed based on log
posts_sync_message = "Posts: Sync not currently required by schedule. "
logging.info("Posts sync: Skipped as fetch_count_for_posts_api is 0 (determined by state_manager).")
# Log an "attempt" to sync posts which resulted in a skip due to schedule.
# This keeps the log fresh, indicating a check was made.
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
else:
posts_sync_message, token_state = _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api)
# _sync_linkedin_posts_internal now handles its own logging internally
# --- Sync Mentions ---
# sync_linkedin_mentions will check token_state.get("mentions_should_sync_now")
# and log its attempt internally.
mentions_sync_message, token_state = sync_linkedin_mentions(token_state)
# --- Sync Follower Stats ---
# sync_linkedin_follower_stats will check token_state.get("fs_should_sync_now")
# and log its attempt internally.
follower_stats_sync_message, token_state = sync_linkedin_follower_stats(token_state)
logging.info(f"Sync process complete. Messages: Posts: [{posts_sync_message.strip()}], Mentions: [{mentions_sync_message.strip()}], Follower Stats: [{follower_stats_sync_message.strip()}]")
final_message = f"<p style='color:green; text-align:center;'>βœ… Sync Attempted. {posts_sync_message} {mentions_sync_message} {follower_stats_sync_message}</p>"
return final_message, token_state