GuglielmoTor commited on
Commit
17bd31e
·
verified ·
1 Parent(s): 5c5c0fc

Update sync_logic.py

Browse files
Files changed (1) hide show
  1. sync_logic.py +198 -103
sync_logic.py CHANGED
@@ -1,15 +1,15 @@
1
  # sync_logic.py
2
  """
3
  Handles the logic for syncing LinkedIn data: posts, mentions, and follower statistics.
4
- Fetches data from LinkedIn APIs and uploads to Bubble.
5
  """
6
  import pandas as pd
7
  import logging
8
  import html
 
9
 
10
  # Assuming Bubble_API_Calls contains bulk_upload_to_bubble
11
- from Bubble_API_Calls import bulk_upload_to_bubble
12
-
13
  # Assuming Linkedin_Data_API_Calls contains all necessary LinkedIn data fetching and processing functions
14
  from Linkedin_Data_API_Calls import (
15
  fetch_linkedin_posts_core,
@@ -33,25 +33,98 @@ from config import (
33
  DEFAULT_MENTIONS_INITIAL_FETCH_COUNT, DEFAULT_MENTIONS_UPDATE_FETCH_COUNT,
34
  BUBBLE_FOLLOWER_STATS_TABLE_NAME, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN,
35
  FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, FOLLOWER_STATS_PAID_COLUMN,
36
- LINKEDIN_CLIENT_ID_ENV_VAR
 
 
 
 
37
  )
38
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39
 
40
  def _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api):
41
  """Internal logic for syncing LinkedIn posts."""
 
 
 
42
  logging.info(f"Posts sync: Starting fetch for {fetch_count_for_posts_api} posts.")
43
  client_id = token_state.get("client_id")
44
  token_dict = token_state.get("token")
45
  org_urn = token_state.get('org_urn')
46
  bubble_posts_df_orig = token_state.get("bubble_posts_df", pd.DataFrame()).copy()
47
  posts_sync_message = ""
 
48
 
49
  try:
 
 
 
 
 
 
 
 
 
50
  processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_for_posts_api)
51
 
52
  if not processed_raw_posts:
53
  posts_sync_message = "Posts: None found via API. "
54
  logging.info("Posts sync: No raw posts returned from API.")
 
 
 
55
  return posts_sync_message, token_state
56
 
57
  existing_post_urns = set()
@@ -63,15 +136,17 @@ def _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api):
63
  if not new_raw_posts:
64
  posts_sync_message = "Posts: All fetched already in Bubble. "
65
  logging.info("Posts sync: All fetched posts were already found in Bubble.")
 
 
 
66
  return posts_sync_message, token_state
67
 
68
  logging.info(f"Posts sync: Processing {len(new_raw_posts)} new raw posts.")
69
  post_urns_to_process = [p[LINKEDIN_POST_URN_KEY] for p in new_raw_posts if p.get(LINKEDIN_POST_URN_KEY)]
70
 
71
  all_comments_data = fetch_comments(client_id, token_dict, post_urns_to_process, stats_map)
72
- sentiments_per_post = analyze_sentiment(all_comments_data) # Assumes analysis of comments for posts
73
  detailed_new_posts = compile_detailed_posts(new_raw_posts, stats_map, sentiments_per_post)
74
-
75
  li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_new_posts, all_comments_data)
76
 
77
  if li_posts:
@@ -97,59 +172,59 @@ def _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api):
97
  except Exception as e:
98
  logging.exception("Posts sync: Unexpected error during processing.")
99
  posts_sync_message = f"Posts: Unexpected error ({type(e).__name__}). "
 
 
 
 
 
100
  return posts_sync_message, token_state
101
 
102
 
103
  def sync_linkedin_mentions(token_state):
104
- """Fetches new LinkedIn mentions and uploads them to Bubble."""
105
- logging.info("Starting LinkedIn mentions sync process.")
 
 
 
 
 
 
 
106
  if not token_state or not token_state.get("token"):
107
  logging.error("Mentions sync: Access denied. No LinkedIn token.")
 
 
 
 
108
  return "Mentions: No token. ", token_state
109
 
110
  client_id = token_state.get("client_id")
111
  token_dict = token_state.get("token")
112
  org_urn = token_state.get('org_urn')
113
- # Work with a copy, original df in token_state will be updated at the end
114
  bubble_mentions_df_orig = token_state.get("bubble_mentions_df", pd.DataFrame()).copy()
 
 
115
 
116
  if not org_urn or not client_id or client_id == "ENV VAR MISSING":
117
  logging.error("Mentions sync: Configuration error (Org URN or Client ID missing).")
 
 
 
118
  return "Mentions: Config error. ", token_state
119
 
