Spaces:
Running
Running
# sync_logic.py | |
""" | |
Handles the logic for syncing LinkedIn data: posts, mentions, and follower statistics. | |
Fetches data from LinkedIn APIs, uploads to Bubble, and logs sync attempts. | |
""" | |
import pandas as pd | |
import logging | |
import html | |
from datetime import timezone # Python's datetime | |
# Assuming Bubble_API_Calls contains bulk_upload_to_bubble | |
from Bubble_API_Calls import bulk_upload_to_bubble, fetch_linkedin_posts_data_from_bubble, update_record_in_bubble | |
# Assuming Linkedin_Data_API_Calls contains all necessary LinkedIn data fetching and processing functions | |
from Linkedin_Data_API_Calls import ( | |
fetch_linkedin_posts_core, | |
fetch_comments, | |
analyze_sentiment, # For post comments | |
compile_detailed_posts, | |
prepare_data_for_bubble, # For posts, stats, comments | |
fetch_linkedin_mentions_core, | |
analyze_mentions_sentiment, # For individual mentions | |
compile_detailed_mentions, # Compiles to user-specified format | |
prepare_mentions_for_bubble # Prepares user-specified format for Bubble | |
) | |
# Assuming linkedin_follower_stats.py contains get_linkedin_follower_stats | |
from linkedin_follower_stats import get_linkedin_follower_stats | |
# Assuming config.py contains all necessary constants | |
from config import ( | |
LINKEDIN_POST_URN_KEY, BUBBLE_POST_URN_COLUMN_NAME, BUBBLE_POSTS_TABLE_NAME, | |
BUBBLE_POST_STATS_TABLE_NAME, BUBBLE_POST_COMMENTS_TABLE_NAME, | |
BUBBLE_MENTIONS_TABLE_NAME, BUBBLE_MENTIONS_ID_COLUMN_NAME, BUBBLE_MENTIONS_DATE_COLUMN_NAME, | |
DEFAULT_MENTIONS_INITIAL_FETCH_COUNT, DEFAULT_MENTIONS_UPDATE_FETCH_COUNT, | |
BUBBLE_FOLLOWER_STATS_TABLE_NAME, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN, | |
FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, FOLLOWER_STATS_PAID_COLUMN, | |
LINKEDIN_CLIENT_ID_ENV_VAR, # Though client_id is usually passed in token_state | |
# NEW constants for logging | |
BUBBLE_OPERATIONS_LOG_TABLE_NAME, BUBBLE_OPERATIONS_LOG_DATE_COLUMN, | |
BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN, | |
LOG_SUBJECT_POSTS, LOG_SUBJECT_MENTIONS, LOG_SUBJECT_FOLLOWER_STATS, | |
BUBBLE_UNIQUE_ID_COLUMN_NAME | |
) | |
def _log_sync_attempt(org_urn, subject, token_state): | |
""" | |
Logs a sync attempt to the Bubble operations log table and updates | |
the operations log DataFrame in token_state. | |
""" | |
logging.info(f"Logging sync attempt for subject: {subject}, org_urn: {org_urn}") | |
if not org_urn: | |
logging.warning("Cannot log sync attempt: org_urn is missing.") | |
return token_state | |
try: | |
log_entry_data = { | |
BUBBLE_OPERATIONS_LOG_DATE_COLUMN: pd.Timestamp.now(tz='UTC').isoformat(), | |
BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN: subject, | |
BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN: org_urn | |
} | |
# Ensure data types are what Bubble expects, e.g., date as string | |
# bulk_upload_to_bubble should handle dicts with basic types. | |
upload_payload = [log_entry_data] | |
bulk_upload_to_bubble(upload_payload, BUBBLE_OPERATIONS_LOG_TABLE_NAME) | |
logging.info(f"Successfully logged sync attempt for {subject} to Bubble table '{BUBBLE_OPERATIONS_LOG_TABLE_NAME}'.") | |
# Update token_state with the new log entry to keep it fresh | |
current_log_df = token_state.get("bubble_operations_log_df", pd.DataFrame()) | |
new_log_entry_df = pd.DataFrame(upload_payload) # DataFrame from the same data we uploaded | |
# Ensure date column is datetime before concat if it exists and is not empty | |
if not new_log_entry_df.empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in new_log_entry_df.columns: | |
new_log_entry_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(new_log_entry_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True) | |
if not current_log_df.empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in current_log_df.columns: | |
# Ensure existing log df date column is also datetime | |
if not pd.api.types.is_datetime64_any_dtype(current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN]): | |
current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True) | |
updated_log_df = pd.concat([current_log_df, new_log_entry_df], ignore_index=True) | |
# To ensure the get_last_sync_attempt_date always gets the absolute latest, | |
# we can sort and drop duplicates, keeping the last. | |
# However, simply appending and letting max() find the latest is also fine. | |
# For robustness, let's sort and keep the latest for each subject/org combo if multiple logs were made rapidly. | |
if not updated_log_df.empty and all(col in updated_log_df.columns for col in [BUBBLE_OPERATIONS_LOG_DATE_COLUMN, BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN]): | |
updated_log_df = updated_log_df.sort_values(by=BUBBLE_OPERATIONS_LOG_DATE_COLUMN).drop_duplicates( | |
subset=[BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN], | |
keep='last' | |
) | |
token_state["bubble_operations_log_df"] = updated_log_df | |
logging.info(f"Updated 'bubble_operations_log_df' in token_state after logging {subject}.") | |
except Exception as e: | |
logging.error(f"Failed to log sync attempt for {subject} or update token_state: {e}", exc_info=True) | |
return token_state | |
def _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api): | |
"""Internal logic for syncing LinkedIn posts.""" | |
# This function is called by orchestrator only if fetch_count_for_posts_api > 0 | |
# So, an attempt to sync posts is indeed happening. | |
logging.info(f"Posts sync: Starting fetch for {fetch_count_for_posts_api} posts.") | |
client_id = token_state.get("client_id") | |
token_dict = token_state.get("token") | |
org_urn = token_state.get('org_urn') | |
bubble_posts_df_orig = token_state.get("bubble_posts_df", pd.DataFrame()).copy() | |
posts_sync_message = "" | |
attempt_logged = False # Flag to ensure log happens once | |
try: | |
# Basic checks before API call | |
if not all([client_id, token_dict, org_urn]): | |
posts_sync_message = "Posts: Config error (client_id, token, or org_urn missing). " | |
logging.error(f"Posts sync: Prerequisite missing - client_id: {'OK' if client_id else 'Missing'}, token: {'OK' if token_dict else 'Missing'}, org_urn: {'OK' if org_urn else 'Missing'}") | |
# Log attempt even if config error, as state_manager decided a sync *should* occur | |
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state) | |
attempt_logged = True | |
return posts_sync_message, token_state | |
processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_for_posts_api) | |
if not processed_raw_posts: | |
posts_sync_message = "Posts: None found via API. " | |
logging.info("Posts sync: No raw posts returned from API.") | |
# Log attempt as API was called | |
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state) | |
attempt_logged = True | |
return posts_sync_message, token_state | |
existing_post_urns = set() | |
if not bubble_posts_df_orig.empty and BUBBLE_POST_URN_COLUMN_NAME in bubble_posts_df_orig.columns: | |
existing_post_urns = set(bubble_posts_df_orig[BUBBLE_POST_URN_COLUMN_NAME].dropna().astype(str)) | |
new_raw_posts = [p for p in processed_raw_posts if str(p.get(LINKEDIN_POST_URN_KEY)) not in existing_post_urns] | |
if not new_raw_posts: | |
posts_sync_message = "Posts: All fetched already in Bubble. " | |
logging.info("Posts sync: All fetched posts were already found in Bubble.") | |
# Log attempt as API was called and processed | |
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state) | |
attempt_logged = True | |
return posts_sync_message, token_state | |
logging.info(f"Posts sync: Processing {len(new_raw_posts)} new raw posts.") | |
post_urns_to_process = [p[LINKEDIN_POST_URN_KEY] for p in new_raw_posts if p.get(LINKEDIN_POST_URN_KEY)] | |
all_comments_data = fetch_comments(client_id, token_dict, post_urns_to_process, stats_map) | |
sentiments_per_post = analyze_sentiment(all_comments_data) | |
detailed_new_posts = compile_detailed_posts(new_raw_posts, stats_map, sentiments_per_post) | |
li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_new_posts, all_comments_data) | |
if li_posts: | |
bulk_upload_to_bubble(li_posts, BUBBLE_POSTS_TABLE_NAME) | |
updated_posts_df = pd.concat([bubble_posts_df_orig, pd.DataFrame(li_posts)], ignore_index=True) | |
token_state["bubble_posts_df"] = updated_posts_df.drop_duplicates(subset=[BUBBLE_POST_URN_COLUMN_NAME], keep='last') | |
logging.info(f"Posts sync: Uploaded {len(li_posts)} new posts to Bubble.") | |
if li_post_stats: | |
bulk_upload_to_bubble(li_post_stats, BUBBLE_POST_STATS_TABLE_NAME) | |
logging.info(f"Posts sync: Uploaded {len(li_post_stats)} post_stats entries.") | |
if li_post_comments: | |
bulk_upload_to_bubble(li_post_comments, BUBBLE_POST_COMMENTS_TABLE_NAME) | |
logging.info(f"Posts sync: Uploaded {len(li_post_comments)} post_comments entries.") | |
posts_sync_message = f"Posts: Synced {len(li_posts)} new. " | |
else: | |
posts_sync_message = "Posts: No new ones to upload after processing. " | |
logging.info("Posts sync: No new posts were prepared for Bubble upload.") | |
except ValueError as ve: | |
posts_sync_message = f"Posts Error: {html.escape(str(ve))}. " | |
logging.error(f"Posts sync: ValueError: {ve}", exc_info=True) | |
except Exception as e: | |
logging.exception("Posts sync: Unexpected error during processing.") | |
posts_sync_message = f"Posts: Unexpected error ({type(e).__name__}). " | |
finally: | |
# Log the sync attempt if it hasn't been logged already (e.g. due to early exit) | |
# and if basic conditions (org_urn) for logging are met. | |
if not attempt_logged and org_urn: | |
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state) | |
return posts_sync_message, token_state | |
def sync_linkedin_mentions(token_state): | |
"""Fetches new LinkedIn mentions and uploads them to Bubble, if scheduled by state_manager.""" | |
logging.info("Starting LinkedIn mentions sync process check.") | |
if not token_state.get("mentions_should_sync_now", False): | |
logging.info("Mentions sync: Not scheduled by state_manager based on operations log. Skipping.") | |
return "Mentions: Sync not currently required by schedule. ", token_state | |
logging.info("Mentions sync: Proceeding as scheduled by state_manager.") | |
if not token_state or not token_state.get("token"): | |
logging.error("Mentions sync: Access denied. No LinkedIn token.") | |
# Still log an attempt if org_urn is available, as a sync was scheduled | |
org_urn_for_log = token_state.get('org_urn') if token_state else None | |
if org_urn_for_log: | |
token_state = _log_sync_attempt(org_urn_for_log, LOG_SUBJECT_MENTIONS, token_state) | |
return "Mentions: No token. ", token_state | |
client_id = token_state.get("client_id") | |
token_dict = token_state.get("token") | |
org_urn = token_state.get('org_urn') | |
bubble_mentions_df_orig = token_state.get("bubble_mentions_df", pd.DataFrame()).copy() | |
mentions_sync_message = "" | |
attempt_logged = False | |
if not org_urn or not client_id or client_id == "ENV VAR MISSING": | |
logging.error("Mentions sync: Configuration error (Org URN or Client ID missing).") | |
if org_urn: # Log if possible | |
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state) | |
attempt_logged = True | |
return "Mentions: Config error. ", token_state | |
# Determine fetch count: initial if no mentions data, update otherwise | |
fetch_count_for_mentions_api = DEFAULT_MENTIONS_INITIAL_FETCH_COUNT \ | |
if bubble_mentions_df_orig.empty else DEFAULT_MENTIONS_UPDATE_FETCH_COUNT | |
logging.info(f"Mentions sync: Fetch count set to {fetch_count_for_mentions_api}.") | |
try: | |
processed_raw_mentions = fetch_linkedin_mentions_core(client_id, token_dict, org_urn, count=fetch_count_for_mentions_api) | |
if not processed_raw_mentions: | |
logging.info("Mentions sync: No new mentions found via API.") | |
mentions_sync_message = "Mentions: None found via API. " | |
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state) | |
attempt_logged = True | |
return mentions_sync_message, token_state | |
existing_mention_ids = set() | |
if not bubble_mentions_df_orig.empty and BUBBLE_MENTIONS_ID_COLUMN_NAME in bubble_mentions_df_orig.columns: | |
existing_mention_ids = set(bubble_mentions_df_orig[BUBBLE_MENTIONS_ID_COLUMN_NAME].dropna().astype(str)) | |
sentiments_map = analyze_mentions_sentiment(processed_raw_mentions) | |
all_compiled_mentions = compile_detailed_mentions(processed_raw_mentions, sentiments_map) | |
new_compiled_mentions_to_upload = [ | |
m for m in all_compiled_mentions if str(m.get("id")) not in existing_mention_ids | |
] | |
if not new_compiled_mentions_to_upload: | |
logging.info("Mentions sync: All fetched mentions are already in Bubble.") | |
mentions_sync_message = "Mentions: All fetched already in Bubble. " | |
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state) | |
attempt_logged = True | |
return mentions_sync_message, token_state | |
bubble_ready_mentions = prepare_mentions_for_bubble(new_compiled_mentions_to_upload) | |
if bubble_ready_mentions: | |
bulk_upload_to_bubble(bubble_ready_mentions, BUBBLE_MENTIONS_TABLE_NAME) | |
logging.info(f"Successfully uploaded {len(bubble_ready_mentions)} new mentions to Bubble.") | |
updated_mentions_df = pd.concat([bubble_mentions_df_orig, pd.DataFrame(bubble_ready_mentions)], ignore_index=True) | |
token_state["bubble_mentions_df"] = updated_mentions_df.drop_duplicates(subset=[BUBBLE_MENTIONS_ID_COLUMN_NAME], keep='last') | |
mentions_sync_message = f"Mentions: Synced {len(bubble_ready_mentions)} new. " | |
else: | |
logging.info("Mentions sync: No new mentions were prepared for Bubble upload.") | |
mentions_sync_message = "Mentions: No new ones to upload. " | |
except ValueError as ve: | |
logging.error(f"ValueError during mentions sync: {ve}", exc_info=True) | |
mentions_sync_message = f"Mentions Error: {html.escape(str(ve))}. " | |
except Exception as e: | |
logging.exception("Unexpected error in sync_linkedin_mentions.") | |
mentions_sync_message = f"Mentions: Unexpected error ({type(e).__name__}). " | |
finally: | |
if not attempt_logged and org_urn: | |
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state) | |
return mentions_sync_message, token_state | |
def _clean_key_component(component_value, is_category_identifier=False): | |
""" | |
Helper to consistently clean key components. | |
For non-date category identifiers, converts to lowercase for case-insensitive matching. | |
""" | |
if pd.isna(component_value) or component_value is None: | |
return "NONE_VALUE" # Consistent placeholder for None/NaN | |
cleaned_value = str(component_value).strip() | |
if is_category_identifier: # Apply lowercasing only to general category text, not dates or URNs/Types | |
return cleaned_value.lower() | |
return cleaned_value | |
def sync_linkedin_follower_stats(token_state): | |
""" | |
Fetches new/updated LinkedIn follower statistics and uploads/updates them in Bubble, | |
if scheduled by state_manager. Includes detailed logging for debugging key mismatches. | |
""" | |
logging.info("DEBUG: Starting LinkedIn follower stats sync process check.") | |
if not token_state.get("fs_should_sync_now", False): | |
logging.info("DEBUG: Follower Stats sync: Not scheduled by state_manager. Skipping.") | |
return "Follower Stats: Sync not currently required by schedule. ", token_state | |
logging.info("DEBUG: Follower Stats sync: Proceeding as scheduled by state_manager.") | |
if not token_state or not token_state.get("token"): | |
logging.error("DEBUG: Follower Stats sync: Access denied. No LinkedIn token.") | |
org_urn_for_log = token_state.get('org_urn') if token_state else None | |
if org_urn_for_log: | |
token_state = _log_sync_attempt(org_urn_for_log, LOG_SUBJECT_FOLLOWER_STATS, token_state) | |
return "Follower Stats: No token. ", token_state | |
client_id = token_state.get("client_id") | |
token_dict = token_state.get("token") | |
org_urn = token_state.get('org_urn') | |
bubble_follower_stats_df_orig = token_state.get("bubble_follower_stats_df", pd.DataFrame()).copy() | |
follower_stats_sync_message = "" | |
attempt_logged = False | |
if not org_urn or not client_id or client_id == "ENV VAR MISSING": | |
logging.error("DEBUG: Follower Stats sync: Configuration error (Org URN or Client ID missing).") | |
if org_urn: | |
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state) | |
attempt_logged = True | |
return "Follower Stats: Config error. ", token_state | |
if not bubble_follower_stats_df_orig.empty and BUBBLE_UNIQUE_ID_COLUMN_NAME not in bubble_follower_stats_df_orig.columns: | |
logging.error(f"DEBUG: Follower Stats sync: Critical error - '{BUBBLE_UNIQUE_ID_COLUMN_NAME}' column missing in bubble_follower_stats_df. Cannot proceed with updates.") | |
if org_urn: | |
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state) | |
attempt_logged = True | |
return f"Follower Stats: Config error ({BUBBLE_UNIQUE_ID_COLUMN_NAME} missing). ", token_state | |
logging.info(f"DEBUG: Follower stats sync proceeding for org_urn: {org_urn}") | |
try: | |
api_follower_stats = get_linkedin_follower_stats(client_id, token_dict, org_urn) | |
if not api_follower_stats: # This is a list of dicts | |
logging.info(f"DEBUG: Follower Stats sync: No stats found via API for org {org_urn}. API returned: {api_follower_stats}") | |
follower_stats_sync_message = "Follower Stats: None found via API. " | |
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state) | |
attempt_logged = True | |
return follower_stats_sync_message, token_state | |
stats_for_bulk_upload = [] | |
records_to_update_via_patch = [] | |
existing_stats_map = {} | |
stats_required_cols = [ | |
FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, | |
FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, | |
FOLLOWER_STATS_PAID_COLUMN, BUBBLE_UNIQUE_ID_COLUMN_NAME | |
] | |
logging.info("DEBUG: Populating existing_stats_map from Bubble data...") | |
if not bubble_follower_stats_df_orig.empty and all(col in bubble_follower_stats_df_orig.columns for col in stats_required_cols): | |
for index, row in bubble_follower_stats_df_orig.iterrows(): | |
org_urn_val = _clean_key_component(row[FOLLOWER_STATS_ORG_URN_COLUMN]) | |
type_val = _clean_key_component(row[FOLLOWER_STATS_TYPE_COLUMN]) | |
category_raw_val = row[FOLLOWER_STATS_CATEGORY_COLUMN] | |
bubble_id_val = row.get(BUBBLE_UNIQUE_ID_COLUMN_NAME) | |
if pd.isna(bubble_id_val): | |
logging.warning(f"DEBUG: Row index {index} from Bubble data has missing Bubble ID ('{BUBBLE_UNIQUE_ID_COLUMN_NAME}'). Cannot use for updates. Data: {row.to_dict()}") | |
continue | |
category_identifier = "" | |
if type_val == 'follower_gains_monthly': # Type is already cleaned | |
parsed_date = pd.to_datetime(category_raw_val, errors='coerce') | |
if pd.NaT is parsed_date or pd.isna(parsed_date): | |
logging.warning(f"DEBUG: Could not parse date for existing monthly gain: '{category_raw_val}' from Bubble row index {index}. Skipping for map.") | |
continue | |
category_identifier = parsed_date.strftime('%Y-%m-%d') # Date format, not lowercased | |
else: | |
# Apply lowercasing for general text categories for case-insensitive matching | |
category_identifier = _clean_key_component(category_raw_val, is_category_identifier=True) | |
key = (org_urn_val, type_val, category_identifier) | |
# Ensure counts are numeric when storing in map | |
existing_organic_count = pd.to_numeric(row[FOLLOWER_STATS_ORGANIC_COLUMN], errors='coerce') | |
existing_paid_count = pd.to_numeric(row[FOLLOWER_STATS_PAID_COLUMN], errors='coerce') | |
existing_organic_count = 0 if pd.isna(existing_organic_count) else int(existing_organic_count) | |
existing_paid_count = 0 if pd.isna(existing_paid_count) else int(existing_paid_count) | |
existing_stats_map[key] = ( | |
existing_organic_count, | |
existing_paid_count, | |
str(bubble_id_val) # Ensure Bubble ID is string | |
) | |
logging.debug(f"DEBUG: Added to existing_stats_map: Key={key}, BubbleID={str(bubble_id_val)}, OrgCounts={existing_organic_count}, PaidCounts={existing_paid_count}") | |
elif not bubble_follower_stats_df_orig.empty: | |
logging.warning(f"DEBUG: Follower Stats: Bubble data is missing one or more required columns for map: {stats_required_cols}.") | |
else: | |
logging.info("DEBUG: Follower Stats: Bubble_follower_stats_df_orig is empty. existing_stats_map will be empty.") | |
logging.info(f"DEBUG: Processing {len(api_follower_stats)} stats from API...") | |
for i, stat_from_api in enumerate(api_follower_stats): # api_follower_stats is a list of dicts | |
api_org_urn = _clean_key_component(stat_from_api.get(FOLLOWER_STATS_ORG_URN_COLUMN)) | |
api_type = _clean_key_component(stat_from_api.get(FOLLOWER_STATS_TYPE_COLUMN)) | |
api_category_raw = stat_from_api.get(FOLLOWER_STATS_CATEGORY_COLUMN) | |
api_category_identifier = "" | |
if api_type == 'follower_gains_monthly': # API type is already cleaned | |
parsed_date = pd.to_datetime(api_category_raw, errors='coerce') | |
if pd.NaT is parsed_date or pd.isna(parsed_date): | |
logging.warning(f"DEBUG: API stat index {i}: Could not parse date for API monthly gain: '{api_category_raw}'. Skipping.") | |
continue | |
api_category_identifier = parsed_date.strftime('%Y-%m-%d') # Date format, not lowercased | |
else: | |
# Apply lowercasing for general text categories for case-insensitive matching | |
api_category_identifier = _clean_key_component(api_category_raw, is_category_identifier=True) | |
key_from_api = (api_org_urn, api_type, api_category_identifier) | |
logging.debug(f"DEBUG: API stat index {i}: Generated Key={key_from_api}, RawData={stat_from_api}") | |
# Ensure API counts are numeric | |
api_organic_count = pd.to_numeric(stat_from_api.get(FOLLOWER_STATS_ORGANIC_COLUMN), errors='coerce') | |
api_paid_count = pd.to_numeric(stat_from_api.get(FOLLOWER_STATS_PAID_COLUMN), errors='coerce') | |
api_organic_count = 0 if pd.isna(api_organic_count) else int(api_organic_count) | |
api_paid_count = 0 if pd.isna(api_paid_count) else int(api_paid_count) | |
if key_from_api not in existing_stats_map: | |
logging.info(f"DEBUG: API stat index {i}: Key={key_from_api} NOT FOUND in existing_stats_map. Adding for BULK UPLOAD.") | |
stats_for_bulk_upload.append(stat_from_api) | |
else: | |
existing_organic, existing_paid, bubble_id = existing_stats_map[key_from_api] # Counts are already int from map | |
logging.info(f"DEBUG: API stat index {i}: Key={key_from_api} FOUND in existing_stats_map. BubbleID={bubble_id}. ExistingCounts(O/P): {existing_organic}/{existing_paid}. APICounts(O/P): {api_organic_count}/{api_paid_count}.") | |
fields_to_update_in_bubble = {} | |
if api_organic_count > existing_organic: | |
fields_to_update_in_bubble[FOLLOWER_STATS_ORGANIC_COLUMN] = api_organic_count | |
logging.debug(f"DEBUG: API stat index {i}: Organic count update: API({api_organic_count}) > Bubble({existing_organic}) for BubbleID {bubble_id}") | |
if api_paid_count > existing_paid: | |
fields_to_update_in_bubble[FOLLOWER_STATS_PAID_COLUMN] = api_paid_count | |
logging.debug(f"DEBUG: API stat index {i}: Paid count update: API({api_paid_count}) > Bubble({existing_paid}) for BubbleID {bubble_id}") | |
if fields_to_update_in_bubble: | |
records_to_update_via_patch.append((bubble_id, fields_to_update_in_bubble)) | |
logging.info(f"DEBUG: API stat index {i}: Queued for PATCH update. BubbleID={bubble_id}, Updates={fields_to_update_in_bubble}") | |
else: | |
logging.info(f"DEBUG: API stat index {i}: Counts are not greater or equal. No update needed for BubbleID={bubble_id}.") | |
num_bulk_uploaded = 0 | |
if stats_for_bulk_upload: | |
logging.info(f"DEBUG: Attempting to bulk upload {len(stats_for_bulk_upload)} new follower stat entries.") | |
if bulk_upload_to_bubble(stats_for_bulk_upload, BUBBLE_FOLLOWER_STATS_TABLE_NAME): | |
num_bulk_uploaded = len(stats_for_bulk_upload) | |
logging.info(f"Successfully bulk-uploaded {num_bulk_uploaded} new follower stat entries to Bubble for org {org_urn}.") | |
else: | |
logging.error(f"Failed to bulk-upload {len(stats_for_bulk_upload)} new follower stat entries for org {org_urn}.") | |
num_patched_updated = 0 | |
if records_to_update_via_patch: | |
logging.info(f"DEBUG: Attempting to PATCH update {len(records_to_update_via_patch)} follower stat entries.") | |
successfully_patched_ids_and_data_temp = [] # To store what was actually successful for token_state update | |
for bubble_id, fields_to_update in records_to_update_via_patch: | |
if update_record_in_bubble(BUBBLE_FOLLOWER_STATS_TABLE_NAME, bubble_id, fields_to_update): | |
num_patched_updated += 1 | |
successfully_patched_ids_and_data_temp.append({'bubble_id': bubble_id, 'fields': fields_to_update}) | |
else: | |
logging.error(f"Failed to update record {bubble_id} via PATCH for follower stats for org {org_urn}.") | |
logging.info(f"Attempted to update {len(records_to_update_via_patch)} follower stat entries via PATCH, {num_patched_updated} succeeded for org {org_urn}.") | |
if not stats_for_bulk_upload and not records_to_update_via_patch: | |
logging.info(f"DEBUG: Follower Stats sync: Data for org {org_urn} is up-to-date or no changes met update criteria.") | |
follower_stats_sync_message = "Follower Stats: Data up-to-date or no qualifying changes. " | |
else: | |
follower_stats_sync_message = f"Follower Stats: Synced (New: {num_bulk_uploaded}, Updated: {num_patched_updated}). " | |
# --- Update token_state's follower stats DataFrame --- | |
current_data_for_state_df = bubble_follower_stats_df_orig.copy() | |
if num_patched_updated > 0: # Check against actual successful patches | |
for item in successfully_patched_ids_and_data_temp: # Iterate over successfully patched items | |
bubble_id = item['bubble_id'] | |
fields_updated = item['fields'] | |
idx = current_data_for_state_df[current_data_for_state_df[BUBBLE_UNIQUE_ID_COLUMN_NAME] == bubble_id].index | |
if not idx.empty: | |
for col, value in fields_updated.items(): | |
current_data_for_state_df.loc[idx, col] = value | |
if num_bulk_uploaded > 0: # Check against actual successful bulk uploads | |
successfully_created_stats = stats_for_bulk_upload[:num_bulk_uploaded] # Slice based on success count | |
if successfully_created_stats: | |
newly_created_df = pd.DataFrame(successfully_created_stats) | |
if not newly_created_df.empty: | |
for col in current_data_for_state_df.columns: | |
if col not in newly_created_df.columns: | |
newly_created_df[col] = pd.NA | |
aligned_newly_created_df = newly_created_df.reindex(columns=current_data_for_state_df.columns).fillna(pd.NA) | |
current_data_for_state_df = pd.concat([current_data_for_state_df, aligned_newly_created_df], ignore_index=True) | |
if not current_data_for_state_df.empty: | |
monthly_part = current_data_for_state_df[current_data_for_state_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].copy() | |
if not monthly_part.empty: | |
# Ensure FOLLOWER_STATS_CATEGORY_COLUMN is string before strftime, after to_datetime | |
monthly_part.loc[:, FOLLOWER_STATS_CATEGORY_COLUMN] = pd.to_datetime(monthly_part[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.strftime('%Y-%m-%d') | |
monthly_part = monthly_part.drop_duplicates( | |
subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN], | |
keep='last' | |
) | |
demographics_part = current_data_for_state_df[current_data_for_state_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].copy() | |
if not demographics_part.empty: | |
# For demographics, category is already cleaned (and lowercased) if it was text | |
# Ensure all subset columns exist before drop_duplicates | |
demo_subset_cols = [FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN] | |
if all(col in demographics_part.columns for col in demo_subset_cols): | |
# Clean the category column here again to match the key generation for demographics | |
demographics_part.loc[:, FOLLOWER_STATS_CATEGORY_COLUMN] = demographics_part[FOLLOWER_STATS_CATEGORY_COLUMN].apply(lambda x: _clean_key_component(x, is_category_identifier=True)) | |
demographics_part = demographics_part.drop_duplicates( | |
subset=demo_subset_cols, | |
keep='last' | |
) | |
else: | |
logging.warning(f"DEBUG: Demographics part missing one of {demo_subset_cols} for deduplication.") | |
if monthly_part.empty and demographics_part.empty: | |
token_state["bubble_follower_stats_df"] = pd.DataFrame(columns=bubble_follower_stats_df_orig.columns) | |
elif monthly_part.empty: # only demographics_part has data or is empty | |
token_state["bubble_follower_stats_df"] = demographics_part.reset_index(drop=True) if not demographics_part.empty else pd.DataFrame(columns=bubble_follower_stats_df_orig.columns) | |
elif demographics_part.empty: # only monthly_part has data or is empty | |
token_state["bubble_follower_stats_df"] = monthly_part.reset_index(drop=True) if not monthly_part.empty else pd.DataFrame(columns=bubble_follower_stats_df_orig.columns) | |
else: # both have data | |
token_state["bubble_follower_stats_df"] = pd.concat([monthly_part, demographics_part], ignore_index=True) | |
else: # if current_data_for_state_df ended up empty | |
token_state["bubble_follower_stats_df"] = pd.DataFrame(columns=bubble_follower_stats_df_orig.columns) | |
except ValueError as ve: | |
logging.error(f"DEBUG: ValueError during follower stats sync for {org_urn}: {ve}", exc_info=True) | |
follower_stats_sync_message = f"Follower Stats Error: {html.escape(str(ve))}. " | |
except Exception as e: # Catch any other unexpected error | |
logging.exception(f"DEBUG: Unexpected error in sync_linkedin_follower_stats for {org_urn}.") # .exception logs stack trace | |
follower_stats_sync_message = f"Follower Stats: Unexpected error ({type(e).__name__}). " | |
finally: | |
if not attempt_logged and org_urn: # Ensure log attempt happens if not already logged due to early exit | |
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state) | |
return follower_stats_sync_message, token_state | |
def sync_all_linkedin_data_orchestrator(token_state): | |
"""Orchestrates the syncing of all LinkedIn data types (Posts, Mentions, Follower Stats).""" | |
logging.info("Starting sync_all_linkedin_data_orchestrator process.") | |
if not token_state or not token_state.get("token"): | |
logging.error("Sync All: Access denied. LinkedIn token not available.") | |
return "<p style='color:red; text-align:center;'>β Access denied. LinkedIn token not available.</p>", token_state | |
org_urn = token_state.get('org_urn') | |
client_id = token_state.get("client_id") | |
posts_sync_message = "" | |
mentions_sync_message = "" | |
follower_stats_sync_message = "" | |
if not org_urn: | |
logging.error("Sync All: Org URN missing in token_state.") | |
return "<p style='color:red;'>β Config error: Org URN missing.</p>", token_state | |
if not client_id or client_id == "ENV VAR MISSING": | |
logging.error("Sync All: Client ID missing or not set in token_state.") | |
return "<p style='color:red;'>β Config error: Client ID missing.</p>", token_state | |
# --- Sync Posts --- | |
fetch_count_for_posts_api = token_state.get('fetch_count_for_api', 0) | |
if fetch_count_for_posts_api == 0: | |
# This means state_manager determined no post sync is needed based on log | |
posts_sync_message = "Posts: Sync not currently required by schedule. " | |
logging.info("Posts sync: Skipped as fetch_count_for_posts_api is 0 (determined by state_manager).") | |
else: | |
posts_sync_message, token_state = _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api) | |
# _sync_linkedin_posts_internal now handles its own logging internally | |
# --- Sync Mentions --- | |
# sync_linkedin_mentions will check token_state.get("mentions_should_sync_now") | |
# and log its attempt internally. | |
mentions_sync_message, token_state = sync_linkedin_mentions(token_state) | |
# --- Sync Follower Stats --- | |
# sync_linkedin_follower_stats will check token_state.get("fs_should_sync_now") | |
# and log its attempt internally. | |
follower_stats_sync_message, token_state = sync_linkedin_follower_stats(token_state) | |
logging.info(f"Sync process complete. Messages: Posts: [{posts_sync_message.strip()}], Mentions: [{mentions_sync_message.strip()}], Follower Stats: [{follower_stats_sync_message.strip()}]") | |
final_message = f"<p style='color:green; text-align:center;'>β Sync Attempted. {posts_sync_message} {mentions_sync_message} {follower_stats_sync_message}</p>" | |
return final_message, token_state | |