File size: 35,930 Bytes
7f592b2
 
 
17bd31e
7f592b2
 
 
 
17bd31e
7f592b2
 
6b29e46
7f592b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17bd31e
 
 
 
6fbe851
 
7f592b2
 
17bd31e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7f592b2
 
 
17bd31e
 
 
7f592b2
 
 
 
 
 
17bd31e
7f592b2
 
17bd31e
 
 
 
 
 
 
 
 
7f592b2
 
 
 
 
17bd31e
 
 
7f592b2
 
 
 
 
 
 
 
 
 
 
17bd31e
 
 
7f592b2
 
 
 
 
 
17bd31e
7f592b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17bd31e
 
 
 
 
7f592b2
 
 
 
17bd31e
 
 
 
 
 
 
 
 
7f592b2
 
17bd31e
 
 
 
7f592b2
 
 
 
 
 
17bd31e
 
7f592b2
 
 
17bd31e
 
 
7f592b2
 
17bd31e
 
 
 
 
7f592b2
 
 
 
17bd31e
 
 
 
7f592b2
 
 
 
 
 
 
 
 
 
 
 
 
 
17bd31e
 
 
 
7f592b2
 
 
 
 
 
 
17bd31e
7f592b2
 
17bd31e
 
7f592b2
 
17bd31e
7f592b2
 
17bd31e
 
 
 
 
7f592b2
 
fe99c78
 
 
 
 
 
 
 
 
 
 
 
 
 
7f592b2
6b29e46
 
fe99c78
6b29e46
fe99c78
17bd31e
 
fe99c78
17bd31e
 
fe99c78
17bd31e
7f592b2
fe99c78
17bd31e
 
 
7f592b2
 
 
 
 
 
6b29e46
17bd31e
 
7f592b2
 
fe99c78
17bd31e
 
 
7f592b2
fe99c78
6b29e46
fe99c78
6b29e46
fe99c78
6b29e46
 
 
fe99c78
7f592b2
 
6b29e46
fe99c78
 
17bd31e
 
 
 
7f592b2
6b29e46
fe99c78
6b29e46
 
 
 
fe99c78
 
6b29e46
7f592b2
fe99c78
6b29e46
fe99c78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6b29e46
fe99c78
 
 
 
6b29e46
fe99c78
 
 
 
 
 
 
 
6b29e46
fe99c78
 
 
6b29e46
fe99c78
6b29e46
fe99c78
 
 
 
6b29e46
fe99c78
 
 
 
6b29e46
 
fe99c78
 
 
 
 
6b29e46
fe99c78
 
 
 
17bd31e
fe99c78
 
6b29e46
fe99c78
 
 
 
 
 
 
 
 
6b29e46
 
fe99c78
 
 
6b29e46
fe99c78
6b29e46
fe99c78
6b29e46
fe99c78
6b29e46
fe99c78
6b29e46
fe99c78
6b29e46
fe99c78
 
 
6b29e46
 
 
fe99c78
6b29e46
 
 
 
 
 
 
 
fe99c78
 
6b29e46
 
 
fe99c78
6b29e46
 
 
 
 
fe99c78
6b29e46
 
 
 
 
 
 
fe99c78
 
 
 
 
 
 
 
6b29e46
fe99c78
 
6b29e46
 
 
 
 
fe99c78
6b29e46
 
 
 
fe99c78
6b29e46
fe99c78
 
 
6b29e46
 
 
 
fe99c78
6b29e46
fe99c78
 
6b29e46
 
fe99c78
 
6b29e46
 
 
 
 
fe99c78
6b29e46
 
 
fe99c78
6b29e46
fe99c78
6b29e46
fe99c78
6b29e46
fe99c78
6b29e46
17bd31e
fe99c78
7f592b2
fe99c78
17bd31e
fe99c78
 
17bd31e
 
fe99c78
6b29e46
 
17bd31e
7f592b2
 
 
 
 
 
 
 
 
 
17bd31e
7f592b2
 
 
 
 
 
 
 
17bd31e
7f592b2
 
 
 
 
 
17bd31e
 
 
9915c93
7f592b2
 
17bd31e
7f592b2
 
17bd31e
 
7f592b2
 
 
17bd31e
 
7f592b2
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
# sync_logic.py
"""
Handles the logic for syncing LinkedIn data: posts, mentions, and follower statistics.
Fetches data from LinkedIn APIs, uploads to Bubble, and logs sync attempts.
"""
import pandas as pd
import logging
import html
from datetime import timezone # Python's datetime