120
- fetch_count_for_mentions_api = 0
121
- mentions_sync_is_needed_now = False
122
- if bubble_mentions_df_orig.empty:
123
- mentions_sync_is_needed_now = True
124
- fetch_count_for_mentions_api = DEFAULT_MENTIONS_INITIAL_FETCH_COUNT
125
- logging.info("Mentions sync needed: Bubble DF empty. Fetching initial count.")
126
- else:
127
- if BUBBLE_MENTIONS_DATE_COLUMN_NAME not in bubble_mentions_df_orig.columns or \
128
- bubble_mentions_df_orig[BUBBLE_MENTIONS_DATE_COLUMN_NAME].isnull().all():
129
- mentions_sync_is_needed_now = True
130
- fetch_count_for_mentions_api = DEFAULT_MENTIONS_INITIAL_FETCH_COUNT
131
- logging.info(f"Mentions sync needed: Date column '{BUBBLE_MENTIONS_DATE_COLUMN_NAME}' missing or all null. Fetching initial count.")
132
- else:
133
- # Use a copy for date checks to avoid SettingWithCopyWarning if any modification were made
134
- mentions_df_check = bubble_mentions_df_orig.copy()
135
- mentions_df_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME] = pd.to_datetime(mentions_df_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME], errors='coerce', utc=True)
136
- last_mention_date_utc = mentions_df_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME].dropna().max()
137
- if pd.isna(last_mention_date_utc) or \
138
- (pd.Timestamp('now', tz='UTC').normalize() - last_mention_date_utc.normalize()).days >= 7:
139
- mentions_sync_is_needed_now = True
140
- fetch_count_for_mentions_api = DEFAULT_MENTIONS_UPDATE_FETCH_COUNT
141
- logging.info(f"Mentions sync needed: Last mention date {last_mention_date_utc} is old or invalid. Fetching update count.")
142
-
143
- if not mentions_sync_is_needed_now:
144
- logging.info("Mentions data is fresh based on current check. No API fetch needed for mentions.")
145
- return "Mentions: Up-to-date. ", token_state
146
-
147
- logging.info(f"Mentions sync proceeding. Fetch count: {fetch_count_for_mentions_api}")
148
  try:
149
  processed_raw_mentions = fetch_linkedin_mentions_core(client_id, token_dict, org_urn, count=fetch_count_for_mentions_api)
150
  if not processed_raw_mentions:
151
  logging.info("Mentions sync: No new mentions found via API.")
152
- return "Mentions: None found via API. ", token_state
 
 
 
153
 
154
  existing_mention_ids = set()
155
  if not bubble_mentions_df_orig.empty and BUBBLE_MENTIONS_ID_COLUMN_NAME in bubble_mentions_df_orig.columns:
@@ -164,7 +239,10 @@ def sync_linkedin_mentions(token_state):
164
 
165
  if not new_compiled_mentions_to_upload:
166
  logging.info("Mentions sync: All fetched mentions are already in Bubble.")
167
- return "Mentions: All fetched already in Bubble. ", token_state
 
 
 
168
 
169
  bubble_ready_mentions = prepare_mentions_for_bubble(new_compiled_mentions_to_upload)
170
  if bubble_ready_mentions:
@@ -172,108 +250,103 @@ def sync_linkedin_mentions(token_state):
172
  logging.info(f"Successfully uploaded {len(bubble_ready_mentions)} new mentions to Bubble.")
173
  updated_mentions_df = pd.concat([bubble_mentions_df_orig, pd.DataFrame(bubble_ready_mentions)], ignore_index=True)
174
  token_state["bubble_mentions_df"] = updated_mentions_df.drop_duplicates(subset=[BUBBLE_MENTIONS_ID_COLUMN_NAME], keep='last')
175
- return f"Mentions: Synced {len(bubble_ready_mentions)} new. ", token_state
176
  else:
177
  logging.info("Mentions sync: No new mentions were prepared for Bubble upload.")
178
- return "Mentions: No new ones to upload. ", token_state
 
179
  except ValueError as ve:
180
  logging.error(f"ValueError during mentions sync: {ve}", exc_info=True)
181
- return f"Mentions Error: {html.escape(str(ve))}. ", token_state
182
  except Exception as e:
183
  logging.exception("Unexpected error in sync_linkedin_mentions.")
184
- return f"Mentions: Unexpected error ({type(e).__name__}). ", token_state
 
 
 
 
185
 
186
 
187
  def sync_linkedin_follower_stats(token_state):
188
- """Fetches new LinkedIn follower statistics and uploads them to Bubble."""
189
- logging.info("Starting LinkedIn follower stats sync process.")
 
 
 
 
 
 
 
190
  if not token_state or not token_state.get("token"):
191
  logging.error("Follower Stats sync: Access denied. No LinkedIn token.")
 
 
 
192
  return "Follower Stats: No token. ", token_state
193
 
194
  client_id = token_state.get("client_id")
195
  token_dict = token_state.get("token")
196
  org_urn = token_state.get('org_urn')
197
  bubble_follower_stats_df_orig = token_state.get("bubble_follower_stats_df", pd.DataFrame()).copy()
 
 
198
 
199
  if not org_urn or not client_id or client_id == "ENV VAR MISSING":
200
  logging.error("Follower Stats sync: Configuration error (Org URN or Client ID missing).")
 
 
 
201
  return "Follower Stats: Config error. ", token_state
