Spaces:
Running
Running
File size: 34,399 Bytes
7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 6b29e46 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 6b29e46 17bd31e 7f592b2 17bd31e 7f592b2 6b29e46 17bd31e 7f592b2 17bd31e 7f592b2 8e48c64 04694e3 6b29e46 7f592b2 6b29e46 7f592b2 17bd31e 7f592b2 6b29e46 7f592b2 6b29e46 17bd31e 6b29e46 7f592b2 6b29e46 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 6b29e46 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 17bd31e 7f592b2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 |
# sync_logic.py
"""
Handles the logic for syncing LinkedIn data: posts, mentions, and follower statistics.
Fetches data from LinkedIn APIs, uploads to Bubble, and logs sync attempts.
"""
import pandas as pd
import logging
import html
from datetime import timezone # Python's datetime
# Assuming Bubble_API_Calls contains bulk_upload_to_bubble
from Bubble_API_Calls import bulk_upload_to_bubble, fetch_linkedin_posts_data_from_bubble, update_record_in_bubble
# Assuming Linkedin_Data_API_Calls contains all necessary LinkedIn data fetching and processing functions
from Linkedin_Data_API_Calls import (
fetch_linkedin_posts_core,
fetch_comments,
analyze_sentiment, # For post comments
compile_detailed_posts,
prepare_data_for_bubble, # For posts, stats, comments
fetch_linkedin_mentions_core,
analyze_mentions_sentiment, # For individual mentions
compile_detailed_mentions, # Compiles to user-specified format
prepare_mentions_for_bubble # Prepares user-specified format for Bubble
)
# Assuming linkedin_follower_stats.py contains get_linkedin_follower_stats
from linkedin_follower_stats import get_linkedin_follower_stats
# Assuming config.py contains all necessary constants
from config import (
LINKEDIN_POST_URN_KEY, BUBBLE_POST_URN_COLUMN_NAME, BUBBLE_POSTS_TABLE_NAME,
BUBBLE_POST_STATS_TABLE_NAME, BUBBLE_POST_COMMENTS_TABLE_NAME,
BUBBLE_MENTIONS_TABLE_NAME, BUBBLE_MENTIONS_ID_COLUMN_NAME, BUBBLE_MENTIONS_DATE_COLUMN_NAME,
DEFAULT_MENTIONS_INITIAL_FETCH_COUNT, DEFAULT_MENTIONS_UPDATE_FETCH_COUNT,
BUBBLE_FOLLOWER_STATS_TABLE_NAME, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN,
FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, FOLLOWER_STATS_PAID_COLUMN,
LINKEDIN_CLIENT_ID_ENV_VAR, # Though client_id is usually passed in token_state
# NEW constants for logging
BUBBLE_OPERATIONS_LOG_TABLE_NAME, BUBBLE_OPERATIONS_LOG_DATE_COLUMN,
BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN,
LOG_SUBJECT_POSTS, LOG_SUBJECT_MENTIONS, LOG_SUBJECT_FOLLOWER_STATS
)
def _log_sync_attempt(org_urn, subject, token_state):
"""
Logs a sync attempt to the Bubble operations log table and updates
the operations log DataFrame in token_state.
"""
logging.info(f"Logging sync attempt for subject: {subject}, org_urn: {org_urn}")
if not org_urn:
logging.warning("Cannot log sync attempt: org_urn is missing.")
return token_state
try:
log_entry_data = {
BUBBLE_OPERATIONS_LOG_DATE_COLUMN: pd.Timestamp.now(tz='UTC').isoformat(),
BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN: subject,
BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN: org_urn
}
# Ensure data types are what Bubble expects, e.g., date as string
# bulk_upload_to_bubble should handle dicts with basic types.
upload_payload = [log_entry_data]
bulk_upload_to_bubble(upload_payload, BUBBLE_OPERATIONS_LOG_TABLE_NAME)
logging.info(f"Successfully logged sync attempt for {subject} to Bubble table '{BUBBLE_OPERATIONS_LOG_TABLE_NAME}'.")
# Update token_state with the new log entry to keep it fresh
current_log_df = token_state.get("bubble_operations_log_df", pd.DataFrame())
new_log_entry_df = pd.DataFrame(upload_payload) # DataFrame from the same data we uploaded
# Ensure date column is datetime before concat if it exists and is not empty
if not new_log_entry_df.empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in new_log_entry_df.columns:
new_log_entry_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(new_log_entry_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True)
if not current_log_df.empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in current_log_df.columns:
# Ensure existing log df date column is also datetime
if not pd.api.types.is_datetime64_any_dtype(current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN]):
current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True)
updated_log_df = pd.concat([current_log_df, new_log_entry_df], ignore_index=True)
# To ensure the get_last_sync_attempt_date always gets the absolute latest,
# we can sort and drop duplicates, keeping the last.
# However, simply appending and letting max() find the latest is also fine.
# For robustness, let's sort and keep the latest for each subject/org combo if multiple logs were made rapidly.
if not updated_log_df.empty and all(col in updated_log_df.columns for col in [BUBBLE_OPERATIONS_LOG_DATE_COLUMN, BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN]):
updated_log_df = updated_log_df.sort_values(by=BUBBLE_OPERATIONS_LOG_DATE_COLUMN).drop_duplicates(
subset=[BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN],
keep='last'
)
token_state["bubble_operations_log_df"] = updated_log_df
logging.info(f"Updated 'bubble_operations_log_df' in token_state after logging {subject}.")
except Exception as e:
logging.error(f"Failed to log sync attempt for {subject} or update token_state: {e}", exc_info=True)
return token_state
def _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api):
"""Internal logic for syncing LinkedIn posts."""
# This function is called by orchestrator only if fetch_count_for_posts_api > 0
# So, an attempt to sync posts is indeed happening.
logging.info(f"Posts sync: Starting fetch for {fetch_count_for_posts_api} posts.")
client_id = token_state.get("client_id")
token_dict = token_state.get("token")
org_urn = token_state.get('org_urn')
bubble_posts_df_orig = token_state.get("bubble_posts_df", pd.DataFrame()).copy()
posts_sync_message = ""
attempt_logged = False # Flag to ensure log happens once
try:
# Basic checks before API call
if not all([client_id, token_dict, org_urn]):
posts_sync_message = "Posts: Config error (client_id, token, or org_urn missing). "
logging.error(f"Posts sync: Prerequisite missing - client_id: {'OK' if client_id else 'Missing'}, token: {'OK' if token_dict else 'Missing'}, org_urn: {'OK' if org_urn else 'Missing'}")
# Log attempt even if config error, as state_manager decided a sync *should* occur
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
attempt_logged = True
return posts_sync_message, token_state
processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_for_posts_api)
if not processed_raw_posts:
posts_sync_message = "Posts: None found via API. "
logging.info("Posts sync: No raw posts returned from API.")
# Log attempt as API was called
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
attempt_logged = True
return posts_sync_message, token_state
existing_post_urns = set()
if not bubble_posts_df_orig.empty and BUBBLE_POST_URN_COLUMN_NAME in bubble_posts_df_orig.columns:
existing_post_urns = set(bubble_posts_df_orig[BUBBLE_POST_URN_COLUMN_NAME].dropna().astype(str))
new_raw_posts = [p for p in processed_raw_posts if str(p.get(LINKEDIN_POST_URN_KEY)) not in existing_post_urns]
if not new_raw_posts:
posts_sync_message = "Posts: All fetched already in Bubble. "
logging.info("Posts sync: All fetched posts were already found in Bubble.")
# Log attempt as API was called and processed
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
attempt_logged = True
return posts_sync_message, token_state
logging.info(f"Posts sync: Processing {len(new_raw_posts)} new raw posts.")
post_urns_to_process = [p[LINKEDIN_POST_URN_KEY] for p in new_raw_posts if p.get(LINKEDIN_POST_URN_KEY)]
all_comments_data = fetch_comments(client_id, token_dict, post_urns_to_process, stats_map)
sentiments_per_post = analyze_sentiment(all_comments_data)
detailed_new_posts = compile_detailed_posts(new_raw_posts, stats_map, sentiments_per_post)
li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_new_posts, all_comments_data)
if li_posts:
bulk_upload_to_bubble(li_posts, BUBBLE_POSTS_TABLE_NAME)
updated_posts_df = pd.concat([bubble_posts_df_orig, pd.DataFrame(li_posts)], ignore_index=True)
token_state["bubble_posts_df"] = updated_posts_df.drop_duplicates(subset=[BUBBLE_POST_URN_COLUMN_NAME], keep='last')
logging.info(f"Posts sync: Uploaded {len(li_posts)} new posts to Bubble.")
if li_post_stats:
bulk_upload_to_bubble(li_post_stats, BUBBLE_POST_STATS_TABLE_NAME)
logging.info(f"Posts sync: Uploaded {len(li_post_stats)} post_stats entries.")
if li_post_comments:
bulk_upload_to_bubble(li_post_comments, BUBBLE_POST_COMMENTS_TABLE_NAME)
logging.info(f"Posts sync: Uploaded {len(li_post_comments)} post_comments entries.")
posts_sync_message = f"Posts: Synced {len(li_posts)} new. "
else:
posts_sync_message = "Posts: No new ones to upload after processing. "
logging.info("Posts sync: No new posts were prepared for Bubble upload.")
except ValueError as ve:
posts_sync_message = f"Posts Error: {html.escape(str(ve))}. "
logging.error(f"Posts sync: ValueError: {ve}", exc_info=True)
except Exception as e:
logging.exception("Posts sync: Unexpected error during processing.")
posts_sync_message = f"Posts: Unexpected error ({type(e).__name__}). "
finally:
# Log the sync attempt if it hasn't been logged already (e.g. due to early exit)
# and if basic conditions (org_urn) for logging are met.
if not attempt_logged and org_urn:
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
return posts_sync_message, token_state
def sync_linkedin_mentions(token_state):
"""Fetches new LinkedIn mentions and uploads them to Bubble, if scheduled by state_manager."""
logging.info("Starting LinkedIn mentions sync process check.")
if not token_state.get("mentions_should_sync_now", False):
logging.info("Mentions sync: Not scheduled by state_manager based on operations log. Skipping.")
return "Mentions: Sync not currently required by schedule. ", token_state
logging.info("Mentions sync: Proceeding as scheduled by state_manager.")
if not token_state or not token_state.get("token"):
logging.error("Mentions sync: Access denied. No LinkedIn token.")
# Still log an attempt if org_urn is available, as a sync was scheduled
org_urn_for_log = token_state.get('org_urn') if token_state else None
if org_urn_for_log:
token_state = _log_sync_attempt(org_urn_for_log, LOG_SUBJECT_MENTIONS, token_state)
return "Mentions: No token. ", token_state
client_id = token_state.get("client_id")
token_dict = token_state.get("token")
org_urn = token_state.get('org_urn')
bubble_mentions_df_orig = token_state.get("bubble_mentions_df", pd.DataFrame()).copy()
mentions_sync_message = ""
attempt_logged = False
if not org_urn or not client_id or client_id == "ENV VAR MISSING":
logging.error("Mentions sync: Configuration error (Org URN or Client ID missing).")
if org_urn: # Log if possible
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
attempt_logged = True
return "Mentions: Config error. ", token_state
# Determine fetch count: initial if no mentions data, update otherwise
fetch_count_for_mentions_api = DEFAULT_MENTIONS_INITIAL_FETCH_COUNT \
if bubble_mentions_df_orig.empty else DEFAULT_MENTIONS_UPDATE_FETCH_COUNT
logging.info(f"Mentions sync: Fetch count set to {fetch_count_for_mentions_api}.")
try:
processed_raw_mentions = fetch_linkedin_mentions_core(client_id, token_dict, org_urn, count=fetch_count_for_mentions_api)
if not processed_raw_mentions:
logging.info("Mentions sync: No new mentions found via API.")
mentions_sync_message = "Mentions: None found via API. "
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
attempt_logged = True
return mentions_sync_message, token_state
existing_mention_ids = set()
if not bubble_mentions_df_orig.empty and BUBBLE_MENTIONS_ID_COLUMN_NAME in bubble_mentions_df_orig.columns:
existing_mention_ids = set(bubble_mentions_df_orig[BUBBLE_MENTIONS_ID_COLUMN_NAME].dropna().astype(str))
sentiments_map = analyze_mentions_sentiment(processed_raw_mentions)
all_compiled_mentions = compile_detailed_mentions(processed_raw_mentions, sentiments_map)
new_compiled_mentions_to_upload = [
m for m in all_compiled_mentions if str(m.get("id")) not in existing_mention_ids
]
if not new_compiled_mentions_to_upload:
logging.info("Mentions sync: All fetched mentions are already in Bubble.")
mentions_sync_message = "Mentions: All fetched already in Bubble. "
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
attempt_logged = True
return mentions_sync_message, token_state
bubble_ready_mentions = prepare_mentions_for_bubble(new_compiled_mentions_to_upload)
if bubble_ready_mentions:
bulk_upload_to_bubble(bubble_ready_mentions, BUBBLE_MENTIONS_TABLE_NAME)
logging.info(f"Successfully uploaded {len(bubble_ready_mentions)} new mentions to Bubble.")
updated_mentions_df = pd.concat([bubble_mentions_df_orig, pd.DataFrame(bubble_ready_mentions)], ignore_index=True)
token_state["bubble_mentions_df"] = updated_mentions_df.drop_duplicates(subset=[BUBBLE_MENTIONS_ID_COLUMN_NAME], keep='last')
mentions_sync_message = f"Mentions: Synced {len(bubble_ready_mentions)} new. "
else:
logging.info("Mentions sync: No new mentions were prepared for Bubble upload.")
mentions_sync_message = "Mentions: No new ones to upload. "
except ValueError as ve:
logging.error(f"ValueError during mentions sync: {ve}", exc_info=True)
mentions_sync_message = f"Mentions Error: {html.escape(str(ve))}. "
except Exception as e:
logging.exception("Unexpected error in sync_linkedin_mentions.")
mentions_sync_message = f"Mentions: Unexpected error ({type(e).__name__}). "
finally:
if not attempt_logged and org_urn:
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
return mentions_sync_message, token_state
def sync_linkedin_follower_stats(token_state):
"""
Fetches new/updated LinkedIn follower statistics and uploads/updates them in Bubble,
if scheduled by state_manager.
For both monthly gains and demographics, updates counts only if the new LinkedIn count is greater.
Creates new records if the category/month doesn't exist.
"""
logging.info("Starting LinkedIn follower stats sync process check.")
if not token_state.get("fs_should_sync_now", False):
logging.info("Follower Stats sync: Not scheduled by state_manager. Skipping.")
return "Follower Stats: Sync not currently required by schedule. ", token_state
logging.info("Follower Stats sync: Proceeding as scheduled by state_manager.")
if not token_state or not token_state.get("token"):
logging.error("Follower Stats sync: Access denied. No LinkedIn token.")
org_urn_for_log = token_state.get('org_urn') if token_state else None
if org_urn_for_log:
token_state = _log_sync_attempt(org_urn_for_log, LOG_SUBJECT_FOLLOWER_STATS, token_state)
return "Follower Stats: No token. ", token_state
client_id = token_state.get("client_id")
token_dict = token_state.get("token")
org_urn = token_state.get('org_urn')
bubble_follower_stats_df_orig = token_state.get("bubble_follower_stats_df", pd.DataFrame()).copy()
follower_stats_sync_message = ""
attempt_logged = False
if not org_urn or not client_id or client_id == "ENV VAR MISSING":
logging.error("Follower Stats sync: Configuration error (Org URN or Client ID missing).")
if org_urn:
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
attempt_logged = True
return "Follower Stats: Config error. ", token_state
logging.info(f"{bubble_follower_stats_df_orig.columns}")
# Ensure the BUBBLE_UNIQUE_ID_COLUMN_NAME exists in the DataFrame if it's not empty,
# as it's crucial for building the maps for updates.
if not bubble_follower_stats_df_orig.empty and BUBBLE_UNIQUE_ID_COLUMN_NAME not in bubble_follower_stats_df_orig.columns:
logging.error(f"Follower Stats sync: Critical error - '{BUBBLE_UNIQUE_ID_COLUMN_NAME}' column missing in bubble_follower_stats_df. Cannot proceed with updates.")
if org_urn:
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state) # Log the attempt despite error
attempt_logged = True
return f"Follower Stats: Config error ({BUBBLE_UNIQUE_ID_COLUMN_NAME} missing). ", token_state
logging.info(f"Follower stats sync proceeding for org_urn: {org_urn}")
try:
api_follower_stats = get_linkedin_follower_stats(client_id, token_dict, org_urn)
if not api_follower_stats:
logging.info(f"Follower Stats sync: No stats found via API for org {org_urn}.")
follower_stats_sync_message = "Follower Stats: None found via API. "
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
attempt_logged = True
return follower_stats_sync_message, token_state
stats_for_bulk_upload = []
records_to_update_via_patch = [] # List of tuples: (bubble_id, fields_to_update_dict)
# --- Prepare maps for existing data in Bubble for efficient lookup ---
# Key: (org_urn, type, category_identifier), Value: (organic, paid, bubble_record_id)
# For monthly gains, category_identifier is the formatted date string.
# For demographics, category_identifier is the FOLLOWER_STATS_CATEGORY_COLUMN value.
existing_stats_map = {}
stats_required_cols = [
FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN,
FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, # Assuming these apply to monthly too
FOLLOWER_STATS_PAID_COLUMN, # Assuming these apply to monthly too
BUBBLE_UNIQUE_ID_COLUMN_NAME
]
if not bubble_follower_stats_df_orig.empty and all(col in bubble_follower_stats_df_orig.columns for col in stats_required_cols):
for _, row in bubble_follower_stats_df_orig.iterrows():
category_identifier = str(row[FOLLOWER_STATS_CATEGORY_COLUMN])
# For monthly gains, ensure category (date) is consistently formatted if needed
if row[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly':
try:
category_identifier = pd.to_datetime(row[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').strftime('%Y-%m-%d')
if category_identifier == 'NaT': # Handle parsing errors
logging.warning(f"Could not parse date for existing monthly gain: {row[FOLLOWER_STATS_CATEGORY_COLUMN]}. Skipping this entry for map.")
continue
except Exception: # Catch any other parsing issues
logging.warning(f"Error parsing date for existing monthly gain: {row[FOLLOWER_STATS_CATEGORY_COLUMN]}. Skipping this entry for map.")
continue
key = (
str(row[FOLLOWER_STATS_ORG_URN_COLUMN]),
str(row[FOLLOWER_STATS_TYPE_COLUMN]),
category_identifier
)
existing_stats_map[key] = (
row[FOLLOWER_STATS_ORGANIC_COLUMN], # Assuming monthly gains have this
row[FOLLOWER_STATS_PAID_COLUMN], # Assuming monthly gains have this
row[BUBBLE_UNIQUE_ID_COLUMN_NAME]
)
elif not bubble_follower_stats_df_orig.empty:
logging.warning(f"Follower Stats: Data in Bubble is missing one or more required columns for update logic: {stats_required_cols}. Will treat all API stats as new if not matched by key elements.")
# --- Process all stats from API (monthly gains and demographics) ---
for stat_from_api in api_follower_stats:
api_type = str(stat_from_api.get(FOLLOWER_STATS_TYPE_COLUMN))
api_category_raw = stat_from_api.get(FOLLOWER_STATS_CATEGORY_COLUMN)
api_category_identifier = str(api_category_raw)
if api_type == 'follower_gains_monthly':
try:
api_category_identifier = pd.to_datetime(api_category_raw, errors='coerce').strftime('%Y-%m-%d')
if api_category_identifier == 'NaT':
logging.warning(f"Could not parse date from API for monthly gain: {api_category_raw}. Skipping this API stat.")
continue
except Exception:
logging.warning(f"Error parsing date from API for monthly gain: {api_category_raw}. Skipping this API stat.")
continue
key = (
str(stat_from_api.get(FOLLOWER_STATS_ORG_URN_COLUMN)),
api_type,
api_category_identifier
)
# Assuming monthly gains also have organic/paid counts.
# If they have different count fields, these need to be specified.
# For simplicity, using FOLLOWER_STATS_ORGANIC_COLUMN and FOLLOWER_STATS_PAID_COLUMN.
# If monthly gains only have a single 'count' field, adjust logic accordingly.
api_organic_count = stat_from_api.get(FOLLOWER_STATS_ORGANIC_COLUMN, 0)
api_paid_count = stat_from_api.get(FOLLOWER_STATS_PAID_COLUMN, 0)
if key not in existing_stats_map:
# This stat category/month is entirely new, add for bulk creation
stats_for_bulk_upload.append(stat_from_api)
else:
# Stat category/month exists, check if counts need updating
existing_organic, existing_paid, bubble_id = existing_stats_map[key]
fields_to_update_in_bubble = {}
if api_organic_count != existing_organic:
fields_to_update_in_bubble[FOLLOWER_STATS_ORGANIC_COLUMN] = api_organic_count
if api_paid_count != existing_paid:
fields_to_update_in_bubble[FOLLOWER_STATS_PAID_COLUMN] = api_paid_count
if fields_to_update_in_bubble: # If there's at least one field to update
records_to_update_via_patch.append((bubble_id, fields_to_update_in_bubble))
# --- Perform Bubble Operations ---
num_bulk_uploaded = 0
if stats_for_bulk_upload:
if bulk_upload_to_bubble(stats_for_bulk_upload, BUBBLE_FOLLOWER_STATS_TABLE_NAME):
num_bulk_uploaded = len(stats_for_bulk_upload)
logging.info(f"Successfully bulk-uploaded {num_bulk_uploaded} new follower stat entries to Bubble for org {org_urn}.")
else:
logging.error(f"Failed to bulk-upload {len(stats_for_bulk_upload)} new follower stat entries for org {org_urn}.")
num_patched_updated = 0
if records_to_update_via_patch:
for bubble_id, fields_to_update in records_to_update_via_patch:
if update_record_in_bubble(BUBBLE_FOLLOWER_STATS_TABLE_NAME, bubble_id, fields_to_update):
num_patched_updated += 1
else:
logging.error(f"Failed to update record {bubble_id} via PATCH for follower stats for org {org_urn}.")
logging.info(f"Attempted to update {len(records_to_update_via_patch)} follower stat entries via PATCH, {num_patched_updated} succeeded for org {org_urn}.")
if not stats_for_bulk_upload and not records_to_update_via_patch:
logging.info(f"Follower Stats sync: Data for org {org_urn} is up-to-date or no changes met update criteria.")
follower_stats_sync_message = "Follower Stats: Data up-to-date or no qualifying changes. "
else:
follower_stats_sync_message = f"Follower Stats: Synced (New: {num_bulk_uploaded}, Updated: {num_patched_updated}). "
# --- Update token_state's follower stats DataFrame ---
current_data_for_state_df = bubble_follower_stats_df_orig.copy()
if records_to_update_via_patch and num_patched_updated > 0:
# Create a temporary map of successful updates for quick lookup
successful_updates_map = {
bubble_id: fields for i, (bubble_id, fields) in enumerate(records_to_update_via_patch) if i < num_patched_updated
}
if successful_updates_map: # only proceed if there were successful updates to reflect
for index, row in current_data_for_state_df.iterrows():
bubble_id_from_df = row.get(BUBBLE_UNIQUE_ID_COLUMN_NAME)
if bubble_id_from_df in successful_updates_map:
fields_updated = successful_updates_map[bubble_id_from_df]
for col, value in fields_updated.items():
current_data_for_state_df.loc[index, col] = value
if stats_for_bulk_upload and num_bulk_uploaded > 0:
# Only consider successfully uploaded new records
successfully_created_stats = [s for i, s in enumerate(stats_for_bulk_upload) if i < num_bulk_uploaded]
if successfully_created_stats:
newly_created_df = pd.DataFrame(successfully_created_stats)
if not newly_created_df.empty:
for col in current_data_for_state_df.columns:
if col not in newly_created_df.columns:
newly_created_df[col] = pd.NA # Use pd.NA for missing values
# Align columns before concat to avoid issues with differing column orders or types
aligned_newly_created_df = newly_created_df.reindex(columns=current_data_for_state_df.columns).fillna(pd.NA)
current_data_for_state_df = pd.concat([current_data_for_state_df, aligned_newly_created_df], ignore_index=True)
if not current_data_for_state_df.empty:
# Deduplication logic (important after combining original, patched, and new data)
# Ensure consistent primary key for deduplication across types
# For monthly gains, primary key is (org_urn, type='follower_gains_monthly', category=date_str)
# For demographics, primary key is (org_urn, type, category)
# To handle this, we can sort by a hypothetical 'last_modified_indicator' if we had one,
# or rely on 'keep=last' after ensuring data is ordered such that API data (potentially newer) comes later.
# The concat order (original, then new) and then drop_duplicates with keep='last' on identifying keys is standard.
# We need to define unique keys for each type to drop duplicates correctly.
# The current deduplication splits by type and then applies different subsets. This should still work.
monthly_part = current_data_for_state_df[current_data_for_state_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly']
if not monthly_part.empty:
# Ensure category is consistently formatted for monthly gains before deduplication
monthly_part_copy = monthly_part.copy() # To avoid SettingWithCopyWarning
monthly_part_copy[FOLLOWER_STATS_CATEGORY_COLUMN] = pd.to_datetime(monthly_part_copy[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.strftime('%Y-%m-%d')
monthly_part = monthly_part_copy.drop_duplicates(
subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN],
keep='last'
)
demographics_part = current_data_for_state_df[current_data_for_state_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly']
if not demographics_part.empty:
demo_subset_cols = [FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN]
if all(col in demographics_part.columns for col in demo_subset_cols):
demographics_part = demographics_part.drop_duplicates(
subset=demo_subset_cols,
keep='last'
)
else:
logging.warning("Follower Stats: Missing columns for demographic deduplication in token_state update. Skipping.")
if monthly_part.empty and demographics_part.empty:
token_state["bubble_follower_stats_df"] = pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
elif monthly_part.empty:
token_state["bubble_follower_stats_df"] = demographics_part.reset_index(drop=True) if not demographics_part.empty else pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
elif demographics_part.empty:
token_state["bubble_follower_stats_df"] = monthly_part.reset_index(drop=True) if not monthly_part.empty else pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
else:
token_state["bubble_follower_stats_df"] = pd.concat([monthly_part, demographics_part], ignore_index=True)
else:
token_state["bubble_follower_stats_df"] = pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
except ValueError as ve:
logging.error(f"ValueError during follower stats sync for {org_urn}: {ve}", exc_info=True)
follower_stats_sync_message = f"Follower Stats Error: {html.escape(str(ve))}. "
except Exception as e:
logging.exception(f"Unexpected error in sync_linkedin_follower_stats for {org_urn}.")
follower_stats_sync_message = f"Follower Stats: Unexpected error ({type(e).__name__}). "
finally:
if not attempt_logged and org_urn:
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
return follower_stats_sync_message, token_state
def sync_all_linkedin_data_orchestrator(token_state):
"""Orchestrates the syncing of all LinkedIn data types (Posts, Mentions, Follower Stats)."""
logging.info("Starting sync_all_linkedin_data_orchestrator process.")
if not token_state or not token_state.get("token"):
logging.error("Sync All: Access denied. LinkedIn token not available.")
return "<p style='color:red; text-align:center;'>β Access denied. LinkedIn token not available.</p>", token_state
org_urn = token_state.get('org_urn')
client_id = token_state.get("client_id")
posts_sync_message = ""
mentions_sync_message = ""
follower_stats_sync_message = ""
if not org_urn:
logging.error("Sync All: Org URN missing in token_state.")
return "<p style='color:red;'>β Config error: Org URN missing.</p>", token_state
if not client_id or client_id == "ENV VAR MISSING":
logging.error("Sync All: Client ID missing or not set in token_state.")
return "<p style='color:red;'>β Config error: Client ID missing.</p>", token_state
# --- Sync Posts ---
fetch_count_for_posts_api = token_state.get('fetch_count_for_api', 0)
if fetch_count_for_posts_api == 0:
# This means state_manager determined no post sync is needed based on log
posts_sync_message = "Posts: Sync not currently required by schedule. "
logging.info("Posts sync: Skipped as fetch_count_for_posts_api is 0 (determined by state_manager).")
# Log an "attempt" to sync posts which resulted in a skip due to schedule.
# This keeps the log fresh, indicating a check was made.
token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
else:
posts_sync_message, token_state = _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api)
# _sync_linkedin_posts_internal now handles its own logging internally
# --- Sync Mentions ---
# sync_linkedin_mentions will check token_state.get("mentions_should_sync_now")
# and log its attempt internally.
mentions_sync_message, token_state = sync_linkedin_mentions(token_state)
# --- Sync Follower Stats ---
# sync_linkedin_follower_stats will check token_state.get("fs_should_sync_now")
# and log its attempt internally.
follower_stats_sync_message, token_state = sync_linkedin_follower_stats(token_state)
logging.info(f"Sync process complete. Messages: Posts: [{posts_sync_message.strip()}], Mentions: [{mentions_sync_message.strip()}], Follower Stats: [{follower_stats_sync_message.strip()}]")
final_message = f"<p style='color:green; text-align:center;'>β
Sync Attempted. {posts_sync_message} {mentions_sync_message} {follower_stats_sync_message}</p>"
return final_message, token_state
|