# Assuming Bubble_API_Calls contains bulk_upload_to_bubble
from Bubble_API_Calls import bulk_upload_to_bubble, fetch_linkedin_posts_data_from_bubble, update_record_in_bubble
# Assuming Linkedin_Data_API_Calls contains all necessary LinkedIn data fetching and processing functions
from Linkedin_Data_API_Calls import (
    fetch_linkedin_posts_core,
    fetch_comments,
    analyze_sentiment, # For post comments
    compile_detailed_posts,
    prepare_data_for_bubble, # For posts, stats, comments
    fetch_linkedin_mentions_core,
    analyze_mentions_sentiment, # For individual mentions
    compile_detailed_mentions, # Compiles to user-specified format
    prepare_mentions_for_bubble # Prepares user-specified format for Bubble
)
# Assuming linkedin_follower_stats.py contains get_linkedin_follower_stats
from linkedin_follower_stats import get_linkedin_follower_stats

# Assuming config.py contains all necessary constants
from config import (
    LINKEDIN_POST_URN_KEY, BUBBLE_POST_URN_COLUMN_NAME, BUBBLE_POSTS_TABLE_NAME,
    BUBBLE_POST_STATS_TABLE_NAME, BUBBLE_POST_COMMENTS_TABLE_NAME,
    BUBBLE_MENTIONS_TABLE_NAME, BUBBLE_MENTIONS_ID_COLUMN_NAME, BUBBLE_MENTIONS_DATE_COLUMN_NAME,
    DEFAULT_MENTIONS_INITIAL_FETCH_COUNT, DEFAULT_MENTIONS_UPDATE_FETCH_COUNT,
    BUBBLE_FOLLOWER_STATS_TABLE_NAME, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN,
    FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, FOLLOWER_STATS_PAID_COLUMN,
    LINKEDIN_CLIENT_ID_ENV_VAR, # Though client_id is usually passed in token_state
    # NEW constants for logging
    BUBBLE_OPERATIONS_LOG_TABLE_NAME, BUBBLE_OPERATIONS_LOG_DATE_COLUMN,
    BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN,
    LOG_SUBJECT_POSTS, LOG_SUBJECT_MENTIONS, LOG_SUBJECT_FOLLOWER_STATS,
    BUBBLE_UNIQUE_ID_COLUMN_NAME 
)

def _log_sync_attempt(org_urn, subject, token_state):
    """
    Logs a sync attempt to the Bubble operations log table and updates
    the operations log DataFrame in token_state.
    """
    logging.info(f"Logging sync attempt for subject: {subject}, org_urn: {org_urn}")
    if not org_urn:
        logging.warning("Cannot log sync attempt: org_urn is missing.")
        return token_state
    try:
        log_entry_data = {
            BUBBLE_OPERATIONS_LOG_DATE_COLUMN: pd.Timestamp.now(tz='UTC').isoformat(),
            BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN: subject,
            BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN: org_urn
        }
        
        # Ensure data types are what Bubble expects, e.g., date as string
        # bulk_upload_to_bubble should handle dicts with basic types.
        upload_payload = [log_entry_data]

        bulk_upload_to_bubble(upload_payload, BUBBLE_OPERATIONS_LOG_TABLE_NAME)
        logging.info(f"Successfully logged sync attempt for {subject} to Bubble table '{BUBBLE_OPERATIONS_LOG_TABLE_NAME}'.")

        # Update token_state with the new log entry to keep it fresh
        current_log_df = token_state.get("bubble_operations_log_df", pd.DataFrame())
        new_log_entry_df = pd.DataFrame(upload_payload) # DataFrame from the same data we uploaded

        # Ensure date column is datetime before concat if it exists and is not empty
        if not new_log_entry_df.empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in new_log_entry_df.columns:
             new_log_entry_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(new_log_entry_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True)
        
        if not current_log_df.empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in current_log_df.columns:
            # Ensure existing log df date column is also datetime
            if not pd.api.types.is_datetime64_any_dtype(current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN]):
                current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True)
        
        updated_log_df = pd.concat([current_log_df, new_log_entry_df], ignore_index=True)
        # To ensure the get_last_sync_attempt_date always gets the absolute latest,
        # we can sort and drop duplicates, keeping the last.
        # However, simply appending and letting max() find the latest is also fine.
        # For robustness, let's sort and keep the latest for each subject/org combo if multiple logs were made rapidly.
        if not updated_log_df.empty and all(col in updated_log_df.columns for col in [BUBBLE_OPERATIONS_LOG_DATE_COLUMN, BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN]):
            updated_log_df = updated_log_df.sort_values(by=BUBBLE_OPERATIONS_LOG_DATE_COLUMN).drop_duplicates(
                subset=[BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN],
                keep='last'
            )
        token_state["bubble_operations_log_df"] = updated_log_df
        logging.info(f"Updated 'bubble_operations_log_df' in token_state after logging {subject}.")

    except Exception as e:
        logging.error(f"Failed to log sync attempt for {subject} or update token_state: {e}", exc_info=True)
    return token_state