202
-
203
- follower_stats_sync_is_needed_now = False
204
- if bubble_follower_stats_df_orig.empty:
205
- follower_stats_sync_is_needed_now = True
206
- logging.info("Follower stats sync needed: Bubble DF is empty.")
207
- else:
208
- monthly_gains_df_check = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].copy()
209
- if monthly_gains_df_check.empty or FOLLOWER_STATS_CATEGORY_COLUMN not in monthly_gains_df_check.columns:
210
- follower_stats_sync_is_needed_now = True
211
- logging.info("Follower stats sync needed: Monthly gains data missing or date column absent.")
212
- else:
213
- monthly_gains_df_check.loc[:, FOLLOWER_STATS_CATEGORY_COLUMN] = pd.to_datetime(monthly_gains_df_check[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.normalize()
214
- last_gain_date = monthly_gains_df_check[FOLLOWER_STATS_CATEGORY_COLUMN].dropna().max()
215
-
216
- if pd.isna(last_gain_date):
217
- follower_stats_sync_is_needed_now = True
218
- logging.info("Follower stats sync needed: No valid dates in monthly gains after conversion for check.")
219
- else:
220
- if last_gain_date.tzinfo is None or last_gain_date.tzinfo.utcoffset(last_gain_date) is None:
221
- last_gain_date = last_gain_date.tz_localize('UTC')
222
- else:
223
- last_gain_date = last_gain_date.tz_convert('UTC')
224
-
225
- start_of_current_month = pd.Timestamp('now', tz='UTC').normalize().replace(day=1)
226
- if last_gain_date < start_of_current_month:
227
- follower_stats_sync_is_needed_now = True
228
- logging.info(f"Follower stats sync needed: Last gain date {last_gain_date} is old or invalid.")
229
-
230
- if bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].empty:
231
- follower_stats_sync_is_needed_now = True
232
- logging.info("Follower stats sync needed: Demographic data (non-monthly) is missing.")
233
-
234
- if not follower_stats_sync_is_needed_now:
235
- logging.info("Follower stats data is fresh based on current check. No API fetch needed.")
236
- return "Follower Stats: Data up-to-date. ", token_state
237
-
238
  logging.info(f"Follower stats sync proceeding for org_urn: {org_urn}")
239
  try:
240
  api_follower_stats = get_linkedin_follower_stats(client_id, token_dict, org_urn)
241
  if not api_follower_stats:
242
  logging.info(f"Follower Stats sync: No stats found via API for org {org_urn}.")
243
- return "Follower Stats: None found via API. ", token_state
 
 
 
244
 
245
  new_stats_to_upload = []
 
 
246
  api_monthly_gains = [s for s in api_follower_stats if s.get(FOLLOWER_STATS_TYPE_COLUMN) == 'follower_gains_monthly']
247
  existing_monthly_gain_dates = set()
248
  if not bubble_follower_stats_df_orig.empty:
249
  bubble_monthly_df = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly']
250
  if FOLLOWER_STATS_CATEGORY_COLUMN in bubble_monthly_df.columns:
251
- existing_monthly_gain_dates = set(bubble_monthly_df[FOLLOWER_STATS_CATEGORY_COLUMN].astype(str).unique())
 
252
 
253
  for gain_stat in api_monthly_gains:
254
- if str(gain_stat.get(FOLLOWER_STATS_CATEGORY_COLUMN)) not in existing_monthly_gain_dates:
 
255
  new_stats_to_upload.append(gain_stat)
256
-
 
257
  api_demographics = [s for s in api_follower_stats if s.get(FOLLOWER_STATS_TYPE_COLUMN) != 'follower_gains_monthly']
 
 
258
  existing_demographics_map = {}
259
  if not bubble_follower_stats_df_orig.empty:
260
  bubble_demographics_df = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly']
261
- if not bubble_demographics_df.empty and \
262
- all(col in bubble_demographics_df.columns for col in [
263
- FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN,
264
- FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN,
265
- FOLLOWER_STATS_PAID_COLUMN
266
- ]):
267
  for _, row in bubble_demographics_df.iterrows():
268
  key = (
269
  str(row[FOLLOWER_STATS_ORG_URN_COLUMN]),
270
  str(row[FOLLOWER_STATS_TYPE_COLUMN]),
271
- str(row[FOLLOWER_STATS_CATEGORY_COLUMN])
272
  )
273
  existing_demographics_map[key] = (
274
- row[FOLLOWER_STATS_ORGANIC_COLUMN],
275
- row[FOLLOWER_STATS_PAID_COLUMN]
276
  )
 
277
  for demo_stat in api_demographics:
278
  key = (
279
  str(demo_stat.get(FOLLOWER_STATS_ORG_URN_COLUMN)),
@@ -284,33 +357,46 @@ def sync_linkedin_follower_stats(token_state):
284
  demo_stat.get(FOLLOWER_STATS_ORGANIC_COLUMN, 0),
285
  demo_stat.get(FOLLOWER_STATS_PAID_COLUMN, 0)
286
  )
 
287
  if key not in existing_demographics_map or existing_demographics_map[key] != api_counts:
288
  new_stats_to_upload.append(demo_stat)
289
 
 
290
  if not new_stats_to_upload:
291
- logging.info(f"Follower Stats sync: Data for org {org_urn} is up-to-date or no changes found.")
292
- return "Follower Stats: Data up-to-date or no changes. ", token_state
 
 
 
293
 
294
  bulk_upload_to_bubble(new_stats_to_upload, BUBBLE_FOLLOWER_STATS_TABLE_NAME)
295
  logging.info(f"Successfully uploaded {len(new_stats_to_upload)} follower stat entries to Bubble for org {org_urn}.")
296
 
 
297
  temp_df = pd.concat([bubble_follower_stats_df_orig, pd.DataFrame(new_stats_to_upload)], ignore_index=True)
 
298
  monthly_part = temp_df[temp_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].drop_duplicates(
299
  subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN],
300
  keep='last'
301
  )
 
