GuglielmoTor commited on
Commit
6b29e46
·
verified ·
1 Parent(s): 284b445

Update sync_logic.py

Browse files
Files changed (1) hide show
  1. sync_logic.py +203 -78
sync_logic.py CHANGED
@@ -9,7 +9,7 @@ import html
9
  from datetime import timezone # Python's datetime
10
 
11
  # Assuming Bubble_API_Calls contains bulk_upload_to_bubble
12
- from Bubble_API_Calls import bulk_upload_to_bubble, fetch_linkedin_posts_data_from_bubble # Added fetch for log update
13
  # Assuming Linkedin_Data_API_Calls contains all necessary LinkedIn data fetching and processing functions
14
  from Linkedin_Data_API_Calls import (
15
  fetch_linkedin_posts_core,
@@ -268,7 +268,12 @@ def sync_linkedin_mentions(token_state):
268
 
269
 
270
  def sync_linkedin_follower_stats(token_state):
271
- """Fetches new LinkedIn follower statistics and uploads them to Bubble, if scheduled."""
 
 
 
 
 
272
  logging.info("Starting LinkedIn follower stats sync process check.")
273
 
274
  if not token_state.get("fs_should_sync_now", False):
@@ -288,6 +293,7 @@ def sync_linkedin_follower_stats(token_state):
288
  token_dict = token_state.get("token")
289
  org_urn = token_state.get('org_urn')
290
  bubble_follower_stats_df_orig = token_state.get("bubble_follower_stats_df", pd.DataFrame()).copy()
 
291
  follower_stats_sync_message = ""
292
  attempt_logged = False
293
 
@@ -298,9 +304,19 @@ def sync_linkedin_follower_stats(token_state):
298
  attempt_logged = True
299
  return "Follower Stats: Config error. ", token_state
300
 
 
 
 
 
 
 
 
 
 
301
  logging.info(f"Follower stats sync proceeding for org_urn: {org_urn}")
302
  try:
303
  api_follower_stats = get_linkedin_follower_stats(client_id, token_dict, org_urn)
 
304
  if not api_follower_stats:
305
  logging.info(f"Follower Stats sync: No stats found via API for org {org_urn}.")
306
  follower_stats_sync_message = "Follower Stats: None found via API. "
@@ -308,84 +324,192 @@ def sync_linkedin_follower_stats(token_state):
308
  attempt_logged = True
309
  return follower_stats_sync_message, token_state
310
 
311
- new_stats_to_upload = []
312
- # Logic for comparing API stats with existing Bubble stats (monthly gains and demographics)
313
- # Monthly Gains
314
- api_monthly_gains = [s for s in api_follower_stats if s.get(FOLLOWER_STATS_TYPE_COLUMN) == 'follower_gains_monthly']
315
- existing_monthly_gain_dates = set()
316
- if not bubble_follower_stats_df_orig.empty:
317
- bubble_monthly_df = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly']
318
- if FOLLOWER_STATS_CATEGORY_COLUMN in bubble_monthly_df.columns:
319
- # Convert to string for consistent comparison, assuming API data also provides date as string or convertible
320
- existing_monthly_gain_dates = set(pd.to_datetime(bubble_monthly_df[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.strftime('%Y-%m-%d').dropna())
321
-
322
- for gain_stat in api_monthly_gains:
323
- api_date_str = pd.to_datetime(gain_stat.get(FOLLOWER_STATS_CATEGORY_COLUMN), errors='coerce').strftime('%Y-%m-%d')
324
- if api_date_str != 'NaT' and api_date_str not in existing_monthly_gain_dates:
325
- new_stats_to_upload.append(gain_stat)
326
-
327
- # Demographics (overwrite logic: if API has it, and it's different or not present in Bubble, upload)
328
- api_demographics = [s for s in api_follower_stats if s.get(FOLLOWER_STATS_TYPE_COLUMN) != 'follower_gains_monthly']
329
- # Create a map of existing demographics for quick lookup
330
- # Key: (org_urn, type, category), Value: (organic_count, paid_count)
331
- existing_demographics_map = {}
332
- if not bubble_follower_stats_df_orig.empty:
333
- bubble_demographics_df = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly']
334
- required_cols_demo = [
335
- FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN,
336
- FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN,
337
- FOLLOWER_STATS_PAID_COLUMN
338
- ]
339
- if not bubble_demographics_df.empty and all(col in bubble_demographics_df.columns for col in required_cols_demo):
340
- for _, row in bubble_demographics_df.iterrows():
341
- key = (
342
- str(row[FOLLOWER_STATS_ORG_URN_COLUMN]),
343
- str(row[FOLLOWER_STATS_TYPE_COLUMN]),
344
- str(row[FOLLOWER_STATS_CATEGORY_COLUMN]) # Category can be various things
345
- )
346
- existing_demographics_map[key] = (
347
- row[FOLLOWER_STATS_ORGANIC_COLUMN], row[FOLLOWER_STATS_PAID_COLUMN]
348
- )
349
-
350
- for demo_stat in api_demographics:
351
- key = (
352
- str(demo_stat.get(FOLLOWER_STATS_ORG_URN_COLUMN)),
353
- str(demo_stat.get(FOLLOWER_STATS_TYPE_COLUMN)),
354
- str(demo_stat.get(FOLLOWER_STATS_CATEGORY_COLUMN))
355
- )
356
- api_counts = (
357
- demo_stat.get(FOLLOWER_STATS_ORGANIC_COLUMN, 0),
358
- demo_stat.get(FOLLOWER_STATS_PAID_COLUMN, 0)
359
- )
360
- # If key not in existing OR counts are different, then it's new/changed
361
- if key not in existing_demographics_map or existing_demographics_map[key] != api_counts:
362
- new_stats_to_upload.append(demo_stat)
363
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
364
 
365
- if not new_stats_to_upload:
366
- logging.info(f"Follower Stats sync: Data for org {org_urn} is up-to-date or no changes found after comparison.")
367
- follower_stats_sync_message = "Follower Stats: Data up-to-date or no changes. "
368
- token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
369
- attempt_logged = True
370
- return follower_stats_sync_message, token_state
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
371
 
372
- bulk_upload_to_bubble(new_stats_to_upload, BUBBLE_FOLLOWER_STATS_TABLE_NAME)
373
- logging.info(f"Successfully uploaded {len(new_stats_to_upload)} follower stat entries to Bubble for org {org_urn}.")
374
-
375
- # Update token_state's follower stats DataFrame
376
- temp_df = pd.concat([bubble_follower_stats_df_orig, pd.DataFrame(new_stats_to_upload)], ignore_index=True)
377
- # For monthly gains, keep last entry per org/date (category)
378
- monthly_part = temp_df[temp_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].drop_duplicates(
379
- subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN],
380
- keep='last'
381
- )
382
- # For demographics, keep last entry per org/type/category
383
- demographics_part = temp_df[temp_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].drop_duplicates(
384
- subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN],
385
- keep='last'
386
- )
387
- token_state["bubble_follower_stats_df"] = pd.concat([monthly_part, demographics_part], ignore_index=True)
388
- follower_stats_sync_message = f"Follower Stats: Synced {len(new_stats_to_upload)} entries. "
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
389
 
390
  except ValueError as ve:
391
  logging.error(f"ValueError during follower stats sync for {org_urn}: {ve}", exc_info=True)
@@ -395,7 +519,8 @@ def sync_linkedin_follower_stats(token_state):
395
  follower_stats_sync_message = f"Follower Stats: Unexpected error ({type(e).__name__}). "
396
  finally:
397
  if not attempt_logged and org_urn:
398
- token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
 
399
  return follower_stats_sync_message, token_state
400
 
401
 
 
9
  from datetime import timezone # Python's datetime
10
 
11
  # Assuming Bubble_API_Calls contains bulk_upload_to_bubble
12
+ from Bubble_API_Calls import bulk_upload_to_bubble, fetch_linkedin_posts_data_from_bubble, update_record_in_bubble
13
  # Assuming Linkedin_Data_API_Calls contains all necessary LinkedIn data fetching and processing functions
14
  from Linkedin_Data_API_Calls import (
15
  fetch_linkedin_posts_core,
 
268
 
269
 
270
  def sync_linkedin_follower_stats(token_state):
271
+ """
272
+ Fetches new/updated LinkedIn follower statistics and uploads/updates them in Bubble,
273
+ if scheduled by state_manager.
274
+ For both monthly gains and demographics, updates counts only if the new LinkedIn count is greater.
275
+ Creates new records if the category/month doesn't exist.
276
+ """
277
  logging.info("Starting LinkedIn follower stats sync process check.")
278
 
279
  if not token_state.get("fs_should_sync_now", False):
 
293
  token_dict = token_state.get("token")
294
  org_urn = token_state.get('org_urn')
295
  bubble_follower_stats_df_orig = token_state.get("bubble_follower_stats_df", pd.DataFrame()).copy()
296
+
297
  follower_stats_sync_message = ""
298
  attempt_logged = False
299
 
 
304
  attempt_logged = True
305
  return "Follower Stats: Config error. ", token_state
306
 
307
+ # Ensure the BUBBLE_UNIQUE_ID_COLUMN_NAME exists in the DataFrame if it's not empty,
308
+ # as it's crucial for building the maps for updates.
309
+ if not bubble_follower_stats_df_orig.empty and BUBBLE_UNIQUE_ID_COLUMN_NAME not in bubble_follower_stats_df_orig.columns:
310
+ logging.error(f"Follower Stats sync: Critical error - '{BUBBLE_UNIQUE_ID_COLUMN_NAME}' column missing in bubble_follower_stats_df. Cannot proceed with updates.")
311
+ if org_urn:
312
+ token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state) # Log the attempt despite error
313
+ attempt_logged = True
314
+ return f"Follower Stats: Config error ({BUBBLE_UNIQUE_ID_COLUMN_NAME} missing). ", token_state
315
+
316
  logging.info(f"Follower stats sync proceeding for org_urn: {org_urn}")
317
  try:
318
  api_follower_stats = get_linkedin_follower_stats(client_id, token_dict, org_urn)
319
+
320
  if not api_follower_stats:
321
  logging.info(f"Follower Stats sync: No stats found via API for org {org_urn}.")
322
  follower_stats_sync_message = "Follower Stats: None found via API. "
 
324
  attempt_logged = True
325
  return follower_stats_sync_message, token_state
326
 
327
+ stats_for_bulk_upload = []
328
+ records_to_update_via_patch = [] # List of tuples: (bubble_id, fields_to_update_dict)
329
+
330
+ # --- Prepare maps for existing data in Bubble for efficient lookup ---
331
+ # Key: (org_urn, type, category_identifier), Value: (organic, paid, bubble_record_id)
332
+ # For monthly gains, category_identifier is the formatted date string.
333
+ # For demographics, category_identifier is the FOLLOWER_STATS_CATEGORY_COLUMN value.
334
+ existing_stats_map = {}
335
+ stats_required_cols = [
336
+ FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN,
337
+ FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, # Assuming these apply to monthly too
338
+ FOLLOWER_STATS_PAID_COLUMN, # Assuming these apply to monthly too
339
+ BUBBLE_UNIQUE_ID_COLUMN_NAME
340
+ ]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
341
 
342
+ if not bubble_follower_stats_df_orig.empty and all(col in bubble_follower_stats_df_orig.columns for col in stats_required_cols):
343
+ for _, row in bubble_follower_stats_df_orig.iterrows():
344
+ category_identifier = str(row[FOLLOWER_STATS_CATEGORY_COLUMN])
345
+ # For monthly gains, ensure category (date) is consistently formatted if needed
346
+ if row[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly':
347
+ try:
348
+ category_identifier = pd.to_datetime(row[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').strftime('%Y-%m-%d')
349
+ if category_identifier == 'NaT': # Handle parsing errors
350
+ logging.warning(f"Could not parse date for existing monthly gain: {row[FOLLOWER_STATS_CATEGORY_COLUMN]}. Skipping this entry for map.")
351
+ continue
352
+ except Exception: # Catch any other parsing issues
353
+ logging.warning(f"Error parsing date for existing monthly gain: {row[FOLLOWER_STATS_CATEGORY_COLUMN]}. Skipping this entry for map.")
354
+ continue
355
+
356
+ key = (
357
+ str(row[FOLLOWER_STATS_ORG_URN_COLUMN]),
358
+ str(row[FOLLOWER_STATS_TYPE_COLUMN]),
359
+ category_identifier
360
+ )
361
+ existing_stats_map[key] = (
362
+ row[FOLLOWER_STATS_ORGANIC_COLUMN], # Assuming monthly gains have this
363
+ row[FOLLOWER_STATS_PAID_COLUMN], # Assuming monthly gains have this
364
+ row[BUBBLE_UNIQUE_ID_COLUMN_NAME]
365
+ )
366
+ elif not bubble_follower_stats_df_orig.empty:
367
+ logging.warning(f"Follower Stats: Data in Bubble is missing one or more required columns for update logic: {stats_required_cols}. Will treat all API stats as new if not matched by key elements.")
368
+
369
+
370
+ # --- Process all stats from API (monthly gains and demographics) ---
371
+ for stat_from_api in api_follower_stats:
372
+ api_type = str(stat_from_api.get(FOLLOWER_STATS_TYPE_COLUMN))
373
+ api_category_raw = stat_from_api.get(FOLLOWER_STATS_CATEGORY_COLUMN)
374
+
375
+ api_category_identifier = str(api_category_raw)
376
+ if api_type == 'follower_gains_monthly':
377
+ try:
378
+ api_category_identifier = pd.to_datetime(api_category_raw, errors='coerce').strftime('%Y-%m-%d')
379
+ if api_category_identifier == 'NaT':
380
+ logging.warning(f"Could not parse date from API for monthly gain: {api_category_raw}. Skipping this API stat.")
381
+ continue
382
+ except Exception:
383
+ logging.warning(f"Error parsing date from API for monthly gain: {api_category_raw}. Skipping this API stat.")
384
+ continue
385
 
386
+ key = (
387
+ str(stat_from_api.get(FOLLOWER_STATS_ORG_URN_COLUMN)),
388
+ api_type,
389
+ api_category_identifier
390
+ )
391
+
392
+ # Assuming monthly gains also have organic/paid counts.
393
+ # If they have different count fields, these need to be specified.
394
+ # For simplicity, using FOLLOWER_STATS_ORGANIC_COLUMN and FOLLOWER_STATS_PAID_COLUMN.
395
+ # If monthly gains only have a single 'count' field, adjust logic accordingly.
396
+ api_organic_count = stat_from_api.get(FOLLOWER_STATS_ORGANIC_COLUMN, 0)
397
+ api_paid_count = stat_from_api.get(FOLLOWER_STATS_PAID_COLUMN, 0)
398
+
399
+ if key not in existing_stats_map:
400
+ # This stat category/month is entirely new, add for bulk creation
401
+ stats_for_bulk_upload.append(stat_from_api)
402
+ else:
403
+ # Stat category/month exists, check if counts need updating
404
+ existing_organic, existing_paid, bubble_id = existing_stats_map[key]
405
+ fields_to_update_in_bubble = {}
406
+
407
+ if api_organic_count != existing_organic:
408
+ fields_to_update_in_bubble[FOLLOWER_STATS_ORGANIC_COLUMN] = api_organic_count
409
+
410
+ if api_paid_count != existing_paid:
411
+ fields_to_update_in_bubble[FOLLOWER_STATS_PAID_COLUMN] = api_paid_count
412
+
413
+ if fields_to_update_in_bubble: # If there's at least one field to update
414
+ records_to_update_via_patch.append((bubble_id, fields_to_update_in_bubble))
415
+
416
+ # --- Perform Bubble Operations ---
417
+ num_bulk_uploaded = 0
418
+ if stats_for_bulk_upload:
419
+ if bulk_upload_to_bubble(stats_for_bulk_upload, BUBBLE_FOLLOWER_STATS_TABLE_NAME):
420
+ num_bulk_uploaded = len(stats_for_bulk_upload)
421
+ logging.info(f"Successfully bulk-uploaded {num_bulk_uploaded} new follower stat entries to Bubble for org {org_urn}.")
422
+ else:
423
+ logging.error(f"Failed to bulk-upload {len(stats_for_bulk_upload)} new follower stat entries for org {org_urn}.")
424
+
425
+ num_patched_updated = 0
426
+ if records_to_update_via_patch:
427
+ for bubble_id, fields_to_update in records_to_update_via_patch:
428
+ if update_record_in_bubble(BUBBLE_FOLLOWER_STATS_TABLE_NAME, bubble_id, fields_to_update):
429
+ num_patched_updated += 1
430
+ else:
431
+ logging.error(f"Failed to update record {bubble_id} via PATCH for follower stats for org {org_urn}.")
432
+ logging.info(f"Attempted to update {len(records_to_update_via_patch)} follower stat entries via PATCH, {num_patched_updated} succeeded for org {org_urn}.")
433
+
434
+ if not stats_for_bulk_upload and not records_to_update_via_patch:
435
+ logging.info(f"Follower Stats sync: Data for org {org_urn} is up-to-date or no changes met update criteria.")
436
+ follower_stats_sync_message = "Follower Stats: Data up-to-date or no qualifying changes. "
437
+ else:
438
+ follower_stats_sync_message = f"Follower Stats: Synced (New: {num_bulk_uploaded}, Updated: {num_patched_updated}). "
439
+
440
+ # --- Update token_state's follower stats DataFrame ---
441
+ current_data_for_state_df = bubble_follower_stats_df_orig.copy()
442
+
443
+ if records_to_update_via_patch and num_patched_updated > 0:
444
+ # Create a temporary map of successful updates for quick lookup
445
+ successful_updates_map = {
446
+ bubble_id: fields for i, (bubble_id, fields) in enumerate(records_to_update_via_patch) if i < num_patched_updated
447
+ }
448
+ if successful_updates_map: # only proceed if there were successful updates to reflect
449
+ for index, row in current_data_for_state_df.iterrows():
450
+ bubble_id_from_df = row.get(BUBBLE_UNIQUE_ID_COLUMN_NAME)
451
+ if bubble_id_from_df in successful_updates_map:
452
+ fields_updated = successful_updates_map[bubble_id_from_df]
453
+ for col, value in fields_updated.items():
454
+ current_data_for_state_df.loc[index, col] = value
455
+
456
+ if stats_for_bulk_upload and num_bulk_uploaded > 0:
457
+ # Only consider successfully uploaded new records
458
+ successfully_created_stats = [s for i, s in enumerate(stats_for_bulk_upload) if i < num_bulk_uploaded]
459
+ if successfully_created_stats:
460
+ newly_created_df = pd.DataFrame(successfully_created_stats)
461
+ if not newly_created_df.empty:
462
+ for col in current_data_for_state_df.columns:
463
+ if col not in newly_created_df.columns:
464
+ newly_created_df[col] = pd.NA # Use pd.NA for missing values
465
+ # Align columns before concat to avoid issues with differing column orders or types
466
+ aligned_newly_created_df = newly_created_df.reindex(columns=current_data_for_state_df.columns).fillna(pd.NA)
467
+ current_data_for_state_df = pd.concat([current_data_for_state_df, aligned_newly_created_df], ignore_index=True)
468
+
469
+ if not current_data_for_state_df.empty:
470
+ # Deduplication logic (important after combining original, patched, and new data)
471
+ # Ensure consistent primary key for deduplication across types
472
+ # For monthly gains, primary key is (org_urn, type='follower_gains_monthly', category=date_str)
473
+ # For demographics, primary key is (org_urn, type, category)
474
+
475
+ # To handle this, we can sort by a hypothetical 'last_modified_indicator' if we had one,
476
+ # or rely on 'keep=last' after ensuring data is ordered such that API data (potentially newer) comes later.
477
+ # The concat order (original, then new) and then drop_duplicates with keep='last' on identifying keys is standard.
478
 
479
+ # We need to define unique keys for each type to drop duplicates correctly.
480
+ # The current deduplication splits by type and then applies different subsets. This should still work.
481
+
482
+ monthly_part = current_data_for_state_df[current_data_for_state_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly']
483
+ if not monthly_part.empty:
484
+ # Ensure category is consistently formatted for monthly gains before deduplication
485
+ monthly_part_copy = monthly_part.copy() # To avoid SettingWithCopyWarning
486
+ monthly_part_copy[FOLLOWER_STATS_CATEGORY_COLUMN] = pd.to_datetime(monthly_part_copy[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.strftime('%Y-%m-%d')
487
+ monthly_part = monthly_part_copy.drop_duplicates(
488
+ subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN],
489
+ keep='last'
490
+ )
491
+
492
+ demographics_part = current_data_for_state_df[current_data_for_state_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly']
493
+ if not demographics_part.empty:
494
+ demo_subset_cols = [FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN]
495
+ if all(col in demographics_part.columns for col in demo_subset_cols):
496
+ demographics_part = demographics_part.drop_duplicates(
497
+ subset=demo_subset_cols,
498
+ keep='last'
499
+ )
500
+ else:
501
+ logging.warning("Follower Stats: Missing columns for demographic deduplication in token_state update. Skipping.")
502
+
503
+ if monthly_part.empty and demographics_part.empty:
504
+ token_state["bubble_follower_stats_df"] = pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
505
+ elif monthly_part.empty:
506
+ token_state["bubble_follower_stats_df"] = demographics_part.reset_index(drop=True) if not demographics_part.empty else pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
507
+ elif demographics_part.empty:
508
+ token_state["bubble_follower_stats_df"] = monthly_part.reset_index(drop=True) if not monthly_part.empty else pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
509
+ else:
510
+ token_state["bubble_follower_stats_df"] = pd.concat([monthly_part, demographics_part], ignore_index=True)
511
+ else:
512
+ token_state["bubble_follower_stats_df"] = pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
513
 
514
  except ValueError as ve:
515
  logging.error(f"ValueError during follower stats sync for {org_urn}: {ve}", exc_info=True)
 
519
  follower_stats_sync_message = f"Follower Stats: Unexpected error ({type(e).__name__}). "
520
  finally:
521
  if not attempt_logged and org_urn:
522
+ token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
523
+
524
  return follower_stats_sync_message, token_state
525
 
526