def _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api):
    """Internal logic for syncing LinkedIn posts."""
    # This function is called by orchestrator only if fetch_count_for_posts_api > 0
    # So, an attempt to sync posts is indeed happening.

    logging.info(f"Posts sync: Starting fetch for {fetch_count_for_posts_api} posts.")
    client_id = token_state.get("client_id")
    token_dict = token_state.get("token")
    org_urn = token_state.get('org_urn')
    bubble_posts_df_orig = token_state.get("bubble_posts_df", pd.DataFrame()).copy()
    posts_sync_message = ""
    attempt_logged = False # Flag to ensure log happens once

    try:
        # Basic checks before API call
        if not all([client_id, token_dict, org_urn]):
            posts_sync_message = "Posts: Config error (client_id, token, or org_urn missing). "
            logging.error(f"Posts sync: Prerequisite missing - client_id: {'OK' if client_id else 'Missing'}, token: {'OK' if token_dict else 'Missing'}, org_urn: {'OK' if org_urn else 'Missing'}")
            # Log attempt even if config error, as state_manager decided a sync *should* occur
            token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
            attempt_logged = True
            return posts_sync_message, token_state

        processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_for_posts_api)

        if not processed_raw_posts:
            posts_sync_message = "Posts: None found via API. "
            logging.info("Posts sync: No raw posts returned from API.")
            # Log attempt as API was called
            token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
            attempt_logged = True
            return posts_sync_message, token_state

        existing_post_urns = set()
        if not bubble_posts_df_orig.empty and BUBBLE_POST_URN_COLUMN_NAME in bubble_posts_df_orig.columns:
            existing_post_urns = set(bubble_posts_df_orig[BUBBLE_POST_URN_COLUMN_NAME].dropna().astype(str))

        new_raw_posts = [p for p in processed_raw_posts if str(p.get(LINKEDIN_POST_URN_KEY)) not in existing_post_urns]

        if not new_raw_posts:
            posts_sync_message = "Posts: All fetched already in Bubble. "
            logging.info("Posts sync: All fetched posts were already found in Bubble.")
            # Log attempt as API was called and processed
            token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
            attempt_logged = True
            return posts_sync_message, token_state

        logging.info(f"Posts sync: Processing {len(new_raw_posts)} new raw posts.")
        post_urns_to_process = [p[LINKEDIN_POST_URN_KEY] for p in new_raw_posts if p.get(LINKEDIN_POST_URN_KEY)]

        all_comments_data = fetch_comments(client_id, token_dict, post_urns_to_process, stats_map)
        sentiments_per_post = analyze_sentiment(all_comments_data)
        detailed_new_posts = compile_detailed_posts(new_raw_posts, stats_map, sentiments_per_post)
        li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_new_posts, all_comments_data)

        if li_posts:
            bulk_upload_to_bubble(li_posts, BUBBLE_POSTS_TABLE_NAME)
            updated_posts_df = pd.concat([bubble_posts_df_orig, pd.DataFrame(li_posts)], ignore_index=True)
            token_state["bubble_posts_df"] = updated_posts_df.drop_duplicates(subset=[BUBBLE_POST_URN_COLUMN_NAME], keep='last')
            logging.info(f"Posts sync: Uploaded {len(li_posts)} new posts to Bubble.")

            if li_post_stats:
                bulk_upload_to_bubble(li_post_stats, BUBBLE_POST_STATS_TABLE_NAME)
                logging.info(f"Posts sync: Uploaded {len(li_post_stats)} post_stats entries.")
            if li_post_comments:
                bulk_upload_to_bubble(li_post_comments, BUBBLE_POST_COMMENTS_TABLE_NAME)
                logging.info(f"Posts sync: Uploaded {len(li_post_comments)} post_comments entries.")
            posts_sync_message = f"Posts: Synced {len(li_posts)} new. "
        else:
            posts_sync_message = "Posts: No new ones to upload after processing. "
            logging.info("Posts sync: No new posts were prepared for Bubble upload.")

    except ValueError as ve:
        posts_sync_message = f"Posts Error: {html.escape(str(ve))}. "
        logging.error(f"Posts sync: ValueError: {ve}", exc_info=True)
    except Exception as e:
        logging.exception("Posts sync: Unexpected error during processing.")
        posts_sync_message = f"Posts: Unexpected error ({type(e).__name__}). "
    finally:
        # Log the sync attempt if it hasn't been logged already (e.g. due to early exit)
        # and if basic conditions (org_urn) for logging are met.
        if not attempt_logged and org_urn:
            token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
    return posts_sync_message, token_state