302
  demographics_part = temp_df[temp_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].drop_duplicates(
303
  subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN],
304
  keep='last'
305
  )
306
  token_state["bubble_follower_stats_df"] = pd.concat([monthly_part, demographics_part], ignore_index=True)
307
- return f"Follower Stats: Synced {len(new_stats_to_upload)} entries. ", token_state
 
308
  except ValueError as ve:
309
  logging.error(f"ValueError during follower stats sync for {org_urn}: {ve}", exc_info=True)
310
- return f"Follower Stats Error: {html.escape(str(ve))}. ", token_state
311
  except Exception as e:
312
  logging.exception(f"Unexpected error in sync_linkedin_follower_stats for {org_urn}.")
313
- return f"Follower Stats: Unexpected error ({type(e).__name__}). ", token_state
 
 
 
 
314
 
315
 
316
  def sync_all_linkedin_data_orchestrator(token_state):
@@ -321,7 +407,7 @@ def sync_all_linkedin_data_orchestrator(token_state):
321
  return "<p style='color:red; text-align:center;'>❌ Access denied. LinkedIn token not available.</p>", token_state
322
 
323
  org_urn = token_state.get('org_urn')
324
- client_id = token_state.get("client_id") # Client ID should be in token_state from process_and_store_bubble_token
325
 
326
  posts_sync_message = ""
327
  mentions_sync_message = ""
@@ -330,22 +416,31 @@ def sync_all_linkedin_data_orchestrator(token_state):
330
  if not org_urn:
331
  logging.error("Sync All: Org URN missing in token_state.")
332
  return "<p style='color:red;'>❌ Config error: Org URN missing.</p>", token_state
333
- if not client_id or client_id == "ENV VAR MISSING": # Check client_id from token_state
334
  logging.error("Sync All: Client ID missing or not set in token_state.")
335
  return "<p style='color:red;'>❌ Config error: Client ID missing.</p>", token_state
336
 
337
  # --- Sync Posts ---
338
  fetch_count_for_posts_api = token_state.get('fetch_count_for_api', 0)
339
  if fetch_count_for_posts_api == 0:
340
- posts_sync_message = "Posts: Already up-to-date. "
341
- logging.info("Posts sync: Skipped as fetch_count_for_posts_api is 0.")
 
 
 
 
342
  else:
343
  posts_sync_message, token_state = _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api)
 
344
 
345
  # --- Sync Mentions ---
 
 
346
  mentions_sync_message, token_state = sync_linkedin_mentions(token_state)
347
 
348
  # --- Sync Follower Stats ---
 
 
349
  follower_stats_sync_message, token_state = sync_linkedin_follower_stats(token_state)
350
 
351
  logging.info(f"Sync process complete. Messages: Posts: [{posts_sync_message.strip()}], Mentions: [{mentions_sync_message.strip()}], Follower Stats: [{follower_stats_sync_message.strip()}]")
 
1
  # sync_logic.py
2
  """
3
  Handles the logic for syncing LinkedIn data: posts, mentions, and follower statistics.
4
+ Fetches data from LinkedIn APIs, uploads to Bubble, and logs sync attempts.
5
  """
6
  import pandas as pd
7
  import logging
8
  import html
9
+ from datetime import timezone # Python's datetime
10
 
11
  # Assuming Bubble_API_Calls contains bulk_upload_to_bubble
12
+ from Bubble_API_Calls import bulk_upload_to_bubble, fetch_linkedin_posts_data_from_bubble # Added fetch for log update
 
13
  # Assuming Linkedin_Data_API_Calls contains all necessary LinkedIn data fetching and processing functions
14
  from Linkedin_Data_API_Calls import (
15
  fetch_linkedin_posts_core,
 
33
  DEFAULT_MENTIONS_INITIAL_FETCH_COUNT, DEFAULT_MENTIONS_UPDATE_FETCH_COUNT,
34
  BUBBLE_FOLLOWER_STATS_TABLE_NAME, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN,
35
  FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, FOLLOWER_STATS_PAID_COLUMN,
36
+ LINKEDIN_CLIENT_ID_ENV_VAR, # Though client_id is usually passed in token_state
37
+ # NEW constants for logging
38
+ BUBBLE_OPERATIONS_LOG_TABLE_NAME, BUBBLE_OPERATIONS_LOG_DATE_COLUMN,
39
+ BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN,
40
+ LOG_SUBJECT_POSTS, LOG_SUBJECT_MENTIONS, LOG_SUBJECT_FOLLOWER_STATS
41
  )
42
 
