GuglielmoTor commited on
Commit
47cf8c3
·
verified ·
1 Parent(s): c01589c

Delete services/sync_logic.py

Browse files
Files changed (1) hide show
  1. services/sync_logic.py +0 -584
services/sync_logic.py DELETED
@@ -1,584 +0,0 @@
1
- # sync_logic.py
2
- """
3
- Handles the logic for syncing LinkedIn data: posts, mentions, and follower statistics.
4
- Fetches data from LinkedIn APIs, uploads to Bubble, and logs sync attempts.
5
- """
6
- import pandas as pd
7
- import logging
8
- import html
9
- from datetime import timezone # Python's datetime
10
-
11
- # Assuming Bubble_API_Calls contains bulk_upload_to_bubble
12
- from apis.Bubble_API_Calls import bulk_upload_to_bubble, fetch_linkedin_posts_data_from_bubble, update_record_in_bubble
13
- # Assuming Linkedin_Data_API_Calls contains all necessary LinkedIn data fetching and processing functions
14
- from apis.Linkedin_Data_API_Calls import (
15
- fetch_linkedin_posts_core,
16
- fetch_comments,
17
- analyze_sentiment, # For post comments
18
- compile_detailed_posts,
19
- prepare_data_for_bubble, # For posts, stats, comments
20
- fetch_linkedin_mentions_core,
21
- analyze_mentions_sentiment, # For individual mentions
22
- compile_detailed_mentions, # Compiles to user-specified format
23
- prepare_mentions_for_bubble # Prepares user-specified format for Bubble
24
- )
25
- # Assuming linkedin_follower_stats.py contains get_linkedin_follower_stats
26
- from apis.linkedin_follower_stats import get_linkedin_follower_stats
27
-
28
- # Assuming config.py contains all necessary constants
29
- from config import (
30
- LINKEDIN_POST_URN_KEY, BUBBLE_POST_URN_COLUMN_NAME, BUBBLE_POSTS_TABLE_NAME,
31
- BUBBLE_POST_STATS_TABLE_NAME, BUBBLE_POST_COMMENTS_TABLE_NAME,
32
- BUBBLE_MENTIONS_TABLE_NAME, BUBBLE_MENTIONS_ID_COLUMN_NAME, BUBBLE_MENTIONS_DATE_COLUMN_NAME,
33
- DEFAULT_MENTIONS_INITIAL_FETCH_COUNT, DEFAULT_MENTIONS_UPDATE_FETCH_COUNT,
34
- BUBBLE_FOLLOWER_STATS_TABLE_NAME, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN,
35
- FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, FOLLOWER_STATS_PAID_COLUMN,
36
- LINKEDIN_CLIENT_ID_ENV_VAR, # Though client_id is usually passed in token_state
37
- # NEW constants for logging
38
- BUBBLE_OPERATIONS_LOG_TABLE_NAME, BUBBLE_OPERATIONS_LOG_DATE_COLUMN,
39
- BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN,
40
- LOG_SUBJECT_POSTS, LOG_SUBJECT_MENTIONS, LOG_SUBJECT_FOLLOWER_STATS,
41
- BUBBLE_UNIQUE_ID_COLUMN_NAME
42
- )
43
-
44
- def _log_sync_attempt(org_urn, subject, token_state):
45
- """
46
- Logs a sync attempt to the Bubble operations log table and updates
47
- the operations log DataFrame in token_state.
48
- """
49
- logging.info(f"Logging sync attempt for subject: {subject}, org_urn: {org_urn}")
50
- if not org_urn:
51
- logging.warning("Cannot log sync attempt: org_urn is missing.")
52
- return token_state
53
- try:
54
- log_entry_data = {
55
- BUBBLE_OPERATIONS_LOG_DATE_COLUMN: pd.Timestamp.now(tz='UTC').isoformat(),
56
- BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN: subject,
57
- BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN: org_urn
58
- }
59
-
60
- # Ensure data types are what Bubble expects, e.g., date as string
61
- # bulk_upload_to_bubble should handle dicts with basic types.
62
- upload_payload = [log_entry_data]
63
-
64
- bulk_upload_to_bubble(upload_payload, BUBBLE_OPERATIONS_LOG_TABLE_NAME)
65
- logging.info(f"Successfully logged sync attempt for {subject} to Bubble table '{BUBBLE_OPERATIONS_LOG_TABLE_NAME}'.")
66
-
67
- # Update token_state with the new log entry to keep it fresh
68
- current_log_df = token_state.get("bubble_operations_log_df", pd.DataFrame())
69
- new_log_entry_df = pd.DataFrame(upload_payload) # DataFrame from the same data we uploaded
70
-
71
- # Ensure date column is datetime before concat if it exists and is not empty
72
- if not new_log_entry_df.empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in new_log_entry_df.columns:
73
- new_log_entry_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(new_log_entry_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True)
74
-
75
- if not current_log_df.empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in current_log_df.columns:
76
- # Ensure existing log df date column is also datetime
77
- if not pd.api.types.is_datetime64_any_dtype(current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN]):
78
- current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True)
79
-
80
- updated_log_df = pd.concat([current_log_df, new_log_entry_df], ignore_index=True)
81
- # To ensure the get_last_sync_attempt_date always gets the absolute latest,
82
- # we can sort and drop duplicates, keeping the last.
83
- # However, simply appending and letting max() find the latest is also fine.
84
- # For robustness, let's sort and keep the latest for each subject/org combo if multiple logs were made rapidly.
85
- if not updated_log_df.empty and all(col in updated_log_df.columns for col in [BUBBLE_OPERATIONS_LOG_DATE_COLUMN, BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN]):
86
- updated_log_df = updated_log_df.sort_values(by=BUBBLE_OPERATIONS_LOG_DATE_COLUMN).drop_duplicates(
87
- subset=[BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN],
88
- keep='last'
89
- )
90
- token_state["bubble_operations_log_df"] = updated_log_df
91
- logging.info(f"Updated 'bubble_operations_log_df' in token_state after logging {subject}.")
92
-
93
- except Exception as e:
94
- logging.error(f"Failed to log sync attempt for {subject} or update token_state: {e}", exc_info=True)
95
- return token_state
96
-
97
-
98
- def _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api):
99
- """Internal logic for syncing LinkedIn posts."""
100
- # This function is called by orchestrator only if fetch_count_for_posts_api > 0
101
- # So, an attempt to sync posts is indeed happening.
102
-
103
- logging.info(f"Posts sync: Starting fetch for {fetch_count_for_posts_api} posts.")
104
- client_id = token_state.get("client_id")
105
- token_dict = token_state.get("token")
106
- org_urn = token_state.get('org_urn')
107
- bubble_posts_df_orig = token_state.get("bubble_posts_df", pd.DataFrame()).copy()
108
- posts_sync_message = ""
109
- attempt_logged = False # Flag to ensure log happens once
110
-
111
- try:
112
- # Basic checks before API call
113
- if not all([client_id, token_dict, org_urn]):
114
- posts_sync_message = "Posts: Config error (client_id, token, or org_urn missing). "
115
- logging.error(f"Posts sync: Prerequisite missing - client_id: {'OK' if client_id else 'Missing'}, token: {'OK' if token_dict else 'Missing'}, org_urn: {'OK' if org_urn else 'Missing'}")
116
- # Log attempt even if config error, as state_manager decided a sync *should* occur
117
- token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
118
- attempt_logged = True
119
- return posts_sync_message, token_state
120
-
121
- processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_for_posts_api)
122
-
123
- if not processed_raw_posts:
124
- posts_sync_message = "Posts: None found via API. "
125
- logging.info("Posts sync: No raw posts returned from API.")
126
- # Log attempt as API was called
127
- token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
128
- attempt_logged = True
129
- return posts_sync_message, token_state
130
-
131
- existing_post_urns = set()
132
- if not bubble_posts_df_orig.empty and BUBBLE_POST_URN_COLUMN_NAME in bubble_posts_df_orig.columns:
133
- existing_post_urns = set(bubble_posts_df_orig[BUBBLE_POST_URN_COLUMN_NAME].dropna().astype(str))
134
-
135
- new_raw_posts = [p for p in processed_raw_posts if str(p.get(LINKEDIN_POST_URN_KEY)) not in existing_post_urns]
136
-
137
- if not new_raw_posts:
138
- posts_sync_message = "Posts: All fetched already in Bubble. "
139
- logging.info("Posts sync: All fetched posts were already found in Bubble.")
140
- # Log attempt as API was called and processed
141
- token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
142
- attempt_logged = True
143
- return posts_sync_message, token_state
144
-
145
- logging.info(f"Posts sync: Processing {len(new_raw_posts)} new raw posts.")
146
- post_urns_to_process = [p[LINKEDIN_POST_URN_KEY] for p in new_raw_posts if p.get(LINKEDIN_POST_URN_KEY)]
147
-
148
- all_comments_data = fetch_comments(client_id, token_dict, post_urns_to_process, stats_map)
149
- sentiments_per_post = analyze_sentiment(all_comments_data)
150
- detailed_new_posts = compile_detailed_posts(new_raw_posts, stats_map, sentiments_per_post)
151
- li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_new_posts, all_comments_data)
152
-
153
- if li_posts:
154
- bulk_upload_to_bubble(li_posts, BUBBLE_POSTS_TABLE_NAME)
155
- updated_posts_df = pd.concat([bubble_posts_df_orig, pd.DataFrame(li_posts)], ignore_index=True)
156
- token_state["bubble_posts_df"] = updated_posts_df.drop_duplicates(subset=[BUBBLE_POST_URN_COLUMN_NAME], keep='last')
157
- logging.info(f"Posts sync: Uploaded {len(li_posts)} new posts to Bubble.")
158
-
159
- if li_post_stats:
160
- bulk_upload_to_bubble(li_post_stats, BUBBLE_POST_STATS_TABLE_NAME)
161
- logging.info(f"Posts sync: Uploaded {len(li_post_stats)} post_stats entries.")
162
- if li_post_comments:
163
- bulk_upload_to_bubble(li_post_comments, BUBBLE_POST_COMMENTS_TABLE_NAME)
164
- logging.info(f"Posts sync: Uploaded {len(li_post_comments)} post_comments entries.")
165
- posts_sync_message = f"Posts: Synced {len(li_posts)} new. "
166
- else:
167
- posts_sync_message = "Posts: No new ones to upload after processing. "
168
- logging.info("Posts sync: No new posts were prepared for Bubble upload.")
169
-
170
- except ValueError as ve:
171
- posts_sync_message = f"Posts Error: {html.escape(str(ve))}. "
172
- logging.error(f"Posts sync: ValueError: {ve}", exc_info=True)
173
- except Exception as e:
174
- logging.exception("Posts sync: Unexpected error during processing.")
175
- posts_sync_message = f"Posts: Unexpected error ({type(e).__name__}). "
176
- finally:
177
- # Log the sync attempt if it hasn't been logged already (e.g. due to early exit)
178
- # and if basic conditions (org_urn) for logging are met.
179
- if not attempt_logged and org_urn:
180
- token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
181
- return posts_sync_message, token_state
182
-
183
-
184
- def sync_linkedin_mentions(token_state):
185
- """Fetches new LinkedIn mentions and uploads them to Bubble, if scheduled by state_manager."""
186
- logging.info("Starting LinkedIn mentions sync process check.")
187
-
188
- if not token_state.get("mentions_should_sync_now", False):
189
- logging.info("Mentions sync: Not scheduled by state_manager based on operations log. Skipping.")
190
- return "Mentions: Sync not currently required by schedule. ", token_state
191
-
192
- logging.info("Mentions sync: Proceeding as scheduled by state_manager.")
193
-
194
- if not token_state or not token_state.get("token"):
195
- logging.error("Mentions sync: Access denied. No LinkedIn token.")
196
- # Still log an attempt if org_urn is available, as a sync was scheduled
197
- org_urn_for_log = token_state.get('org_urn') if token_state else None
198
- if org_urn_for_log:
199
- token_state = _log_sync_attempt(org_urn_for_log, LOG_SUBJECT_MENTIONS, token_state)
200
- return "Mentions: No token. ", token_state
201
-
202
- client_id = token_state.get("client_id")
203
- token_dict = token_state.get("token")
204
- org_urn = token_state.get('org_urn')
205
- bubble_mentions_df_orig = token_state.get("bubble_mentions_df", pd.DataFrame()).copy()
206
- mentions_sync_message = ""
207
- attempt_logged = False
208
-
209
- if not org_urn or not client_id or client_id == "ENV VAR MISSING":
210
- logging.error("Mentions sync: Configuration error (Org URN or Client ID missing).")
211
- if org_urn: # Log if possible
212
- token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
213
- attempt_logged = True
214
- return "Mentions: Config error. ", token_state
215
-
216
- # Determine fetch count: initial if no mentions data, update otherwise
217
- fetch_count_for_mentions_api = DEFAULT_MENTIONS_INITIAL_FETCH_COUNT \
218
- if bubble_mentions_df_orig.empty else DEFAULT_MENTIONS_UPDATE_FETCH_COUNT
219
- logging.info(f"Mentions sync: Fetch count set to {fetch_count_for_mentions_api}.")
220
-
221
- try:
222
- processed_raw_mentions = fetch_linkedin_mentions_core(client_id, token_dict, org_urn, count=fetch_count_for_mentions_api)
223
- if not processed_raw_mentions:
224
- logging.info("Mentions sync: No new mentions found via API.")
225
- mentions_sync_message = "Mentions: None found via API. "
226
- token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
227
- attempt_logged = True
228
- return mentions_sync_message, token_state
229
-
230
- existing_mention_ids = set()
231
- if not bubble_mentions_df_orig.empty and BUBBLE_MENTIONS_ID_COLUMN_NAME in bubble_mentions_df_orig.columns:
232
- existing_mention_ids = set(bubble_mentions_df_orig[BUBBLE_MENTIONS_ID_COLUMN_NAME].dropna().astype(str))
233
-
234
- sentiments_map = analyze_mentions_sentiment(processed_raw_mentions)
235
- all_compiled_mentions = compile_detailed_mentions(processed_raw_mentions, sentiments_map)
236
-
237
- new_compiled_mentions_to_upload = [
238
- m for m in all_compiled_mentions if str(m.get("id")) not in existing_mention_ids
239
- ]
240
-
241
- if not new_compiled_mentions_to_upload:
242
- logging.info("Mentions sync: All fetched mentions are already in Bubble.")
243
- mentions_sync_message = "Mentions: All fetched already in Bubble. "
244
- token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
245
- attempt_logged = True
246
- return mentions_sync_message, token_state
247
-
248
- bubble_ready_mentions = prepare_mentions_for_bubble(new_compiled_mentions_to_upload)
249
- if bubble_ready_mentions:
250
- bulk_upload_to_bubble(bubble_ready_mentions, BUBBLE_MENTIONS_TABLE_NAME)
251
- logging.info(f"Successfully uploaded {len(bubble_ready_mentions)} new mentions to Bubble.")
252
- updated_mentions_df = pd.concat([bubble_mentions_df_orig, pd.DataFrame(bubble_ready_mentions)], ignore_index=True)
253
- token_state["bubble_mentions_df"] = updated_mentions_df.drop_duplicates(subset=[BUBBLE_MENTIONS_ID_COLUMN_NAME], keep='last')
254
- mentions_sync_message = f"Mentions: Synced {len(bubble_ready_mentions)} new. "
255
- else:
256
- logging.info("Mentions sync: No new mentions were prepared for Bubble upload.")
257
- mentions_sync_message = "Mentions: No new ones to upload. "
258
-
259
- except ValueError as ve:
260
- logging.error(f"ValueError during mentions sync: {ve}", exc_info=True)
261
- mentions_sync_message = f"Mentions Error: {html.escape(str(ve))}. "
262
- except Exception as e:
263
- logging.exception("Unexpected error in sync_linkedin_mentions.")
264
- mentions_sync_message = f"Mentions: Unexpected error ({type(e).__name__}). "
265
- finally:
266
- if not attempt_logged and org_urn:
267
- token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
268
- return mentions_sync_message, token_state
269
-
270
-
271
- def _clean_key_component(component_value, is_category_identifier=False):
272
- """
273
- Helper to consistently clean key components.
274
- For non-date category identifiers, converts to lowercase for case-insensitive matching.
275
- """
276
- if pd.isna(component_value) or component_value is None:
277
- return "NONE_VALUE" # Consistent placeholder for None/NaN
278
-
279
- cleaned_value = str(component_value).strip()
280
- if is_category_identifier: # Apply lowercasing only to general category text, not dates or URNs/Types
281
- return cleaned_value.lower()
282
- return cleaned_value
283
-
284
-
285
- def sync_linkedin_follower_stats(token_state):
286
- """
287
- Fetches new/updated LinkedIn follower statistics and uploads/updates them in Bubble,
288
- if scheduled by state_manager. Includes detailed logging for debugging key mismatches.
289
- """
290
- logging.info("DEBUG: Starting LinkedIn follower stats sync process check.")
291
-
292
- if not token_state.get("fs_should_sync_now", False):
293
- logging.info("DEBUG: Follower Stats sync: Not scheduled by state_manager. Skipping.")
294
- return "Follower Stats: Sync not currently required by schedule. ", token_state
295
-
296
- logging.info("DEBUG: Follower Stats sync: Proceeding as scheduled by state_manager.")
297
-
298
- if not token_state or not token_state.get("token"):
299
- logging.error("DEBUG: Follower Stats sync: Access denied. No LinkedIn token.")
300
- org_urn_for_log = token_state.get('org_urn') if token_state else None
301
- if org_urn_for_log:
302
- token_state = _log_sync_attempt(org_urn_for_log, LOG_SUBJECT_FOLLOWER_STATS, token_state)
303
- return "Follower Stats: No token. ", token_state
304
-
305
- client_id = token_state.get("client_id")
306
- token_dict = token_state.get("token")
307
- org_urn = token_state.get('org_urn')
308
- bubble_follower_stats_df_orig = token_state.get("bubble_follower_stats_df", pd.DataFrame()).copy()
309
-
310
- follower_stats_sync_message = ""
311
- attempt_logged = False
312
-
313
- if not org_urn or not client_id or client_id == "ENV VAR MISSING":
314
- logging.error("DEBUG: Follower Stats sync: Configuration error (Org URN or Client ID missing).")
315
- if org_urn:
316
- token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
317
- attempt_logged = True
318
- return "Follower Stats: Config error. ", token_state
319
-
320
- if not bubble_follower_stats_df_orig.empty and BUBBLE_UNIQUE_ID_COLUMN_NAME not in bubble_follower_stats_df_orig.columns:
321
- logging.error(f"DEBUG: Follower Stats sync: Critical error - '{BUBBLE_UNIQUE_ID_COLUMN_NAME}' column missing in bubble_follower_stats_df. Cannot proceed with updates.")
322
- if org_urn:
323
- token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
324
- attempt_logged = True
325
- return f"Follower Stats: Config error ({BUBBLE_UNIQUE_ID_COLUMN_NAME} missing). ", token_state
326
-
327
- logging.info(f"DEBUG: Follower stats sync proceeding for org_urn: {org_urn}")
328
- try:
329
- api_follower_stats = get_linkedin_follower_stats(client_id, token_dict, org_urn)
330
-
331
- if not api_follower_stats: # This is a list of dicts
332
- logging.info(f"DEBUG: Follower Stats sync: No stats found via API for org {org_urn}. API returned: {api_follower_stats}")
333
- follower_stats_sync_message = "Follower Stats: None found via API. "
334
- token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
335
- attempt_logged = True
336
- return follower_stats_sync_message, token_state
337
-
338
- stats_for_bulk_upload = []
339
- records_to_update_via_patch = []
340
-
341
- existing_stats_map = {}
342
- stats_required_cols = [
343
- FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN,
344
- FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN,
345
- FOLLOWER_STATS_PAID_COLUMN, BUBBLE_UNIQUE_ID_COLUMN_NAME
346
- ]
347
-
348
- logging.info("DEBUG: Populating existing_stats_map from Bubble data...")
349
- if not bubble_follower_stats_df_orig.empty and all(col in bubble_follower_stats_df_orig.columns for col in stats_required_cols):
350
- for index, row in bubble_follower_stats_df_orig.iterrows():
351
- org_urn_val = _clean_key_component(row[FOLLOWER_STATS_ORG_URN_COLUMN])
352
- type_val = _clean_key_component(row[FOLLOWER_STATS_TYPE_COLUMN])
353
- category_raw_val = row[FOLLOWER_STATS_CATEGORY_COLUMN]
354
- bubble_id_val = row.get(BUBBLE_UNIQUE_ID_COLUMN_NAME)
355
-
356
- if pd.isna(bubble_id_val):
357
- logging.warning(f"DEBUG: Row index {index} from Bubble data has missing Bubble ID ('{BUBBLE_UNIQUE_ID_COLUMN_NAME}'). Cannot use for updates. Data: {row.to_dict()}")
358
- continue
359
-
360
- category_identifier = ""
361
- if type_val == 'follower_gains_monthly': # Type is already cleaned
362
- parsed_date = pd.to_datetime(category_raw_val, errors='coerce')
363
- if pd.NaT is parsed_date or pd.isna(parsed_date):
364
- logging.warning(f"DEBUG: Could not parse date for existing monthly gain: '{category_raw_val}' from Bubble row index {index}. Skipping for map.")
365
- continue
366
- category_identifier = parsed_date.strftime('%Y-%m-%d') # Date format, not lowercased
367
- else:
368
- # Apply lowercasing for general text categories for case-insensitive matching
369
- category_identifier = _clean_key_component(category_raw_val, is_category_identifier=True)
370
-
371
- key = (org_urn_val, type_val, category_identifier)
372
-
373
- # Ensure counts are numeric when storing in map
374
- existing_organic_count = pd.to_numeric(row[FOLLOWER_STATS_ORGANIC_COLUMN], errors='coerce')
375
- existing_paid_count = pd.to_numeric(row[FOLLOWER_STATS_PAID_COLUMN], errors='coerce')
376
- existing_organic_count = 0 if pd.isna(existing_organic_count) else int(existing_organic_count)
377
- existing_paid_count = 0 if pd.isna(existing_paid_count) else int(existing_paid_count)
378
-
379
- existing_stats_map[key] = (
380
- existing_organic_count,
381
- existing_paid_count,
382
- str(bubble_id_val) # Ensure Bubble ID is string
383
- )
384
- logging.debug(f"DEBUG: Added to existing_stats_map: Key={key}, BubbleID={str(bubble_id_val)}, OrgCounts={existing_organic_count}, PaidCounts={existing_paid_count}")
385
-
386
- elif not bubble_follower_stats_df_orig.empty:
387
- logging.warning(f"DEBUG: Follower Stats: Bubble data is missing one or more required columns for map: {stats_required_cols}.")
388
- else:
389
- logging.info("DEBUG: Follower Stats: Bubble_follower_stats_df_orig is empty. existing_stats_map will be empty.")
390
-
391
- logging.info(f"DEBUG: Processing {len(api_follower_stats)} stats from API...")
392
- for i, stat_from_api in enumerate(api_follower_stats): # api_follower_stats is a list of dicts
393
- api_org_urn = _clean_key_component(stat_from_api.get(FOLLOWER_STATS_ORG_URN_COLUMN))
394
- api_type = _clean_key_component(stat_from_api.get(FOLLOWER_STATS_TYPE_COLUMN))
395
- api_category_raw = stat_from_api.get(FOLLOWER_STATS_CATEGORY_COLUMN)
396
-
397
- api_category_identifier = ""
398
- if api_type == 'follower_gains_monthly': # API type is already cleaned
399
- parsed_date = pd.to_datetime(api_category_raw, errors='coerce')
400
- if pd.NaT is parsed_date or pd.isna(parsed_date):
401
- logging.warning(f"DEBUG: API stat index {i}: Could not parse date for API monthly gain: '{api_category_raw}'. Skipping.")
402
- continue
403
- api_category_identifier = parsed_date.strftime('%Y-%m-%d') # Date format, not lowercased
404
- else:
405
- # Apply lowercasing for general text categories for case-insensitive matching
406
- api_category_identifier = _clean_key_component(api_category_raw, is_category_identifier=True)
407
-
408
- key_from_api = (api_org_urn, api_type, api_category_identifier)
409
- logging.debug(f"DEBUG: API stat index {i}: Generated Key={key_from_api}, RawData={stat_from_api}")
410
-
411
- # Ensure API counts are numeric
412
- api_organic_count = pd.to_numeric(stat_from_api.get(FOLLOWER_STATS_ORGANIC_COLUMN), errors='coerce')
413
- api_paid_count = pd.to_numeric(stat_from_api.get(FOLLOWER_STATS_PAID_COLUMN), errors='coerce')
414
- api_organic_count = 0 if pd.isna(api_organic_count) else int(api_organic_count)
415
- api_paid_count = 0 if pd.isna(api_paid_count) else int(api_paid_count)
416
-
417
-
418
- if key_from_api not in existing_stats_map:
419
- logging.info(f"DEBUG: API stat index {i}: Key={key_from_api} NOT FOUND in existing_stats_map. Adding for BULK UPLOAD.")
420
- stats_for_bulk_upload.append(stat_from_api)
421
- else:
422
- existing_organic, existing_paid, bubble_id = existing_stats_map[key_from_api] # Counts are already int from map
423
- logging.info(f"DEBUG: API stat index {i}: Key={key_from_api} FOUND in existing_stats_map. BubbleID={bubble_id}. ExistingCounts(O/P): {existing_organic}/{existing_paid}. APICounts(O/P): {api_organic_count}/{api_paid_count}.")
424
-
425
- fields_to_update_in_bubble = {}
426
- if api_organic_count > existing_organic:
427
- fields_to_update_in_bubble[FOLLOWER_STATS_ORGANIC_COLUMN] = api_organic_count
428
- logging.debug(f"DEBUG: API stat index {i}: Organic count update: API({api_organic_count}) > Bubble({existing_organic}) for BubbleID {bubble_id}")
429
-
430
- if api_paid_count > existing_paid:
431
- fields_to_update_in_bubble[FOLLOWER_STATS_PAID_COLUMN] = api_paid_count
432
- logging.debug(f"DEBUG: API stat index {i}: Paid count update: API({api_paid_count}) > Bubble({existing_paid}) for BubbleID {bubble_id}")
433
-
434
- if fields_to_update_in_bubble:
435
- records_to_update_via_patch.append((bubble_id, fields_to_update_in_bubble))
436
- logging.info(f"DEBUG: API stat index {i}: Queued for PATCH update. BubbleID={bubble_id}, Updates={fields_to_update_in_bubble}")
437
- else:
438
- logging.info(f"DEBUG: API stat index {i}: Counts are not greater or equal. No update needed for BubbleID={bubble_id}.")
439
-
440
- num_bulk_uploaded = 0
441
- if stats_for_bulk_upload:
442
- logging.info(f"DEBUG: Attempting to bulk upload {len(stats_for_bulk_upload)} new follower stat entries.")
443
- if bulk_upload_to_bubble(stats_for_bulk_upload, BUBBLE_FOLLOWER_STATS_TABLE_NAME):
444
- num_bulk_uploaded = len(stats_for_bulk_upload)
445
- logging.info(f"Successfully bulk-uploaded {num_bulk_uploaded} new follower stat entries to Bubble for org {org_urn}.")
446
- else:
447
- logging.error(f"Failed to bulk-upload {len(stats_for_bulk_upload)} new follower stat entries for org {org_urn}.")
448
-
449
- num_patched_updated = 0
450
- if records_to_update_via_patch:
451
- logging.info(f"DEBUG: Attempting to PATCH update {len(records_to_update_via_patch)} follower stat entries.")
452
- successfully_patched_ids_and_data_temp = [] # To store what was actually successful for token_state update
453
- for bubble_id, fields_to_update in records_to_update_via_patch:
454
- if update_record_in_bubble(BUBBLE_FOLLOWER_STATS_TABLE_NAME, bubble_id, fields_to_update):
455
- num_patched_updated += 1
456
- successfully_patched_ids_and_data_temp.append({'bubble_id': bubble_id, 'fields': fields_to_update})
457
- else:
458
- logging.error(f"Failed to update record {bubble_id} via PATCH for follower stats for org {org_urn}.")
459
- logging.info(f"Attempted to update {len(records_to_update_via_patch)} follower stat entries via PATCH, {num_patched_updated} succeeded for org {org_urn}.")
460
-
461
- if not stats_for_bulk_upload and not records_to_update_via_patch:
462
- logging.info(f"DEBUG: Follower Stats sync: Data for org {org_urn} is up-to-date or no changes met update criteria.")
463
- follower_stats_sync_message = "Follower Stats: Data up-to-date or no qualifying changes. "
464
- else:
465
- follower_stats_sync_message = f"Follower Stats: Synced (New: {num_bulk_uploaded}, Updated: {num_patched_updated}). "
466
-
467
- # --- Update token_state's follower stats DataFrame ---
468
- current_data_for_state_df = bubble_follower_stats_df_orig.copy()
469
-
470
- if num_patched_updated > 0: # Check against actual successful patches
471
- for item in successfully_patched_ids_and_data_temp: # Iterate over successfully patched items
472
- bubble_id = item['bubble_id']
473
- fields_updated = item['fields']
474
- idx = current_data_for_state_df[current_data_for_state_df[BUBBLE_UNIQUE_ID_COLUMN_NAME] == bubble_id].index
475
- if not idx.empty:
476
- for col, value in fields_updated.items():
477
- current_data_for_state_df.loc[idx, col] = value
478
-
479
- if num_bulk_uploaded > 0: # Check against actual successful bulk uploads
480
- successfully_created_stats = stats_for_bulk_upload[:num_bulk_uploaded] # Slice based on success count
481
- if successfully_created_stats:
482
- newly_created_df = pd.DataFrame(successfully_created_stats)
483
- if not newly_created_df.empty:
484
- for col in current_data_for_state_df.columns:
485
- if col not in newly_created_df.columns:
486
- newly_created_df[col] = pd.NA
487
- aligned_newly_created_df = newly_created_df.reindex(columns=current_data_for_state_df.columns).fillna(pd.NA)
488
- current_data_for_state_df = pd.concat([current_data_for_state_df, aligned_newly_created_df], ignore_index=True)
489
-
490
- if not current_data_for_state_df.empty:
491
- monthly_part = current_data_for_state_df[current_data_for_state_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].copy()
492
- if not monthly_part.empty:
493
- # Ensure FOLLOWER_STATS_CATEGORY_COLUMN is string before strftime, after to_datetime
494
- monthly_part.loc[:, FOLLOWER_STATS_CATEGORY_COLUMN] = pd.to_datetime(monthly_part[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.strftime('%Y-%m-%d')
495
- monthly_part = monthly_part.drop_duplicates(
496
- subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN],
497
- keep='last'
498
- )
499
-
500
- demographics_part = current_data_for_state_df[current_data_for_state_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].copy()
501
- if not demographics_part.empty:
502
- # For demographics, category is already cleaned (and lowercased) if it was text
503
- # Ensure all subset columns exist before drop_duplicates
504
- demo_subset_cols = [FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN]
505
- if all(col in demographics_part.columns for col in demo_subset_cols):
506
- # Clean the category column here again to match the key generation for demographics
507
- demographics_part.loc[:, FOLLOWER_STATS_CATEGORY_COLUMN] = demographics_part[FOLLOWER_STATS_CATEGORY_COLUMN].apply(lambda x: _clean_key_component(x, is_category_identifier=True))
508
- demographics_part = demographics_part.drop_duplicates(
509
- subset=demo_subset_cols,
510
- keep='last'
511
- )
512
- else:
513
- logging.warning(f"DEBUG: Demographics part missing one of {demo_subset_cols} for deduplication.")
514
-
515
- if monthly_part.empty and demographics_part.empty:
516
- token_state["bubble_follower_stats_df"] = pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
517
- elif monthly_part.empty: # only demographics_part has data or is empty
518
- token_state["bubble_follower_stats_df"] = demographics_part.reset_index(drop=True) if not demographics_part.empty else pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
519
- elif demographics_part.empty: # only monthly_part has data or is empty
520
- token_state["bubble_follower_stats_df"] = monthly_part.reset_index(drop=True) if not monthly_part.empty else pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
521
- else: # both have data
522
- token_state["bubble_follower_stats_df"] = pd.concat([monthly_part, demographics_part], ignore_index=True)
523
- else: # if current_data_for_state_df ended up empty
524
- token_state["bubble_follower_stats_df"] = pd.DataFrame(columns=bubble_follower_stats_df_orig.columns)
525
-
526
-
527
- except ValueError as ve:
528
- logging.error(f"DEBUG: ValueError during follower stats sync for {org_urn}: {ve}", exc_info=True)
529
- follower_stats_sync_message = f"Follower Stats Error: {html.escape(str(ve))}. "
530
- except Exception as e: # Catch any other unexpected error
531
- logging.exception(f"DEBUG: Unexpected error in sync_linkedin_follower_stats for {org_urn}.") # .exception logs stack trace
532
- follower_stats_sync_message = f"Follower Stats: Unexpected error ({type(e).__name__}). "
533
- finally:
534
- if not attempt_logged and org_urn: # Ensure log attempt happens if not already logged due to early exit
535
- token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
536
-
537
- return follower_stats_sync_message, token_state
538
-
539
-
540
- def sync_all_linkedin_data_orchestrator(token_state):
541
- """Orchestrates the syncing of all LinkedIn data types (Posts, Mentions, Follower Stats)."""
542
- logging.info("Starting sync_all_linkedin_data_orchestrator process.")
543
- if not token_state or not token_state.get("token"):
544
- logging.error("Sync All: Access denied. LinkedIn token not available.")
545
- return "<p style='color:red; text-align:center;'>❌ Access denied. LinkedIn token not available.</p>", token_state
546
-
547
- org_urn = token_state.get('org_urn')
548
- client_id = token_state.get("client_id")
549
-
550
- posts_sync_message = ""
551
- mentions_sync_message = ""
552
- follower_stats_sync_message = ""
553
-
554
- if not org_urn:
555
- logging.error("Sync All: Org URN missing in token_state.")
556
- return "<p style='color:red;'>❌ Config error: Org URN missing.</p>", token_state
557
- if not client_id or client_id == "ENV VAR MISSING":
558
- logging.error("Sync All: Client ID missing or not set in token_state.")
559
- return "<p style='color:red;'>❌ Config error: Client ID missing.</p>", token_state
560
-
561
- # --- Sync Posts ---
562
- fetch_count_for_posts_api = token_state.get('fetch_count_for_api', 0)
563
- if fetch_count_for_posts_api == 0:
564
- # This means state_manager determined no post sync is needed based on log
565
- posts_sync_message = "Posts: Sync not currently required by schedule. "
566
- logging.info("Posts sync: Skipped as fetch_count_for_posts_api is 0 (determined by state_manager).")
567
-
568
- else:
569
- posts_sync_message, token_state = _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api)
570
- # _sync_linkedin_posts_internal now handles its own logging internally
571
-
572
- # --- Sync Mentions ---
573
- # sync_linkedin_mentions will check token_state.get("mentions_should_sync_now")
574
- # and log its attempt internally.
575
- mentions_sync_message, token_state = sync_linkedin_mentions(token_state)
576
-
577
- # --- Sync Follower Stats ---
578
- # sync_linkedin_follower_stats will check token_state.get("fs_should_sync_now")
579
- # and log its attempt internally.
580
- follower_stats_sync_message, token_state = sync_linkedin_follower_stats(token_state)
581
-
582
- logging.info(f"Sync process complete. Messages: Posts: [{posts_sync_message.strip()}], Mentions: [{mentions_sync_message.strip()}], Follower Stats: [{follower_stats_sync_message.strip()}]")
583
- final_message = f"<p style='color:green; text-align:center;'>✅ Sync Attempted. {posts_sync_message} {mentions_sync_message} {follower_stats_sync_message}</p>"
584
- return final_message, token_state