def sync_linkedin_mentions(token_state):
    """Fetches new LinkedIn mentions and uploads them to Bubble, if scheduled by state_manager."""
    logging.info("Starting LinkedIn mentions sync process check.")
    
    if not token_state.get("mentions_should_sync_now", False):
        logging.info("Mentions sync: Not scheduled by state_manager based on operations log. Skipping.")
        return "Mentions: Sync not currently required by schedule. ", token_state

    logging.info("Mentions sync: Proceeding as scheduled by state_manager.")
    
    if not token_state or not token_state.get("token"):
        logging.error("Mentions sync: Access denied. No LinkedIn token.")
        # Still log an attempt if org_urn is available, as a sync was scheduled
        org_urn_for_log = token_state.get('org_urn') if token_state else None
        if org_urn_for_log:
             token_state = _log_sync_attempt(org_urn_for_log, LOG_SUBJECT_MENTIONS, token_state)
        return "Mentions: No token. ", token_state

    client_id = token_state.get("client_id")
    token_dict = token_state.get("token")
    org_urn = token_state.get('org_urn')
    bubble_mentions_df_orig = token_state.get("bubble_mentions_df", pd.DataFrame()).copy()
    mentions_sync_message = ""
    attempt_logged = False

    if not org_urn or not client_id or client_id == "ENV VAR MISSING":
        logging.error("Mentions sync: Configuration error (Org URN or Client ID missing).")
        if org_urn: # Log if possible
            token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
            attempt_logged = True
        return "Mentions: Config error. ", token_state

    # Determine fetch count: initial if no mentions data, update otherwise
    fetch_count_for_mentions_api = DEFAULT_MENTIONS_INITIAL_FETCH_COUNT \
        if bubble_mentions_df_orig.empty else DEFAULT_MENTIONS_UPDATE_FETCH_COUNT
    logging.info(f"Mentions sync: Fetch count set to {fetch_count_for_mentions_api}.")

    try:
        processed_raw_mentions = fetch_linkedin_mentions_core(client_id, token_dict, org_urn, count=fetch_count_for_mentions_api)
        if not processed_raw_mentions:
            logging.info("Mentions sync: No new mentions found via API.")
            mentions_sync_message = "Mentions: None found via API. "
            token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
            attempt_logged = True
            return mentions_sync_message, token_state

        existing_mention_ids = set()
        if not bubble_mentions_df_orig.empty and BUBBLE_MENTIONS_ID_COLUMN_NAME in bubble_mentions_df_orig.columns:
            existing_mention_ids = set(bubble_mentions_df_orig[BUBBLE_MENTIONS_ID_COLUMN_NAME].dropna().astype(str))

        sentiments_map = analyze_mentions_sentiment(processed_raw_mentions)
        all_compiled_mentions = compile_detailed_mentions(processed_raw_mentions, sentiments_map)

        new_compiled_mentions_to_upload = [
            m for m in all_compiled_mentions if str(m.get("id")) not in existing_mention_ids
        ]

        if not new_compiled_mentions_to_upload:
            logging.info("Mentions sync: All fetched mentions are already in Bubble.")
            mentions_sync_message = "Mentions: All fetched already in Bubble. "
            token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
            attempt_logged = True
            return mentions_sync_message, token_state

        bubble_ready_mentions = prepare_mentions_for_bubble(new_compiled_mentions_to_upload)
        if bubble_ready_mentions:
            bulk_upload_to_bubble(bubble_ready_mentions, BUBBLE_MENTIONS_TABLE_NAME)
            logging.info(f"Successfully uploaded {len(bubble_ready_mentions)} new mentions to Bubble.")
            updated_mentions_df = pd.concat([bubble_mentions_df_orig, pd.DataFrame(bubble_ready_mentions)], ignore_index=True)
            token_state["bubble_mentions_df"] = updated_mentions_df.drop_duplicates(subset=[BUBBLE_MENTIONS_ID_COLUMN_NAME], keep='last')
            mentions_sync_message = f"Mentions: Synced {len(bubble_ready_mentions)} new. "
        else:
            logging.info("Mentions sync: No new mentions were prepared for Bubble upload.")
            mentions_sync_message = "Mentions: No new ones to upload. "
            
    except ValueError as ve:
        logging.error(f"ValueError during mentions sync: {ve}", exc_info=True)
        mentions_sync_message = f"Mentions Error: {html.escape(str(ve))}. "
    except Exception as e:
        logging.exception("Unexpected error in sync_linkedin_mentions.")
        mentions_sync_message = f"Mentions: Unexpected error ({type(e).__name__}). "
    finally:
        if not attempt_logged and org_urn:
            token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
    return mentions_sync_message, token_state