43
+ def _log_sync_attempt(org_urn, subject, token_state):
44
+ """
45
+ Logs a sync attempt to the Bubble operations log table and updates
46
+ the operations log DataFrame in token_state.
47
+ """
48
+ logging.info(f"Logging sync attempt for subject: {subject}, org_urn: {org_urn}")
49
+ if not org_urn:
50
+ logging.warning("Cannot log sync attempt: org_urn is missing.")
51
+ return token_state
52
+ try:
53
+ log_entry_data = {
54
+ BUBBLE_OPERATIONS_LOG_DATE_COLUMN: pd.Timestamp.now(tz='UTC').isoformat(),
55
+ BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN: subject,
56
+ BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN: org_urn
57
+ }
58
+
59
+ # Ensure data types are what Bubble expects, e.g., date as string
60
+ # bulk_upload_to_bubble should handle dicts with basic types.
61
+ upload_payload = [log_entry_data]
62
+
63
+ bulk_upload_to_bubble(upload_payload, BUBBLE_OPERATIONS_LOG_TABLE_NAME)
64
+ logging.info(f"Successfully logged sync attempt for {subject} to Bubble table '{BUBBLE_OPERATIONS_LOG_TABLE_NAME}'.")
65
+
66
+ # Update token_state with the new log entry to keep it fresh
67
+ current_log_df = token_state.get("bubble_operations_log_df", pd.DataFrame())
68
+ new_log_entry_df = pd.DataFrame(upload_payload) # DataFrame from the same data we uploaded
69
+
70
+ # Ensure date column is datetime before concat if it exists and is not empty
71
+ if not new_log_entry_df.empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in new_log_entry_df.columns:
72
+ new_log_entry_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(new_log_entry_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True)
73
+
74
+ if not current_log_df.empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in current_log_df.columns:
75
+ # Ensure existing log df date column is also datetime
76
+ if not pd.api.types.is_datetime64_any_dtype(current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN]):
77
+ current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(current_log_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True)
78
+
79
+ updated_log_df = pd.concat([current_log_df, new_log_entry_df], ignore_index=True)
80
+ # To ensure the get_last_sync_attempt_date always gets the absolute latest,
81
+ # we can sort and drop duplicates, keeping the last.
82
+ # However, simply appending and letting max() find the latest is also fine.
83
+ # For robustness, let's sort and keep the latest for each subject/org combo if multiple logs were made rapidly.
84
+ if not updated_log_df.empty and all(col in updated_log_df.columns for col in [BUBBLE_OPERATIONS_LOG_DATE_COLUMN, BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN]):
85
+ updated_log_df = updated_log_df.sort_values(by=BUBBLE_OPERATIONS_LOG_DATE_COLUMN).drop_duplicates(
86
+ subset=[BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN],
87
+ keep='last'
88
+ )
89
+ token_state["bubble_operations_log_df"] = updated_log_df
90
+ logging.info(f"Updated 'bubble_operations_log_df' in token_state after logging {subject}.")
91
+
92
+ except Exception as e:
93
+ logging.error(f"Failed to log sync attempt for {subject} or update token_state: {e}", exc_info=True)
94
+ return token_state
95
+
96
 
97
  def _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api):
98
  """Internal logic for syncing LinkedIn posts."""
99
+ # This function is called by orchestrator only if fetch_count_for_posts_api > 0
100
+ # So, an attempt to sync posts is indeed happening.
101
+
102
  logging.info(f"Posts sync: Starting fetch for {fetch_count_for_posts_api} posts.")
103
  client_id = token_state.get("client_id")
104
  token_dict = token_state.get("token")
105
  org_urn = token_state.get('org_urn')
106
  bubble_posts_df_orig = token_state.get("bubble_posts_df", pd.DataFrame()).copy()
107
  posts_sync_message = ""
108
+ attempt_logged = False # Flag to ensure log happens once
109
 
110
  try:
111
+ # Basic checks before API call
112
+ if not all([client_id, token_dict, org_urn]):
113
+ posts_sync_message = "Posts: Config error (client_id, token, or org_urn missing). "
114
+ logging.error(f"Posts sync: Prerequisite missing - client_id: {'OK' if client_id else 'Missing'}, token: {'OK' if token_dict else 'Missing'}, org_urn: {'OK' if org_urn else 'Missing'}")
115
+ # Log attempt even if config error, as state_manager decided a sync *should* occur
116
+ token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
117
+ attempt_logged = True
118
+ return posts_sync_message, token_state
119
+
120
  processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_for_posts_api)
121
 
122
  if not processed_raw_posts:
123
  posts_sync_message = "Posts: None found via API. "
124
  logging.info("Posts sync: No raw posts returned from API.")
125
+ # Log attempt as API was called
126
+ token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
127
+ attempt_logged = True
128
  return posts_sync_message, token_state
129
 
130
  existing_post_urns = set()
 
136
  if not new_raw_posts:
137
  posts_sync_message = "Posts: All fetched already in Bubble. "
138
  logging.info("Posts sync: All fetched posts were already found in Bubble.")
139
+ # Log attempt as API was called and processed
140
+ token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
141
+ attempt_logged = True
142
  return posts_sync_message, token_state
143
 
144
  logging.info(f"Posts sync: Processing {len(new_raw_posts)} new raw posts.")
145
  post_urns_to_process = [p[LINKEDIN_POST_URN_KEY] for p in new_raw_posts if p.get(LINKEDIN_POST_URN_KEY)]
146
 
147
  all_comments_data = fetch_comments(client_id, token_dict, post_urns_to_process, stats_map)
148
+ sentiments_per_post = analyze_sentiment(all_comments_data)
149
  detailed_new_posts = compile_detailed_posts(new_raw_posts, stats_map, sentiments_per_post)
 
150
  li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_new_posts, all_comments_data)
151
 
152
  if li_posts:
 
172
  except Exception as e:
173
  logging.exception("Posts sync: Unexpected error during processing.")
174
  posts_sync_message = f"Posts: Unexpected error ({type(e).__name__}). "
175
+ finally:
176
+ # Log the sync attempt if it hasn't been logged already (e.g. due to early exit)
177
+ # and if basic conditions (org_urn) for logging are met.
178
+ if not attempt_logged and org_urn:
179
+ token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
180
  return posts_sync_message, token_state
181
 
182
 
183
  def sync_linkedin_mentions(token_state):
184
+ """Fetches new LinkedIn mentions and uploads them to Bubble, if scheduled by state_manager."""
185
+ logging.info("Starting LinkedIn mentions sync process check.")
186
+
187
+ if not token_state.get("mentions_should_sync_now", False):
188
+ logging.info("Mentions sync: Not scheduled by state_manager based on operations log. Skipping.")
189
+ return "Mentions: Sync not currently required by schedule. ", token_state
190
+
191
+ logging.info("Mentions sync: Proceeding as scheduled by state_manager.")
192
+
193
  if not token_state or not token_state.get("token"):
194
  logging.error("Mentions sync: Access denied. No LinkedIn token.")
195
+ # Still log an attempt if org_urn is available, as a sync was scheduled
196
+ org_urn_for_log = token_state.get('org_urn') if token_state else None
197
+ if org_urn_for_log:
198
+ token_state = _log_sync_attempt(org_urn_for_log, LOG_SUBJECT_MENTIONS, token_state)
199
  return "Mentions: No token. ", token_state
200
 
201
  client_id = token_state.get("client_id")
202
  token_dict = token_state.get("token")
203
  org_urn = token_state.get('org_urn')
 
204
  bubble_mentions_df_orig = token_state.get("bubble_mentions_df", pd.DataFrame()).copy()
205
+ mentions_sync_message = ""
206
+ attempt_logged = False
207
 
208
  if not org_urn or not client_id or client_id == "ENV VAR MISSING":
209
  logging.error("Mentions sync: Configuration error (Org URN or Client ID missing).")
210
+ if org_urn: # Log if possible
211
+ token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
212
+ attempt_logged = True
213
  return "Mentions: Config error. ", token_state
214
 
215
+ # Determine fetch count: initial if no mentions data, update otherwise
216
+ fetch_count_for_mentions_api = DEFAULT_MENTIONS_INITIAL_FETCH_COUNT \
217
+ if bubble_mentions_df_orig.empty else DEFAULT_MENTIONS_UPDATE_FETCH_COUNT
218
+ logging.info(f"Mentions sync: Fetch count set to {fetch_count_for_mentions_api}.")
219
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
220
  try:
221
  processed_raw_mentions = fetch_linkedin_mentions_core(client_id, token_dict, org_urn, count=fetch_count_for_mentions_api)
222
  if not processed_raw_mentions:
223
  logging.info("Mentions sync: No new mentions found via API.")
224
+ mentions_sync_message = "Mentions: None found via API. "
225
+ token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
226
+ attempt_logged = True
227
+ return mentions_sync_message, token_state
228
 
229
  existing_mention_ids = set()
230
  if not bubble_mentions_df_orig.empty and BUBBLE_MENTIONS_ID_COLUMN_NAME in bubble_mentions_df_orig.columns:
 
239
 
240
  if not new_compiled_mentions_to_upload:
241
  logging.info("Mentions sync: All fetched mentions are already in Bubble.")
242
+ mentions_sync_message = "Mentions: All fetched already in Bubble. "
243
+ token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
244
+ attempt_logged = True
245
+ return mentions_sync_message, token_state
246
 
247
  bubble_ready_mentions = prepare_mentions_for_bubble(new_compiled_mentions_to_upload)
248
  if bubble_ready_mentions:
 
250
  logging.info(f"Successfully uploaded {len(bubble_ready_mentions)} new mentions to Bubble.")
251
  updated_mentions_df = pd.concat([bubble_mentions_df_orig, pd.DataFrame(bubble_ready_mentions)], ignore_index=True)
252
  token_state["bubble_mentions_df"] = updated_mentions_df.drop_duplicates(subset=[BUBBLE_MENTIONS_ID_COLUMN_NAME], keep='last')
253
+ mentions_sync_message = f"Mentions: Synced {len(bubble_ready_mentions)} new. "
254
  else:
255
  logging.info("Mentions sync: No new mentions were prepared for Bubble upload.")
256
+ mentions_sync_message = "Mentions: No new ones to upload. "
257
+
258
  except ValueError as ve:
259
  logging.error(f"ValueError during mentions sync: {ve}", exc_info=True)
260
+ mentions_sync_message = f"Mentions Error: {html.escape(str(ve))}. "
261
  except Exception as e:
262
  logging.exception("Unexpected error in sync_linkedin_mentions.")
263
+ mentions_sync_message = f"Mentions: Unexpected error ({type(e).__name__}). "
264
+ finally:
265
+ if not attempt_logged and org_urn:
266
+ token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_MENTIONS, token_state)
267
+ return mentions_sync_message, token_state
268
 
269
 
270
  def sync_linkedin_follower_stats(token_state):
271
+ """Fetches new LinkedIn follower statistics and uploads them to Bubble, if scheduled."""
272
+ logging.info("Starting LinkedIn follower stats sync process check.")
273
+
274
+ if not token_state.get("fs_should_sync_now", False):
275
+ logging.info("Follower Stats sync: Not scheduled by state_manager. Skipping.")
276
+ return "Follower Stats: Sync not currently required by schedule. ", token_state
277
+
278
+ logging.info("Follower Stats sync: Proceeding as scheduled by state_manager.")
279
+
280
  if not token_state or not token_state.get("token"):
281
  logging.error("Follower Stats sync: Access denied. No LinkedIn token.")
282
+ org_urn_for_log = token_state.get('org_urn') if token_state else None
283
+ if org_urn_for_log:
284
+ token_state = _log_sync_attempt(org_urn_for_log, LOG_SUBJECT_FOLLOWER_STATS, token_state)
285
  return "Follower Stats: No token. ", token_state
286
 
287
  client_id = token_state.get("client_id")
288
  token_dict = token_state.get("token")
289
  org_urn = token_state.get('org_urn')
290
  bubble_follower_stats_df_orig = token_state.get("bubble_follower_stats_df", pd.DataFrame()).copy()
291
+ follower_stats_sync_message = ""
292
+ attempt_logged = False
293
 
294
  if not org_urn or not client_id or client_id == "ENV VAR MISSING":
295
  logging.error("Follower Stats sync: Configuration error (Org URN or Client ID missing).")
296
+ if org_urn:
297
+ token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
298
+ attempt_logged = True
299
  return "Follower Stats: Config error. ", token_state
300
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
301
  logging.info(f"Follower stats sync proceeding for org_urn: {org_urn}")
302
  try:
303
  api_follower_stats = get_linkedin_follower_stats(client_id, token_dict, org_urn)
304
  if not api_follower_stats:
305
  logging.info(f"Follower Stats sync: No stats found via API for org {org_urn}.")
306
+ follower_stats_sync_message = "Follower Stats: None found via API. "
307
+ token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
308
+ attempt_logged = True
309
+ return follower_stats_sync_message, token_state
310
 
311
  new_stats_to_upload = []
312
+ # Logic for comparing API stats with existing Bubble stats (monthly gains and demographics)
313
+ # Monthly Gains
314
  api_monthly_gains = [s for s in api_follower_stats if s.get(FOLLOWER_STATS_TYPE_COLUMN) == 'follower_gains_monthly']
315
  existing_monthly_gain_dates = set()
316
  if not bubble_follower_stats_df_orig.empty:
317
  bubble_monthly_df = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly']
318
  if FOLLOWER_STATS_CATEGORY_COLUMN in bubble_monthly_df.columns:
319
+ # Convert to string for consistent comparison, assuming API data also provides date as string or convertible
320
+ existing_monthly_gain_dates = set(pd.to_datetime(bubble_monthly_df[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.strftime('%Y-%m-%d').dropna())
321
 
322
  for gain_stat in api_monthly_gains:
323
+ api_date_str = pd.to_datetime(gain_stat.get(FOLLOWER_STATS_CATEGORY_COLUMN), errors='coerce').strftime('%Y-%m-%d')
324
+ if api_date_str != 'NaT' and api_date_str not in existing_monthly_gain_dates:
325
  new_stats_to_upload.append(gain_stat)
326
+
327
+ # Demographics (overwrite logic: if API has it, and it's different or not present in Bubble, upload)
328
  api_demographics = [s for s in api_follower_stats if s.get(FOLLOWER_STATS_TYPE_COLUMN) != 'follower_gains_monthly']
329
+ # Create a map of existing demographics for quick lookup
330
+ # Key: (org_urn, type, category), Value: (organic_count, paid_count)
331
  existing_demographics_map = {}
332
  if not bubble_follower_stats_df_orig.empty:
333
  bubble_demographics_df = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly']
334
+ required_cols_demo = [
335
+ FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN,
336
+ FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN,
337
+ FOLLOWER_STATS_PAID_COLUMN
338
+ ]
339
+ if not bubble_demographics_df.empty and all(col in bubble_demographics_df.columns for col in required_cols_demo):
340
  for _, row in bubble_demographics_df.iterrows():
341
  key = (
342
  str(row[FOLLOWER_STATS_ORG_URN_COLUMN]),
343
  str(row[FOLLOWER_STATS_TYPE_COLUMN]),
344
+ str(row[FOLLOWER_STATS_CATEGORY_COLUMN]) # Category can be various things
345
  )
346
  existing_demographics_map[key] = (
347
+ row[FOLLOWER_STATS_ORGANIC_COLUMN], row[FOLLOWER_STATS_PAID_COLUMN]
 
348
  )
349
+
350
  for demo_stat in api_demographics:
351
  key = (
352
  str(demo_stat.get(FOLLOWER_STATS_ORG_URN_COLUMN)),
 
357
  demo_stat.get(FOLLOWER_STATS_ORGANIC_COLUMN, 0),
358
  demo_stat.get(FOLLOWER_STATS_PAID_COLUMN, 0)
359
  )
360
+ # If key not in existing OR counts are different, then it's new/changed
361
  if key not in existing_demographics_map or existing_demographics_map[key] != api_counts:
362
  new_stats_to_upload.append(demo_stat)
363
 
364
+
365
  if not new_stats_to_upload:
366
+ logging.info(f"Follower Stats sync: Data for org {org_urn} is up-to-date or no changes found after comparison.")
367
+ follower_stats_sync_message = "Follower Stats: Data up-to-date or no changes. "
368
+ token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
369
+ attempt_logged = True
370
+ return follower_stats_sync_message, token_state
371
 
372
  bulk_upload_to_bubble(new_stats_to_upload, BUBBLE_FOLLOWER_STATS_TABLE_NAME)
373
  logging.info(f"Successfully uploaded {len(new_stats_to_upload)} follower stat entries to Bubble for org {org_urn}.")
374
 
375
+ # Update token_state's follower stats DataFrame
376
  temp_df = pd.concat([bubble_follower_stats_df_orig, pd.DataFrame(new_stats_to_upload)], ignore_index=True)
377
+ # For monthly gains, keep last entry per org/date (category)
378
  monthly_part = temp_df[temp_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].drop_duplicates(
379
  subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN],
380
  keep='last'
381
  )
382
+ # For demographics, keep last entry per org/type/category
383
  demographics_part = temp_df[temp_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].drop_duplicates(
384
  subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN],
385
  keep='last'
386
  )