def _clean_key_component(component_value, is_category_identifier=False):
    """
    Helper to consistently clean key components.
    For non-date category identifiers, converts to lowercase for case-insensitive matching.
    """
    if pd.isna(component_value) or component_value is None:
        return "NONE_VALUE" # Consistent placeholder for None/NaN
    
    cleaned_value = str(component_value).strip()
    if is_category_identifier: # Apply lowercasing only to general category text, not dates or URNs/Types
        return cleaned_value.lower()
    return cleaned_value


def sync_linkedin_follower_stats(token_state):
    """
    Fetches new/updated LinkedIn follower statistics and uploads/updates them in Bubble,
    if scheduled by state_manager. Includes detailed logging for debugging key mismatches.
    """
    logging.info("DEBUG: Starting LinkedIn follower stats sync process check.")

    if not token_state.get("fs_should_sync_now", False):
        logging.info("DEBUG: Follower Stats sync: Not scheduled by state_manager. Skipping.")
        return "Follower Stats: Sync not currently required by schedule. ", token_state
        
    logging.info("DEBUG: Follower Stats sync: Proceeding as scheduled by state_manager.")

    if not token_state or not token_state.get("token"):
        logging.error("DEBUG: Follower Stats sync: Access denied. No LinkedIn token.")
        org_urn_for_log = token_state.get('org_urn') if token_state else None
        if org_urn_for_log:
            token_state = _log_sync_attempt(org_urn_for_log, LOG_SUBJECT_FOLLOWER_STATS, token_state)
        return "Follower Stats: No token. ", token_state

    client_id = token_state.get("client_id")
    token_dict = token_state.get("token")
    org_urn = token_state.get('org_urn')
    bubble_follower_stats_df_orig = token_state.get("bubble_follower_stats_df", pd.DataFrame()).copy()
    
    follower_stats_sync_message = ""
    attempt_logged = False

    if not org_urn or not client_id or client_id == "ENV VAR MISSING":
        logging.error("DEBUG: Follower Stats sync: Configuration error (Org URN or Client ID missing).")
        if org_urn:
            token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
            attempt_logged = True
        return "Follower Stats: Config error. ", token_state
    
    if not bubble_follower_stats_df_orig.empty and BUBBLE_UNIQUE_ID_COLUMN_NAME not in bubble_follower_stats_df_orig.columns:
        logging.error(f"DEBUG: Follower Stats sync: Critical error - '{BUBBLE_UNIQUE_ID_COLUMN_NAME}' column missing in bubble_follower_stats_df. Cannot proceed with updates.")
        if org_urn:
             token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
             attempt_logged = True
        return f"Follower Stats: Config error ({BUBBLE_UNIQUE_ID_COLUMN_NAME} missing). ", token_state

    logging.info(f"DEBUG: Follower stats sync proceeding for org_urn: {org_urn}")
    try:
        api_follower_stats = get_linkedin_follower_stats(client_id, token_dict, org_urn)
        
        if not api_follower_stats: # This is a list of dicts
            logging.info(f"DEBUG: Follower Stats sync: No stats found via API for org {org_urn}. API returned: {api_follower_stats}")
            follower_stats_sync_message = "Follower Stats: None found via API. "
            token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
            attempt_logged = True
            return follower_stats_sync_message, token_state

        stats_for_bulk_upload = []
        records_to_update_via_patch = []

        existing_stats_map = {}
        stats_required_cols = [
            FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN,
            FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN,
            FOLLOWER_STATS_PAID_COLUMN, BUBBLE_UNIQUE_ID_COLUMN_NAME
        ]

        logging.info("DEBUG: Populating existing_stats_map from Bubble data...")
        if not bubble_follower_stats_df_orig.empty and all(col in bubble_follower_stats_df_orig.columns for col in stats_required_cols):
            for index, row in bubble_follower_stats_df_orig.iterrows():
                org_urn_val = _clean_key_component(row[FOLLOWER_STATS_ORG_URN_COLUMN])
                type_val = _clean_key_component(row[FOLLOWER_STATS_TYPE_COLUMN])
                category_raw_val = row[FOLLOWER_STATS_CATEGORY_COLUMN]
                bubble_id_val = row.get(BUBBLE_UNIQUE_ID_COLUMN_NAME)

                if pd.isna(bubble_id_val):
                    logging.warning(f"DEBUG: Row index {index} from Bubble data has missing Bubble ID ('{BUBBLE_UNIQUE_ID_COLUMN_NAME}'). Cannot use for updates. Data: {row.to_dict()}")
                    continue

                category_identifier = ""
                if type_val == 'follower_gains_monthly': # Type is already cleaned
                    parsed_date = pd.to_datetime(category_raw_val, errors='coerce')
                    if pd.NaT is parsed_date or pd.isna(parsed_date):
                        logging.warning(f"DEBUG: Could not parse date for existing monthly gain: '{category_raw_val}' from Bubble row index {index}. Skipping for map.")
                        continue
                    category_identifier = parsed_date.strftime('%Y-%m-%d') # Date format, not lowercased
                else:
                    # Apply lowercasing for general text categories for case-insensitive matching
                    category_identifier = _clean_key_component(category_raw_val, is_category_identifier=True) 
                
                key = (org_urn_val, type_val, category_identifier)
                
                # Ensure counts are numeric when storing in map
                existing_organic_count = pd.to_numeric(row[FOLLOWER_STATS_ORGANIC_COLUMN], errors='coerce')
                existing_paid_count = pd.to_numeric(row[FOLLOWER_STATS_PAID_COLUMN], errors='coerce')
                existing_organic_count = 0 if pd.isna(existing_organic_count) else int(existing_organic_count)
                existing_paid_count = 0 if pd.isna(existing_paid_count) else int(existing_paid_count)

                existing_stats_map[key] = (
                    existing_organic_count,
                    existing_paid_count,
                    str(bubble_id_val) # Ensure Bubble ID is string
                )
                logging.debug(f"DEBUG: Added to existing_stats_map: Key={key}, BubbleID={str(bubble_id_val)}, OrgCounts={existing_organic_count}, PaidCounts={existing_paid_count}")

        elif not bubble_follower_stats_df_orig.empty:
            logging.warning(f"DEBUG: Follower Stats: Bubble data is missing one or more required columns for map: {stats_required_cols}.")
        else:
            logging.info("DEBUG: Follower Stats: Bubble_follower_stats_df_orig is empty. existing_stats_map will be empty.")

        logging.info(f"DEBUG: Processing {len(api_follower_stats)} stats from API...")
        for i, stat_from_api in enumerate(api_follower_stats): # api_follower_stats is a list of dicts
            api_org_urn = _clean_key_component(stat_from_api.get(FOLLOWER_STATS_ORG_URN_COLUMN))
            api_type = _clean_key_component(stat_from_api.get(FOLLOWER_STATS_TYPE_COLUMN))
            api_category_raw = stat_from_api.get(FOLLOWER_STATS_CATEGORY_COLUMN)
            
            api_category_identifier = ""
            if api_type == 'follower_gains_monthly': # API type is already cleaned
                parsed_date = pd.to_datetime(api_category_raw, errors='coerce')
                if pd.NaT is parsed_date or pd.isna(parsed_date):
                    logging.warning(f"DEBUG: API stat index {i}: Could not parse date for API monthly gain: '{api_category_raw}'. Skipping.")
                    continue
                api_category_identifier = parsed_date.strftime('%Y-%m-%d') # Date format, not lowercased
            else:
                # Apply lowercasing for general text categories for case-insensitive matching
                api_category_identifier = _clean_key_component(api_category_raw, is_category_identifier=True)

            key_from_api = (api_org_urn, api_type, api_category_identifier)
            logging.debug(f"DEBUG: API stat index {i}: Generated Key={key_from_api}, RawData={stat_from_api}")
            
            # Ensure API counts are numeric
            api_organic_count = pd.to_numeric(stat_from_api.get(FOLLOWER_STATS_ORGANIC_COLUMN), errors='coerce')
            api_paid_count = pd.to_numeric(stat_from_api.get(FOLLOWER_STATS_PAID_COLUMN), errors='coerce')
            api_organic_count = 0 if pd.isna(api_organic_count) else int(api_organic_count)
            api_paid_count = 0 if pd.isna(api_paid_count) else int(api_paid_count)


            if key_from_api not in existing_stats_map:
                logging.info(f"DEBUG: API stat index {i}: Key={key_from_api} NOT FOUND in existing_stats_map. Adding for BULK UPLOAD.")
                stats_for_bulk_upload.append(stat_from_api)
            else:
                existing_organic, existing_paid, bubble_id = existing_stats_map[key_from_api] # Counts are already int from map
                logging.info(f"DEBUG: API stat index {i}: Key={key_from_api} FOUND in existing_stats_map. BubbleID={bubble_id}. ExistingCounts(O/P): {existing_organic}/{existing_paid}. APICounts(O/P): {api_organic_count}/{api_paid_count}.")
                
                fields_to_update_in_bubble = {}
                if api_organic_count > existing_organic:
                    fields_to_update_in_bubble[FOLLOWER_STATS_ORGANIC_COLUMN] = api_organic_count
                    logging.debug(f"DEBUG: API stat index {i}: Organic count update: API({api_organic_count}) > Bubble({existing_organic}) for BubbleID {bubble_id}")
                
                if api_paid_count > existing_paid:
                    fields_to_update_in_bubble[FOLLOWER_STATS_PAID_COLUMN] = api_paid_count
                    logging.debug(f"DEBUG: API stat index {i}: Paid count update: API({api_paid_count}) > Bubble({existing_paid}) for BubbleID {bubble_id}")
                
                if fields_to_update_in_bubble:
                    records_to_update_via_patch.append((bubble_id, fields_to_update_in_bubble))
                    logging.info(f"DEBUG: API stat index {i}: Queued for PATCH update. BubbleID={bubble_id}, Updates={fields_to_update_in_bubble}")
                else:
                    logging.info(f"DEBUG: API stat index {i}: Counts are not greater or equal. No update needed for BubbleID={bubble_id}.")
        
        num_bulk_uploaded = 0
        if stats_for_bulk_upload:
            logging.info(f"DEBUG: Attempting to bulk upload {len(stats_for_bulk_upload)} new follower stat entries.")
            if bulk_upload_to_bubble(stats_for_bulk_upload, BUBBLE_FOLLOWER_STATS_TABLE_NAME):
                num_bulk_uploaded = len(stats_for_bulk_upload)
                logging.info(f"Successfully bulk-uploaded {num_bulk_uploaded} new follower stat entries to Bubble for org {org_urn}.")
            else:
                logging.error(f"Failed to bulk-upload {len(stats_for_bulk_upload)} new follower stat entries for org {org_urn}.")
        
        num_patched_updated = 0
        if records_to_update_via_patch:
            logging.info(f"DEBUG: Attempting to PATCH update {len(records_to_update_via_patch)} follower stat entries.")
            successfully_patched_ids_and_data_temp = [] # To store what was actually successful for token_state update
            for bubble_id, fields_to_update in records_to_update_via_patch:
                if update_record_in_bubble(BUBBLE_FOLLOWER_STATS_TABLE_NAME, bubble_id, fields_to_update):
                    num_patched_updated += 1
                    successfully_patched_ids_and_data_temp.append({'bubble_id': bubble_id, 'fields': fields_to_update})
                else:
                    logging.error(f"Failed to update record {bubble_id} via PATCH for follower stats for org {org_urn}.")
            logging.info(f"Attempted to update {len(records_to_update_via_patch)} follower stat entries via PATCH, {num_patched_updated} succeeded for org {org_urn}.")

        if not stats_for_bulk_upload and not records_to_update_via_patch:
            logging.info(f"DEBUG: Follower Stats sync: Data for org {org_urn} is up-to-date or no changes met update criteria.")
            follower_stats_sync_message = "Follower Stats: Data up-to-date or no qualifying changes. "
        else:
            follower_stats_sync_message = f"Follower Stats: Synced (New: {num_bulk_uploaded}, Updated: {num_patched_updated}). "

        # --- Update token_state's follower stats DataFrame ---
        current_data_for_state_df = bubble_follower_stats_df_orig.copy()

        if num_patched_updated > 0: # Check against actual successful patches
            for item in successfully_patched_ids_and_data_temp: # Iterate over successfully patched items
                bubble_id = item['bubble_id']
                fields_updated = item['fields']
                idx = current_data_for_state_df[current_data_for_state_df[BUBBLE_UNIQUE_ID_COLUMN_NAME] == bubble_id].index
                if not idx.empty:
                    for col, value in fields_updated.items():
                        current_data_for_state_df.loc[idx, col] = value
        
        if num_bulk_uploaded > 0: # Check against actual successful bulk uploads
            successfully_created_stats = stats_for_bulk_upload[:num_bulk_uploaded] # Slice based on success count
            if successfully_created_stats:
                newly_created_df = pd.DataFrame(successfully_created_stats)
                if not newly_created_df.empty:
                    for col in current_data_for_state_df.columns:
                        if col not in newly_created_df.columns:
                            newly_created_df[col] = pd.NA 
                    aligned_newly_created_df = newly_created_df.reindex(columns=current_data_for_state_df.columns).fillna(pd.NA)
                    current_data_for_state_df = pd.concat([current_data_for_state_df, aligned_newly_created_df], ignore_index=True)

        if not current_data_for_state_df.empty:
            monthly_part = current_data_for_state_df[current_data_for_state_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].copy()
            if not monthly_part.empty:
                # Ensure FOLLOWER_STATS_CATEGORY_COLUMN is string before strftime, after to_datetime
                monthly_part.loc[:, FOLLOWER_STATS_CATEGORY_COLUMN] = pd.to_datetime(monthly_part[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.strftime('%Y-%m-%d')
                monthly_part = monthly_part.drop_duplicates(
                    subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN],
                    keep='last'
                )

            demographics_part = current_data_for_state_df[current_data_for_state_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].copy()
            if not demographics_part.empty:
                # For demographics, category is already cleaned (and lowercased) if it was text
                # Ensure all subset columns exist before drop_duplicates
                demo_subset_cols = [FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN]
                if all(col in demographics_part.columns for col in demo_subset_cols):
                     # Clean the category column here again to match the key generation for demographics
                    demographics_part.loc[:, FOLLOWER_STATS_CATEGORY_COLUMN] = demographics_part[FOLLOWER_STATS_CATEGORY_COLUMN].apply(lambda x: _clean_key_component(x, is_category_identifier=True))
                    demographics_part = demographics_part.drop_duplicates(
                        subset=demo_subset_cols,
                        keep='last'
                    )
                else:
                    logging.warning(f"DEBUG: Demographics part missing one of {demo_subset_cols} for deduplication.")
            
            if monthly_part.empty and demographics_part.empty:
                token_state["bubble_follower_stats_df"] = pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
            elif monthly_part.empty: # only demographics_part has data or is empty
                token_state["bubble_follower_stats_df"] = demographics_part.reset_index(drop=True) if not demographics_part.empty else pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
            elif demographics_part.empty: # only monthly_part has data or is empty
                token_state["bubble_follower_stats_df"] = monthly_part.reset_index(drop=True) if not monthly_part.empty else pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
            else: # both have data
                token_state["bubble_follower_stats_df"] = pd.concat([monthly_part, demographics_part], ignore_index=True)
        else: # if current_data_for_state_df ended up empty
            token_state["bubble_follower_stats_df"] = pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)


    except ValueError as ve:
        logging.error(f"DEBUG: ValueError during follower stats sync for {org_urn}: {ve}", exc_info=True)
        follower_stats_sync_message = f"Follower Stats Error: {html.escape(str(ve))}. "
    except Exception as e: # Catch any other unexpected error
        logging.exception(f"DEBUG: Unexpected error in sync_linkedin_follower_stats for {org_urn}.") # .exception logs stack trace
        follower_stats_sync_message = f"Follower Stats: Unexpected error ({type(e).__name__}). "
    finally:
        if not attempt_logged and org_urn: # Ensure log attempt happens if not already logged due to early exit
            token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
            
    return follower_stats_sync_message, token_state