387
  token_state["bubble_follower_stats_df"] = pd.concat([monthly_part, demographics_part], ignore_index=True)
388
+ follower_stats_sync_message = f"Follower Stats: Synced {len(new_stats_to_upload)} entries. "
389
+
390
  except ValueError as ve:
391
  logging.error(f"ValueError during follower stats sync for {org_urn}: {ve}", exc_info=True)
392
+ follower_stats_sync_message = f"Follower Stats Error: {html.escape(str(ve))}. "
393
  except Exception as e:
394
  logging.exception(f"Unexpected error in sync_linkedin_follower_stats for {org_urn}.")
395
+ follower_stats_sync_message = f"Follower Stats: Unexpected error ({type(e).__name__}). "
396
+ finally:
397
+ if not attempt_logged and org_urn:
398
+ token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_FOLLOWER_STATS, token_state)
399
+ return follower_stats_sync_message, token_state
400
 
401
 
402
  def sync_all_linkedin_data_orchestrator(token_state):
 
407
  return "<p style='color:red; text-align:center;'>❌ Access denied. LinkedIn token not available.</p>", token_state
408
 
409
  org_urn = token_state.get('org_urn')
410
+ client_id = token_state.get("client_id")
411
 
412
  posts_sync_message = ""
413
  mentions_sync_message = ""
 
416
  if not org_urn:
417
  logging.error("Sync All: Org URN missing in token_state.")
418
  return "<p style='color:red;'>❌ Config error: Org URN missing.</p>", token_state
419
+ if not client_id or client_id == "ENV VAR MISSING":
420
  logging.error("Sync All: Client ID missing or not set in token_state.")
421
  return "<p style='color:red;'>❌ Config error: Client ID missing.</p>", token_state
422
 
423
  # --- Sync Posts ---
424
  fetch_count_for_posts_api = token_state.get('fetch_count_for_api', 0)
425
  if fetch_count_for_posts_api == 0:
426
+ # This means state_manager determined no post sync is needed based on log
427
+ posts_sync_message = "Posts: Sync not currently required by schedule. "
428
+ logging.info("Posts sync: Skipped as fetch_count_for_posts_api is 0 (determined by state_manager).")
429
+ # Log an "attempt" to sync posts which resulted in a skip due to schedule.
430
+ # This keeps the log fresh, indicating a check was made.
431
+ token_state = _log_sync_attempt(org_urn, LOG_SUBJECT_POSTS, token_state)
432
  else:
433
  posts_sync_message, token_state = _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api)
434
+ # _sync_linkedin_posts_internal now handles its own logging internally
435
 
436
  # --- Sync Mentions ---
437
+ # sync_linkedin_mentions will check token_state.get("mentions_should_sync_now")
438
+ # and log its attempt internally.
439
  mentions_sync_message, token_state = sync_linkedin_mentions(token_state)
440
 
441
  # --- Sync Follower Stats ---
442
+ # sync_linkedin_follower_stats will check token_state.get("fs_should_sync_now")
443
+ # and log its attempt internally.
444
  follower_stats_sync_message, token_state = sync_linkedin_follower_stats(token_state)
445
 
446
  logging.info(f"Sync process complete. Messages: Posts: [{posts_sync_message.strip()}], Mentions: [{mentions_sync_message.strip()}], Follower Stats: [{follower_stats_sync_message.strip()}]")