def sync_all_linkedin_data_orchestrator(token_state):
    """Orchestrates the syncing of all LinkedIn data types (Posts, Mentions, Follower Stats)."""
    logging.info("Starting sync_all_linkedin_data_orchestrator process.")
    if not token_state or not token_state.get("token"):
        logging.error("Sync All: Access denied. LinkedIn token not available.")
        return "<p style='color:red; text-align:center;'>❌ Access denied. LinkedIn token not available.</p>", token_state

    org_urn = token_state.get('org_urn')
    client_id = token_state.get("client_id")

    posts_sync_message = ""
    mentions_sync_message = ""
    follower_stats_sync_message = ""

    if not org_urn:
        logging.error("Sync All: Org URN missing in token_state.")
        return "<p style='color:red;'>❌ Config error: Org URN missing.</p>", token_state
    if not client_id or client_id == "ENV VAR MISSING":
        logging.error("Sync All: Client ID missing or not set in token_state.")
        return "<p style='color:red;'>❌ Config error: Client ID missing.</p>", token_state

    # --- Sync Posts ---
    fetch_count_for_posts_api = token_state.get('fetch_count_for_api', 0)
    if fetch_count_for_posts_api == 0:
        # This means state_manager determined no post sync is needed based on log
        posts_sync_message = "Posts: Sync not currently required by schedule. "
        logging.info("Posts sync: Skipped as fetch_count_for_posts_api is 0 (determined by state_manager).")

    else:
        posts_sync_message, token_state = _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api)
        # _sync_linkedin_posts_internal now handles its own logging internally

    # --- Sync Mentions ---
    # sync_linkedin_mentions will check token_state.get("mentions_should_sync_now")
    # and log its attempt internally.
    mentions_sync_message, token_state = sync_linkedin_mentions(token_state)

    # --- Sync Follower Stats ---
    # sync_linkedin_follower_stats will check token_state.get("fs_should_sync_now")
    # and log its attempt internally.
    follower_stats_sync_message, token_state = sync_linkedin_follower_stats(token_state)

    logging.info(f"Sync process complete. Messages: Posts: [{posts_sync_message.strip()}], Mentions: [{mentions_sync_message.strip()}], Follower Stats: [{follower_stats_sync_message.strip()}]")
    final_message = f"<p style='color:green; text-align:center;'>βœ… Sync Attempted. {posts_sync_message} {mentions_sync_message} {follower_stats_sync_message}</p>"
    return final_message, token_state